You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/08/09 05:25:31 UTC
[3/8] incubator-eagle git commit: [EAGLE-422] eagle support for mr &
spark running job monitoring
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java
new file mode 100644
index 0000000..bb76213
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java
@@ -0,0 +1,647 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.spark.running.parser;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.eagle.jpm.spark.crawl.EventType;
+import org.apache.eagle.jpm.spark.running.common.SparkRunningConfigManager;
+import org.apache.eagle.jpm.spark.running.entities.*;
+import org.apache.eagle.jpm.spark.running.recover.SparkRunningJobManager;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.HDFSUtil;
+import org.apache.eagle.jpm.util.SparkJobTagName;
+import org.apache.eagle.jpm.util.Utils;
+import org.apache.eagle.jpm.util.resourceFetch.ResourceFetcher;
+import org.apache.eagle.jpm.util.resourceFetch.connection.InputStreamUtils;
+import org.apache.eagle.jpm.util.resourceFetch.model.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.*;
+import java.util.function.Function;
+
+public class SparkApplicationParser implements Runnable {
+ private static final Logger LOG = LoggerFactory.getLogger(SparkApplicationParser.class);
+
+ public enum ParserStatus {
+ RUNNING,
+ FINISHED,
+ APP_FINISHED
+ }
+
+ private AppInfo app;
+ private static final int MAX_RETRY_TIMES = 2;
+ private SparkAppEntityCreationHandler sparkAppEntityCreationHandler;
+ //<sparkAppId, SparkAppEntity>
+ private Map<String, SparkAppEntity> sparkAppEntityMap;
+ private Map<String, JobConfig> sparkJobConfigs;
+ private Map<Integer, Pair<Integer, Pair<Long, Long>>> stagesTime;
+ private Set<Integer> completeStages;
+ private Configuration hdfsConf;
+ private SparkRunningConfigManager.EndpointConfig endpointConfig;
+ private final Object lock = new Object();
+ private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
+ private Map<String, String> commonTags = new HashMap<>();
+ private SparkRunningJobManager sparkRunningJobManager;
+ private ParserStatus parserStatus;
+ private ResourceFetcher rmResourceFetcher;
+ private int currentAttempt;
+ private boolean first;
+
+ static {
+ OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
+ }
+
+ public SparkApplicationParser(SparkRunningConfigManager.EagleServiceConfig eagleServiceConfig,
+ SparkRunningConfigManager.EndpointConfig endpointConfig,
+ SparkRunningConfigManager.JobExtractorConfig jobExtractorConfig,
+ AppInfo app, Map<String, SparkAppEntity> sparkApp,
+ SparkRunningJobManager sparkRunningJobManager, ResourceFetcher rmResourceFetcher) {
+ this.sparkAppEntityCreationHandler = new SparkAppEntityCreationHandler(eagleServiceConfig);
+ this.endpointConfig = endpointConfig;
+ this.app = app;
+ this.sparkJobConfigs = new HashMap<>();
+ this.stagesTime = new HashMap<>();
+ this.completeStages = new HashSet<>();
+ this.sparkAppEntityMap = sparkApp;
+ if (this.sparkAppEntityMap == null) {
+ this.sparkAppEntityMap = new HashMap<>();
+ }
+ this.rmResourceFetcher = rmResourceFetcher;
+ this.currentAttempt = 1;
+ this.first = true;
+ this.hdfsConf = new Configuration();
+ this.hdfsConf.set("fs.defaultFS", endpointConfig.nnEndpoint);
+ this.hdfsConf.setBoolean("fs.hdfs.impl.disable.cache", true);
+ this.hdfsConf.set("hdfs.kerberos.principal", endpointConfig.principal);
+ this.hdfsConf.set("hdfs.keytab.file", endpointConfig.keyTab);
+
+ this.commonTags.put(SparkJobTagName.SITE.toString(), jobExtractorConfig.site);
+ this.commonTags.put(SparkJobTagName.SPARK_USER.toString(), app.getUser());
+ this.commonTags.put(SparkJobTagName.SPARK_QUEUE.toString(), app.getQueue());
+ this.parserStatus = ParserStatus.FINISHED;
+ this.sparkRunningJobManager = sparkRunningJobManager;
+ }
+
+ public ParserStatus status() {
+ return this.parserStatus;
+ }
+
+ public void setStatus(ParserStatus status) {
+ this.parserStatus = status;
+ }
+
+ private void finishSparkApp(String sparkAppId) {
+ SparkAppEntity attemptEntity = sparkAppEntityMap.get(sparkAppId);
+ attemptEntity.setYarnState(Constants.AppState.FINISHED.toString());
+ attemptEntity.setYarnStatus(Constants.AppStatus.FAILED.toString());
+ sparkJobConfigs.remove(sparkAppId);
+ if (sparkJobConfigs.size() == 0) {
+ this.parserStatus = ParserStatus.APP_FINISHED;
+ }
+ stagesTime.clear();
+ LOG.info("spark application {} has been finished", sparkAppId);
+ }
+
+ private void fetchSparkRunningInfo() throws Exception {
+ for (int i = 0; i < MAX_RETRY_TIMES; i++) {
+ if (fetchSparkApps()) {
+ break;
+ } else if (i == MAX_RETRY_TIMES - 1) {
+ //check whether the app has finished. if we test that we can connect rm, then we consider the app has finished
+ //if we get here either because of cannot connect rm or the app has finished
+ rmResourceFetcher.getResource(Constants.ResourceType.RUNNING_SPARK_JOB);
+ sparkAppEntityMap.keySet().forEach(this::finishSparkApp);
+ return;
+ }
+ }
+
+ List<Function<String, Boolean>> functions = new ArrayList<>();
+ functions.add(fetchSparkExecutors);
+ functions.add(fetchSparkJobs);
+ if (!first) {
+ functions.add(fetchSparkStagesAndTasks);
+ }
+
+ this.first = false;
+ for (String sparkAppId : sparkAppEntityMap.keySet()) {
+ for (Function<String, Boolean> function : functions) {
+ int i = 0;
+ for (; i < MAX_RETRY_TIMES; i++) {
+ if (function.apply(sparkAppId)) {
+ break;
+ }
+ }
+ if (i >= MAX_RETRY_TIMES) {
+ //may caused by rm unreachable
+ rmResourceFetcher.getResource(Constants.ResourceType.RUNNING_SPARK_JOB);
+ finishSparkApp(sparkAppId);
+ break;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void run() {
+ synchronized (this.lock) {
+ if (this.parserStatus == ParserStatus.APP_FINISHED) {
+ return;
+ }
+
+ LOG.info("start to process yarn application " + app.getId());
+ try {
+ fetchSparkRunningInfo();
+ } catch (Exception e) {
+ LOG.warn("exception found when process application {}, {}", app.getId(), e);
+ e.printStackTrace();
+ } finally {
+ for (String jobId : sparkAppEntityMap.keySet()) {
+ sparkAppEntityCreationHandler.add(sparkAppEntityMap.get(jobId));
+ }
+ if (sparkAppEntityCreationHandler.flush()) { //force flush
+ //we must flush entities before delete from zk in case of missing finish state of jobs
+ //delete from zk if needed
+ sparkAppEntityMap.keySet()
+ .stream()
+ .filter(
+ jobId -> sparkAppEntityMap.get(jobId).getYarnState().equals(Constants.AppState.FINISHED.toString()) ||
+ sparkAppEntityMap.get(jobId).getYarnState().equals(Constants.AppState.FAILED.toString()))
+ .forEach(
+ jobId -> this.sparkRunningJobManager.delete(app.getId(), jobId));
+ }
+
+ LOG.info("finish process yarn application " + app.getId());
+ }
+
+ if (this.parserStatus == ParserStatus.RUNNING) {
+ this.parserStatus = ParserStatus.FINISHED;
+ }
+ }
+ }
+
+ private JobConfig parseJobConfig(InputStream is) throws Exception {
+ JobConfig jobConfig = new JobConfig();
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(is))) {
+ String line;
+ boolean stop = false;
+ while ((line = reader.readLine()) != null && !stop) {
+ try {
+ JSONParser parser = new JSONParser();
+ JSONObject eventObj = (JSONObject) parser.parse(line);
+
+ if (eventObj != null) {
+ String eventType = (String) eventObj.get("Event");
+ LOG.info("Event type: " + eventType);
+ if (eventType.equalsIgnoreCase(EventType.SparkListenerEnvironmentUpdate.toString())) {
+ stop = true;
+ JSONObject sparkProps = (JSONObject) eventObj.get("Spark Properties");
+ for (Object key : sparkProps.keySet()) {
+ jobConfig.put((String) key, (String) sparkProps.get(key));
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.error(String.format("Fail to parse %s.", line), e);
+ }
+ }
+
+ return jobConfig;
+ }
+ }
+
+ private JobConfig getJobConfig(String sparkAppId, int attemptId) {
+ //TODO: getResourceManagerVersion() and compare version to make attempt id.
+
+ LOG.info("Get job config for sparkAppId {}, attempt {}, appId {}", sparkAppId, attemptId, app.getId());
+ JobConfig jobConfig = null;
+
+ try (FileSystem hdfs = HDFSUtil.getFileSystem(this.hdfsConf)) {
+// // For Yarn version >= 2.7,
+// // log name: "application_1468625664674_0003_appattempt_1468625664674_0003_000001"
+// String attemptIdFormatted = String.format("%06d", attemptId);
+// // remove "application_" to get the number part of appID.
+// String sparkAppIdNum = sparkAppId.substring(12);
+// String attemptIdString = "appattempt_" + sparkAppIdNum + "_" + attemptIdFormatted;
+
+ // For Yarn version 2.4.x
+ // log name: application_1464382345557_269065_1
+ String attemptIdString = Integer.toString(attemptId);
+
+ //test appId_attemptId.inprogress/appId_attemptId/appId.inprogress/appId
+ String eventLogDir = this.endpointConfig.eventLog;
+ Path attemptFile = new Path(eventLogDir + "/" + sparkAppId + "_" + attemptIdString + ".inprogress");
+ if (!hdfs.exists(attemptFile)) {
+ attemptFile = new Path(eventLogDir + "/" + sparkAppId + "_" + attemptIdString);
+ if (!hdfs.exists(attemptFile)) {
+ attemptFile = new Path(eventLogDir + "/" + sparkAppId + ".inprogress");
+ if (!hdfs.exists(attemptFile)) {
+ attemptFile = new Path(eventLogDir + "/" + sparkAppId);
+ }
+ }
+ }
+
+ LOG.info("Attempt File path: " + attemptFile.toString());
+ jobConfig = parseJobConfig(hdfs.open(attemptFile));
+ } catch (Exception e) {
+ LOG.error("Fail to process application {}", sparkAppId, e);
+ }
+
+ return jobConfig;
+ }
+
+ private boolean isClientMode(JobConfig jobConfig) {
+ return jobConfig.containsKey(Constants.SPARK_MASTER_KEY) &&
+ jobConfig.get(Constants.SPARK_MASTER_KEY).equalsIgnoreCase("yarn-client");
+ }
+
+ private boolean fetchSparkApps() {
+ String appURL = app.getTrackingUrl() + Constants.SPARK_APPS_URL + "?" + Constants.ANONYMOUS_PARAMETER;
+ InputStream is = null;
+ SparkApplication[] sparkApplications = null;
+ try {
+ is = InputStreamUtils.getInputStream(appURL, null, Constants.CompressionType.NONE);
+ LOG.info("fetch spark application from {}", appURL);
+ sparkApplications = OBJ_MAPPER.readValue(is, SparkApplication[].class);
+ } catch (java.net.ConnectException e) {
+ LOG.warn("fetch spark application from {} failed, {}", appURL, e);
+ e.printStackTrace();
+ return true;
+ } catch (Exception e) {
+ LOG.warn("fetch spark application from {} failed, {}", appURL, e);
+ e.printStackTrace();
+ return false;
+ } finally {
+ Utils.closeInputStream(is);
+ }
+
+ for (SparkApplication sparkApplication : sparkApplications) {
+ String id = sparkApplication.getId();
+ if (id.contains(" ") || !id.startsWith("app")) {
+ //spark version < 1.6.0 and id contains white space, need research again later
+ LOG.warn("skip spark application {}", id);
+ continue;
+ }
+
+ currentAttempt = sparkApplication.getAttempts().size();
+ int lastSavedAttempt = 1;
+ if (sparkAppEntityMap.containsKey(id)) {
+ lastSavedAttempt = Integer.parseInt(sparkAppEntityMap.get(id).getTags().get(SparkJobTagName.SPARK_APP_ATTEMPT_ID.toString()));
+ }
+ for (int j = lastSavedAttempt; j <= currentAttempt; j++) {
+ SparkAppEntity attemptEntity = new SparkAppEntity();
+ commonTags.put(SparkJobTagName.SPARK_APP_NAME.toString(), sparkApplication.getName());
+ commonTags.put(SparkJobTagName.SPARK_APP_ATTEMPT_ID.toString(), "" + j);
+ commonTags.put(SparkJobTagName.SPARK_APP_ID.toString(), id);
+ attemptEntity.setTags(new HashMap<>(commonTags));
+ attemptEntity.setAppInfo(app);
+
+ attemptEntity.setStartTime(Utils.dateTimeToLong(sparkApplication.getAttempts().get(j - 1).getStartTime()));
+ attemptEntity.setTimestamp(attemptEntity.getStartTime());
+
+ if (sparkJobConfigs.containsKey(id) && j == currentAttempt) {
+ attemptEntity.setConfig(sparkJobConfigs.get(id));
+ }
+
+ if (attemptEntity.getConfig() == null) {
+ attemptEntity.setConfig(getJobConfig(id, j));
+ if (j == currentAttempt) {
+ sparkJobConfigs.put(id, attemptEntity.getConfig());
+ }
+ }
+
+ try {
+ JobConfig jobConfig = attemptEntity.getConfig();
+ attemptEntity.setExecMemoryBytes(Utils.parseMemory(jobConfig.get(Constants.SPARK_EXECUTOR_MEMORY_KEY)));
+
+ attemptEntity.setDriveMemoryBytes(isClientMode(jobConfig) ?
+ 0 :
+ Utils.parseMemory(jobConfig.get(Constants.SPARK_DRIVER_MEMORY_KEY)));
+ attemptEntity.setExecutorCores(Integer.parseInt(jobConfig.get(Constants.SPARK_EXECUTOR_CORES_KEY)));
+ // spark.driver.cores may not be set.
+ String driverCoresStr = jobConfig.get(Constants.SPARK_DRIVER_CORES_KEY);
+ int driverCores = 0;
+ if (driverCoresStr != null && !isClientMode(jobConfig)) {
+ driverCores = Integer.parseInt(driverCoresStr);
+ }
+ attemptEntity.setDriverCores(driverCores);
+ } catch (Exception e) {
+ LOG.warn("add config failed, {}", e);
+ e.printStackTrace();
+ }
+
+ if (j == currentAttempt) {
+ //current attempt
+ attemptEntity.setYarnState(app.getState());
+ attemptEntity.setYarnStatus(app.getFinalStatus());
+ sparkAppEntityMap.put(id, attemptEntity);
+ this.sparkRunningJobManager.update(app.getId(), id, attemptEntity);
+ } else {
+ attemptEntity.setYarnState(Constants.AppState.FINISHED.toString());
+ attemptEntity.setYarnStatus(Constants.AppStatus.FAILED.toString());
+ }
+ sparkAppEntityCreationHandler.add(attemptEntity);
+ }
+ }
+
+ sparkAppEntityCreationHandler.flush();
+ return true;
+ }
+
+ private Function<String, Boolean> fetchSparkExecutors = sparkAppId -> {
+ //only get current attempt
+ SparkAppEntity sparkAppEntity = sparkAppEntityMap.get(sparkAppId);
+ String executorURL = app.getTrackingUrl() + Constants.SPARK_APPS_URL + "/" + sparkAppId + "/" + Constants.SPARK_EXECUTORS_URL + "?" + Constants.ANONYMOUS_PARAMETER;
+ InputStream is = null;
+ SparkExecutor[] sparkExecutors = null;
+ try {
+ is = InputStreamUtils.getInputStream(executorURL, null, Constants.CompressionType.NONE);
+ LOG.info("fetch spark executor from {}", executorURL);
+ sparkExecutors = OBJ_MAPPER.readValue(is, SparkExecutor[].class);
+ } catch (java.net.ConnectException e) {
+ LOG.warn("fetch spark application from {} failed, {}", executorURL, e);
+ e.printStackTrace();
+ return true;
+ } catch (Exception e) {
+ LOG.warn("fetch spark executor from {} failed, {}", executorURL, e);
+ e.printStackTrace();
+ return false;
+ } finally {
+ Utils.closeInputStream(is);
+ }
+ sparkAppEntity.setExecutors(sparkExecutors.length);
+
+ for (SparkExecutor executor : sparkExecutors) {
+ SparkExecutorEntity entity = new SparkExecutorEntity();
+ entity.setTags(new HashMap<>(sparkAppEntity.getTags()));
+ entity.getTags().put(SparkJobTagName.SPARK_EXECUTOR_ID.toString(), executor.getId());
+ entity.setHostPort(executor.getHostPort());
+ entity.setRddBlocks(executor.getRddBlocks());
+ entity.setMemoryUsed(executor.getMemoryUsed());
+ entity.setDiskUsed(executor.getDiskUsed());
+ entity.setActiveTasks(executor.getActiveTasks());
+ entity.setFailedTasks(executor.getFailedTasks());
+ entity.setCompletedTasks(executor.getCompletedTasks());
+ entity.setTotalTasks(executor.getTotalTasks());
+ entity.setTotalDuration(executor.getTotalDuration());
+ entity.setTotalInputBytes(executor.getTotalInputBytes());
+ entity.setTotalShuffleRead(executor.getTotalShuffleRead());
+ entity.setTotalShuffleWrite(executor.getTotalShuffleWrite());
+ entity.setMaxMemory(executor.getMaxMemory());
+
+ entity.setTimestamp(sparkAppEntity.getTimestamp());
+ entity.setStartTime(sparkAppEntity.getStartTime());
+ if (executor.getId().equalsIgnoreCase("driver")) {
+ entity.setExecMemoryBytes(sparkAppEntity.getDriveMemoryBytes());
+ entity.setCores(sparkAppEntity.getDriverCores());
+ entity.setMemoryOverhead(sparkAppEntity.getDriverMemoryOverhead());
+ } else {
+ entity.setExecMemoryBytes(sparkAppEntity.getExecMemoryBytes());
+ entity.setCores(sparkAppEntity.getExecutorCores());
+ entity.setMemoryOverhead(sparkAppEntity.getExecutorMemoryOverhead());
+ }
+ sparkAppEntityCreationHandler.add(entity);
+ }
+ return true;
+ };
+
+ private Function<String, Boolean> fetchSparkJobs = sparkAppId -> {
+ //only get current attempt
+ SparkAppEntity sparkAppEntity = sparkAppEntityMap.get(sparkAppId);
+ String jobURL = app.getTrackingUrl() + Constants.SPARK_APPS_URL + "/" + sparkAppId + "/" + Constants.SPARK_JOBS_URL + "?" + Constants.ANONYMOUS_PARAMETER;
+ InputStream is = null;
+ SparkJob[] sparkJobs = null;
+ try {
+ is = InputStreamUtils.getInputStream(jobURL, null, Constants.CompressionType.NONE);
+ LOG.info("fetch spark job from {}", jobURL);
+ sparkJobs = OBJ_MAPPER.readValue(is, SparkJob[].class);
+ } catch (java.net.ConnectException e) {
+ LOG.warn("fetch spark application from {} failed, {}", jobURL, e);
+ e.printStackTrace();
+ return true;
+ } catch (Exception e) {
+ LOG.warn("fetch spark job from {} failed, {}", jobURL, e);
+ e.printStackTrace();
+ return false;
+ } finally {
+ Utils.closeInputStream(is);
+ }
+
+ sparkAppEntity.setNumJobs(sparkJobs.length);
+ for (SparkJob sparkJob : sparkJobs) {
+ SparkJobEntity entity = new SparkJobEntity();
+ entity.setTags(new HashMap<>(commonTags));
+ entity.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), sparkJob.getJobId() + "");
+ entity.setSubmissionTime(Utils.dateTimeToLong(sparkJob.getSubmissionTime()));
+ if (sparkJob.getCompletionTime() != null) {
+ entity.setCompletionTime(Utils.dateTimeToLong(sparkJob.getCompletionTime()));
+ }
+ entity.setNumStages(sparkJob.getStageIds().size());
+ entity.setStatus(sparkJob.getStatus());
+ entity.setNumTask(sparkJob.getNumTasks());
+ entity.setNumActiveTasks(sparkJob.getNumActiveTasks());
+ entity.setNumCompletedTasks(sparkJob.getNumCompletedTasks());
+ entity.setNumSkippedTasks(sparkJob.getNumSkippedTasks());
+ entity.setNumFailedTasks(sparkJob.getNumFailedTasks());
+ entity.setNumActiveStages(sparkJob.getNumActiveStages());
+ entity.setNumCompletedStages(sparkJob.getNumCompletedStages());
+ entity.setNumSkippedStages(sparkJob.getNumSkippedStages());
+ entity.setNumFailedStages(sparkJob.getNumFailedStages());
+ entity.setStages(sparkJob.getStageIds());
+ entity.setTimestamp(sparkAppEntity.getTimestamp());
+
+ sparkAppEntity.setTotalStages(sparkAppEntity.getTotalStages() + entity.getNumStages());
+ sparkAppEntity.setTotalTasks(sparkAppEntity.getTotalTasks() + entity.getNumTask());
+ sparkAppEntity.setActiveTasks(sparkAppEntity.getActiveTasks() + entity.getNumActiveTasks());
+ sparkAppEntity.setCompleteTasks(sparkAppEntity.getCompleteTasks() + entity.getNumCompletedTasks());
+ sparkAppEntity.setSkippedTasks(sparkAppEntity.getSkippedTasks() + entity.getNumSkippedTasks());
+ sparkAppEntity.setFailedTasks(sparkAppEntity.getFailedStages() + entity.getNumFailedTasks());
+ sparkAppEntity.setActiveStages(sparkAppEntity.getActiveStages() + entity.getNumActiveStages());
+ sparkAppEntity.setCompleteStages(sparkAppEntity.getCompleteStages() + entity.getNumCompletedStages());
+ sparkAppEntity.setSkippedStages(sparkAppEntity.getSkippedStages() + entity.getNumSkippedStages());
+ sparkAppEntity.setFailedStages(sparkAppEntity.getFailedStages() + entity.getNumFailedStages());
+
+ for (Integer stageId : sparkJob.getStageIds()) {
+ stagesTime.put(stageId, Pair.of(sparkJob.getJobId(), Pair.of(entity.getSubmissionTime(), entity.getCompletionTime())));
+ }
+ sparkAppEntityCreationHandler.add(entity);
+ }
+ return true;
+ };
+
+ private Function<String, Boolean> fetchSparkStagesAndTasks = sparkAppId -> {
+ SparkAppEntity sparkAppEntity = sparkAppEntityMap.get(sparkAppId);
+ String stageURL = app.getTrackingUrl() + Constants.SPARK_APPS_URL + "/" + sparkAppId + "/" + Constants.SPARK_STAGES_URL + "?" + Constants.ANONYMOUS_PARAMETER;
+ InputStream is = null;
+ SparkStage[] sparkStages;
+ try {
+ is = InputStreamUtils.getInputStream(stageURL, null, Constants.CompressionType.NONE);
+ LOG.info("fetch spark stage from {}", stageURL);
+ sparkStages = OBJ_MAPPER.readValue(is, SparkStage[].class);
+ } catch (java.net.ConnectException e) {
+ LOG.warn("fetch spark application from {} failed, {}", stageURL, e);
+ e.printStackTrace();
+ return true;
+ } catch (Exception e) {
+ LOG.warn("fetch spark stage from {} failed, {}", stageURL, e);
+ e.printStackTrace();
+ return false;
+ } finally {
+ Utils.closeInputStream(is);
+ }
+
+ for (SparkStage sparkStage : sparkStages) {
+ //TODO
+ //we need a thread pool to handle this if there are many stages
+ SparkStage stage;
+ try {
+ stageURL = app.getTrackingUrl() + Constants.SPARK_APPS_URL + "/" + sparkAppId + "/" + Constants.SPARK_STAGES_URL + "/" + sparkStage.getStageId() + "?" + Constants.ANONYMOUS_PARAMETER;
+ is = InputStreamUtils.getInputStream(stageURL, null, Constants.CompressionType.NONE);
+ LOG.info("fetch spark stage from {}", stageURL);
+ stage = OBJ_MAPPER.readValue(is, SparkStage[].class)[0];
+ } catch (Exception e) {
+ LOG.warn("fetch spark stage from {} failed, {}", stageURL, e);
+ e.printStackTrace();
+ return false;
+ } finally {
+ Utils.closeInputStream(is);
+ }
+
+ if (this.completeStages.contains(stage.getStageId())) {
+ return true;
+ }
+ SparkStageEntity stageEntity = new SparkStageEntity();
+ stageEntity.setTags(new HashMap<>(commonTags));
+ stageEntity.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), stagesTime.get(stage.getStageId()).getLeft() + "");
+ stageEntity.getTags().put(SparkJobTagName.SPARK_SATGE_ID.toString(), stage.getStageId() + "");
+ stageEntity.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), stage.getAttemptId() + "");
+ stageEntity.setStatus(stage.getStatus());
+ stageEntity.setNumActiveTasks(stage.getNumActiveTasks());
+ stageEntity.setNumCompletedTasks(stage.getNumCompleteTasks());
+ stageEntity.setNumFailedTasks(stage.getNumFailedTasks());
+ stageEntity.setExecutorRunTime(stage.getExecutorRunTime());
+ stageEntity.setInputBytes(stage.getInputBytes());
+ stageEntity.setInputRecords(stage.getInputRecords());
+ stageEntity.setOutputBytes(stage.getOutputBytes());
+ stageEntity.setOutputRecords(stage.getOutputRecords());
+ stageEntity.setShuffleReadBytes(stage.getShuffleReadBytes());
+ stageEntity.setShuffleReadRecords(stage.getShuffleReadRecords());
+ stageEntity.setShuffleWriteBytes(stage.getShuffleWriteBytes());
+ stageEntity.setShuffleWriteRecords(stage.getShuffleWriteRecords());
+ stageEntity.setMemoryBytesSpilled(stage.getMemoryBytesSpilled());
+ stageEntity.setDiskBytesSpilled(stage.getDiskBytesSpilled());
+ stageEntity.setName(stage.getName());
+ stageEntity.setSchedulingPool(stage.getSchedulingPool());
+ stageEntity.setSubmitTime(stagesTime.get(stage.getStageId()).getRight().getLeft());
+ stageEntity.setTimestamp(stageEntity.getSubmitTime());
+ stageEntity.setCompleteTime(stagesTime.get(stage.getStageId()).getRight().getRight());
+ stageEntity.setNumTasks(stage.getTasks() == null ? 0 : stage.getTasks().size());
+ fetchTasksFromStage(stageEntity, stage);
+ sparkAppEntityCreationHandler.add(stageEntity);
+ if (stage.getStatus().equals(Constants.StageState.COMPLETE.toString())) {
+ this.completeStages.add(stage.getStageId());
+ LOG.info("stage {} of spark {} has finished", stage.getStageId(), sparkAppId);
+ }
+
+ sparkAppEntity.setInputBytes(sparkAppEntity.getInputBytes() + stageEntity.getInputBytes());
+ sparkAppEntity.setInputRecords(sparkAppEntity.getInputBytes() + stageEntity.getInputRecords());
+ sparkAppEntity.setOutputBytes(sparkAppEntity.getOutputBytes() + stageEntity.getOutputBytes());
+ sparkAppEntity.setOutputRecords(sparkAppEntity.getOutputBytes() + stageEntity.getOutputRecords());
+ sparkAppEntity.setShuffleReadBytes(sparkAppEntity.getShuffleReadBytes() + stageEntity.getShuffleReadBytes());
+ sparkAppEntity.setShuffleReadRecords(sparkAppEntity.getShuffleReadRecords() + stageEntity.getShuffleReadRecords());
+ sparkAppEntity.setShuffleWriteBytes(sparkAppEntity.getShuffleWriteBytes() + stageEntity.getShuffleWriteBytes());
+ sparkAppEntity.setShuffleWriteRecords(sparkAppEntity.getShuffleWriteRecords() + stageEntity.getShuffleWriteRecords());
+ sparkAppEntity.setExecutorRunTime(sparkAppEntity.getExecutorRunTime() + stageEntity.getExecutorRunTime());
+ sparkAppEntity.setExecutorDeserializeTime(sparkAppEntity.getExecutorDeserializeTime() + stageEntity.getExecutorDeserializeTime());
+ sparkAppEntity.setResultSize(sparkAppEntity.getResultSize() + stageEntity.getResultSize());
+ sparkAppEntity.setJvmGcTime(sparkAppEntity.getJvmGcTime() + stageEntity.getJvmGcTime());
+ sparkAppEntity.setResultSerializationTime(sparkAppEntity.getResultSerializationTime() + stageEntity.getResultSerializationTime());
+ sparkAppEntity.setMemoryBytesSpilled(sparkAppEntity.getMemoryBytesSpilled() + stageEntity.getMemoryBytesSpilled());
+ sparkAppEntity.setDiskBytesSpilled(sparkAppEntity.getDiskBytesSpilled() + stageEntity.getDiskBytesSpilled());
+ sparkAppEntity.setCompleteTasks(sparkAppEntity.getCompleteTasks() + stageEntity.getNumCompletedTasks());
+ }
+ return true;
+ };
+
+ private void fetchTasksFromStage(SparkStageEntity stageEntity, SparkStage stage) {
+ Map<String, SparkTask> tasks = stage.getTasks();
+ for (String key : tasks.keySet()) {
+ SparkTask task = tasks.get(key);
+ SparkTaskEntity taskEntity = new SparkTaskEntity();
+ taskEntity.setTags(new HashMap<>(stageEntity.getTags()));
+ taskEntity.getTags().put(SparkJobTagName.SPARK_TASK_ATTEMPT_ID.toString(), task.getAttempt() + "");
+ taskEntity.getTags().put(SparkJobTagName.SPARK_TASK_INDEX.toString(), task.getIndex() + "");
+ taskEntity.setTaskId(task.getTaskId());
+ taskEntity.setLaunchTime(Utils.dateTimeToLong(task.getLaunchTime()));
+ taskEntity.setHost(task.getHost());
+ taskEntity.setTaskLocality(task.getTaskLocality());
+ taskEntity.setSpeculative(task.isSpeculative());
+ taskEntity.setTimestamp(stageEntity.getTimestamp());
+
+ SparkTaskMetrics taskMetrics = task.getTaskMetrics();
+ taskEntity.setExecutorDeserializeTime(taskMetrics == null ? 0 : taskMetrics.getExecutorDeserializeTime());
+ taskEntity.setExecutorRunTime(taskMetrics == null ? 0 : taskMetrics.getExecutorRunTime());
+ taskEntity.setResultSize(taskMetrics == null ? 0 : taskMetrics.getResultSize());
+ taskEntity.setJvmGcTime(taskMetrics == null ? 0 : taskMetrics.getJvmGcTime());
+ taskEntity.setResultSerializationTime(taskMetrics == null ? 0 : taskMetrics.getResultSerializationTime());
+ taskEntity.setMemoryBytesSpilled(taskMetrics == null ? 0 : taskMetrics.getMemoryBytesSpilled());
+ taskEntity.setDiskBytesSpilled(taskMetrics == null ? 0 : taskMetrics.getDiskBytesSpilled());
+
+ SparkTaskInputMetrics inputMetrics = null;
+ if (taskMetrics != null && taskMetrics.getInputMetrics() != null) {
+ inputMetrics = taskMetrics.getInputMetrics();
+ }
+ taskEntity.setInputBytes(inputMetrics == null ? 0 : inputMetrics.getBytesRead());
+ taskEntity.setInputRecords(inputMetrics == null ? 0 : inputMetrics.getRecordsRead());
+
+ //need to figure outputMetrics
+
+ SparkTaskShuffleReadMetrics shuffleReadMetrics = null;
+ if (taskMetrics != null && taskMetrics.getShuffleReadMetrics() != null) {
+ shuffleReadMetrics = taskMetrics.getShuffleReadMetrics();
+ }
+ taskEntity.setShuffleReadRemoteBytes(shuffleReadMetrics == null ? 0 : shuffleReadMetrics.getRemoteBytesRead());
+ taskEntity.setShuffleReadRecords(shuffleReadMetrics == null ? 0 : shuffleReadMetrics.getRecordsRead());
+
+ SparkTaskShuffleWriteMetrics shuffleWriteMetrics = null;
+ if (taskMetrics != null && taskMetrics.getShuffleWriteMetrics() != null) {
+ shuffleWriteMetrics = taskMetrics.getShuffleWriteMetrics();
+ }
+ taskEntity.setShuffleWriteBytes(shuffleWriteMetrics == null ? 0 : shuffleWriteMetrics.getBytesWritten());
+ taskEntity.setShuffleWriteRecords(shuffleWriteMetrics == null ? 0 : shuffleWriteMetrics.getRecordsWritten());
+
+ stageEntity.setExecutorDeserializeTime(stageEntity.getExecutorDeserializeTime() + taskEntity.getExecutorDeserializeTime());
+ stageEntity.setResultSize(stageEntity.getResultSize() + taskEntity.getResultSize());
+ stageEntity.setJvmGcTime(stageEntity.getJvmGcTime() + taskEntity.getJvmGcTime());
+ stageEntity.setResultSerializationTime(stageEntity.getResultSerializationTime() + taskEntity.getResultSerializationTime());
+
+ this.sparkAppEntityCreationHandler.add(taskEntity);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java
new file mode 100644
index 0000000..2b6c62f
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.spark.running.recover;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.eagle.jpm.spark.running.common.SparkRunningConfigManager;
+import org.apache.eagle.jpm.spark.running.entities.SparkAppEntity;
+import org.apache.eagle.jpm.util.jobrecover.RunningJobManager;
+import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo;
+
+import java.io.Serializable;
+import java.util.*;
+
+public class SparkRunningJobManager implements Serializable {
+ private RunningJobManager runningJobManager;
+
+ public SparkRunningJobManager(SparkRunningConfigManager.ZKStateConfig config) {
+ this.runningJobManager = new RunningJobManager(config.zkQuorum,
+ config.zkSessionTimeoutMs, config.zkRetryTimes, config.zkRetryInterval, config.zkRoot);
+ }
+
+ public Map<String, SparkAppEntity> recoverYarnApp(String appId) throws Exception {
+ Map<String, Pair<Map<String, String>, AppInfo>> result = this.runningJobManager.recoverYarnApp(appId);
+ Map<String, SparkAppEntity> apps = new HashMap<>();
+ for (String jobId : result.keySet()) {
+ Pair<Map<String, String>, AppInfo> job = result.get(jobId);
+ SparkAppEntity sparkAppEntity = new SparkAppEntity();
+ sparkAppEntity.setTags(job.getLeft());
+ sparkAppEntity.setAppInfo(job.getRight());
+ sparkAppEntity.setTimestamp(job.getRight().getStartedTime());
+ apps.put(jobId, sparkAppEntity);
+ }
+ return apps;
+ }
+
+ public Map<String, Map<String, SparkAppEntity>> recover() {
+ //we need read from zookeeper, path looks like /apps/mr/running/yarnAppId/jobId/
+ //<yarnAppId, <jobId, JobExecutionAPIEntity>>
+ Map<String, Map<String, SparkAppEntity>> result = new HashMap<>();
+ Map<String, Map<String, Pair<Map<String, String>, AppInfo>>> apps = this.runningJobManager.recover();
+ for (String appId : apps.keySet()) {
+ result.put(appId, new HashMap<>());
+ Map<String, Pair<Map<String, String>, AppInfo>> jobs = apps.get(appId);
+
+ for (String jobId : jobs.keySet()) {
+ Pair<Map<String, String>, AppInfo> job = jobs.get(jobId);
+ SparkAppEntity sparkAppEntity = new SparkAppEntity();
+ sparkAppEntity.setTags(job.getLeft());
+ sparkAppEntity.setAppInfo(job.getRight());
+ sparkAppEntity.setTimestamp(job.getRight().getStartedTime());
+ result.get(appId).put(jobId, sparkAppEntity);
+ }
+ }
+ return result;
+ }
+
+ public void update(String yarnAppId, String jobId, SparkAppEntity entity) {
+ this.runningJobManager.update(yarnAppId, jobId, entity.getTags(), entity.getAppInfo());
+ }
+
+ public void delete(String yarnAppId, String jobId) {
+ this.runningJobManager.delete(yarnAppId, jobId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java
new file mode 100644
index 0000000..6be0cfd
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.spark.running.storm;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import org.apache.eagle.jpm.spark.running.common.SparkRunningConfigManager;
+import org.apache.eagle.jpm.spark.running.entities.SparkAppEntity;
+import org.apache.eagle.jpm.spark.running.recover.SparkRunningJobManager;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.resourceFetch.RMResourceFetcher;
+import org.apache.eagle.jpm.util.resourceFetch.ResourceFetcher;
+import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class SparkRunningJobFetchSpout extends BaseRichSpout {
+ private static final Logger LOG = LoggerFactory.getLogger(SparkRunningJobFetchSpout.class);
+
+ private SparkRunningConfigManager.ZKStateConfig zkStateConfig;
+ private SparkRunningConfigManager.JobExtractorConfig jobExtractorConfig;
+ private SparkRunningConfigManager.EndpointConfig endpointConfig;
+ private ResourceFetcher resourceFetcher;
+ private SpoutOutputCollector collector;
+ private boolean init;
+ private transient SparkRunningJobManager sparkRunningJobManager;
+ private Set<String> runningYarnApps;
+
+ public SparkRunningJobFetchSpout(SparkRunningConfigManager.JobExtractorConfig jobExtractorConfig,
+ SparkRunningConfigManager.EndpointConfig endpointConfig,
+ SparkRunningConfigManager.ZKStateConfig zkStateConfig) {
+ this.jobExtractorConfig = jobExtractorConfig;
+ this.endpointConfig = endpointConfig;
+ this.zkStateConfig = zkStateConfig;
+ this.init = !(zkStateConfig.recoverEnabled);
+ this.runningYarnApps = new HashSet<>();
+ }
+
+ @Override
+ public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
+ resourceFetcher = new RMResourceFetcher(endpointConfig.rmUrls);
+ collector = spoutOutputCollector;
+ this.sparkRunningJobManager = new SparkRunningJobManager(zkStateConfig);
+ }
+
+ @Override
+ public void nextTuple() {
+ LOG.info("Start to fetch spark running jobs");
+ try {
+ Map<String, Map<String, SparkAppEntity>> sparkApps = null;
+ List<AppInfo> apps;
+ if (!this.init) {
+ sparkApps = recoverRunningApps();
+
+ apps = new ArrayList<>();
+ for (String appId : sparkApps.keySet()) {
+ Map<String, SparkAppEntity> jobs = sparkApps.get(appId);
+ if (jobs.size() > 0) {
+ Set<String> jobIds = jobs.keySet();
+ apps.add(jobs.get(jobIds.iterator().next()).getAppInfo());
+ this.runningYarnApps.add(appId);
+ }
+ }
+ LOG.info("recover {} spark yarn apps from zookeeper", apps.size());
+ this.init = true;
+ } else {
+ apps = resourceFetcher.getResource(Constants.ResourceType.RUNNING_SPARK_JOB);
+ LOG.info("get {} apps from resource manager", apps == null ? 0 : apps.size());
+ Set<String> running = new HashSet<>();
+ Iterator<String> appIdIterator = this.runningYarnApps.iterator();
+ while (appIdIterator.hasNext()) {
+ String appId = appIdIterator.next();
+ boolean hasFinished = true;
+ if (apps != null) {
+ for (AppInfo appInfo : apps) {
+ if (appId.equals(appInfo.getId())) {
+ hasFinished = false;
+ }
+ running.add(appInfo.getId());
+ }
+
+ if (hasFinished) {
+ try {
+ Map<String, SparkAppEntity> result = this.sparkRunningJobManager.recoverYarnApp(appId);
+ if (result.size() > 0) {
+ if (sparkApps == null) {
+ sparkApps = new HashMap<>();
+ }
+ sparkApps.put(appId, result);
+ AppInfo appInfo = result.get(result.keySet().iterator().next()).getAppInfo();
+ appInfo.setState(Constants.AppState.FINISHED.toString());
+ apps.add(appInfo);
+ }
+ } catch (KeeperException.NoNodeException e) {
+ LOG.warn("{}", e);
+ LOG.warn("yarn app {} has finished", appId);
+ }
+ }
+ }
+ }
+
+ this.runningYarnApps = running;
+ LOG.info("get {} total apps(contains finished)", apps == null ? 0 : apps.size());
+ }
+
+ if (apps != null) {
+ for (AppInfo app : apps) {
+ LOG.info("emit spark yarn application " + app.getId());
+ if (sparkApps != null) {
+ //emit (AppInfo, Map<String, SparkAppEntity>)
+ collector.emit(new Values(app.getId(), app, sparkApps.get(app.getId())));
+ } else {
+ collector.emit(new Values(app.getId(), app, null));
+ }
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ try {
+ Thread.sleep(jobExtractorConfig.fetchRunningJobInterval * 1000);
+ } catch (Exception e) {
+ }
+ }
+ }
+
+ private Map<String, Map<String, SparkAppEntity>> recoverRunningApps() {
+ //we need read from zookeeper, path looks like /apps/spark/running/yarnAppId/appId/
+ //content of path /apps/spark/running/yarnAppId/appId is SparkAppEntity(current attempt)
+ //as we know, a yarn application may contains many spark applications
+ //so, the returned results is a Map, key is yarn appId
+ Map<String, Map<String, SparkAppEntity>> result = this.sparkRunningJobManager.recover();
+ return result;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+ outputFieldsDeclarer.declare(new Fields("appId", "appInfo", "sparkAppEntity"));
+ }
+
+ @Override
+ public void fail(Object msgId) {
+
+ }
+
+ @Override
+ public void ack(Object msgId) {
+
+ }
+
+ @Override
+ public void close() {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
new file mode 100644
index 0000000..6928240
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.spark.running.storm;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+import org.apache.eagle.jpm.spark.running.common.SparkRunningConfigManager;
+import org.apache.eagle.jpm.spark.running.entities.SparkAppEntity;
+import org.apache.eagle.jpm.spark.running.parser.SparkApplicationParser;
+import org.apache.eagle.jpm.spark.running.recover.SparkRunningJobManager;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.resourceFetch.RMResourceFetcher;
+import org.apache.eagle.jpm.util.resourceFetch.ResourceFetcher;
+import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+
+public class SparkRunningJobParseBolt extends BaseRichBolt {
+ private static final Logger LOG = LoggerFactory.getLogger(SparkRunningJobParseBolt.class);
+
+ private SparkRunningConfigManager.ZKStateConfig zkStateConfig;
+ private SparkRunningConfigManager.EagleServiceConfig eagleServiceConfig;
+ private SparkRunningConfigManager.EndpointConfig endpointConfig;
+ private SparkRunningConfigManager.JobExtractorConfig jobExtractorConfig;
+ private ExecutorService executorService;
+ private Map<String, SparkApplicationParser> runningSparkParsers;
+ private ResourceFetcher resourceFetcher;
+ public SparkRunningJobParseBolt(SparkRunningConfigManager.ZKStateConfig zkStateConfig,
+ SparkRunningConfigManager.EagleServiceConfig eagleServiceConfig,
+ SparkRunningConfigManager.EndpointConfig endpointConfig,
+ SparkRunningConfigManager.JobExtractorConfig jobExtractorConfig) {
+ this.zkStateConfig = zkStateConfig;
+ this.eagleServiceConfig = eagleServiceConfig;
+ this.endpointConfig = endpointConfig;
+ this.jobExtractorConfig = jobExtractorConfig;
+ this.runningSparkParsers = new HashMap<>();
+ }
+
+ @Override
+ public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
+ this.executorService = Executors.newFixedThreadPool(jobExtractorConfig.parseThreadPoolSize);
+ this.resourceFetcher = new RMResourceFetcher(endpointConfig.rmUrls);
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ AppInfo appInfo = (AppInfo)tuple.getValue(1);
+ Map<String, SparkAppEntity> sparkApp = (Map<String, SparkAppEntity>)tuple.getValue(2);
+
+ LOG.info("get spark yarn application " + appInfo.getId());
+
+ SparkApplicationParser applicationParser;
+ if (!runningSparkParsers.containsKey(appInfo.getId())) {
+ applicationParser = new SparkApplicationParser(eagleServiceConfig, endpointConfig, jobExtractorConfig, appInfo, sparkApp, new SparkRunningJobManager(zkStateConfig), resourceFetcher);
+ runningSparkParsers.put(appInfo.getId(), applicationParser);
+ LOG.info("create application parser for {}", appInfo.getId());
+ } else {
+ applicationParser = runningSparkParsers.get(appInfo.getId());
+ }
+
+ Set<String> runningParserIds = new HashSet<>(runningSparkParsers.keySet());
+ runningParserIds.stream()
+ .filter(appId -> runningSparkParsers.get(appId).status() == SparkApplicationParser.ParserStatus.APP_FINISHED)
+ .forEach(appId -> {
+ runningSparkParsers.remove(appId);
+ LOG.info("remove parser {}", appId);
+ });
+
+ if (appInfo.getState().equals(Constants.AppState.FINISHED.toString()) ||
+ applicationParser.status() == SparkApplicationParser.ParserStatus.FINISHED) {
+ applicationParser.setStatus(SparkApplicationParser.ParserStatus.RUNNING);
+ executorService.execute(applicationParser);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+ }
+
+ @Override
+ public void cleanup() {
+ super.cleanup();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
new file mode 100644
index 0000000..21686a6
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.hadoop.hdfs.DistributedFileSystem
+org.apache.hadoop.hdfs.web.HftpFileSystem
+org.apache.hadoop.hdfs.web.HsftpFileSystem
+org.apache.hadoop.hdfs.web.WebHdfsFileSystem
+org.apache.hadoop.hdfs.web.SWebHdfsFileSystem
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf
new file mode 100644
index 0000000..d93a135
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+ "envContextConfig" : {
+ "env" : "local",
+ "topologyName" : "sparkRunningJob",
+ "stormConfigFile" : "storm.yaml",
+ "parallelismConfig" : {
+ "sparkRunningJobFetchSpout" : 1,
+ "sparkRunningJobParseBolt" : 4
+ },
+ "tasks" : {
+ "sparkRunningJobFetchSpout" : 1,
+ "sparkRunningJobParseBolt" : 4
+ },
+ "workers" : 2
+ },
+
+ "jobExtractorConfig" : {
+ "site" : "sandbox",
+ "fetchRunningJobInterval" : 15,
+ "parseThreadPoolSize" : 5
+ },
+
+ "dataSourceConfig" : {
+ "rmUrls": ["http://sandbox.hortonworks.com:8088"],
+ "nnEndpoint" : "hdfs://sandbox.hortonworks.com:8020",
+ "principal" : "", #if not need, then empty
+ "keytab" : "",
+ "eventLog" : "/spark-history"
+ },
+
+ "zookeeperConfig" : {
+ "zkQuorum" : "sandbox.hortonworks.com:2181",
+ "zkPort" : "2181",
+ "zkRoot" : "/apps/spark/running",
+ "recoverEnabled" : false,
+ "zkSessionTimeoutMs" : 15000,
+ "zkRetryTimes" : 3,
+ "zkRetryInterval" : 20000
+ },
+
+ "eagleProps" : {
+ "mailHost" : "abc.com",
+ "mailDebug" : "true",
+ eagleService.host:"sandbox.hortonworks.com",
+ eagleService.port: 9099,
+ eagleService.username: "admin",
+ eagleService.password : "secret",
+ eagleService.readTimeOutSeconds : 20,
+ eagleService.maxFlushNum : 500
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/resources/log4j.properties b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/log4j.properties
new file mode 100644
index 0000000..6b8c8d6
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/log4j.properties
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout, DRFA
+
+eagle.log.dir=../logs
+eagle.log.file=eagle.log
+
+# standard output
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
+
+# Daily Rolling File Appender
+ log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+ log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file}
+ log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+## 30-day backup
+# log4j.appender.DRFA.MaxBackupIndex=30
+ log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
index 0792f15..a633fd4 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
@@ -17,14 +17,23 @@
package org.apache.eagle.jpm.util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class Constants {
+ private final static Logger LOG = LoggerFactory.getLogger(Constants.class);
+ //SPARK
public final static String SPARK_APP_SERVICE_ENDPOINT_NAME = "SparkAppService";
public final static String SPARK_JOB_SERVICE_ENDPOINT_NAME = "SparkJobService";
public final static String SPARK_STAGE_SERVICE_ENDPOINT_NAME = "SparkStageService";
public final static String SPARK_TASK_SERVICE_ENDPOINT_NAME = "SparkTaskService";
public final static String SPARK_EXECUTOR_SERVICE_ENDPOINT_NAME = "SparkExecutorService";
-
+ public final static String RUNNING_SPARK_APP_SERVICE_ENDPOINT_NAME = "RunningSparkAppService";
+ public final static String RUNNING_SPARK_JOB_SERVICE_ENDPOINT_NAME = "RunningSparkJobService";
+ public final static String RUNNING_SPARK_STAGE_SERVICE_ENDPOINT_NAME = "RunningSparkStageService";
+ public final static String RUNNING_SPARK_TASK_SERVICE_ENDPOINT_NAME = "RunningSparkTaskService";
+ public final static String RUNNING_SPARK_EXECUTOR_SERVICE_ENDPOINT_NAME = "RunningSparkExecutorService";
public static final String APPLICATION_PREFIX = "application";
public static final String JOB_PREFIX = "job";
public static final String V2_APPS_URL = "ws/v1/cluster/apps";
@@ -33,17 +42,123 @@ public class Constants {
public static final String V2_APPS_RUNNING_URL = "ws/v1/cluster/apps?state=RUNNING";
public static final String V2_APPS_COMPLETED_URL = "ws/v1/cluster/apps?state=FINISHED";
+ public static final String SPARK_MASTER_KEY = "spark.master";
+ public static final String SPARK_EXECUTOR_MEMORY_KEY = "spark.executor.memory";
+ public static final String SPARK_DRIVER_MEMORY_KEY = "spark.driver.memory";
+ public static final String SPARK_YARN_AM_MEMORY_KEY = "spark.yarn.am.memory";
+ public static final String SPARK_EXECUTOR_CORES_KEY = "spark.executor.cores";
+ public static final String SPARK_DRIVER_CORES_KEY = "spark.driver.cores";
+ public static final String SPARK_YARN_AM_CORES_KEY = "spark.yarn.am.cores";
+ public static final String SPARK_YARN_EXECUTOR_MEMORY_OVERHEAD_KEY = "spark.yarn.executor.memoryOverhead";
+ public static final String SPARK_YARN_DRIVER_MEMORY_OVERHEAD_KEY = "spark.yarn.driver.memoryOverhead";
+ public static final String SPARK_YARN_am_MEMORY_OVERHEAD_KEY = "spark.yarn.am.memoryOverhead";
+
public static final String SPARK_APPS_URL ="api/v1/applications";
+ public static final String SPARK_EXECUTORS_URL = "executors";
+ public static final String SPARK_JOBS_URL = "jobs";
+ public static final String SPARK_STAGES_URL = "stages";
+ public static final String MR_JOBS_URL = "ws/v1/mapreduce/jobs";
+ public static final String MR_JOB_COUNTERS_URL = "counters";
+ public static final String MR_TASKS_URL = "tasks";
+ public static final String MR_TASK_ATTEMPTS_URL = "attempts";
+ public static final String MR_CONF_URL = "conf";
+
+ public static final String YARN_API_CLUSTER_INFO = "ws/v1/cluster/info";
public enum CompressionType {
GZIP, NONE
}
public enum JobState {
- RUNNING, COMPLETED, ALL
+ NEW, INITED, RUNNING, SUCCEEDED, FAILED, KILL_WAIT, KILLED, ERROR, FINISHED, ALL
+ }
+ public enum TaskState {
+ NEW, SCHEDULED, RUNNING, SUCCEEDED, FAILED, KILL_WAIT, KILLED
+ }
+ public enum StageState {
+ ACTIVE, COMPLETE, PENDING
+ }
+ public enum AppState {
+ NEW, NEW_SAVING, SUBMITTED, ACCEPTED, RUNNING, FINISHED, FAILED, KILLED
+ }
+ public enum AppStatus {
+ UNDEFINED, SUCCEEDED, FAILED, KILLED
}
-
public enum ResourceType {
- COMPLETE_SPARK_JOB, SPARK_JOB_DETAIL
+ COMPLETE_SPARK_JOB, SPARK_JOB_DETAIL, RUNNING_SPARK_JOB, RUNNING_MR_JOB, CLUSTER_INFO
+ }
+
+ //MR
+ public static final String JPA_JOB_CONFIG_SERVICE_NAME = "JobConfigService";
+ public static final String JPA_JOB_EVENT_SERVICE_NAME = "JobEventService";
+ public static final String JPA_JOB_EXECUTION_SERVICE_NAME = "JobExecutionService";
+ public static final String JPA_RUNNING_JOB_EXECUTION_SERVICE_NAME = "RunningJobExecutionService";
+ public static final String JPA_TASK_ATTEMPT_EXECUTION_SERVICE_NAME = "TaskAttemptExecutionService";
+ public static final String JPA_TASK_FAILURE_COUNT_SERVICE_NAME = "TaskFailureCountService";
+ public static final String JPA_TASK_ATTEMPT_COUNTER_SERVICE_NAME = "TaskAttemptCounterService";
+ public static final String JPA_TASK_EXECUTION_SERVICE_NAME = "TaskExecutionService";
+ public static final String JPA_RUNNING_TASK_EXECUTION_SERVICE_NAME = "RunningTaskExecutionService";
+ public static final String JPA_RUNNING_TASK_ATTEMPT_EXECUTION_SERVICE_NAME = "RunningTaskAttemptExecutionService";
+ public static final String JPA_JOB_PROCESS_TIME_STAMP_NAME = "JobProcessTimeStampService";
+
+ public static final String JOB_TASK_TYPE_TAG = "taskType";
+
+ public static class JobConfiguration {
+ // job type
+ public static final String SCOOBI_JOB = "scoobi.mode";
+ public static final String HIVE_JOB = "hive.query.string";
+ public static final String PIG_JOB = "pig.script";
+ public static final String CASCADING_JOB = "cascading.app.name";
}
+ /**
+ * MR task types
+ */
+ public enum TaskType {
+ SETUP, MAP, REDUCE, CLEANUP
+ }
+
+ public enum JobType {
+ CASCADING("CASCADING"),HIVE("HIVE"),PIG("PIG"),SCOOBI("SCOOBI"),
+ NOTAVALIABLE("N/A")
+ ;
+ private String value;
+ JobType(String value){
+ this.value = value;
+ }
+ @Override
+ public String toString() {
+ return this.value;
+ }
+ }
+
+ public static final String FILE_SYSTEM_COUNTER = "org.apache.hadoop.mapreduce.FileSystemCounter";
+ public static final String TASK_COUNTER = "org.apache.hadoop.mapreduce.TaskCounter";
+ public static final String JOB_COUNTER = "org.apache.hadoop.mapreduce.JobCounter";
+
+ public static final String MAP_TASK_ATTEMPT_COUNTER = "MapTaskAttemptCounter";
+ public static final String REDUCE_TASK_ATTEMPT_COUNTER = "ReduceTaskAttemptCounter";
+
+ public static final String MAP_TASK_ATTEMPT_FILE_SYSTEM_COUNTER = "MapTaskAttemptFileSystemCounter";
+ public static final String REDUCE_TASK_ATTEMPT_FILE_SYSTEM_COUNTER = "ReduceTaskAttemptFileSystemCounter";
+
+ public enum TaskAttemptCounter {
+ TASK_ATTEMPT_DURATION,
+ }
+
+ public enum JobCounter {
+ DATA_LOCAL_MAPS,
+ RACK_LOCAL_MAPS,
+ TOTAL_LAUNCHED_MAPS
+ }
+
+ public static final String metricFormat = "%s.%s";
+ public static final String ALLOCATED_MB = "allocatedmb";
+ public static final String ALLOCATED_VCORES = "allocatedvcores";
+ public static final String RUNNING_CONTAINERS = "runningcontainers";
+ public static final String TASK_EXECUTION_TIME = "taskduration";
+ public static final String JOB_LEVEL = "job";
+ public static final String TASK_LEVEL = "task";
+
+ public static final String JOB_DEFINITION_ID_KEY = "jobDefId";
+
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java
index 8adb001..325a92a 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java
@@ -29,7 +29,7 @@ public class HDFSUtil {
public static FileSystem getFileSystem(Configuration conf) throws IOException {
HDFSUtil.login(conf);
- return FileSystem.get(conf);
+ return FileSystem.get(conf);
}
public static void login(Configuration kConfig) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/MRJobTagName.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/MRJobTagName.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/MRJobTagName.java
new file mode 100644
index 0000000..ea8e4f4
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/MRJobTagName.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.util;
+
+public enum MRJobTagName {
+ SITE("site"),
+ RACK("rack"),
+ HOSTNAME("hostname"),
+ JOB_NAME("jobName"),
+ JOD_DEF_ID("jobDefId"),
+ JOB_ID("jobId"),
+ TASK_ID("taskId"),
+ TASK_ATTEMPT_ID("taskAttemptId"),
+ JOB_STATUS("jobStatus"),
+ USER("user"),
+ TASK_TYPE("taskType"),
+ TASK_EXEC_TYPE("taskExecType"),
+ ERROR_CATEGORY("errorCategory"),
+ JOB_QUEUE("queue"),
+ RULE_TYPE("ruleType"),
+ JOB_TYPE("jobType");
+
+ private String tagName;
+ private MRJobTagName(String tagName) {
+ this.tagName = tagName;
+ }
+
+ public String toString() {
+
+ return this.tagName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
new file mode 100644
index 0000000..7a613eb
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.util;
+
+import jline.internal.Log;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+public class Utils {
+ private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+
+ public static void closeInputStream(InputStream is) {
+ if (is != null) {
+ try {
+ is.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public static void sleep(long seconds) {
+ try {
+ Thread.sleep(seconds * 1000);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public static long dateTimeToLong(String date) {
+ // date is like: 2016-07-29T19:35:40.715GMT
+ long timestamp = 0L;
+ try {
+ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss.SSSzzz");
+ Date parsedDate = dateFormat.parse(date);
+ timestamp = parsedDate.getTime();
+ } catch(ParseException e) {
+ e.printStackTrace();
+ }
+
+ if (timestamp == 0L) {
+ LOG.error("Not able to parse date: " + date);
+ }
+
+ return timestamp;
+ }
+
+ public static long parseMemory(String memory) {
+ if (memory.endsWith("g") || memory.endsWith("G")) {
+ int executorGB = Integer.parseInt(memory.substring(0, memory.length() - 1));
+ return 1024l * 1024 * 1024 * executorGB;
+ } else if (memory.endsWith("m") || memory.endsWith("M")) {
+ int executorMB = Integer.parseInt(memory.substring(0, memory.length() - 1));
+ return 1024l * 1024 * executorMB;
+ } else if (memory.endsWith("k") || memory.endsWith("K")) {
+ int executorKB = Integer.parseInt(memory.substring(0, memory.length() - 1));
+ return 1024l * executorKB;
+ } else if (memory.endsWith("t") || memory.endsWith("T")) {
+ int executorTB = Integer.parseInt(memory.substring(0, memory.length() - 1));
+ return 1024l * 1024 * 1024 * 1024 * executorTB;
+ } else if (memory.endsWith("p") || memory.endsWith("P")) {
+ int executorPB = Integer.parseInt(memory.substring(0, memory.length() - 1));
+ return 1024l * 1024 * 1024 * 1024 * 1024 * executorPB;
+ }
+ Log.info("Cannot parse memory info " + memory);
+ return 0l;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterGroupDictionary.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterGroupDictionary.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterGroupDictionary.java
new file mode 100644
index 0000000..c8572e9
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterGroupDictionary.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.util.jobcounter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * MR Job counter dictionary. It's singlton class that will try to read JobCounter.conf file and configure
+ * counters.
+ *
+ */
+public final class CounterGroupDictionary {
+
+ private final List<CounterGroupKey> groupKeys = new ArrayList<>();
+
+ private static volatile CounterGroupDictionary instance = null;
+ private static final Logger LOG = LoggerFactory.getLogger(CounterGroupDictionary.class);
+
+ private CounterGroupDictionary() {}
+
+ public static CounterGroupDictionary getInstance() throws JobCounterException {
+ if (instance == null) {
+ synchronized (CounterGroupDictionary.class) {
+ if (instance == null) {
+ CounterGroupDictionary tmp = new CounterGroupDictionary();
+ tmp.initialize();
+ instance = tmp;
+ }
+ }
+ }
+ return instance;
+ }
+
+ public CounterGroupKey getCounterGroupByName(String groupName) {
+ for (CounterGroupKey groupKey : groupKeys) {
+ if (groupKey.getName().equalsIgnoreCase(groupName)) {
+ return groupKey;
+ }
+ }
+ return null;
+ }
+
+ public CounterGroupKey getCounterGroupByIndex(int groupIndex) {
+ if (groupIndex < 0 || groupIndex >= groupKeys.size()) {
+ return null;
+ }
+ return groupKeys.get(groupIndex);
+ }
+
+ private void initialize() throws JobCounterException {
+ // load config.properties file from classpath
+ InputStream is = this.getClass().getClassLoader().getResourceAsStream("/JobCounter.conf");
+ try {
+ if (is == null) {
+ is = this.getClass().getClassLoader().getResourceAsStream("JobCounter.conf");
+ if (is == null) {
+ final String errMsg = "Failed to load JobCounter.conf";
+ LOG.error(errMsg);
+ throw new JobCounterException(errMsg);
+ }
+ }
+ final Properties prop = new Properties();
+ try {
+ prop.load(is);
+ } catch(Exception ex) {
+ final String errMsg = "Failed to load JobCounter.conf, reason: " + ex.getMessage();
+ LOG.error(errMsg, ex);
+ throw new JobCounterException(errMsg, ex);
+ }
+ int groupIndex = 0;
+ while (parseGroup(groupIndex, prop)) {
+ ++groupIndex;
+ }
+ } finally {
+ if (is != null) {
+ try {
+ is.close();
+ } catch (IOException e) {
+ }
+ }
+ }
+ }
+
+ private boolean parseGroup(int groupIndex, Properties prop) {
+ final String groupKeyBase = "counter.group" + groupIndex;
+ final String groupNameKey = groupKeyBase + ".name";
+ final String groupName = prop.getProperty(groupNameKey);
+
+ if (groupName == null) {
+ return false;
+ }
+
+ final String groupDescriptionKey = groupKeyBase + ".description";
+ final String groupDescription = prop.getProperty(groupDescriptionKey);
+ final CounterGroupKeyImpl groupKey = new CounterGroupKeyImpl(groupIndex, groupName, groupDescription);
+ final ArrayList<CounterKey> counters = new ArrayList<CounterKey>();
+
+ int counterIndex = 0;
+ while (parseCounter(groupKey, counterIndex, counters, prop)) {
+ ++counterIndex;
+ }
+ groupKey.setCounterKeys(counters.toArray(new CounterKey[counters.size()]));
+ groupKeys.add(groupKey);
+ return true;
+ }
+
+ private boolean parseCounter(CounterGroupKey groupKey, int counterIndex, List<CounterKey> counters, Properties prop) {
+ final String counterKeyBase = "counter.group" + groupKey.getIndex() + ".counter" + counterIndex;
+ final String counterNameKey = counterKeyBase + ".names";
+ final String counterNamesString = prop.getProperty(counterNameKey);
+
+ if (counterNamesString == null) {
+ return false;
+ }
+ final String[] names = counterNamesString.split(",");
+ final List<String> counterNames = new ArrayList<String>();
+ for (String name : names) {
+ counterNames.add(name.trim());
+ }
+
+ final String counterDescriptionKey = counterKeyBase + ".description";
+ final String counterDescription = prop.getProperty(counterDescriptionKey);
+
+ CounterKey counter = new CounterKeyImpl(counterIndex, counterNames, counterDescription, groupKey);
+ counters.add(counter);
+ return true;
+ }
+
+ private static class CounterKeyImpl implements CounterKey {
+ private final int index;
+ private final List<String> counterNames;
+ private final String description;
+ private final CounterGroupKey groupKey;
+
+ public CounterKeyImpl(int index, List<String> counterNames, String description, CounterGroupKey groupKey) {
+ this.index = index;
+ this.counterNames = counterNames;
+ this.description = description;
+ this.groupKey = groupKey;
+ }
+ @Override
+ public int getIndex() {
+ return index;
+ }
+ @Override
+ public List<String> getNames() {
+ return counterNames;
+ }
+ @Override
+ public String getDescription() {
+ return description;
+ }
+ @Override
+ public CounterGroupKey getGroupKey() {
+ return groupKey;
+ }
+ }
+
+ private static class CounterGroupKeyImpl implements CounterGroupKey {
+ private final int index;
+ private final String name;
+ private final String description;
+ private CounterKey[] counterKeys;
+
+ public CounterGroupKeyImpl(int index, String name, String description) {
+ this.index = index;
+ this.name = name;
+ this.description = description;
+ }
+
+ public void setCounterKeys(CounterKey[] counterKeys) {
+ this.counterKeys = counterKeys;
+ }
+
+ @Override
+ public int getIndex() {
+ return index;
+ }
+ @Override
+ public String getName() {
+ return name;
+ }
+ @Override
+ public String getDescription() {
+ return description;
+ }
+ @Override
+ public int getCounterNumber() {
+ return counterKeys.length;
+ }
+ @Override
+ public List<CounterKey> listCounterKeys() {
+ return Arrays.asList(counterKeys);
+ }
+ @Override
+ public CounterKey getCounterKeyByName(String name) {
+ for (CounterKey counterKey : counterKeys) {
+ for (String n : counterKey.getNames()) {
+ if (n.equalsIgnoreCase(name)) {
+ return counterKey;
+ }
+ }
+ }
+ return null;
+ }
+ @Override
+ public CounterKey getCounterKeyByID(int index) {
+ if (index < 0 || index >= counterKeys.length) {
+ return null;
+ }
+ return counterKeys[index];
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterGroupKey.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterGroupKey.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterGroupKey.java
new file mode 100644
index 0000000..482623a
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterGroupKey.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.util.jobcounter;
+
+import java.util.List;
+
+public interface CounterGroupKey {
+
+ String getName();
+ String getDescription();
+ int getIndex();
+ int getCounterNumber();
+ List<CounterKey> listCounterKeys();
+ CounterKey getCounterKeyByName(String name);
+ CounterKey getCounterKeyByID(int index);
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterKey.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterKey.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterKey.java
new file mode 100644
index 0000000..8e4e519
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterKey.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.util.jobcounter;
+
+import java.util.List;
+
+public interface CounterKey {
+
+ List<String> getNames();
+ String getDescription();
+ int getIndex();
+ CounterGroupKey getGroupKey();
+
+}