You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/09/07 17:42:05 UTC
[09/52] [abbrv] incubator-eagle git commit: [EAGLE-496] fix code
style of jpm
[EAGLE-496] fix code style of jpm
Author: wujinhu <wu...@126.com>
Closes #383 from wujinhu/EAGLE-496.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/0b852cbc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/0b852cbc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/0b852cbc
Branch: refs/heads/master
Commit: 0b852cbcd4243e71f51ae7a42c68e1ce7571545e
Parents: 6a55b59
Author: wujinhu <wu...@126.com>
Authored: Wed Aug 24 20:22:58 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Wed Aug 24 20:22:58 2016 +0800
----------------------------------------------------------------------
.../queue/common/HadoopYarnResourceUtils.java | 2 +-
.../queue/common/YarnURLSelectorImpl.java | 2 +-
.../storm/HadoopQueueRunningExtractor.java | 2 +-
.../eagle/jpm/mr/historyentity/JobConfig.java | 2 +-
.../jpm/mr/historyentity/JobConfigSerDeser.java | 15 +-
.../JobConfigurationAPIEntity.java | 13 +-
.../jpm/mr/historyentity/JobEventAPIEntity.java | 4 +-
.../mr/historyentity/JobExecutionAPIEntity.java | 23 +-
.../JobProcessTimeStampEntity.java | 3 +-
.../TaskAttemptCounterAPIEntity.java | 7 +-
.../TaskAttemptExecutionAPIEntity.java | 17 +-
.../historyentity/TaskExecutionAPIEntity.java | 17 +-
.../TaskFailureCountAPIEntity.java | 3 +-
.../mr/runningentity/JobExecutionAPIEntity.java | 10 +-
.../TaskAttemptExecutionAPIEntity.java | 6 +-
.../runningentity/TaskExecutionAPIEntity.java | 6 +-
.../jpm/spark/crawl/JHFInputStreamReader.java | 3 +-
.../eagle/jpm/spark/crawl/JHFParserBase.java | 4 +-
.../jpm/spark/crawl/JHFSparkEventReader.java | 82 +++--
.../eagle/jpm/spark/crawl/JHFSparkParser.java | 12 +-
.../SparkFilesystemInputStreamReaderImpl.java | 4 +-
.../eagle/jpm/spark/entity/JobConfig.java | 3 +-
.../apache/eagle/jpm/spark/entity/SparkApp.java | 11 +-
.../eagle/jpm/spark/entity/SparkExecutor.java | 4 +-
.../apache/eagle/jpm/spark/entity/SparkJob.java | 24 +-
.../eagle/jpm/spark/entity/SparkStage.java | 32 +-
.../eagle/jpm/spark/entity/SparkTask.java | 4 +-
.../history/crawler/AbstractJobHistoryDAO.java | 61 ++--
.../crawler/DefaultJHFInputStreamCallback.java | 24 +-
.../history/crawler/EagleOutputCollector.java | 6 +-
.../mr/history/crawler/JHFCrawlerDriver.java | 4 +-
.../history/crawler/JHFCrawlerDriverImpl.java | 193 +++++------
.../history/crawler/JHFInputStreamCallback.java | 4 +-
.../crawler/JobHistoryContentFilter.java | 5 +
.../crawler/JobHistoryContentFilterBuilder.java | 53 +--
.../crawler/JobHistoryContentFilterImpl.java | 42 +--
.../mr/history/crawler/JobHistoryDAOImpl.java | 80 ++---
.../jpm/mr/history/crawler/JobHistoryLCM.java | 24 +-
.../JobHistorySpoutCollectorInterceptor.java | 8 +-
.../HistoryJobEntityCreationListener.java | 6 +-
.../mr/history/parser/JHFEventReaderBase.java | 319 ++++++++++---------
.../mr/history/parser/JHFMRVer1EventReader.java | 10 +-
.../jpm/mr/history/parser/JHFMRVer1Parser.java | 15 +-
.../mr/history/parser/JHFMRVer2EventReader.java | 5 +-
.../jpm/mr/history/parser/JHFMRVer2Parser.java | 10 +-
.../jpm/mr/history/parser/JHFParserBase.java | 5 +-
.../parser/JHFWriteNotCompletedException.java | 6 +-
...JobConfigurationCreationServiceListener.java | 8 +-
.../JobEntityCreationEagleServiceListener.java | 2 +-
.../parser/JobEntityCreationPublisher.java | 5 -
.../parser/JobEntityLifecycleAggregator.java | 68 ++--
.../mr/history/parser/TaskFailureListener.java | 13 +-
.../eagle/jpm/mr/running/MRRunningJobMain.java | 11 +-
.../running/config/MRRunningConfigManager.java | 13 +-
.../jpm/mr/running/parser/MRJobParser.java | 62 ++--
.../mr/running/recover/MRRunningJobManager.java | 8 +-
.../running/storm/MRRunningJobFetchSpout.java | 6 +-
.../mr/running/storm/MRRunningJobParseBolt.java | 6 +-
.../history/config/SparkHistoryCrawlConfig.java | 3 +-
.../status/JobHistoryZKStateManager.java | 153 +++++----
.../spark/history/status/ZKStateConstant.java | 2 +-
.../history/storm/FinishedSparkJobSpout.java | 23 +-
.../history/storm/SparkHistoryTopology.java | 14 +-
.../spark/history/storm/SparkJobParseBolt.java | 16 +-
.../eagle/jpm/spark/history/storm/TestHDFS.java | 4 +-
.../jpm/spark/running/SparkRunningJobApp.java | 9 +-
.../spark/running/SparkRunningJobAppConfig.java | 17 +-
.../spark/running/entities/SparkAppEntity.java | 11 +-
.../running/entities/SparkExecutorEntity.java | 2 +-
.../spark/running/entities/SparkJobEntity.java | 2 +-
.../running/entities/SparkStageEntity.java | 24 +-
.../spark/running/entities/SparkTaskEntity.java | 2 +-
.../running/parser/SparkApplicationParser.java | 44 +--
.../running/recover/SparkRunningJobManager.java | 7 +-
.../storm/SparkRunningJobFetchSpout.java | 6 +-
.../running/storm/SparkRunningJobParseBolt.java | 6 +-
.../org/apache/eagle/jpm/util/Constants.java | 35 +-
.../org/apache/eagle/jpm/util/HDFSUtil.java | 12 +-
.../org/apache/eagle/jpm/util/JSONUtil.java | 30 +-
.../eagle/jpm/util/JobNameNormalization.java | 178 ++++++-----
.../org/apache/eagle/jpm/util/MRJobTagName.java | 5 +-
.../eagle/jpm/util/SparkEntityConstant.java | 4 +-
.../apache/eagle/jpm/util/SparkJobTagName.java | 5 +-
.../java/org/apache/eagle/jpm/util/Utils.java | 16 +-
.../util/jobcounter/CounterGroupDictionary.java | 13 +-
.../jpm/util/jobcounter/CounterGroupKey.java | 7 +-
.../eagle/jpm/util/jobcounter/CounterKey.java | 5 +-
.../util/jobcounter/JobCounterException.java | 12 +-
.../eagle/jpm/util/jobcounter/JobCounters.java | 2 +-
.../util/jobcounter/JobCountersSerDeser.java | 2 +-
.../jpm/util/jobrecover/RunningJobManager.java | 15 +-
.../util/resourceFetch/RMResourceFetcher.java | 305 +++++++++---------
.../jpm/util/resourceFetch/ResourceFetcher.java | 4 +-
.../SparkHistoryServerResourceFetcher.java | 29 +-
.../connection/InputStreamUtils.java | 77 +++--
.../util/resourceFetch/connection/JobUtils.java | 42 +--
.../connection/URLConnectionUtils.java | 121 ++++---
.../resourceFetch/ha/AbstractURLSelector.java | 32 +-
.../util/resourceFetch/ha/HAURLSelector.java | 14 +-
.../resourceFetch/ha/HAURLSelectorImpl.java | 150 ++++-----
.../jpm/util/resourceFetch/model/AppInfo.java | 251 ++++++++-------
.../util/resourceFetch/model/Applications.java | 20 +-
.../util/resourceFetch/model/AppsWrapper.java | 22 +-
.../util/resourceFetch/model/ClusterInfo.java | 4 +-
.../resourceFetch/model/ClusterInfoWrapper.java | 4 +-
.../resourceFetch/model/JobCounterGroup.java | 8 +-
.../resourceFetch/model/JobCounterItem.java | 11 +-
.../util/resourceFetch/model/JobCounters.java | 8 +-
.../resourceFetch/model/JobCountersWrapper.java | 4 +-
.../jpm/util/resourceFetch/model/MRJob.java | 4 +-
.../util/resourceFetch/model/MRJobsWrapper.java | 4 +-
.../jpm/util/resourceFetch/model/MRTask.java | 4 +-
.../util/resourceFetch/model/MRTaskAttempt.java | 4 +-
.../model/MRTaskAttemptWrapper.java | 4 +-
.../resourceFetch/model/MRTaskAttempts.java | 4 +-
.../jpm/util/resourceFetch/model/MRTasks.java | 4 +-
.../resourceFetch/model/MRTasksWrapper.java | 4 +-
.../jpm/util/resourceFetch/model/MrJobs.java | 4 +-
.../resourceFetch/model/SparkApplication.java | 4 +-
.../model/SparkApplicationAttempt.java | 4 +-
.../model/SparkApplicationWrapper.java | 4 +-
.../util/resourceFetch/model/SparkExecutor.java | 4 +-
.../jpm/util/resourceFetch/model/SparkJob.java | 4 +-
.../util/resourceFetch/model/SparkStage.java | 4 +-
.../jpm/util/resourceFetch/model/SparkTask.java | 4 +-
.../model/SparkTaskInputMetrics.java | 4 +-
.../resourceFetch/model/SparkTaskMetrics.java | 4 +-
.../model/SparkTaskShuffleReadMetrics.java | 4 +-
.../model/SparkTaskShuffleWriteMetrics.java | 4 +-
.../resourceFetch/model/TaskCounterGroup.java | 5 +-
.../resourceFetch/model/TaskCounterItem.java | 4 +-
.../util/resourceFetch/model/TaskCounters.java | 5 +-
.../model/TaskCountersWrapper.java | 4 +-
.../url/JobListServiceURLBuilderImpl.java | 56 ++--
.../resourceFetch/url/ServiceURLBuilder.java | 4 +-
.../SparkCompleteJobServiceURLBuilderImpl.java | 2 +-
.../url/SparkJobServiceURLBuilderImpl.java | 2 +-
.../jpm/util/resourceFetch/url/URLUtil.java | 2 +-
.../hive/jobrunning/HiveJobFetchSpout.java | 12 +-
139 files changed, 1783 insertions(+), 1582 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopYarnResourceUtils.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopYarnResourceUtils.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopYarnResourceUtils.java
index f2c4b1f..2802449 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopYarnResourceUtils.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopYarnResourceUtils.java
@@ -20,7 +20,7 @@ package org.apache.eagle.hadoop.queue.common;
import com.typesafe.config.Config;
import org.apache.eagle.jpm.util.Constants;
-import org.apache.eagle.jpm.util.resourceFetch.connection.InputStreamUtils;
+import org.apache.eagle.jpm.util.resourcefetch.connection.InputStreamUtils;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.map.ObjectMapper;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnURLSelectorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnURLSelectorImpl.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnURLSelectorImpl.java
index 05e3be9..02f67d4 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnURLSelectorImpl.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnURLSelectorImpl.java
@@ -19,7 +19,7 @@
package org.apache.eagle.hadoop.queue.common;
import org.apache.eagle.jpm.util.Constants;
-import org.apache.eagle.jpm.util.resourceFetch.ha.AbstractURLSelector;
+import org.apache.eagle.jpm.util.resourcefetch.ha.AbstractURLSelector;
public class YarnURLSelectorImpl extends AbstractURLSelector {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java
index 975e633..3c4391b 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java
@@ -27,7 +27,7 @@ import org.apache.eagle.hadoop.queue.common.HadoopYarnResourceUtils;
import org.apache.eagle.hadoop.queue.common.YarnClusterResourceURLBuilder;
import org.apache.eagle.hadoop.queue.common.YarnURLSelectorImpl;
import org.apache.eagle.jpm.util.Constants;
-import org.apache.eagle.jpm.util.resourceFetch.ha.HAURLSelector;
+import org.apache.eagle.jpm.util.resourcefetch.ha.HAURLSelector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobConfig.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobConfig.java
index 97ebd50..35f346b 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobConfig.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobConfig.java
@@ -32,7 +32,7 @@ public final class JobConfig {
this.config = config;
}
- public String toString(){
+ public String toString() {
return config.toString();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobConfigSerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobConfigSerDeser.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobConfigSerDeser.java
index 5af4377..cfa50f9 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobConfigSerDeser.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobConfigSerDeser.java
@@ -35,11 +35,15 @@ public class JobConfigSerDeser implements EntitySerDeser<JobConfig> {
String sb = Bytes.toString(bytes);
String[] keyValue = sb.split(",");
for (String pair : keyValue) {
- String str[] = pair.split(":");
- if (pair.equals("") || str[0].equals("")) continue;
+ String[] str = pair.split(":");
+ if (pair.equals("") || str[0].equals("")) {
+ continue;
+ }
String key = str[0];
String value = "";
- if (str.length == 2) value = str[1];
+ if (str.length == 2) {
+ value = str[1];
+ }
map.put(key, value);
}
return jc;
@@ -49,14 +53,15 @@ public class JobConfigSerDeser implements EntitySerDeser<JobConfig> {
public byte[] serialize(JobConfig conf) {
Map<String, String> map = conf.getConfig();
StringBuilder sb = new StringBuilder();
- for (Entry<String, String> entry : map.entrySet())
+ for (Entry<String, String> entry : map.entrySet()) {
sb.append(entry.getKey() + ":" + entry.getValue() + ",");
+ }
sb.deleteCharAt(sb.length() - 1);
return sb.toString().getBytes();
}
@Override
- public Class<JobConfig> type(){
+ public Class<JobConfig> type() {
return JobConfig.class;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobConfigurationAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobConfigurationAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobConfigurationAPIEntity.java
index 3a09c5f..d186fd4 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobConfigurationAPIEntity.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobConfigurationAPIEntity.java
@@ -22,7 +22,7 @@ import org.apache.eagle.jpm.util.Constants;
import org.apache.eagle.log.entity.meta.*;
import org.codehaus.jackson.map.annotate.JsonSerialize;
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@Table("eaglejpa")
@ColumnFamily("f")
@Prefix("jconf")
@@ -30,9 +30,9 @@ import org.codehaus.jackson.map.annotate.JsonSerialize;
@TimeSeries(true)
@Partition({"site"})
@Indexes({
- @Index(name="Index_1_jobId", columns = { "jobId" }, unique = true),
- @Index(name="Index_2_jobDefId", columns = { "jobDefId" }, unique = false)
-})
+ @Index(name = "Index_1_jobId", columns = { "jobId" }, unique = true),
+ @Index(name = "Index_2_jobDefId", columns = { "jobDefId" }, unique = false)
+ })
public class JobConfigurationAPIEntity extends JobBaseAPIEntity {
@Column("a")
@@ -45,20 +45,25 @@ public class JobConfigurationAPIEntity extends JobBaseAPIEntity {
public JobConfig getJobConfig() {
return jobConfig;
}
+
public void setJobConfig(JobConfig jobConfig) {
this.jobConfig = jobConfig;
_pcs.firePropertyChange("jobConfig", null, null);
}
+
public String getConfigJobName() {
return configJobName;
}
+
public void setConfigJobName(String configJobName) {
this.configJobName = configJobName;
_pcs.firePropertyChange("configJobName", null, null);
}
+
public String getAlertEmailList() {
return alertEmailList;
}
+
public void setAlertEmailList(String alertEmailList) {
this.alertEmailList = alertEmailList;
_pcs.firePropertyChange("alertEmailList", null, null);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobEventAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobEventAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobEventAPIEntity.java
index b289a9c..c6bb8e4 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobEventAPIEntity.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobEventAPIEntity.java
@@ -22,7 +22,7 @@ import org.apache.eagle.jpm.util.Constants;
import org.apache.eagle.log.entity.meta.*;
import org.codehaus.jackson.map.annotate.JsonSerialize;
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@Table("eaglejpa")
@ColumnFamily("f")
@Prefix("jevent")
@@ -30,13 +30,13 @@ import org.codehaus.jackson.map.annotate.JsonSerialize;
@TimeSeries(true)
@Partition({"site"})
public class JobEventAPIEntity extends JobBaseAPIEntity {
-
@Column("a")
private String eventType;
public String getEventType() {
return eventType;
}
+
public void setEventType(String eventType) {
this.eventType = eventType;
_pcs.firePropertyChange("eventType", null, null);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java
index d9093ff..1f75f07 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java
@@ -23,7 +23,7 @@ import org.apache.eagle.jpm.util.jobcounter.JobCounters;
import org.apache.eagle.log.entity.meta.*;
import org.codehaus.jackson.map.annotate.JsonSerialize;
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@Table("eaglejpa")
@ColumnFamily("f")
@Prefix("jexec")
@@ -31,8 +31,8 @@ import org.codehaus.jackson.map.annotate.JsonSerialize;
@TimeSeries(true)
@Partition({"site"})
@Indexes({
- @Index(name="Index_1_jobId", columns = { "jobId" }, unique = true),
- @Index(name="Index_2_jobDefId", columns = { "jobDefId" }, unique = false)
+ @Index(name = "Index_1_jobId", columns = { "jobId" }, unique = true),
+ @Index(name = "Index_2_jobDefId", columns = { "jobDefId" }, unique = false)
})
public class JobExecutionAPIEntity extends JobBaseAPIEntity {
@Column("a")
@@ -85,6 +85,7 @@ public class JobExecutionAPIEntity extends JobBaseAPIEntity {
public long getDurationTime() {
return durationTime;
}
+
public void setDurationTime(long durationTime) {
this.durationTime = durationTime;
valueChanged("durationTime");
@@ -93,59 +94,75 @@ public class JobExecutionAPIEntity extends JobBaseAPIEntity {
public String getCurrentState() {
return currentState;
}
+
public void setCurrentState(String currentState) {
this.currentState = currentState;
_pcs.firePropertyChange("currentState", null, null);
}
+
public long getStartTime() {
return startTime;
}
+
public void setStartTime(long startTime) {
this.startTime = startTime;
_pcs.firePropertyChange("startTime", null, null);
}
+
public long getEndTime() {
return endTime;
}
+
public void setEndTime(long endTime) {
this.endTime = endTime;
_pcs.firePropertyChange("endTime", null, null);
}
+
public int getNumTotalMaps() {
return numTotalMaps;
}
+
public void setNumTotalMaps(int numTotalMaps) {
this.numTotalMaps = numTotalMaps;
_pcs.firePropertyChange("numTotalMaps", null, null);
}
+
public int getNumFailedMaps() {
return numFailedMaps;
}
+
public void setNumFailedMaps(int numFailedMaps) {
this.numFailedMaps = numFailedMaps;
_pcs.firePropertyChange("numFailedMaps", null, null);
}
+
public int getNumFinishedMaps() {
return numFinishedMaps;
}
+
public void setNumFinishedMaps(int numFinishedMaps) {
this.numFinishedMaps = numFinishedMaps;
_pcs.firePropertyChange("numFinishedMaps", null, null);
}
+
public int getNumTotalReduces() {
return numTotalReduces;
}
+
public void setNumTotalReduces(int numTotalReduces) {
this.numTotalReduces = numTotalReduces;
_pcs.firePropertyChange("numTotalReduces", null, null);
}
+
public int getNumFailedReduces() {
return numFailedReduces;
}
+
public void setNumFailedReduces(int numFailedReduces) {
this.numFailedReduces = numFailedReduces;
_pcs.firePropertyChange("numFailedReduces", null, null);
}
+
public int getNumFinishedReduces() {
return numFinishedReduces;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobProcessTimeStampEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobProcessTimeStampEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobProcessTimeStampEntity.java
index df57657..6afe347 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobProcessTimeStampEntity.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobProcessTimeStampEntity.java
@@ -23,7 +23,7 @@ import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import org.apache.eagle.log.entity.meta.*;
import org.codehaus.jackson.map.annotate.JsonSerialize;
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@Table("eaglejpa_process")
@ColumnFamily("f")
@Prefix("process")
@@ -37,6 +37,7 @@ public class JobProcessTimeStampEntity extends TaggedLogAPIEntity {
public long getCurrentTimeStamp() {
return currentTimeStamp;
}
+
public void setCurrentTimeStamp(long currentTimeStamp) {
this.currentTimeStamp = currentTimeStamp;
_pcs.firePropertyChange("currentTimeStamp", null, null);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptCounterAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptCounterAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptCounterAPIEntity.java
index 89272bf..e526f45 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptCounterAPIEntity.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptCounterAPIEntity.java
@@ -22,7 +22,7 @@ import org.apache.eagle.jpm.util.Constants;
import org.apache.eagle.log.entity.meta.*;
import org.codehaus.jackson.map.annotate.JsonSerialize;
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@Table("eaglejpa_anomaly")
@ColumnFamily("f")
@Prefix("tacount")
@@ -40,20 +40,25 @@ public class TaskAttemptCounterAPIEntity extends JobBaseAPIEntity {
public int getKilledCount() {
return killedCount;
}
+
public void setKilledCount(int killedCount) {
this.killedCount = killedCount;
_pcs.firePropertyChange("killedCount", null, null);
}
+
public int getFailedCount() {
return failedCount;
}
+
public void setFailedCount(int failedCount) {
this.failedCount = failedCount;
_pcs.firePropertyChange("failedCount", null, null);
}
+
public int getTotalCount() {
return totalCount;
}
+
public void setTotalCount(int totalCount) {
this.totalCount = totalCount;
_pcs.firePropertyChange("totalCount", null, null);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java
index be5566b..620ee1f 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java
@@ -23,7 +23,7 @@ import org.apache.eagle.jpm.util.jobcounter.JobCounters;
import org.apache.eagle.log.entity.meta.*;
import org.codehaus.jackson.map.annotate.JsonSerialize;
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@Table("eaglejpa_task")
@ColumnFamily("f")
@Prefix("taexec")
@@ -31,7 +31,7 @@ import org.codehaus.jackson.map.annotate.JsonSerialize;
@TimeSeries(true)
@Partition({"site"})
@Indexes({
- @Index(name="Index_1_jobId", columns = { "jobID" }, unique = false)
+ @Index(name = "Index_1_jobId", columns = { "jobID" }, unique = false)
})
public class TaskAttemptExecutionAPIEntity extends JobBaseAPIEntity {
@Column("a")
@@ -52,48 +52,61 @@ public class TaskAttemptExecutionAPIEntity extends JobBaseAPIEntity {
public String getTaskStatus() {
return taskStatus;
}
+
public void setTaskStatus(String taskStatus) {
this.taskStatus = taskStatus;
_pcs.firePropertyChange("taskStatus", null, null);
}
+
public long getStartTime() {
return startTime;
}
+
public void setStartTime(long startTime) {
this.startTime = startTime;
_pcs.firePropertyChange("startTime", null, null);
}
+
public long getEndTime() {
return endTime;
}
+
public void setEndTime(long endTime) {
this.endTime = endTime;
_pcs.firePropertyChange("endTime", null, null);
}
+
public long getDuration() {
return duration;
}
+
public void setDuration(long duration) {
this.duration = duration;
_pcs.firePropertyChange("duration", null, null);
}
+
public String getError() {
return error;
}
+
public void setError(String error) {
this.error = error;
_pcs.firePropertyChange("error", null, null);
}
+
public JobCounters getJobCounters() {
return jobCounters;
}
+
public void setJobCounters(JobCounters jobCounters) {
this.jobCounters = jobCounters;
_pcs.firePropertyChange("jobCounters", null, null);
}
+
public String getTaskAttemptID() {
return taskAttemptID;
}
+
public void setTaskAttemptID(String taskAttemptID) {
this.taskAttemptID = taskAttemptID;
_pcs.firePropertyChange("taskAttemptID", null, null);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskExecutionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskExecutionAPIEntity.java
index 9de8b05..bf559d4 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskExecutionAPIEntity.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskExecutionAPIEntity.java
@@ -23,7 +23,7 @@ import org.apache.eagle.jpm.util.jobcounter.JobCounters;
import org.apache.eagle.log.entity.meta.*;
import org.codehaus.jackson.map.annotate.JsonSerialize;
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@Table("eaglejpa_task")
@ColumnFamily("f")
@Prefix("texec")
@@ -31,8 +31,8 @@ import org.codehaus.jackson.map.annotate.JsonSerialize;
@TimeSeries(true)
@Partition({"site"})
@Indexes({
- @Index(name="Index_1_jobId", columns = { "jobId" }, unique = false)
-})
+ @Index(name = "Index_1_jobId", columns = { "jobId" }, unique = false)
+ })
public class TaskExecutionAPIEntity extends JobBaseAPIEntity {
@Column("a")
private String taskStatus;
@@ -50,41 +50,52 @@ public class TaskExecutionAPIEntity extends JobBaseAPIEntity {
public String getTaskStatus() {
return taskStatus;
}
+
public void setTaskStatus(String taskStatus) {
this.taskStatus = taskStatus;
_pcs.firePropertyChange("taskStatus", null, null);
}
+
public long getStartTime() {
return startTime;
}
+
public void setStartTime(long startTime) {
this.startTime = startTime;
_pcs.firePropertyChange("startTime", null, null);
}
+
public long getEndTime() {
return endTime;
}
+
public void setEndTime(long endTime) {
this.endTime = endTime;
_pcs.firePropertyChange("endTime", null, null);
}
+
public long getDuration() {
return duration;
}
+
public void setDuration(long duration) {
this.duration = duration;
_pcs.firePropertyChange("duration", null, null);
}
+
public String getError() {
return error;
}
+
public void setError(String error) {
this.error = error;
_pcs.firePropertyChange("error", null, null);
}
+
public JobCounters getJobCounters() {
return jobCounters;
}
+
public void setJobCounters(JobCounters jobCounters) {
this.jobCounters = jobCounters;
_pcs.firePropertyChange("jobCounters", null, null);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskFailureCountAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskFailureCountAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskFailureCountAPIEntity.java
index 1445a24..31f96da 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskFailureCountAPIEntity.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskFailureCountAPIEntity.java
@@ -22,7 +22,7 @@ import org.apache.eagle.jpm.util.Constants;
import org.apache.eagle.log.entity.meta.*;
import org.codehaus.jackson.map.annotate.JsonSerialize;
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@Table("eaglejpa_anomaly")
@ColumnFamily("f")
@Prefix("taskfailurecount")
@@ -37,7 +37,6 @@ public class TaskFailureCountAPIEntity extends JobBaseAPIEntity {
@Column("c")
private String taskStatus;
-
public String getTaskStatus() {
return taskStatus;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobExecutionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobExecutionAPIEntity.java
index 653f1c9..86b6554 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobExecutionAPIEntity.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobExecutionAPIEntity.java
@@ -20,12 +20,12 @@ package org.apache.eagle.jpm.mr.runningentity;
import org.apache.eagle.jpm.util.Constants;
import org.apache.eagle.jpm.util.jobcounter.JobCounters;
-import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo;
+import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import org.apache.eagle.log.entity.meta.*;
import org.codehaus.jackson.map.annotate.JsonSerialize;
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@Table("eagleMRRunningJobs")
@ColumnFamily("f")
@Prefix("jobs")
@@ -33,9 +33,9 @@ import org.codehaus.jackson.map.annotate.JsonSerialize;
@TimeSeries(true)
@Partition({"site"})
@Indexes({
- @Index(name="Index_1_jobId", columns = { "jobId" }, unique = true),
- @Index(name="Index_2_jobDefId", columns = { "jobDefId" }, unique = false)
-})
+ @Index(name = "Index_1_jobId", columns = { "jobId" }, unique = true),
+ @Index(name = "Index_2_jobDefId", columns = { "jobDefId" }, unique = false)
+ })
@Tags({"site", "jobId", "jobName", "jobDefId", "jobType", "user", "queue"})
public class JobExecutionAPIEntity extends TaggedLogAPIEntity {
@Column("a")
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/TaskAttemptExecutionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/TaskAttemptExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/TaskAttemptExecutionAPIEntity.java
index 11a8b4c..088869f 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/TaskAttemptExecutionAPIEntity.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/TaskAttemptExecutionAPIEntity.java
@@ -23,7 +23,7 @@ import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import org.apache.eagle.log.entity.meta.*;
import org.codehaus.jackson.map.annotate.JsonSerialize;
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@Table("eagleMRRunningTasks")
@ColumnFamily("f")
@Prefix("tasks_exec_attempt")
@@ -31,8 +31,8 @@ import org.codehaus.jackson.map.annotate.JsonSerialize;
@TimeSeries(true)
@Partition({"site"})
@Indexes({
- @Index(name="Index_1_jobId", columns = { "jobId" }, unique = false)
-})
+ @Index(name = "Index_1_jobId", columns = { "jobId" }, unique = false)
+ })
@Tags({"site", "jobId", "JobName", "jobDefId", "jobType", "taskType", "taskId", "user", "queue", "host", "rack"})
public class TaskAttemptExecutionAPIEntity extends TaggedLogAPIEntity {
@Column("a")
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/TaskExecutionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/TaskExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/TaskExecutionAPIEntity.java
index 50e042f..d1d62ee 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/TaskExecutionAPIEntity.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/TaskExecutionAPIEntity.java
@@ -24,7 +24,7 @@ import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import org.apache.eagle.log.entity.meta.*;
import org.codehaus.jackson.map.annotate.JsonSerialize;
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@Table("eagleMRRunningTasks")
@ColumnFamily("f")
@Prefix("tasks_exec")
@@ -32,8 +32,8 @@ import org.codehaus.jackson.map.annotate.JsonSerialize;
@TimeSeries(true)
@Partition({"site"})
@Indexes({
- @Index(name="Index_1_jobId", columns = { "jobId" }, unique = false)
-})
+ @Index(name = "Index_1_jobId", columns = { "jobId" }, unique = false)
+ })
@Tags({"site", "jobId", "JobName", "jobDefId", "jobType", "taskType", "taskId", "user", "queue", "hostname"})
public class TaskExecutionAPIEntity extends TaggedLogAPIEntity {
@Column("a")
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFInputStreamReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFInputStreamReader.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFInputStreamReader.java
index feeee7b..8a8d0db 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFInputStreamReader.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFInputStreamReader.java
@@ -20,6 +20,5 @@ package org.apache.eagle.jpm.spark.crawl;
import java.io.InputStream;
public interface JHFInputStreamReader {
- public void read(InputStream is) throws Exception;
-
+ void read(InputStream is) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFParserBase.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFParserBase.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFParserBase.java
index 48701f7..62ba7d9 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFParserBase.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFParserBase.java
@@ -21,9 +21,9 @@ import java.io.InputStream;
public interface JHFParserBase {
/**
- * this method will ensure to close the inputstream
+ * this method will ensure to close the inputStream.
* @param is
* @throws Exception
*/
- public void parse(InputStream is) throws Exception;
+ void parse(InputStream is) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
index 1b75e81..e298fa3 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
@@ -17,9 +17,6 @@
package org.apache.eagle.jpm.spark.crawl;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.jpm.spark.entity.JobConfig;
import org.apache.eagle.jpm.spark.entity.*;
import org.apache.eagle.jpm.util.JSONUtil;
import org.apache.eagle.jpm.util.JobNameNormalization;
@@ -28,6 +25,8 @@ import org.apache.eagle.jpm.util.SparkJobTagName;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import org.apache.eagle.service.client.impl.EagleServiceBaseClient;
import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.slf4j.Logger;
@@ -116,8 +115,8 @@ public class JHFSparkEventReader {
List<String> jobConfs = conf.getStringList("basic.jobConf.additional.info");
String[] props = {"spark.yarn.app.id", "spark.executor.memory", "spark.driver.host", "spark.driver.port",
- "spark.driver.memory", "spark.scheduler.pool", "spark.executor.cores", "spark.yarn.am.memory",
- "spark.yarn.am.cores", "spark.yarn.executor.memoryOverhead", "spark.yarn.driver.memoryOverhead", "spark.yarn.am.memoryOverhead", "spark.master"};
+ "spark.driver.memory", "spark.scheduler.pool", "spark.executor.cores", "spark.yarn.am.memory",
+ "spark.yarn.am.cores", "spark.yarn.executor.memoryOverhead", "spark.yarn.driver.memoryOverhead", "spark.yarn.am.memoryOverhead", "spark.master"};
jobConfs.addAll(Arrays.asList(props));
for (String prop : jobConfs) {
if (sparkProps.containsKey(prop)) {
@@ -363,9 +362,9 @@ public class JHFSparkEventReader {
stage.setCompleteTime(JSONUtil.getLong(stageInfo, "Completion Time"));
if (stageInfo.containsKey("Failure Reason")) {
- stage.setStatus(SparkEntityConstant.SPARK_STAGE_STATUS.FAILED.toString());
+ stage.setStatus(SparkEntityConstant.SparkStageStatus.FAILED.toString());
} else {
- stage.setStatus(SparkEntityConstant.SPARK_STAGE_STATUS.COMPLETE.toString());
+ stage.setStatus(SparkEntityConstant.SparkStageStatus.COMPLETE.toString());
}
}
@@ -383,9 +382,9 @@ public class JHFSparkEventReader {
JSONObject jobResult = JSONUtil.getJSONObject(event, "Job Result");
String result = JSONUtil.getString(jobResult, "Result");
if (result.equalsIgnoreCase("JobSucceeded")) {
- job.setStatus(SparkEntityConstant.SPARK_JOB_STATUS.SUCCEEDED.toString());
+ job.setStatus(SparkEntityConstant.SparkJobStatus.SUCCEEDED.toString());
} else {
- job.setStatus(SparkEntityConstant.SPARK_JOB_STATUS.FAILED.toString());
+ job.setStatus(SparkEntityConstant.SparkJobStatus.FAILED.toString());
}
}
@@ -429,15 +428,23 @@ public class JHFSparkEventReader {
app.setExecutors(executors.values().size());
long executorMemory = parseExecutorMemory((String) this.getConfigVal(this.app.getConfig(), "spark.executor.memory", String.class.getName()));
- long driverMemory = parseExecutorMemory(this.isClientMode(app.getConfig()) ? (String) this.getConfigVal(this.app.getConfig(), "spark.yarn.am.memory", String.class.getName()) : (String) this.getConfigVal(app.getConfig(), "spark.driver.memory", String.class.getName()));
- int executoreCore = (Integer) this.getConfigVal(app.getConfig(), "spark.executor.cores", Integer.class.getName());
- int driverCore = this.isClientMode(app.getConfig()) ? (Integer) this.getConfigVal(app.getConfig(), "spark.yarn.am.cores", Integer.class.getName()) : (Integer) this.getConfigVal(app.getConfig(), "spark.driver.cores", Integer.class.getName());
+ long driverMemory = parseExecutorMemory(this.isClientMode(app.getConfig())
+ ? (String) this.getConfigVal(this.app.getConfig(), "spark.yarn.am.memory", String.class.getName())
+ : (String) this.getConfigVal(app.getConfig(), "spark.driver.memory", String.class.getName()));
+
+ int executorCore = (Integer) this.getConfigVal(app.getConfig(), "spark.executor.cores", Integer.class.getName());
+ int driverCore = this.isClientMode(app.getConfig())
+ ? (Integer) this.getConfigVal(app.getConfig(), "spark.yarn.am.cores", Integer.class.getName())
+ : (Integer) this.getConfigVal(app.getConfig(), "spark.driver.cores", Integer.class.getName());
+
long executorMemoryOverhead = this.getMemoryOverhead(app.getConfig(), executorMemory, "spark.yarn.executor.memoryOverhead");
- long driverMemoryOverhead = this.isClientMode(app.getConfig()) ? this.getMemoryOverhead(app.getConfig(), driverMemory, "spark.yarn.am.memoryOverhead") : this.getMemoryOverhead(app.getConfig(), driverMemory, "spark.yarn.driver.memoryOverhead");
+ long driverMemoryOverhead = this.isClientMode(app.getConfig())
+ ? this.getMemoryOverhead(app.getConfig(), driverMemory, "spark.yarn.am.memoryOverhead")
+ : this.getMemoryOverhead(app.getConfig(), driverMemory, "spark.yarn.driver.memoryOverhead");
app.setExecMemoryBytes(executorMemory);
app.setDriveMemoryBytes(driverMemory);
- app.setExecutorCores(executoreCore);
+ app.setExecutorCores(executorCore);
app.setDriverCores(driverCore);
app.setExecutorMemoryOverhead(executorMemoryOverhead);
app.setDriverMemoryOverhead(driverMemoryOverhead);
@@ -450,11 +457,12 @@ public class JHFSparkEventReader {
executor.setMemoryOverhead(driverMemoryOverhead);
} else {
executor.setExecMemoryBytes(executorMemory);
- executor.setCores(executoreCore);
+ executor.setCores(executorCore);
executor.setMemoryOverhead(executorMemoryOverhead);
}
- if (executor.getEndTime() == 0)
+ if (executor.getEndTime() == 0) {
executor.setEndTime(app.getEndTime());
+ }
this.aggregateExecutorToApp(executor);
}
this.flushEntities(executors.values(), false);
@@ -464,16 +472,16 @@ public class JHFSparkEventReader {
}
private long getMemoryOverhead(JobConfig config, long executorMemory, String fieldName) {
- long result = 0l;
+ long result = 0L;
if (config.getConfig().containsKey(fieldName)) {
result = this.parseExecutorMemory(config.getConfig().get(fieldName) + "m");
- if(result == 0l){
- result = this.parseExecutorMemory(config.getConfig().get(fieldName));
+ if (result == 0L) {
+ result = this.parseExecutorMemory(config.getConfig().get(fieldName));
}
}
- if(result == 0l){
- result = Math.max(this.parseExecutorMemory(conf.getString("spark.defaultVal.spark.yarn.overhead.min")), executorMemory * conf.getInt("spark.defaultVal." + fieldName + ".factor") / 100);
+ if (result == 0L) {
+ result = Math.max(this.parseExecutorMemory(conf.getString("spark.defaultVal.spark.yarn.overhead.min")), executorMemory * conf.getInt("spark.defaultVal." + fieldName + ".factor") / 100);
}
return result;
}
@@ -588,7 +596,7 @@ public class JHFSparkEventReader {
job.setNumTask(job.getNumTask() + stage.getNumTasks());
- if (stage.getStatus().equalsIgnoreCase(SparkEntityConstant.SPARK_STAGE_STATUS.COMPLETE.toString())) {
+ if (stage.getStatus().equalsIgnoreCase(SparkEntityConstant.SparkStageStatus.COMPLETE.toString())) {
//if multiple attempts succeed, just count one
if (!hasStagePriorAttemptSuccess(stage)) {
job.setNumCompletedStages(job.getNumCompletedStages() + 1);
@@ -603,7 +611,7 @@ public class JHFSparkEventReader {
Integer stageAttemptId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString()));
for (Integer i = 0; i < stageAttemptId; i++) {
SparkStage previousStage = stages.get(this.generateStageKey(stage.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString()), i.toString()));
- if (previousStage.getStatus().equalsIgnoreCase(SparkEntityConstant.SPARK_STAGE_STATUS.COMPLETE.toString())) {
+ if (previousStage.getStatus().equalsIgnoreCase(SparkEntityConstant.SparkStageStatus.COMPLETE.toString())) {
return true;
}
}
@@ -659,22 +667,22 @@ public class JHFSparkEventReader {
if (memory.endsWith("g") || memory.endsWith("G")) {
int executorGB = Integer.parseInt(memory.substring(0, memory.length() - 1));
- return 1024l * 1024 * 1024 * executorGB;
+ return 1024L * 1024 * 1024 * executorGB;
} else if (memory.endsWith("m") || memory.endsWith("M")) {
int executorMB = Integer.parseInt(memory.substring(0, memory.length() - 1));
- return 1024l * 1024 * executorMB;
+ return 1024L * 1024 * executorMB;
} else if (memory.endsWith("k") || memory.endsWith("K")) {
int executorKB = Integer.parseInt(memory.substring(0, memory.length() - 1));
- return 1024l * executorKB;
+ return 1024L * executorKB;
} else if (memory.endsWith("t") || memory.endsWith("T")) {
int executorTB = Integer.parseInt(memory.substring(0, memory.length() - 1));
- return 1024l * 1024 * 1024 * 1024 * executorTB;
+ return 1024L * 1024 * 1024 * 1024 * executorTB;
} else if (memory.endsWith("p") || memory.endsWith("P")) {
int executorPB = Integer.parseInt(memory.substring(0, memory.length() - 1));
- return 1024l * 1024 * 1024 * 1024 * 1024 * executorPB;
+ return 1024L * 1024 * 1024 * 1024 * 1024 * executorPB;
}
- LOG.info("Cannot parse memory info " + memory);
- return 0l;
+ LOG.info("Cannot parse memory info " + memory);
+ return 0L;
}
private void flushEntities(Object entity, boolean forceFlush) {
@@ -709,20 +717,6 @@ public class JHFSparkEventReader {
private void doFlush(List entities) throws Exception {
LOG.info("start flushing entities of total number " + entities.size());
-// client.create(entities);
LOG.info("finish flushing entities of total number " + entities.size());
-// for(Object entity: entities){
-// if(entity instanceof SparkApp){
-// for (Field field : entity.getClass().getDeclaredFields()) {
-// field.setAccessible(true); // You might want to set modifier to public first.
-// Object value = field.get(entity);
-// if (value != null) {
-// System.out.println(field.getName() + "=" + value);
-// }
-// }
-// }
-// }
}
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java
index 171cb0f..da049ea 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java
@@ -33,7 +33,7 @@ public class JHFSparkParser implements JHFParserBase {
JHFSparkEventReader eventReader;
- public JHFSparkParser(JHFSparkEventReader reader){
+ public JHFSparkParser(JHFSparkEventReader reader) {
this.eventReader = reader;
}
@@ -41,22 +41,22 @@ public class JHFSparkParser implements JHFParserBase {
public void parse(InputStream is) throws Exception {
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
try {
- String line = null;
+ String line;
JSONParser parser = new JSONParser();
- while((line = reader.readLine()) != null){
- try{
+ while ((line = reader.readLine()) != null) {
+ try {
JSONObject eventObj = (JSONObject) parser.parse(line);
String eventType = (String) eventObj.get("Event");
logger.info("Event type: " + eventType);
this.eventReader.read(eventObj);
- }catch(Exception e){
+ } catch (Exception e) {
logger.error(String.format("Invalid json string. Fail to parse %s.", line), e);
}
}
this.eventReader.clearReader();
} finally {
- if(reader != null){
+ if (reader != null) {
reader.close();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkFilesystemInputStreamReaderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkFilesystemInputStreamReaderImpl.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkFilesystemInputStreamReaderImpl.java
index f1d2cd1..3964454 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkFilesystemInputStreamReaderImpl.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkFilesystemInputStreamReaderImpl.java
@@ -31,7 +31,7 @@ public class SparkFilesystemInputStreamReaderImpl implements JHFInputStreamReade
private SparkApplicationInfo app;
- public SparkFilesystemInputStreamReaderImpl(String site, SparkApplicationInfo app){
+ public SparkFilesystemInputStreamReaderImpl(String site, SparkApplicationInfo app) {
this.site = site;
this.app = app;
}
@@ -45,7 +45,7 @@ public class SparkFilesystemInputStreamReaderImpl implements JHFInputStreamReade
parser.parse(is);
}
- public static void main(String[] args) throws Exception{
+ public static void main(String[] args) throws Exception {
SparkFilesystemInputStreamReaderImpl impl = new SparkFilesystemInputStreamReaderImpl("apollo-phx", new SparkApplicationInfo());
impl.read(new FileInputStream(new File("E:\\eagle\\application_1459803563374_535667_1")));
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/JobConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/JobConfig.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/JobConfig.java
index 11c4a22..0664954 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/JobConfig.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/JobConfig.java
@@ -32,8 +32,9 @@ public class JobConfig implements Serializable {
public void setConfig(Map<String, String> config) {
this.config = config;
}
+
@Override
- public String toString(){
+ public String toString() {
return config.toString();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkApp.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkApp.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkApp.java
index 528a91f..58697a1 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkApp.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkApp.java
@@ -18,10 +18,10 @@
package org.apache.eagle.jpm.spark.entity;
+import org.apache.eagle.jpm.util.Constants;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import org.apache.eagle.log.entity.meta.*;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.apache.eagle.jpm.util.Constants;
@Table("eglesprk_apps")
@ColumnFamily("f")
@@ -31,7 +31,7 @@ import org.apache.eagle.jpm.util.Constants;
@TimeSeries(true)
@Tags({"site","sprkAppId", "sprkAppAttemptId", "sprkAppName", "normSprkAppName","user", "queue"})
@Partition({"site"})
-public class SparkApp extends TaggedLogAPIEntity{
+public class SparkApp extends TaggedLogAPIEntity {
@Column("a")
private long startTime;
@@ -222,11 +222,14 @@ public class SparkApp extends TaggedLogAPIEntity{
return driveMemoryBytes;
}
- public int getCompleteTasks(){ return completeTasks;}
+ public int getCompleteTasks() {
+ return completeTasks;
+ }
public JobConfig getConfig() {
return config;
}
+
public void setStartTime(long startTime) {
this.startTime = startTime;
valueChanged("startTime");
@@ -377,7 +380,7 @@ public class SparkApp extends TaggedLogAPIEntity{
valueChanged("driveMemoryBytes");
}
- public void setCompleteTasks(int completeTasks){
+ public void setCompleteTasks(int completeTasks) {
this.completeTasks = completeTasks;
valueChanged("completeTasks");
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkExecutor.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkExecutor.java
index 366e4aa..4b669ef 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkExecutor.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkExecutor.java
@@ -18,10 +18,10 @@
package org.apache.eagle.jpm.spark.entity;
+import org.apache.eagle.jpm.util.Constants;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import org.apache.eagle.log.entity.meta.*;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.apache.eagle.jpm.util.Constants;
@Table("eglesprk_executors")
@ColumnFamily("f")
@@ -31,7 +31,7 @@ import org.apache.eagle.jpm.util.Constants;
@TimeSeries(true)
@Tags({"site","sprkAppId", "sprkAppAttemptId", "sprkAppName", "normSprkAppName", "executorId","user", "queue"})
@Partition({"site"})
-public class SparkExecutor extends TaggedLogAPIEntity{
+public class SparkExecutor extends TaggedLogAPIEntity {
@Column("a")
private String hostPort;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkJob.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkJob.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkJob.java
index acecb3a..79ac6da 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkJob.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkJob.java
@@ -18,10 +18,10 @@
package org.apache.eagle.jpm.spark.entity;
+import org.apache.eagle.jpm.util.Constants;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import org.apache.eagle.log.entity.meta.*;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.apache.eagle.jpm.util.Constants;
@Table("eglesprk_jobs")
@ColumnFamily("f")
@@ -31,34 +31,34 @@ import org.apache.eagle.jpm.util.Constants;
@TimeSeries(true)
@Tags({"site","sprkAppId", "sprkAppAttemptId", "sprkAppName", "normSprkAppName", "jobId","user", "queue"})
@Partition({"site"})
-public class SparkJob extends TaggedLogAPIEntity{
+public class SparkJob extends TaggedLogAPIEntity {
@Column("a")
private long submissionTime;
@Column("b")
private long completionTime;
@Column("c")
- private int numStages=0;
+ private int numStages = 0;
@Column("d")
private String status;
@Column("e")
- private int numTask=0;
+ private int numTask = 0;
@Column("f")
- private int numActiveTasks=0;
+ private int numActiveTasks = 0;
@Column("g")
- private int numCompletedTasks=0;
+ private int numCompletedTasks = 0;
@Column("h")
- private int numSkippedTasks=0;
+ private int numSkippedTasks = 0;
@Column("i")
- private int numFailedTasks=0;
+ private int numFailedTasks = 0;
@Column("j")
- private int numActiveStages=0;
+ private int numActiveStages = 0;
@Column("k")
- private int numCompletedStages=0;
+ private int numCompletedStages = 0;
@Column("l")
- private int numSkippedStages=0;
+ private int numSkippedStages = 0;
@Column("m")
- private int numFailedStages=0;
+ private int numFailedStages = 0;
public long getSubmissionTime() {
return submissionTime;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkStage.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkStage.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkStage.java
index fcca889..3f56da6 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkStage.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkStage.java
@@ -18,10 +18,10 @@
package org.apache.eagle.jpm.spark.entity;
+import org.apache.eagle.jpm.util.Constants;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import org.apache.eagle.log.entity.meta.*;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.apache.eagle.jpm.util.Constants;
@Table("eglesprk_stages")
@ColumnFamily("f")
@@ -31,38 +31,38 @@ import org.apache.eagle.jpm.util.Constants;
@TimeSeries(true)
@Tags({"site","sprkAppId", "sprkAppAttemptId", "sprkAppName", "normSprkAppName", "jobId", "stageId","stageAttemptId","user", "queue"})
@Partition({"site"})
-public class SparkStage extends TaggedLogAPIEntity{
+public class SparkStage extends TaggedLogAPIEntity {
@Column("a")
private String status;
@Column("b")
- private int numActiveTasks=0;
+ private int numActiveTasks = 0;
@Column("c")
- private int numCompletedTasks=0;
+ private int numCompletedTasks = 0;
@Column("d")
- private int numFailedTasks=0;
+ private int numFailedTasks = 0;
@Column("e")
- private long executorRunTime=0l;
+ private long executorRunTime = 0L;
@Column("f")
- private long inputBytes=0l;
+ private long inputBytes = 0L;
@Column("g")
- private long inputRecords=0l;
+ private long inputRecords = 0L;
@Column("h")
- private long outputBytes=0l;
+ private long outputBytes = 0L;
@Column("i")
- private long outputRecords=0l;
+ private long outputRecords = 0L;
@Column("j")
- private long shuffleReadBytes=0l;
+ private long shuffleReadBytes = 0L;
@Column("k")
- private long shuffleReadRecords=0l;
+ private long shuffleReadRecords = 0L;
@Column("l")
- private long shuffleWriteBytes=0l;
+ private long shuffleWriteBytes = 0L;
@Column("m")
- private long shuffleWriteRecords=0l;
+ private long shuffleWriteRecords = 0L;
@Column("n")
- private long memoryBytesSpilled=0l;
+ private long memoryBytesSpilled = 0L;
@Column("o")
- private long diskBytesSpilled=0l;
+ private long diskBytesSpilled = 0L;
@Column("p")
private String name;
@Column("q")
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkTask.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkTask.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkTask.java
index 6ef7c69..fb2fce5 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkTask.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkTask.java
@@ -18,10 +18,10 @@
package org.apache.eagle.jpm.spark.entity;
+import org.apache.eagle.jpm.util.Constants;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import org.apache.eagle.log.entity.meta.*;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.apache.eagle.jpm.util.Constants;
@Table("eglesprk_tasks")
@ColumnFamily("f")
@@ -31,7 +31,7 @@ import org.apache.eagle.jpm.util.Constants;
@TimeSeries(true)
@Tags({"site","sprkAppId", "sprkAppAttemptId", "sprkAppName", "normSprkAppName", "jobId", "jobName", "stageId","stageAttemptId","taskIndex","taskAttemptId","user", "queue"})
@Partition({"site"})
-public class SparkTask extends TaggedLogAPIEntity{
+public class SparkTask extends TaggedLogAPIEntity {
@Column("a")
private int taskId;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/AbstractJobHistoryDAO.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/AbstractJobHistoryDAO.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/AbstractJobHistoryDAO.java
index 5b330fc..74489cd 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/AbstractJobHistoryDAO.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/AbstractJobHistoryDAO.java
@@ -30,48 +30,49 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
- * job history is the resource
+ * job history is the resource.
*/
public abstract class AbstractJobHistoryDAO implements JobHistoryLCM {
private static final Logger LOG = LoggerFactory.getLogger(AbstractJobHistoryDAO.class);
- private final static String YEAR_URL_FORMAT = "/%4d";
- private final static String MONTH_URL_FORMAT = "/%02d";
- private final static String DAY_URL_FORMAT = "/%02d";
- private final static String YEAR_MONTH_DAY_URL_FORMAT = YEAR_URL_FORMAT + MONTH_URL_FORMAT + DAY_URL_FORMAT;
- protected final static String SERIAL_URL_FORMAT = "/%06d";
- protected final static String FILE_URL_FORMAT = "/%s";
+ private static final String YEAR_URL_FORMAT = "/%4d";
+ private static final String MONTH_URL_FORMAT = "/%02d";
+ private static final String DAY_URL_FORMAT = "/%02d";
+ private static final String YEAR_MONTH_DAY_URL_FORMAT = YEAR_URL_FORMAT + MONTH_URL_FORMAT + DAY_URL_FORMAT;
+ protected static final String SERIAL_URL_FORMAT = "/%06d";
+ protected static final String FILE_URL_FORMAT = "/%s";
private static final Pattern JOBTRACKERNAME_PATTERN = Pattern.compile("^.*_(\\d+)_$");
protected static final Pattern JOBID_PATTERN = Pattern.compile("job_\\d+_\\d+");
- protected final String m_basePath;
- protected volatile String m_jobTrackerName;
+ protected final String basePath;
+ protected volatile String jobTrackerName;
public static final String JOB_CONF_POSTFIX = "_conf.xml";
- private final static Timer timer = new Timer(true);
- private final static long JOB_TRACKER_SYNC_DURATION = 10 * 60 * 1000; // 10 minutes
+ private static final Timer timer = new Timer(true);
+ private static final long JOB_TRACKER_SYNC_DURATION = 10 * 60 * 1000; // 10 minutes
- private boolean m_pathContainsJobTrackerName;
+ private boolean pathContainsJobTrackerName;
public AbstractJobHistoryDAO(String basePath, boolean pathContainsJobTrackerName, String startingJobTrackerName) throws Exception {
- m_basePath = basePath;
- m_pathContainsJobTrackerName = pathContainsJobTrackerName;
- m_jobTrackerName = startingJobTrackerName;
- if (m_pathContainsJobTrackerName) {
- if (startingJobTrackerName == null || startingJobTrackerName.isEmpty())
+ this.basePath = basePath;
+ this.pathContainsJobTrackerName = pathContainsJobTrackerName;
+ jobTrackerName = startingJobTrackerName;
+ if (this.pathContainsJobTrackerName) {
+ if (startingJobTrackerName == null || startingJobTrackerName.isEmpty()) {
throw new IllegalStateException("startingJobTrackerName should not be null or empty");
+ }
// start background thread to check what is current job tracker
- startThread(m_basePath);
+ startThread(this.basePath);
}
}
protected String buildWholePathToYearMonthDay(int year, int month, int day) {
StringBuilder sb = new StringBuilder();
- sb.append(m_basePath);
- if (!m_pathContainsJobTrackerName && m_jobTrackerName != null && !m_jobTrackerName.isEmpty()) {
+ sb.append(basePath);
+ if (!pathContainsJobTrackerName && jobTrackerName != null && !jobTrackerName.isEmpty()) {
sb.append("/");
- sb.append(m_jobTrackerName);
+ sb.append(jobTrackerName);
}
sb.append(String.format(YEAR_MONTH_DAY_URL_FORMAT, year, month, day));
return sb.toString();
@@ -105,7 +106,7 @@ public abstract class AbstractJobHistoryDAO implements JobHistoryLCM {
sb.append(JOB_CONF_POSTFIX);
return sb.toString();
}
- LOG.warn("Illegal job history file name: "+jobHistFileName);
+ LOG.warn("Illegal job history file name: " + jobHistFileName);
return null;
}
@@ -118,11 +119,11 @@ public abstract class AbstractJobHistoryDAO implements JobHistoryLCM {
try {
LOG.info("regularly checking current jobTrackerName in background");
final String _jobTrackerName = calculateJobTrackerName(basePath);
- if (_jobTrackerName != null && !_jobTrackerName.equals(m_jobTrackerName)) {
- LOG.info("jobTrackerName changed from " + m_jobTrackerName +" to " + _jobTrackerName);
- m_jobTrackerName = _jobTrackerName;
+ if (_jobTrackerName != null && !_jobTrackerName.equals(jobTrackerName)) {
+ LOG.info("jobTrackerName changed from " + jobTrackerName + " to " + _jobTrackerName);
+ jobTrackerName = _jobTrackerName;
}
- LOG.info("Current jobTrackerName is: " + m_jobTrackerName);
+ LOG.info("Current jobTrackerName is: " + jobTrackerName);
} catch (Exception e) {
LOG.error("failed to figure out current job tracker name that is not configured due to: " + e.getMessage(), e);
} catch (Throwable t) {
@@ -139,7 +140,7 @@ public abstract class AbstractJobHistoryDAO implements JobHistoryLCM {
try {
downloadIs = getJHFFileContentAsStream(year, month, day, serialNumber, jobHistoryFileName);
} catch (FileNotFoundException ex) {
- LOG.error("job history file not found " + jobHistoryFileName+", ignore and will NOT process any more");
+ LOG.error("job history file not found " + jobHistoryFileName + ", ignore and will NOT process any more");
return;
}
@@ -147,7 +148,7 @@ public abstract class AbstractJobHistoryDAO implements JobHistoryLCM {
try {
downloadJobConfIs = getJHFConfContentAsStream(year, month, day, serialNumber, jobHistoryFileName);
} catch (FileNotFoundException ex) {
- LOG.warn("job configuration file of "+ jobHistoryFileName+" not found , ignore and use empty configuration");
+ LOG.warn("job configuration file of " + jobHistoryFileName + " not found , ignore and use empty configuration");
}
org.apache.hadoop.conf.Configuration conf = null;
@@ -164,12 +165,12 @@ public abstract class AbstractJobHistoryDAO implements JobHistoryLCM {
} catch (Exception ex) {
LOG.error("fail reading job history file", ex);
throw ex;
- } catch(Throwable t) {
+ } catch (Throwable t) {
LOG.error("fail reading job history file", t);
throw new Exception(t);
} finally {
try {
- if(downloadJobConfIs != null) {
+ if (downloadJobConfIs != null) {
downloadJobConfIs.close();
}
if (downloadIs != null) {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
index aeb35fd..87cd4e0 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
@@ -32,30 +32,32 @@ public class DefaultJHFInputStreamCallback implements JHFInputStreamCallback {
private static final Logger LOG = LoggerFactory.getLogger(DefaultJHFInputStreamCallback.class);
- private JobHistoryContentFilter m_filter;
- private MRHistoryJobConfig m_configManager;
+ private JobHistoryContentFilter filter;
+ private MRHistoryJobConfig configManager;
public DefaultJHFInputStreamCallback(JobHistoryContentFilter filter, MRHistoryJobConfig configManager, EagleOutputCollector eagleCollector) {
- this.m_filter = filter;
- this.m_configManager = configManager;
+ this.filter = filter;
+ this.configManager = configManager;
}
@Override
public void onInputStream(InputStream jobFileInputStream, org.apache.hadoop.conf.Configuration conf) throws Exception {
- final MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig = m_configManager.getJobExtractorConfig();
+ final MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
@SuppressWarnings("serial")
- Map<String, String> baseTags = new HashMap<String, String>() { {
- put("site", jobExtractorConfig.site);
- } };
+ Map<String, String> baseTags = new HashMap<String, String>() {
+ {
+ put("site", jobExtractorConfig.site);
+ }
+ };
- if (!m_filter.acceptJobFile()) {
+ if (!filter.acceptJobFile()) {
// close immediately if we don't need job file
jobFileInputStream.close();
} else {
//get parser and parse, do not need to emit data now
- JHFParserBase parser = JHFParserFactory.getParser(m_configManager,
+ JHFParserBase parser = JHFParserFactory.getParser(configManager,
baseTags,
- conf, m_filter);
+ conf, filter);
parser.parse(jobFileInputStream);
jobFileInputStream.close();
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/EagleOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/EagleOutputCollector.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/EagleOutputCollector.java
index 693e876..70eab38 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/EagleOutputCollector.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/EagleOutputCollector.java
@@ -21,9 +21,9 @@ import org.apache.eagle.dataproc.impl.storm.ValuesArray;
import java.io.Serializable;
/**
- * expose simple interface for streaming executor to populate output data
+ * expose simple interface for streaming executor to populate output data.
*
*/
-public interface EagleOutputCollector extends Serializable{
- void collect(ValuesArray t);
+public interface EagleOutputCollector extends Serializable {
+ void collect(ValuesArray t);
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriver.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriver.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriver.java
index 3edde5b..69eb94a 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriver.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriver.java
@@ -20,8 +20,8 @@ package org.apache.eagle.jpm.mr.history.crawler;
public interface JHFCrawlerDriver {
/**
- * return -1 if failed or there is no file to crawl
- * return modified time of the file if succeed
+ * return -1 if failed or there is no file to crawl.
+ * return modified time of the file if succeed.
*/
long crawl() throws Exception;
}