You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/08/24 12:23:17 UTC
[3/5] incubator-eagle git commit: [EAGLE-496] fix code style of jpm
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
index 4163f7b..7293c89 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
@@ -33,7 +33,7 @@ public class JobConfigurationCreationServiceListener implements HistoryJobEntity
private static final Logger logger = LoggerFactory.getLogger(JobConfigurationCreationServiceListener.class);
private static final int MAX_RETRY_TIMES = 3;
private MRHistoryJobConfig configManager;
- private JobConfigurationAPIEntity m_jobConfigurationEntity;
+ private JobConfigurationAPIEntity jobConfigurationEntity;
public JobConfigurationCreationServiceListener(MRHistoryJobConfig configManager) {
this.configManager = configManager;
@@ -43,7 +43,7 @@ public class JobConfigurationCreationServiceListener implements HistoryJobEntity
public void jobEntityCreated(JobBaseAPIEntity entity) throws Exception {
if (entity != null) {
if (entity instanceof JobConfigurationAPIEntity) {
- this.m_jobConfigurationEntity = (JobConfigurationAPIEntity) entity;
+ this.jobConfigurationEntity = (JobConfigurationAPIEntity) entity;
}
}
}
@@ -65,7 +65,7 @@ public class JobConfigurationCreationServiceListener implements HistoryJobEntity
client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
List<JobConfigurationAPIEntity> list = new ArrayList<>();
- list.add(m_jobConfigurationEntity);
+ list.add(jobConfigurationEntity);
int tried = 0;
while (tried <= MAX_RETRY_TIMES) {
@@ -82,7 +82,7 @@ public class JobConfigurationCreationServiceListener implements HistoryJobEntity
}
} finally {
list.clear();
- m_jobConfigurationEntity = null;
+ jobConfigurationEntity = null;
client.getJerseyClient().destroy();
client.close();
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
index abddf3b..e7b8a6b 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
@@ -62,7 +62,7 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
}
/**
- * We need save network bandwidth as well
+ * We need save network bandwidth as well.
*/
@Override
public void flush() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java
index 8b85b26..a80462d 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java
@@ -22,11 +22,6 @@ import org.apache.eagle.jpm.mr.historyentity.JobBaseAPIEntity;
import java.util.Vector;
-/**
- * not thread safe
- *
- * @author yonzhang
- */
public class JobEntityCreationPublisher {
private Vector<HistoryJobEntityCreationListener> listeners = new Vector<>(2);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
index 594d8e2..ea0ba30 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
@@ -18,37 +18,36 @@
package org.apache.eagle.jpm.mr.history.parser;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.eagle.jpm.mr.historyentity.JobBaseAPIEntity;
import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity;
import org.apache.eagle.jpm.mr.historyentity.TaskAttemptExecutionAPIEntity;
import org.apache.eagle.jpm.util.Constants;
import org.apache.eagle.jpm.util.jobcounter.JobCounters;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class JobEntityLifecycleAggregator implements HistoryJobEntityLifecycleListener {
- private final static Logger LOG = LoggerFactory.getLogger(JobEntityLifecycleAggregator.class);
- private JobExecutionAPIEntity m_jobExecutionAPIEntity;
- private final JobCounterAggregateFunction m_mapTaskAttemptCounterAgg;
- private final JobCounterAggregateFunction m_reduceTaskAttemptCounterAgg;
+ private static final Logger LOG = LoggerFactory.getLogger(JobEntityLifecycleAggregator.class);
+ private JobExecutionAPIEntity jobExecutionAPIEntity;
+ private final JobCounterAggregateFunction mapTaskAttemptCounterAgg;
+ private final JobCounterAggregateFunction reduceTaskAttemptCounterAgg;
- private final JobCounterAggregateFunction m_mapFileSystemCounterAgg;
- private final JobCounterAggregateFunction m_reduceFileSystemTaskCounterAgg;
+ private final JobCounterAggregateFunction mapFileSystemCounterAgg;
+ private final JobCounterAggregateFunction reduceFileSystemTaskCounterAgg;
- private long m_mapAttemptDuration = 0;
- private long m_reduceAttemptDuration = 0;
+ private long mapAttemptDuration = 0;
+ private long reduceAttemptDuration = 0;
private boolean jobFinished = false;
public JobEntityLifecycleAggregator() {
- this.m_mapTaskAttemptCounterAgg = new JobCounterSumFunction();
- this.m_reduceTaskAttemptCounterAgg = new JobCounterSumFunction();
- this.m_mapFileSystemCounterAgg = new JobCounterSumFunction();
- this.m_reduceFileSystemTaskCounterAgg = new JobCounterSumFunction();
+ this.mapTaskAttemptCounterAgg = new JobCounterSumFunction();
+ this.reduceTaskAttemptCounterAgg = new JobCounterSumFunction();
+ this.mapFileSystemCounterAgg = new JobCounterSumFunction();
+ this.reduceFileSystemTaskCounterAgg = new JobCounterSumFunction();
}
@Override
@@ -57,7 +56,7 @@ public class JobEntityLifecycleAggregator implements HistoryJobEntityLifecycleLi
if (entity instanceof TaskAttemptExecutionAPIEntity) {
taskAttemptEntityCreated((TaskAttemptExecutionAPIEntity) entity);
} else if (entity instanceof JobExecutionAPIEntity) {
- this.m_jobExecutionAPIEntity = (JobExecutionAPIEntity) entity;
+ this.jobExecutionAPIEntity = (JobExecutionAPIEntity) entity;
}
}
}
@@ -65,44 +64,44 @@ public class JobEntityLifecycleAggregator implements HistoryJobEntityLifecycleLi
@Override
public void jobFinish() {
try {
- if (m_jobExecutionAPIEntity == null) {
+ if (jobExecutionAPIEntity == null) {
throw new IOException("No JobExecutionAPIEntity found before flushing");
}
LOG.debug("Updating aggregated task attempts to job level counters");
- JobCounters jobCounters = m_jobExecutionAPIEntity.getJobCounters();
+ JobCounters jobCounters = jobExecutionAPIEntity.getJobCounters();
if (jobCounters == null) {
- LOG.warn("no job counter found for " + this.m_jobExecutionAPIEntity);
+ LOG.warn("no job counter found for " + this.jobExecutionAPIEntity);
jobCounters = new JobCounters();
}
Map<String, Map<String, Long>> counters = jobCounters.getCounters();
- Map<String, Long> mapTaskAttemptCounter = this.m_mapTaskAttemptCounterAgg.result();
+ Map<String, Long> mapTaskAttemptCounter = this.mapTaskAttemptCounterAgg.result();
if (mapTaskAttemptCounter == null) {
mapTaskAttemptCounter = new HashMap<>();
}
- mapTaskAttemptCounter.put(Constants.TaskAttemptCounter.TASK_ATTEMPT_DURATION.toString(), this.m_mapAttemptDuration);
+ mapTaskAttemptCounter.put(Constants.TaskAttemptCounter.TASK_ATTEMPT_DURATION.toString(), this.mapAttemptDuration);
counters.put(Constants.MAP_TASK_ATTEMPT_COUNTER, mapTaskAttemptCounter);
- Map<String, Long> reduceTaskAttemptCounter = this.m_reduceTaskAttemptCounterAgg.result();
+ Map<String, Long> reduceTaskAttemptCounter = this.reduceTaskAttemptCounterAgg.result();
if (reduceTaskAttemptCounter == null) {
reduceTaskAttemptCounter = new HashMap<>();
}
- reduceTaskAttemptCounter.put(Constants.TaskAttemptCounter.TASK_ATTEMPT_DURATION.toString(), this.m_reduceAttemptDuration);
+ reduceTaskAttemptCounter.put(Constants.TaskAttemptCounter.TASK_ATTEMPT_DURATION.toString(), this.reduceAttemptDuration);
counters.put(Constants.REDUCE_TASK_ATTEMPT_COUNTER, reduceTaskAttemptCounter);
- counters.put(Constants.MAP_TASK_ATTEMPT_FILE_SYSTEM_COUNTER, this.m_mapFileSystemCounterAgg.result());
- counters.put(Constants.REDUCE_TASK_ATTEMPT_FILE_SYSTEM_COUNTER, this.m_reduceFileSystemTaskCounterAgg.result());
+ counters.put(Constants.MAP_TASK_ATTEMPT_FILE_SYSTEM_COUNTER, this.mapFileSystemCounterAgg.result());
+ counters.put(Constants.REDUCE_TASK_ATTEMPT_FILE_SYSTEM_COUNTER, this.reduceFileSystemTaskCounterAgg.result());
jobCounters.setCounters(counters);
- m_jobExecutionAPIEntity.setJobCounters(jobCounters);
+ jobExecutionAPIEntity.setJobCounters(jobCounters);
jobFinished = true;
} catch (Exception e) {
- LOG.error("Failed to update job execution entity: " + this.m_jobExecutionAPIEntity.toString() + ", due to " + e.getMessage(), e);
+ LOG.error("Failed to update job execution entity: " + this.jobExecutionAPIEntity.toString() + ", due to " + e.getMessage(), e);
}
}
@@ -112,14 +111,14 @@ public class JobEntityLifecycleAggregator implements HistoryJobEntityLifecycleLi
if (taskType != null && jobCounters != null && jobCounters.getCounters() != null) {
if (Constants.TaskType.MAP.toString().equals(taskType.toUpperCase())) {
- m_mapAttemptDuration += entity.getDuration();
- this.m_mapTaskAttemptCounterAgg.accumulate(jobCounters.getCounters().get(Constants.TASK_COUNTER));
- this.m_mapFileSystemCounterAgg.accumulate(jobCounters.getCounters().get(Constants.FILE_SYSTEM_COUNTER));
+ mapAttemptDuration += entity.getDuration();
+ this.mapTaskAttemptCounterAgg.accumulate(jobCounters.getCounters().get(Constants.TASK_COUNTER));
+ this.mapFileSystemCounterAgg.accumulate(jobCounters.getCounters().get(Constants.FILE_SYSTEM_COUNTER));
return;
} else if (Constants.TaskType.REDUCE.toString().equals(taskType.toUpperCase())) {
- m_reduceAttemptDuration += entity.getDuration();
- this.m_reduceTaskAttemptCounterAgg.accumulate(jobCounters.getCounters().get(Constants.TASK_COUNTER));
- this.m_reduceFileSystemTaskCounterAgg.accumulate(jobCounters.getCounters().get(Constants.FILE_SYSTEM_COUNTER));
+ reduceAttemptDuration += entity.getDuration();
+ this.reduceTaskAttemptCounterAgg.accumulate(jobCounters.getCounters().get(Constants.TASK_COUNTER));
+ this.reduceFileSystemTaskCounterAgg.accumulate(jobCounters.getCounters().get(Constants.FILE_SYSTEM_COUNTER));
return;
}
}
@@ -151,10 +150,7 @@ public class JobEntityLifecycleAggregator implements HistoryJobEntityLifecycleLi
public JobCounterSumFunction() {
result = new HashMap<>();
}
-
- /**
- * @param counters
- */
+
@Override
public void accumulate(Map<String, Long> counters) {
if (counters != null) {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
index e0c3c6b..f95eaa2 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
@@ -37,7 +37,7 @@ import java.util.List;
import java.util.Map;
public class TaskFailureListener implements HistoryJobEntityCreationListener {
- private static final Logger logger = LoggerFactory.getLogger(TaskFailureListener.class);
+ private static final Logger LOG = LoggerFactory.getLogger(TaskFailureListener.class);
private static final String MR_ERROR_CATEGORY_CONFIG_FILE_NAME = "MRErrorCategory.config";
private static final int BATCH_SIZE = 1000;
private static final int MAX_RETRY_TIMES = 3;
@@ -53,7 +53,7 @@ public class TaskFailureListener implements HistoryJobEntityCreationListener {
is = TaskFailureListener.class.getClassLoader().getResourceAsStream(MR_ERROR_CATEGORY_CONFIG_FILE_NAME);
URL url = TaskFailureListener.class.getClassLoader().getResource(MR_ERROR_CATEGORY_CONFIG_FILE_NAME);
if (url != null) {
- logger.info("Feeder is going to load configuration file: " + url.toString());
+ LOG.info("Feeder is going to load configuration file: " + url.toString());
}
classifier = new MRErrorClassifier(is);
} catch (IOException ex) {
@@ -63,6 +63,7 @@ public class TaskFailureListener implements HistoryJobEntityCreationListener {
try {
is.close();
} catch (IOException e) {
+ LOG.warn("exception found {}", e);
}
}
}
@@ -121,16 +122,16 @@ public class TaskFailureListener implements HistoryJobEntityCreationListener {
int tried = 0;
while (tried <= MAX_RETRY_TIMES) {
try {
- logger.info("start flushing entities of total number " + failureTasks.size());
+ LOG.info("start flushing entities of total number " + failureTasks.size());
client.create(failureTasks);
- logger.info("finish flushing entities of total number " + failureTasks.size());
+ LOG.info("finish flushing entities of total number " + failureTasks.size());
failureTasks.clear();
break;
} catch (Exception ex) {
if (tried < MAX_RETRY_TIMES) {
- logger.error("Got exception to flush, retry as " + (tried + 1) + " times", ex);
+ LOG.error("Got exception to flush, retry as " + (tried + 1) + " times", ex);
} else {
- logger.error("Got exception to flush, reach max retry times " + MAX_RETRY_TIMES, ex);
+ LOG.error("Got exception to flush, reach max retry times " + MAX_RETRY_TIMES, ex);
throw ex;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobMain.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobMain.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobMain.java
index cef29fe..87079fd 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobMain.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobMain.java
@@ -18,18 +18,17 @@
package org.apache.eagle.jpm.mr.running;
+
+import org.apache.eagle.jpm.mr.running.config.MRRunningConfigManager;
+import org.apache.eagle.jpm.mr.running.storm.MRRunningJobFetchSpout;
+import org.apache.eagle.jpm.mr.running.storm.MRRunningJobParseBolt;
+import org.apache.eagle.jpm.util.Constants;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
-import org.apache.eagle.jpm.mr.running.config.MRRunningConfigManager;
-import org.apache.eagle.jpm.mr.running.storm.MRRunningJobFetchSpout;
-import org.apache.eagle.jpm.mr.running.storm.MRRunningJobParseBolt;
-import org.apache.eagle.jpm.util.Constants;
-
import java.util.List;
-import java.util.regex.Pattern;
public class MRRunningJobMain {
public static void main(String[] args) {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java
index a91a493..42426e4 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java
@@ -18,8 +18,8 @@
package org.apache.eagle.jpm.mr.running.config;
-import com.typesafe.config.Config;
import org.apache.eagle.common.config.ConfigOptionParser;
+import com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,27 +27,35 @@ import java.io.Serializable;
public class MRRunningConfigManager implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(MRRunningConfigManager.class);
+
public String getEnv() {
return env;
}
+
private String env;
- public ZKStateConfig getZkStateConfig() { return zkStateConfig; }
+ public ZKStateConfig getZkStateConfig() {
+ return zkStateConfig;
+ }
+
private ZKStateConfig zkStateConfig;
public EagleServiceConfig getEagleServiceConfig() {
return eagleServiceConfig;
}
+
private EagleServiceConfig eagleServiceConfig;
public JobExtractorConfig getJobExtractorConfig() {
return jobExtractorConfig;
}
+
private JobExtractorConfig jobExtractorConfig;
public EndpointConfig getEndpointConfig() {
return endpointConfig;
}
+
private EndpointConfig endpointConfig;
public static class ZKStateConfig implements Serializable {
@@ -82,6 +90,7 @@ public class MRRunningConfigManager implements Serializable {
public Config getConfig() {
return config;
}
+
private Config config;
private static MRRunningConfigManager manager = new MRRunningConfigManager();
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
index b7de79e..3b31d93 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
@@ -18,7 +18,6 @@
package org.apache.eagle.jpm.mr.running.parser;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.eagle.jpm.mr.running.config.MRRunningConfigManager;
import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager;
import org.apache.eagle.jpm.mr.runningentity.JobConfig;
@@ -29,11 +28,12 @@ import org.apache.eagle.jpm.util.Constants;
import org.apache.eagle.jpm.util.JobNameNormalization;
import org.apache.eagle.jpm.util.MRJobTagName;
import org.apache.eagle.jpm.util.Utils;
-import org.apache.eagle.jpm.util.resourceFetch.ResourceFetcher;
-import org.apache.eagle.jpm.util.resourceFetch.connection.InputStreamUtils;
-import org.apache.eagle.jpm.util.resourceFetch.connection.URLConnectionUtils;
-import org.apache.eagle.jpm.util.resourceFetch.model.*;
-import org.apache.eagle.jpm.util.resourceFetch.model.JobCounters;
+import org.apache.eagle.jpm.util.resourcefetch.ResourceFetcher;
+import org.apache.eagle.jpm.util.resourcefetch.connection.InputStreamUtils;
+import org.apache.eagle.jpm.util.resourcefetch.connection.URLConnectionUtils;
+import org.apache.eagle.jpm.util.resourcefetch.model.*;
+import org.apache.eagle.jpm.util.resourcefetch.model.JobCounters;
+import org.apache.commons.lang3.tuple.Pair;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
@@ -42,13 +42,12 @@ import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
-
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
import java.io.InputStream;
import java.net.URLConnection;
import java.util.*;
import java.util.function.Function;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
public class MRJobParser implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(MRJobParser.class);
@@ -58,6 +57,7 @@ public class MRJobParser implements Runnable {
FINISHED,
APP_FINISHED
}
+
private AppInfo app;
private static final int MAX_RETRY_TIMES = 2;
private MRJobEntityCreationHandler mrJobEntityCreationHandler;
@@ -78,6 +78,7 @@ public class MRJobParser implements Runnable {
private Set<String> finishedTaskIds;
private List<String> configKeys;
private MRRunningConfigManager.JobExtractorConfig jobExtractorConfig;
+
static {
OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
}
@@ -187,11 +188,11 @@ public class MRJobParser implements Runnable {
mrJobEntityMap.put(id, new JobExecutionAPIEntity());
}
- String jobDefId = JobNameNormalization.getInstance().normalize(mrJob.getName());
JobExecutionAPIEntity jobExecutionAPIEntity = mrJobEntityMap.get(id);
jobExecutionAPIEntity.setTags(new HashMap<>(commonTags));
jobExecutionAPIEntity.getTags().put(MRJobTagName.JOB_ID.toString(), id);
jobExecutionAPIEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), mrJob.getName());
+ String jobDefId = JobNameNormalization.getInstance().normalize(mrJob.getName());
jobExecutionAPIEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), jobDefId);
if (mrJobConfigs.get(id) != null) {
JobConfig jobConfig = mrJobConfigs.get(id);
@@ -248,7 +249,9 @@ public class MRJobParser implements Runnable {
Utils.closeInputStream(is);
}
- if (jobCounters.getCounterGroup() == null) return true;
+ if (jobCounters.getCounterGroup() == null) {
+ return true;
+ }
JobExecutionAPIEntity jobExecutionAPIEntity = mrJobEntityMap.get(jobId);
org.apache.eagle.jpm.util.jobcounter.JobCounters jobCounter = new org.apache.eagle.jpm.util.jobcounter.JobCounters();
Map<String, Map<String, Long>> groups = new HashMap<>();
@@ -261,7 +264,9 @@ public class MRJobParser implements Runnable {
Map<String, Long> counterValues = groups.get(counterGroupName);
List<JobCounterItem> items = jobCounterGroup.getCounter();
- if (items == null) continue;
+ if (items == null) {
+ continue;
+ }
for (JobCounterItem item : items) {
String key = item.getName();
counterValues.put(key, item.getTotalCounterValue());
@@ -290,7 +295,11 @@ public class MRJobParser implements Runnable {
org.apache.eagle.jpm.util.jobcounter.JobCounters jobCounter = new org.apache.eagle.jpm.util.jobcounter.JobCounters();
String jobId = jobAndTaskId.getLeft();
String taskId = jobAndTaskId.getRight();
- String taskCounterURL = app.getTrackingUrl() + Constants.MR_JOBS_URL + "/" + jobId + "/" + Constants.MR_TASKS_URL + "/" + taskId + "/" + Constants.MR_JOB_COUNTERS_URL + "?" + Constants.ANONYMOUS_PARAMETER;
+ String taskCounterURL = app.getTrackingUrl()
+ + Constants.MR_JOBS_URL + "/"
+ + jobId + "/" + Constants.MR_TASKS_URL + "/"
+ + taskId + "/" + Constants.MR_JOB_COUNTERS_URL
+ + "?" + Constants.ANONYMOUS_PARAMETER;
InputStream is = null;
TaskCounters taskCounters = null;
try {
@@ -304,7 +313,9 @@ public class MRJobParser implements Runnable {
Utils.closeInputStream(is);
}
- if (taskCounters.getTaskCounterGroup() == null) return jobCounter;
+ if (taskCounters.getTaskCounterGroup() == null) {
+ return jobCounter;
+ }
Map<String, Map<String, Long>> groups = new HashMap<>();
for (TaskCounterGroup taskCounterGroup : taskCounters.getTaskCounterGroup()) {
@@ -314,7 +325,9 @@ public class MRJobParser implements Runnable {
Map<String, Long> counterValues = groups.get(taskCounterGroup.getCounterGroupName());
List<TaskCounterItem> items = taskCounterGroup.getCounter();
- if (items == null) continue;
+ if (items == null) {
+ continue;
+ }
for (TaskCounterItem item : items) {
counterValues.put(item.getName(), item.getValue());
}
@@ -328,7 +341,10 @@ public class MRJobParser implements Runnable {
private Function<Pair<String, String>, TaskAttemptExecutionAPIEntity> fetchTaskAttempt = jobAndTaskId -> {
String jobId = jobAndTaskId.getLeft();
String taskId = jobAndTaskId.getRight();
- String taskAttemptURL = app.getTrackingUrl() + Constants.MR_JOBS_URL + "/" + jobId + "/" + Constants.MR_TASKS_URL + "/" + taskId + "/" + Constants.MR_TASK_ATTEMPTS_URL + "?" + Constants.ANONYMOUS_PARAMETER;
+ String taskAttemptURL = app.getTrackingUrl()
+ + Constants.MR_JOBS_URL + "/"
+ + jobId + "/" + Constants.MR_TASKS_URL + "/"
+ + taskId + "/" + Constants.MR_TASK_ATTEMPTS_URL + "?" + Constants.ANONYMOUS_PARAMETER;
InputStream is = null;
List<MRTaskAttempt> taskAttempts = null;
try {
@@ -463,10 +479,10 @@ public class MRJobParser implements Runnable {
mrJobEntityCreationHandler.add(taskExecutionAPIEntity);
- if (task.getState().equals(Constants.TaskState.SUCCEEDED.toString()) ||
- task.getState().equals(Constants.TaskState.FAILED.toString()) ||
- task.getState().equals(Constants.TaskState.KILLED.toString()) ||
- task.getState().equals(Constants.TaskState.KILL_WAIT.toString())) {
+ if (task.getState().equals(Constants.TaskState.SUCCEEDED.toString())
+ || task.getState().equals(Constants.TaskState.FAILED.toString())
+ || task.getState().equals(Constants.TaskState.KILLED.toString())
+ || task.getState().equals(Constants.TaskState.KILL_WAIT.toString())) {
//LOG.info("mr job {} task {} has finished", jobId, task.getId());
this.finishedTaskIds.add(task.getId());
}
@@ -546,10 +562,10 @@ public class MRJobParser implements Runnable {
mrJobEntityMap.keySet()
.stream()
.filter(
- jobId -> mrJobEntityMap.get(jobId).getCurrentState().equals(Constants.AppState.FINISHED.toString()) ||
- mrJobEntityMap.get(jobId).getCurrentState().equals(Constants.AppState.FAILED.toString()))
+ jobId -> mrJobEntityMap.get(jobId).getCurrentState().equals(Constants.AppState.FINISHED.toString())
+ || mrJobEntityMap.get(jobId).getCurrentState().equals(Constants.AppState.FAILED.toString()))
.forEach(
- jobId -> this.runningJobManager.delete(app.getId(), jobId));
+ jobId -> this.runningJobManager.delete(app.getId(), jobId));
}
LOG.info("finish process yarn application " + app.getId());
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java
index 978c3ec..75650b7 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java
@@ -18,14 +18,14 @@
package org.apache.eagle.jpm.mr.running.recover;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.eagle.jpm.mr.running.config.MRRunningConfigManager;
import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity;
import org.apache.eagle.jpm.util.jobrecover.RunningJobManager;
-import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo;
-
+import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
+import org.apache.commons.lang3.tuple.Pair;
import java.io.Serializable;
-import java.util.*;
+import java.util.HashMap;
+import java.util.Map;
public class MRRunningJobManager implements Serializable {
private RunningJobManager runningJobManager;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
index a701d74..ebb9144 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
@@ -29,9 +29,9 @@ import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager;
import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity;
import org.apache.eagle.jpm.util.Constants;
import org.apache.eagle.jpm.util.Utils;
-import org.apache.eagle.jpm.util.resourceFetch.RMResourceFetcher;
-import org.apache.eagle.jpm.util.resourceFetch.ResourceFetcher;
-import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo;
+import org.apache.eagle.jpm.util.resourcefetch.RMResourceFetcher;
+import org.apache.eagle.jpm.util.resourcefetch.ResourceFetcher;
+import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
index 51307e1..0dccd70 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
@@ -28,9 +28,9 @@ import org.apache.eagle.jpm.mr.running.parser.MRJobParser;
import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager;
import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity;
import org.apache.eagle.jpm.util.Constants;
-import org.apache.eagle.jpm.util.resourceFetch.RMResourceFetcher;
-import org.apache.eagle.jpm.util.resourceFetch.ResourceFetcher;
-import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo;
+import org.apache.eagle.jpm.util.resourcefetch.RMResourceFetcher;
+import org.apache.eagle.jpm.util.resourcefetch.ResourceFetcher;
+import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/config/SparkHistoryCrawlConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/config/SparkHistoryCrawlConfig.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/config/SparkHistoryCrawlConfig.java
index 4282a64..e6cd2f6 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/config/SparkHistoryCrawlConfig.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/config/SparkHistoryCrawlConfig.java
@@ -25,7 +25,7 @@ import com.typesafe.config.ConfigFactory;
import java.io.Serializable;
-public class SparkHistoryCrawlConfig implements Serializable{
+public class SparkHistoryCrawlConfig implements Serializable {
public ZKStateConfig zkStateConfig;
public JobHistoryEndpointConfig jobHistoryConfig;
public HDFSConfig hdfsConfig;
@@ -34,6 +34,7 @@ public class SparkHistoryCrawlConfig implements Serializable{
public StormConfig stormConfig;
private Config config;
+
public Config getConfig() {
return config;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
index 0c475a9..382375f 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
@@ -19,18 +19,17 @@
package org.apache.eagle.jpm.spark.history.status;
+import org.apache.eagle.jpm.spark.crawl.SparkApplicationInfo;
+import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.transaction.CuratorTransactionBridge;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.curator.retry.RetryNTimes;
-import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig;
-import org.apache.eagle.jpm.spark.crawl.SparkApplicationInfo;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -38,7 +37,7 @@ import java.util.List;
public class JobHistoryZKStateManager {
public static final Logger LOG = LoggerFactory.getLogger(JobHistoryZKStateManager.class);
private String zkRoot;
- private CuratorFramework _curator;
+ private CuratorFramework curator;
private static String START_TIMESTAMP = "lastAppTime";
private CuratorFramework newCurator(SparkHistoryCrawlConfig config) throws Exception {
@@ -54,46 +53,46 @@ public class JobHistoryZKStateManager {
this.zkRoot = config.zkStateConfig.zkRoot + "/" + config.info.site;
try {
- _curator = newCurator(config);
- _curator.start();
-; } catch (Exception e) {
+ curator = newCurator(config);
+ curator.start();
+ } catch (Exception e) {
LOG.error("Fail to connect to zookeeper", e);
throw new RuntimeException(e);
}
}
public void close() {
- _curator.close();
- _curator = null;
+ curator.close();
+ curator = null;
}
- public List<String> loadApplications(int limit){
+ public List<String> loadApplications(int limit) {
String jobPath = zkRoot + "/jobs";
List<String> apps = new ArrayList<>();
- InterProcessLock lock = new InterProcessReadWriteLock(_curator,jobPath).writeLock();
- try{
+ InterProcessLock lock = new InterProcessReadWriteLock(curator,jobPath).writeLock();
+ try {
lock.acquire();
- Iterator<String> iter = _curator.getChildren().forPath(jobPath).iterator();
- while(iter.hasNext()) {
+ Iterator<String> iter = curator.getChildren().forPath(jobPath).iterator();
+ while (iter.hasNext()) {
String appId = iter.next();
String path = jobPath + "/" + appId;
- if(_curator.checkExists().forPath(path) != null){
- if(new String(_curator.getData().forPath(path)).equals(ZKStateConstant.AppStatus.INIT.toString())){
+ if (curator.checkExists().forPath(path) != null) {
+ if (new String(curator.getData().forPath(path)).equals(ZKStateConstant.AppStatus.INIT.toString())) {
apps.add(appId);
}
}
- if(apps.size() == limit){
+ if (apps.size() == limit) {
break;
}
}
return apps;
- }catch(Exception e){
+ } catch (Exception e) {
LOG.error("fail to read unprocessed jobs", e);
throw new RuntimeException(e);
- }finally {
- try{
+ } finally {
+ try {
lock.release();
- }catch(Exception e){
+ } catch (Exception e) {
LOG.error("fail to release lock", e);
}
@@ -102,18 +101,19 @@ public class JobHistoryZKStateManager {
public void resetApplications() {
String jobPath = zkRoot + "/jobs";
- InterProcessLock lock = new InterProcessReadWriteLock(_curator,jobPath).writeLock();
+ InterProcessLock lock = new InterProcessReadWriteLock(curator,jobPath).writeLock();
try {
lock.acquire();
- Iterator<String> iter = _curator.getChildren().forPath(jobPath).iterator();
+ Iterator<String> iter = curator.getChildren().forPath(jobPath).iterator();
while (iter.hasNext()) {
String appId = iter.next();
String path = jobPath + "/" + appId;
try {
- if (_curator.checkExists().forPath(path) != null) {
- String status = new String(_curator.getData().forPath(path));
- if(!ZKStateConstant.AppStatus.INIT.toString().equals(status))
- _curator.setData().forPath(path, ZKStateConstant.AppStatus.INIT.toString().getBytes("UTF-8"));
+ if (curator.checkExists().forPath(path) != null) {
+ String status = new String(curator.getData().forPath(path));
+ if (!ZKStateConstant.AppStatus.INIT.toString().equals(status)) {
+ curator.setData().forPath(path, ZKStateConstant.AppStatus.INIT.toString().getBytes("UTF-8"));
+ }
}
} catch (Exception e) {
LOG.error("fail to read unprocessed job", e);
@@ -127,136 +127,133 @@ public class JobHistoryZKStateManager {
} finally {
try {
lock.release();
- } catch(Exception e) {
+ } catch (Exception e) {
LOG.error("fail to release lock", e);
}
}
}
- public SparkApplicationInfo getApplicationInfo(String appId){
+ public SparkApplicationInfo getApplicationInfo(String appId) {
- String appPath = zkRoot + "/jobs/" + appId +"/info";
- try{
+ String appPath = zkRoot + "/jobs/" + appId + "/info";
+ try {
SparkApplicationInfo info = new SparkApplicationInfo();
- if(_curator.checkExists().forPath(appPath)!= null){
- String[] appStatus = new String(_curator.getData().forPath(appPath)).split("/");
+ if (curator.checkExists().forPath(appPath) != null) {
+ String[] appStatus = new String(curator.getData().forPath(appPath)).split("/");
info.setQueue(appStatus[0]);
info.setState(appStatus[1]);
info.setFinalStatus(appStatus[2]);
- if(appStatus.length > 3){
+ if (appStatus.length > 3) {
info.setUser(appStatus[3]);
info.setName(appStatus[4]);
}
}
return info;
- }catch(Exception e){
+ } catch (Exception e) {
LOG.error("fail to read application attempt info", e);
throw new RuntimeException(e);
}
}
- public long readLastFinishedTimestamp(){
+ public long readLastFinishedTimestamp() {
String lastTimeStampPath = zkRoot + "/" + START_TIMESTAMP;
- try{
- if(_curator.checkExists().forPath(lastTimeStampPath) == null){
- return 0l;
- }else{
- return Long.valueOf(new String(_curator.getData().forPath(lastTimeStampPath)));
+ try {
+ if (curator.checkExists().forPath(lastTimeStampPath) == null) {
+ return 0L;
+ } else {
+ return Long.valueOf(new String(curator.getData().forPath(lastTimeStampPath)));
}
- }catch(Exception e){
+ } catch (Exception e) {
LOG.error("fail to read last finished spark job timestamp", e);
throw new RuntimeException(e);
}
}
- public boolean hasApplication(String appId){
+ public boolean hasApplication(String appId) {
String path = zkRoot + "/jobs/" + appId;
try {
- if (_curator.checkExists().forPath(path) != null) {
+ if (curator.checkExists().forPath(path) != null) {
return true;
}
return false;
- }catch (Exception e){
+ } catch (Exception e) {
LOG.error("fail to check whether application exists", e);
throw new RuntimeException(e);
}
}
- public void addFinishedApplication(String appId, String queue, String yarnState, String yarnStatus, String user, String name){
+ public void addFinishedApplication(String appId, String queue, String yarnState, String yarnStatus, String user, String name) {
String path = zkRoot + "/jobs/" + appId;
-
-
- try{
- if(_curator.checkExists().forPath(path) != null){
- _curator.delete().deletingChildrenIfNeeded().forPath(path);
+ try {
+ if (curator.checkExists().forPath(path) != null) {
+ curator.delete().deletingChildrenIfNeeded().forPath(path);
}
name = name.replace("/","_");
- if(name.length() > 50){
+ if (name.length() > 50) {
name = name.substring(0, 50);
}
- CuratorTransactionBridge result = _curator.inTransaction().create().withMode(CreateMode.PERSISTENT).forPath(path, ZKStateConstant.AppStatus.INIT.toString().getBytes("UTF-8"));
+ CuratorTransactionBridge result = curator.inTransaction().create().withMode(CreateMode.PERSISTENT).forPath(path, ZKStateConstant.AppStatus.INIT.toString().getBytes("UTF-8"));
result = result.and().create().withMode(CreateMode.PERSISTENT).forPath(path + "/info", String.format("%s/%s/%s/%s/%s", queue, yarnState, yarnStatus, user, name).getBytes("UTF-8"));
result.and().commit();
- }catch (Exception e){
+ } catch (Exception e) {
LOG.error("fail adding finished application", e);
throw new RuntimeException(e);
}
}
- public void updateLastUpdateTime(Long updateTime){
+ public void updateLastUpdateTime(Long updateTime) {
String lastTimeStampPath = zkRoot + "/" + START_TIMESTAMP;
- try{
- if(_curator.checkExists().forPath(lastTimeStampPath) == null){
- _curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(lastTimeStampPath, updateTime.toString().getBytes("UTF-8"));
- }else{
+ try {
+ if (curator.checkExists().forPath(lastTimeStampPath) == null) {
+ curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(lastTimeStampPath, updateTime.toString().getBytes("UTF-8"));
+ } else {
long originalEndTime = this.readLastFinishedTimestamp();
- if(originalEndTime < updateTime){
- _curator.setData().forPath(lastTimeStampPath, updateTime.toString().getBytes("UTF-8"));
+ if (originalEndTime < updateTime) {
+ curator.setData().forPath(lastTimeStampPath, updateTime.toString().getBytes("UTF-8"));
}
}
- }catch (Exception e){
+ } catch (Exception e) {
LOG.error("fail to update last finished time", e);
throw new RuntimeException(e);
}
}
- public void updateApplicationStatus(String appId, Enum<ZKStateConstant.AppStatus> status){
+ public void updateApplicationStatus(String appId, Enum<ZKStateConstant.AppStatus> status) {
String path = zkRoot + "/jobs/" + appId ;
- InterProcessLock lock = new InterProcessReadWriteLock(_curator,zkRoot+"/jobs").readLock();
- try{
- if(_curator.checkExists().forPath(path) != null){
- if(status.equals(ZKStateConstant.AppStatus.FINISHED)){
+ InterProcessLock lock = new InterProcessReadWriteLock(curator,zkRoot + "/jobs").readLock();
+ try {
+ if (curator.checkExists().forPath(path) != null) {
+ if (status.equals(ZKStateConstant.AppStatus.FINISHED)) {
lock.acquire();
- _curator.delete().deletingChildrenIfNeeded().forPath(path);
- }else{
- _curator.setData().forPath(path, status.toString().getBytes("UTF-8"));
+ curator.delete().deletingChildrenIfNeeded().forPath(path);
+ } else {
+ curator.setData().forPath(path, status.toString().getBytes("UTF-8"));
}
- }else{
+ } else {
String errorMsg = String.format("fail to update for application with path %s", path);
LOG.error(errorMsg);
}
- }catch (Exception e){
+ } catch (Exception e) {
LOG.error("fail to update application status", e);
throw new RuntimeException(e);
- }finally{
- try{
- if(lock.isAcquiredInThisProcess())
+ } finally {
+ try {
+ if (lock.isAcquiredInThisProcess()) {
lock.release();
- }catch (Exception e){
+ }
+ } catch (Exception e) {
LOG.error("fail to release lock",e);
}
}
}
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/ZKStateConstant.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/ZKStateConstant.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/ZKStateConstant.java
index 40efa50..578ba3c 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/ZKStateConstant.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/ZKStateConstant.java
@@ -21,7 +21,7 @@ package org.apache.eagle.jpm.spark.history.status;
public class ZKStateConstant {
- public enum AppStatus{
+ public enum AppStatus {
INIT, SENT_FOR_PARSE, FINISHED, FAILED
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java
index 8404eda..8965d3d 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java
@@ -19,26 +19,24 @@
package org.apache.eagle.jpm.spark.history.storm;
+import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig;
+import org.apache.eagle.jpm.spark.history.status.JobHistoryZKStateManager;
+import org.apache.eagle.jpm.spark.history.status.ZKStateConstant;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.resourcefetch.RMResourceFetcher;
+import org.apache.eagle.jpm.util.resourcefetch.ResourceFetcher;
+import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
-import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig;
-import org.apache.eagle.jpm.spark.history.status.JobHistoryZKStateManager;
-import org.apache.eagle.jpm.spark.history.status.ZKStateConstant;
-import org.apache.eagle.jpm.util.Constants;
-import org.apache.eagle.jpm.util.resourceFetch.RMResourceFetcher;
-import org.apache.eagle.jpm.util.resourceFetch.ResourceFetcher;
-import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
-import java.util.ArrayList;
import java.util.Map;
public class FinishedSparkJobSpout extends BaseRichSpout {
@@ -52,7 +50,7 @@ public class FinishedSparkJobSpout extends BaseRichSpout {
private static final int FAIL_MAX_TIMES = 5;
- public FinishedSparkJobSpout(SparkHistoryCrawlConfig config){
+ public FinishedSparkJobSpout(SparkHistoryCrawlConfig config) {
this.config = config;
}
@@ -102,15 +100,14 @@ public class FinishedSparkJobSpout extends BaseRichSpout {
}
} catch (Exception e) {
LOG.error("Fail to run next tuple", e);
- // this.takeRest(10);
}
-
}
private void takeRest(int seconds) {
try {
Thread.sleep(seconds * 1000);
- } catch(InterruptedException e) {
+ } catch (InterruptedException e) {
+ LOG.warn("exception found {}", e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryTopology.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryTopology.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryTopology.java
index bd0eb85..423dbef 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryTopology.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryTopology.java
@@ -19,32 +19,32 @@
package org.apache.eagle.jpm.spark.history.storm;
+import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
-import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig;
public class SparkHistoryTopology {
- private SparkHistoryCrawlConfig SHConfig;
+ private SparkHistoryCrawlConfig sparkHistoryCrawlConfig;
- public SparkHistoryTopology(SparkHistoryCrawlConfig config){
- this.SHConfig = config;
+ public SparkHistoryTopology(SparkHistoryCrawlConfig config) {
+ this.sparkHistoryCrawlConfig = config;
}
public TopologyBuilder getBuilder() {
TopologyBuilder builder = new TopologyBuilder();
String spoutName = "sparkHistoryJobSpout";
String boltName = "sparkHistoryJobBolt";
- com.typesafe.config.Config config = this.SHConfig.getConfig();
+ com.typesafe.config.Config config = this.sparkHistoryCrawlConfig.getConfig();
builder.setSpout(spoutName,
- new FinishedSparkJobSpout(SHConfig),
+ new FinishedSparkJobSpout(sparkHistoryCrawlConfig),
config.getInt("storm.parallelismConfig." + spoutName)
).setNumTasks(config.getInt("storm.tasks." + spoutName));
builder.setBolt(boltName,
- new SparkJobParseBolt(SHConfig),
+ new SparkJobParseBolt(sparkHistoryCrawlConfig),
config.getInt("storm.parallelismConfig." + boltName)
).setNumTasks(config.getInt("storm.tasks." + boltName)).shuffleGrouping(spoutName);
return builder;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java
index 23d5152..f00fa1b 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java
@@ -19,11 +19,6 @@
package org.apache.eagle.jpm.spark.history.storm;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Tuple;
import org.apache.eagle.jpm.spark.crawl.JHFInputStreamReader;
import org.apache.eagle.jpm.spark.crawl.SparkApplicationInfo;
import org.apache.eagle.jpm.spark.crawl.SparkFilesystemInputStreamReaderImpl;
@@ -31,9 +26,14 @@ import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig;
import org.apache.eagle.jpm.spark.history.status.JobHistoryZKStateManager;
import org.apache.eagle.jpm.spark.history.status.ZKStateConstant;
import org.apache.eagle.jpm.util.HDFSUtil;
-import org.apache.eagle.jpm.util.resourceFetch.ResourceFetcher;
-import org.apache.eagle.jpm.util.resourceFetch.SparkHistoryServerResourceFetcher;
-import org.apache.eagle.jpm.util.resourceFetch.model.SparkApplication;
+import org.apache.eagle.jpm.util.resourcefetch.ResourceFetcher;
+import org.apache.eagle.jpm.util.resourcefetch.SparkHistoryServerResourceFetcher;
+import org.apache.eagle.jpm.util.resourcefetch.model.SparkApplication;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/TestHDFS.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/TestHDFS.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/TestHDFS.java
index eb30f5e..f4284e1 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/TestHDFS.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/TestHDFS.java
@@ -28,9 +28,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestHDFS {
-
private static final Logger LOG = LoggerFactory.getLogger(TestHDFS.class);
- public static void main(String[] args) throws Exception{
+
+ public static void main(String[] args) throws Exception {
SparkHistoryCrawlConfig config = new SparkHistoryCrawlConfig();
Configuration conf = new Configuration();
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java
index 61c0751..5e21406 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java
@@ -16,14 +16,15 @@
*/
package org.apache.eagle.jpm.spark.running;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import com.typesafe.config.Config;
+
import org.apache.eagle.app.StormApplication;
import org.apache.eagle.app.environment.impl.StormEnvironment;
import org.apache.eagle.jpm.spark.running.storm.SparkRunningJobFetchSpout;
import org.apache.eagle.jpm.spark.running.storm.SparkRunningJobParseBolt;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import com.typesafe.config.Config;
public class SparkRunningJobApp extends StormApplication {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
index 5988273..d9c66e3 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
@@ -18,8 +18,8 @@
package org.apache.eagle.jpm.spark.running;
-import com.typesafe.config.Config;
import org.apache.eagle.common.config.ConfigOptionParser;
+import com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,28 +33,36 @@ public class SparkRunningJobAppConfig implements Serializable {
public String getEnv() {
return env;
}
+
private String env;
- ZKStateConfig getZkStateConfig() { return zkStateConfig; }
+ ZKStateConfig getZkStateConfig() {
+ return zkStateConfig;
+ }
+
private ZKStateConfig zkStateConfig;
private TopologyConfig topologyConfig;
- public TopologyConfig getTopologyConfig(){
+
+ public TopologyConfig getTopologyConfig() {
return topologyConfig;
}
public EagleServiceConfig getEagleServiceConfig() {
return eagleServiceConfig;
}
+
private EagleServiceConfig eagleServiceConfig;
public JobExtractorConfig getJobExtractorConfig() {
return jobExtractorConfig;
}
+
private JobExtractorConfig jobExtractorConfig;
public EndpointConfig getEndpointConfig() {
return endpointConfig;
}
+
private EndpointConfig endpointConfig;
public static class TopologyConfig implements Serializable {
@@ -100,6 +108,7 @@ public class SparkRunningJobAppConfig implements Serializable {
public Config getConfig() {
return config;
}
+
private Config config;
private static SparkRunningJobAppConfig manager = new SparkRunningJobAppConfig();
@@ -127,7 +136,7 @@ public class SparkRunningJobAppConfig implements Serializable {
return manager;
}
- private void init(Config config){
+ private void init(Config config) {
this.config = config;
this.env = config.getString("envContextConfig.env");
this.zkStateConfig.zkQuorum = config.getString("zookeeperConfig.zkQuorum");
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java
index 7b8f648..7de1530 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java
@@ -17,11 +17,11 @@
package org.apache.eagle.jpm.spark.running.entities;
-import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import org.apache.eagle.log.entity.meta.*;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.apache.eagle.jpm.util.Constants;
@Table("eagleSparkRunningApps")
@ColumnFamily("f")
@@ -265,11 +265,14 @@ public class SparkAppEntity extends TaggedLogAPIEntity {
return driveMemoryBytes;
}
- public int getCompleteTasks(){ return completeTasks;}
+ public int getCompleteTasks() {
+ return completeTasks;
+ }
public JobConfig getConfig() {
return config;
}
+
public void setStartTime(long startTime) {
this.startTime = startTime;
valueChanged("startTime");
@@ -420,7 +423,7 @@ public class SparkAppEntity extends TaggedLogAPIEntity {
valueChanged("driveMemoryBytes");
}
- public void setCompleteTasks(int completeTasks){
+ public void setCompleteTasks(int completeTasks) {
this.completeTasks = completeTasks;
valueChanged("completeTasks");
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java
index f4de84c..89549ca 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java
@@ -17,10 +17,10 @@
package org.apache.eagle.jpm.spark.running.entities;
+import org.apache.eagle.jpm.util.Constants;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import org.apache.eagle.log.entity.meta.*;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.apache.eagle.jpm.util.Constants;
@Table("eagleSparkRunningExecutors")
@ColumnFamily("f")
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java
index 1c2caa4..bb56b52 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java
@@ -17,10 +17,10 @@
package org.apache.eagle.jpm.spark.running.entities;
+import org.apache.eagle.jpm.util.Constants;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import org.apache.eagle.log.entity.meta.*;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.apache.eagle.jpm.util.Constants;
import java.util.List;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java
index 72dbe40..be0ffd0 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java
@@ -17,10 +17,10 @@
package org.apache.eagle.jpm.spark.running.entities;
+import org.apache.eagle.jpm.util.Constants;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import org.apache.eagle.log.entity.meta.*;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.apache.eagle.jpm.util.Constants;
@Table("eagleSparkRunningStages")
@ColumnFamily("f")
@@ -40,27 +40,27 @@ public class SparkStageEntity extends TaggedLogAPIEntity {
@Column("d")
private int numFailedTasks = 0;
@Column("e")
- private long executorRunTime = 0l;
+ private long executorRunTime = 0L;
@Column("f")
- private long inputBytes = 0l;
+ private long inputBytes = 0L;
@Column("g")
- private long inputRecords = 0l;
+ private long inputRecords = 0L;
@Column("h")
- private long outputBytes = 0l;
+ private long outputBytes = 0L;
@Column("i")
- private long outputRecords = 0l;
+ private long outputRecords = 0L;
@Column("j")
- private long shuffleReadBytes = 0l;
+ private long shuffleReadBytes = 0L;
@Column("k")
- private long shuffleReadRecords = 0l;
+ private long shuffleReadRecords = 0L;
@Column("l")
- private long shuffleWriteBytes = 0l;
+ private long shuffleWriteBytes = 0L;
@Column("m")
- private long shuffleWriteRecords = 0l;
+ private long shuffleWriteRecords = 0L;
@Column("n")
- private long memoryBytesSpilled = 0l;
+ private long memoryBytesSpilled = 0L;
@Column("o")
- private long diskBytesSpilled = 0l;
+ private long diskBytesSpilled = 0L;
@Column("p")
private String name;
@Column("q")
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java
index 183a62a..e531806 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java
@@ -17,10 +17,10 @@
package org.apache.eagle.jpm.spark.running.entities;
+import org.apache.eagle.jpm.util.Constants;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import org.apache.eagle.log.entity.meta.*;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.apache.eagle.jpm.util.Constants;
@Table("eagleSparkRunningTasks")
@ColumnFamily("f")
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java
index b2a5b63..6411018 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java
@@ -18,7 +18,6 @@
package org.apache.eagle.jpm.spark.running.parser;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.eagle.jpm.spark.crawl.EventType;
import org.apache.eagle.jpm.spark.running.SparkRunningJobAppConfig;
import org.apache.eagle.jpm.spark.running.entities.*;
@@ -27,9 +26,10 @@ import org.apache.eagle.jpm.util.Constants;
import org.apache.eagle.jpm.util.HDFSUtil;
import org.apache.eagle.jpm.util.SparkJobTagName;
import org.apache.eagle.jpm.util.Utils;
-import org.apache.eagle.jpm.util.resourceFetch.ResourceFetcher;
-import org.apache.eagle.jpm.util.resourceFetch.connection.InputStreamUtils;
-import org.apache.eagle.jpm.util.resourceFetch.model.*;
+import org.apache.eagle.jpm.util.resourcefetch.ResourceFetcher;
+import org.apache.eagle.jpm.util.resourcefetch.connection.InputStreamUtils;
+import org.apache.eagle.jpm.util.resourcefetch.model.*;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -189,12 +189,12 @@ public class SparkApplicationParser implements Runnable {
//we must flush entities before delete from zk in case of missing finish state of jobs
//delete from zk if needed
sparkAppEntityMap.keySet()
- .stream()
- .filter(
- jobId -> sparkAppEntityMap.get(jobId).getYarnState().equals(Constants.AppState.FINISHED.toString()) ||
- sparkAppEntityMap.get(jobId).getYarnState().equals(Constants.AppState.FAILED.toString()))
- .forEach(
- jobId -> this.sparkRunningJobManager.delete(app.getId(), jobId));
+ .stream()
+ .filter(
+ jobId -> sparkAppEntityMap.get(jobId).getYarnState().equals(Constants.AppState.FINISHED.toString())
+ || sparkAppEntityMap.get(jobId).getYarnState().equals(Constants.AppState.FAILED.toString()))
+ .forEach(
+ jobId -> this.sparkRunningJobManager.delete(app.getId(), jobId));
}
LOG.info("finish process yarn application " + app.getId());
@@ -243,12 +243,12 @@ public class SparkApplicationParser implements Runnable {
JobConfig jobConfig = null;
try (FileSystem hdfs = HDFSUtil.getFileSystem(this.hdfsConf)) {
-// // For Yarn version >= 2.7,
-// // log name: "application_1468625664674_0003_appattempt_1468625664674_0003_000001"
-// String attemptIdFormatted = String.format("%06d", attemptId);
-// // remove "application_" to get the number part of appID.
-// String sparkAppIdNum = sparkAppId.substring(12);
-// String attemptIdString = "appattempt_" + sparkAppIdNum + "_" + attemptIdFormatted;
+ // // For Yarn version >= 2.7,
+ // // log name: "application_1468625664674_0003_appattempt_1468625664674_0003_000001"
+ // String attemptIdFormatted = String.format("%06d", attemptId);
+ // // remove "application_" to get the number part of appID.
+ // String sparkAppIdNum = sparkAppId.substring(12);
+ // String attemptIdString = "appattempt_" + sparkAppIdNum + "_" + attemptIdFormatted;
// For Yarn version 2.4.x
// log name: application_1464382345557_269065_1
@@ -277,8 +277,8 @@ public class SparkApplicationParser implements Runnable {
}
private boolean isClientMode(JobConfig jobConfig) {
- return jobConfig.containsKey(Constants.SPARK_MASTER_KEY) &&
- jobConfig.get(Constants.SPARK_MASTER_KEY).equalsIgnoreCase("yarn-client");
+ return jobConfig.containsKey(Constants.SPARK_MASTER_KEY)
+ && jobConfig.get(Constants.SPARK_MASTER_KEY).equalsIgnoreCase("yarn-client");
}
private boolean fetchSparkApps() {
@@ -315,10 +315,10 @@ public class SparkApplicationParser implements Runnable {
lastSavedAttempt = Integer.parseInt(sparkAppEntityMap.get(id).getTags().get(SparkJobTagName.SPARK_APP_ATTEMPT_ID.toString()));
}
for (int j = lastSavedAttempt; j <= currentAttempt; j++) {
- SparkAppEntity attemptEntity = new SparkAppEntity();
commonTags.put(SparkJobTagName.SPARK_APP_NAME.toString(), sparkApplication.getName());
commonTags.put(SparkJobTagName.SPARK_APP_ATTEMPT_ID.toString(), "" + j);
commonTags.put(SparkJobTagName.SPARK_APP_ID.toString(), id);
+ SparkAppEntity attemptEntity = new SparkAppEntity();
attemptEntity.setTags(new HashMap<>(commonTags));
attemptEntity.setAppInfo(app);
@@ -340,9 +340,9 @@ public class SparkApplicationParser implements Runnable {
JobConfig jobConfig = attemptEntity.getConfig();
attemptEntity.setExecMemoryBytes(Utils.parseMemory(jobConfig.get(Constants.SPARK_EXECUTOR_MEMORY_KEY)));
- attemptEntity.setDriveMemoryBytes(isClientMode(jobConfig) ?
- 0 :
- Utils.parseMemory(jobConfig.get(Constants.SPARK_DRIVER_MEMORY_KEY)));
+ attemptEntity.setDriveMemoryBytes(isClientMode(jobConfig)
+ ? 0
+ : Utils.parseMemory(jobConfig.get(Constants.SPARK_DRIVER_MEMORY_KEY)));
attemptEntity.setExecutorCores(Integer.parseInt(jobConfig.get(Constants.SPARK_EXECUTOR_CORES_KEY)));
// spark.driver.cores may not be set.
String driverCoresStr = jobConfig.get(Constants.SPARK_DRIVER_CORES_KEY);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java
index 11f7909..3fb6371 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java
@@ -18,14 +18,15 @@
package org.apache.eagle.jpm.spark.running.recover;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.eagle.jpm.spark.running.SparkRunningJobAppConfig;
import org.apache.eagle.jpm.spark.running.entities.SparkAppEntity;
import org.apache.eagle.jpm.util.jobrecover.RunningJobManager;
-import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo;
+import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
+import org.apache.commons.lang3.tuple.Pair;
import java.io.Serializable;
-import java.util.*;
+import java.util.HashMap;
+import java.util.Map;
public class SparkRunningJobManager implements Serializable {
private RunningJobManager runningJobManager;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java
index 256829e..ccdfe79 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java
@@ -28,9 +28,9 @@ import org.apache.eagle.jpm.spark.running.SparkRunningJobAppConfig;
import org.apache.eagle.jpm.spark.running.entities.SparkAppEntity;
import org.apache.eagle.jpm.spark.running.recover.SparkRunningJobManager;
import org.apache.eagle.jpm.util.Constants;
-import org.apache.eagle.jpm.util.resourceFetch.RMResourceFetcher;
-import org.apache.eagle.jpm.util.resourceFetch.ResourceFetcher;
-import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo;
+import org.apache.eagle.jpm.util.resourcefetch.RMResourceFetcher;
+import org.apache.eagle.jpm.util.resourcefetch.ResourceFetcher;
+import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
index 21a6ef2..c41804b 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
@@ -23,9 +23,9 @@ import org.apache.eagle.jpm.spark.running.entities.SparkAppEntity;
import org.apache.eagle.jpm.spark.running.parser.SparkApplicationParser;
import org.apache.eagle.jpm.spark.running.recover.SparkRunningJobManager;
import org.apache.eagle.jpm.util.Constants;
-import org.apache.eagle.jpm.util.resourceFetch.RMResourceFetcher;
-import org.apache.eagle.jpm.util.resourceFetch.ResourceFetcher;
-import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo;
+import org.apache.eagle.jpm.util.resourcefetch.RMResourceFetcher;
+import org.apache.eagle.jpm.util.resourcefetch.ResourceFetcher;
+import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
index 07850f9..1a6c09c 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
@@ -21,19 +21,19 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Constants {
- private final static Logger LOG = LoggerFactory.getLogger(Constants.class);
+ private static final Logger LOG = LoggerFactory.getLogger(Constants.class);
//SPARK
- public final static String SPARK_APP_SERVICE_ENDPOINT_NAME = "SparkAppService";
- public final static String SPARK_JOB_SERVICE_ENDPOINT_NAME = "SparkJobService";
- public final static String SPARK_STAGE_SERVICE_ENDPOINT_NAME = "SparkStageService";
- public final static String SPARK_TASK_SERVICE_ENDPOINT_NAME = "SparkTaskService";
- public final static String SPARK_EXECUTOR_SERVICE_ENDPOINT_NAME = "SparkExecutorService";
- public final static String RUNNING_SPARK_APP_SERVICE_ENDPOINT_NAME = "RunningSparkAppService";
- public final static String RUNNING_SPARK_JOB_SERVICE_ENDPOINT_NAME = "RunningSparkJobService";
- public final static String RUNNING_SPARK_STAGE_SERVICE_ENDPOINT_NAME = "RunningSparkStageService";
- public final static String RUNNING_SPARK_TASK_SERVICE_ENDPOINT_NAME = "RunningSparkTaskService";
- public final static String RUNNING_SPARK_EXECUTOR_SERVICE_ENDPOINT_NAME = "RunningSparkExecutorService";
+ public static final String SPARK_APP_SERVICE_ENDPOINT_NAME = "SparkAppService";
+ public static final String SPARK_JOB_SERVICE_ENDPOINT_NAME = "SparkJobService";
+ public static final String SPARK_STAGE_SERVICE_ENDPOINT_NAME = "SparkStageService";
+ public static final String SPARK_TASK_SERVICE_ENDPOINT_NAME = "SparkTaskService";
+ public static final String SPARK_EXECUTOR_SERVICE_ENDPOINT_NAME = "SparkExecutorService";
+ public static final String RUNNING_SPARK_APP_SERVICE_ENDPOINT_NAME = "RunningSparkAppService";
+ public static final String RUNNING_SPARK_JOB_SERVICE_ENDPOINT_NAME = "RunningSparkJobService";
+ public static final String RUNNING_SPARK_STAGE_SERVICE_ENDPOINT_NAME = "RunningSparkStageService";
+ public static final String RUNNING_SPARK_TASK_SERVICE_ENDPOINT_NAME = "RunningSparkTaskService";
+ public static final String RUNNING_SPARK_EXECUTOR_SERVICE_ENDPOINT_NAME = "RunningSparkExecutorService";
public static final String APPLICATION_PREFIX = "application";
public static final String JOB_PREFIX = "job";
public static final String V2_APPS_URL = "ws/v1/cluster/apps";
@@ -53,7 +53,7 @@ public class Constants {
public static final String SPARK_YARN_DRIVER_MEMORY_OVERHEAD_KEY = "spark.yarn.driver.memoryOverhead";
public static final String SPARK_YARN_am_MEMORY_OVERHEAD_KEY = "spark.yarn.am.memoryOverhead";
- public static final String SPARK_APPS_URL ="api/v1/applications";
+ public static final String SPARK_APPS_URL = "api/v1/applications";
public static final String SPARK_EXECUTORS_URL = "executors";
public static final String SPARK_JOBS_URL = "jobs";
public static final String SPARK_STAGES_URL = "stages";
@@ -68,21 +68,27 @@ public class Constants {
public enum CompressionType {
GZIP, NONE
}
+
public enum JobState {
NEW, INITED, RUNNING, SUCCEEDED, FAILED, KILL_WAIT, KILLED, ERROR, FINISHED, ALL
}
+
public enum TaskState {
NEW, SCHEDULED, RUNNING, SUCCEEDED, FAILED, KILL_WAIT, KILLED
}
+
public enum StageState {
ACTIVE, COMPLETE, PENDING
}
+
public enum AppState {
NEW, NEW_SAVING, SUBMITTED, ACCEPTED, RUNNING, FINISHED, FAILED, KILLED
}
+
public enum AppStatus {
UNDEFINED, SUCCEEDED, FAILED, KILLED
}
+
public enum ResourceType {
COMPLETE_SPARK_JOB, SPARK_JOB_DETAIL, RUNNING_SPARK_JOB, RUNNING_MR_JOB, CLUSTER_INFO, JOB_CONFIGURATION,
COMPLETE_MR_JOB
@@ -117,7 +123,7 @@ public class Constants {
public static final String HIVE_QUERY_STRING = "hive.query.string";
/**
- * MR task types
+ * MR task types.
*/
public enum TaskType {
SETUP, MAP, REDUCE, CLEANUP
@@ -128,9 +134,10 @@ public class Constants {
NOTAVALIABLE("N/A")
;
private String value;
- JobType(String value){
+ JobType(String value) {
this.value = value;
}
+
@Override
public String toString() {
return this.value;