You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/09/07 17:42:36 UTC
[40/52] [abbrv] incubator-eagle git commit: Update spark history job
feeder config & refactor the code
Update spark history job feeder config & refactor the code
Author: Qingwen Zhao <qi...@gmail.com>
Closes #416 from qingwen220/sparkHist.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/3110c72e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/3110c72e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/3110c72e
Branch: refs/heads/master
Commit: 3110c72e47f697f5c59b5f7d6559d527cf25db3a
Parents: 8774b85
Author: Qingwen Zhao <qi...@gmail.com>
Authored: Wed Sep 7 10:36:24 2016 +0800
Committer: Qingwen Zhao <qi...@gmail.com>
Committed: Wed Sep 7 10:36:24 2016 +0800
----------------------------------------------------------------------
.../environment/impl/StormExecutionRuntime.java | 32 +-
.../apache/eagle/jpm/spark/crawl/EventType.java | 24 -
.../jpm/spark/crawl/JHFInputStreamReader.java | 24 -
.../eagle/jpm/spark/crawl/JHFParserBase.java | 29 -
.../jpm/spark/crawl/JHFSparkEventReader.java | 713 -------------------
.../eagle/jpm/spark/crawl/JHFSparkParser.java | 73 --
.../jpm/spark/crawl/SparkApplicationInfo.java | 69 --
.../SparkFilesystemInputStreamReaderImpl.java | 53 --
.../running/entities/JPMEntityRepository.java | 33 +
.../jpm/spark/running/entities/JobConfig.java | 26 +
.../spark/running/entities/SparkAppEntity.java | 476 +++++++++++++
.../running/entities/SparkExecutorEntity.java | 233 ++++++
.../spark/running/entities/SparkJobEntity.java | 191 +++++
.../running/entities/SparkStageEntity.java | 299 ++++++++
.../spark/running/entities/SparkTaskEntity.java | 290 ++++++++
.../spark/history/SparkHistoryJobAppConfig.java | 4 -
.../history/crawl/JHFInputStreamReader.java | 24 +
.../jpm/spark/history/crawl/JHFParserBase.java | 29 +
.../history/crawl/JHFSparkEventReader.java | 713 +++++++++++++++++++
.../jpm/spark/history/crawl/JHFSparkParser.java | 73 ++
.../history/crawl/SparkApplicationInfo.java | 69 ++
.../SparkFilesystemInputStreamReaderImpl.java | 53 ++
.../status/JobHistoryZKStateManager.java | 7 +-
.../history/storm/SparkHistoryJobParseBolt.java | 13 +-
.../history/storm/SparkHistoryJobSpout.java | 5 +-
...spark.history.SparkHistoryJobAppProvider.xml | 18 -
.../src/main/resources/application.conf | 9 +-
.../running/entities/JPMEntityRepository.java | 30 -
.../jpm/spark/running/entities/JobConfig.java | 25 -
.../spark/running/entities/SparkAppEntity.java | 475 ------------
.../running/entities/SparkExecutorEntity.java | 232 ------
.../spark/running/entities/SparkJobEntity.java | 190 -----
.../running/entities/SparkStageEntity.java | 298 --------
.../spark/running/entities/SparkTaskEntity.java | 289 --------
.../running/parser/SparkApplicationParser.java | 8 +-
.../src/main/resources/application.conf | 6 +-
.../apache/eagle/jpm/util/SparkEventType.java | 25 +
.../util/resourcefetch/RMResourceFetcher.java | 2 +-
38 files changed, 2575 insertions(+), 2587 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
index 04cc19b..e37e8f2 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
@@ -34,13 +34,13 @@ import scala.Int;
import storm.trident.spout.RichSpoutBatchExecutor;
public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,StormTopology> {
- private final static Logger LOG = LoggerFactory.getLogger(StormExecutionRuntime.class);
+ private static final Logger LOG = LoggerFactory.getLogger(StormExecutionRuntime.class);
private static LocalCluster _localCluster;
private StormEnvironment environment;
- private static LocalCluster getLocalCluster(){
- if(_localCluster == null){
+ private static LocalCluster getLocalCluster() {
+ if (_localCluster == null) {
_localCluster = new LocalCluster();
}
return _localCluster;
@@ -56,13 +56,13 @@ public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,
return this.environment;
}
- private final static String STORM_NIMBUS_HOST_CONF_PATH = "application.storm.nimbusHost";
- private final static String STORM_NIMBUS_HOST_DEFAULT = "localhost";
- private final static Integer STORM_NIMBUS_THRIFT_DEFAULT = 6627;
- private final static String STORM_NIMBUS_THRIFT_CONF_PATH = "application.storm.nimbusThriftPort";
+ private static final String STORM_NIMBUS_HOST_CONF_PATH = "application.storm.nimbusHost";
+ private static final String STORM_NIMBUS_HOST_DEFAULT = "localhost";
+ private static final Integer STORM_NIMBUS_THRIFT_DEFAULT = 6627;
+ private static final String STORM_NIMBUS_THRIFT_CONF_PATH = "application.storm.nimbusThriftPort";
private static final String WORKERS = "workers";
- public backtype.storm.Config getStormConfig(){
+ public backtype.storm.Config getStormConfig() {
backtype.storm.Config conf = new backtype.storm.Config();
conf.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, Int.box(64 * 1024));
conf.put(backtype.storm.Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, Int.box(8));
@@ -71,14 +71,14 @@ public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,
conf.put(backtype.storm.Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, Int.box(16384));
conf.put(backtype.storm.Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE, Int.box(20480000));
String nimbusHost = STORM_NIMBUS_HOST_DEFAULT;
- if(environment.config().hasPath(STORM_NIMBUS_HOST_CONF_PATH)) {
+ if (environment.config().hasPath(STORM_NIMBUS_HOST_CONF_PATH)) {
nimbusHost = environment.config().getString(STORM_NIMBUS_HOST_CONF_PATH);
LOG.info("Overriding {} = {}",STORM_NIMBUS_HOST_CONF_PATH,nimbusHost);
} else {
LOG.info("Using default {} = {}",STORM_NIMBUS_HOST_CONF_PATH,STORM_NIMBUS_HOST_DEFAULT);
}
Integer nimbusThriftPort = STORM_NIMBUS_THRIFT_DEFAULT;
- if(environment.config().hasPath(STORM_NIMBUS_THRIFT_CONF_PATH)) {
+ if (environment.config().hasPath(STORM_NIMBUS_THRIFT_CONF_PATH)) {
nimbusThriftPort = environment.config().getInt(STORM_NIMBUS_THRIFT_CONF_PATH);
LOG.info("Overriding {} = {}",STORM_NIMBUS_THRIFT_CONF_PATH,nimbusThriftPort);
} else {
@@ -94,15 +94,15 @@ public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,
}
@Override
- public void start(Application<StormEnvironment, StormTopology> executor, com.typesafe.config.Config config){
+ public void start(Application<StormEnvironment, StormTopology> executor, com.typesafe.config.Config config) {
String topologyName = config.getString("appId");
- Preconditions.checkNotNull(topologyName,"[appId] is required by null for "+executor.getClass().getCanonicalName());
+ Preconditions.checkNotNull(topologyName,"[appId] is required by null for " + executor.getClass().getCanonicalName());
StormTopology topology = executor.execute(config, environment);
- LOG.info("Starting {} ({}), mode: {}",topologyName,executor.getClass().getCanonicalName(), config.getString("mode"));
+ LOG.info("Starting {} ({}), mode: {}",topologyName, executor.getClass().getCanonicalName(), config.getString("mode"));
Config conf = getStormConfig();
- if(ApplicationEntity.Mode.CLUSTER.name().equalsIgnoreCase(config.getString("mode"))){
+ if (ApplicationEntity.Mode.CLUSTER.name().equalsIgnoreCase(config.getString("mode"))) {
String jarFile = config.hasPath("jarPath") ? config.getString("jarPath") : null;
- if(jarFile == null){
+ if (jarFile == null) {
jarFile = DynamicJarPathFinder.findPath(executor.getClass());
}
synchronized (StormExecutionRuntime.class) {
@@ -129,7 +129,7 @@ public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,
public void stop(Application<StormEnvironment, StormTopology> executor, com.typesafe.config.Config config) {
String appId = config.getString("appId");
LOG.info("Stopping topology {} ..." + appId);
- if(config.getString("mode") == ApplicationEntity.Mode.CLUSTER.name()){
+ if (config.getString("mode") == ApplicationEntity.Mode.CLUSTER.name()) {
Nimbus.Client stormClient = NimbusClient.getConfiguredClient(getStormConfig()).getClient();
try {
stormClient.killTopology(appId);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/EventType.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/EventType.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/EventType.java
deleted file mode 100644
index 1ba15b7..0000000
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/EventType.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.crawl;
-
-public enum EventType {
- SparkListenerBlockManagerAdded, SparkListenerEnvironmentUpdate, SparkListenerApplicationStart,
- SparkListenerExecutorAdded, SparkListenerJobStart,SparkListenerStageSubmitted, SparkListenerTaskStart,SparkListenerBlockManagerRemoved,
- SparkListenerTaskEnd, SparkListenerStageCompleted, SparkListenerJobEnd, SparkListenerApplicationEnd,SparkListenerExecutorRemoved
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFInputStreamReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFInputStreamReader.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFInputStreamReader.java
deleted file mode 100644
index 8a8d0db..0000000
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFInputStreamReader.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.crawl;
-
-import java.io.InputStream;
-
-public interface JHFInputStreamReader {
- void read(InputStream is) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFParserBase.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFParserBase.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFParserBase.java
deleted file mode 100644
index 62ba7d9..0000000
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFParserBase.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.crawl;
-
-import java.io.InputStream;
-
-public interface JHFParserBase {
- /**
- * this method will ensure to close the inputStream.
- * @param is
- * @throws Exception
- */
- void parse(InputStream is) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
deleted file mode 100644
index 22b715a..0000000
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
+++ /dev/null
@@ -1,713 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.crawl;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.eagle.jpm.spark.entity.*;
-import org.apache.eagle.jpm.util.*;
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.service.client.EagleServiceClientException;
-import org.apache.eagle.service.client.impl.EagleServiceBaseClient;
-import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
-import org.json.simple.JSONArray;
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-
-public class JHFSparkEventReader {
- private static final Logger LOG = LoggerFactory.getLogger(JHFSparkEventReader.class);
-
- private static final int FLUSH_LIMIT = 500;
- private long firstTaskLaunchTime;
- private long lastEventTime;
-
- private Map<String, SparkExecutor> executors;
- private SparkApp app;
- private Map<Integer, SparkJob> jobs;
- private Map<String, SparkStage> stages;
- private Map<Integer, Set<String>> jobStageMap;
- private Map<Long, SparkTask> tasks;
- private EagleServiceClientImpl client;
- private Map<String, Map<Integer, Boolean>> stageTaskStatusMap;
-
- private List<TaggedLogAPIEntity> createEntities;
-
- private Config conf;
-
- public JHFSparkEventReader(Map<String, String> baseTags, SparkApplicationInfo info) {
- app = new SparkApp();
- app.setTags(new HashMap<String, String>(baseTags));
- app.setYarnState(info.getState());
- app.setYarnStatus(info.getFinalStatus());
- createEntities = new ArrayList<>();
- jobs = new HashMap<Integer, SparkJob>();
- stages = new HashMap<String, SparkStage>();
- jobStageMap = new HashMap<Integer, Set<String>>();
- tasks = new HashMap<Long, SparkTask>();
- executors = new HashMap<String, SparkExecutor>();
- stageTaskStatusMap = new HashMap<>();
- conf = ConfigFactory.load();
- this.initiateClient();
- }
-
- public SparkApp getApp() {
- return this.app;
- }
-
- public void read(JSONObject eventObj) {
- String eventType = (String) eventObj.get("Event");
- if (eventType.equalsIgnoreCase(EventType.SparkListenerApplicationStart.toString())) {
- handleAppStarted(eventObj);
- } else if (eventType.equalsIgnoreCase(EventType.SparkListenerEnvironmentUpdate.toString())) {
- handleEnvironmentSet(eventObj);
- } else if (eventType.equalsIgnoreCase(EventType.SparkListenerExecutorAdded.toString())) {
- handleExecutorAdd(eventObj);
- } else if (eventType.equalsIgnoreCase(EventType.SparkListenerBlockManagerAdded.toString())) {
- handleBlockManagerAdd(eventObj);
- } else if (eventType.equalsIgnoreCase(EventType.SparkListenerJobStart.toString())) {
- handleJobStart(eventObj);
- } else if (eventType.equalsIgnoreCase(EventType.SparkListenerStageSubmitted.toString())) {
- handleStageSubmit(eventObj);
- } else if (eventType.equalsIgnoreCase(EventType.SparkListenerTaskStart.toString())) {
- handleTaskStart(eventObj);
- } else if (eventType.equalsIgnoreCase(EventType.SparkListenerTaskEnd.toString())) {
- handleTaskEnd(eventObj);
- } else if (eventType.equalsIgnoreCase(EventType.SparkListenerStageCompleted.toString())) {
- handleStageComplete(eventObj);
- } else if (eventType.equalsIgnoreCase(EventType.SparkListenerJobEnd.toString())) {
- handleJobEnd(eventObj);
- } else if (eventType.equalsIgnoreCase(EventType.SparkListenerExecutorRemoved.toString())) {
- handleExecutorRemoved(eventObj);
- } else if (eventType.equalsIgnoreCase(EventType.SparkListenerApplicationEnd.toString())) {
- handleAppEnd(eventObj);
- } else if (eventType.equalsIgnoreCase(EventType.SparkListenerBlockManagerRemoved.toString())) {
- //nothing to do now
- } else {
- LOG.info("Not registered event type:" + eventType);
- }
-
- }
-
- private void handleEnvironmentSet(JSONObject event) {
- app.setConfig(new JobConfig());
- JSONObject sparkProps = (JSONObject) event.get("Spark Properties");
-
- String[] additionalJobConf = conf.getString("basic.jobConf.additional.info").split(",\\s*");
- String[] props = {"spark.yarn.app.id", "spark.executor.memory", "spark.driver.host", "spark.driver.port",
- "spark.driver.memory", "spark.scheduler.pool", "spark.executor.cores", "spark.yarn.am.memory",
- "spark.yarn.am.cores", "spark.yarn.executor.memoryOverhead", "spark.yarn.driver.memoryOverhead", "spark.yarn.am.memoryOverhead", "spark.master"};
- String[] jobConf = (String[])ArrayUtils.addAll(additionalJobConf, props);
- for (String prop : jobConf) {
- if (sparkProps.containsKey(prop)) {
- app.getConfig().getConfig().put(prop, (String) sparkProps.get(prop));
- }
- }
- }
-
- private Object getConfigVal(JobConfig config, String configName, String type) {
- if (config.getConfig().containsKey(configName)) {
- Object val = config.getConfig().get(configName);
- if (type.equalsIgnoreCase(Integer.class.getName())) {
- return Integer.parseInt((String) val);
- } else {
- return val;
- }
- } else {
- if (type.equalsIgnoreCase(Integer.class.getName())) {
- return conf.getInt("spark.defaultVal." + configName);
- } else {
- return conf.getString("spark.defaultVal." + configName);
- }
- }
- }
-
- private boolean isClientMode(JobConfig config) {
- return config.getConfig().get("spark.master").equalsIgnoreCase("yarn-client");
- }
-
- private void handleAppStarted(JSONObject event) {
- //need update all entities tag before app start
- List<TaggedLogAPIEntity> entities = new ArrayList<TaggedLogAPIEntity>();
- entities.addAll(this.executors.values());
- entities.add(this.app);
-
- long appStartTime = JSONUtils.getLong(event, "Timestamp", lastEventTime);
- for (TaggedLogAPIEntity entity : entities) {
- entity.getTags().put(SparkJobTagName.SPARK_APP_ID.toString(), JSONUtils.getString(event, "App ID"));
- entity.getTags().put(SparkJobTagName.SPARK_APP_NAME.toString(), JSONUtils.getString(event, "App Name"));
- // In yarn-client mode, attemptId is not available in the log, so we set attemptId = 1.
- String attemptId = isClientMode(this.app.getConfig()) ? "1" : JSONUtils.getString(event, "App Attempt ID");
- entity.getTags().put(SparkJobTagName.SPARK_APP_ATTEMPT_ID.toString(), attemptId);
- // the second argument of getNormalizeName() is changed to null because the original code contains sensitive text
- // original second argument looks like: this.app.getConfig().getConfig().get("xxx"), "xxx" is the sensitive text
- entity.getTags().put(SparkJobTagName.SPARK_APP_NORM_NAME.toString(), this.getNormalizedName(JSONUtils.getString(event, "App Name"), null));
- entity.getTags().put(SparkJobTagName.SPARK_USER.toString(), JSONUtils.getString(event, "User"));
-
- entity.setTimestamp(appStartTime);
- }
-
- this.app.setStartTime(appStartTime);
- this.lastEventTime = appStartTime;
- }
-
- private void handleExecutorAdd(JSONObject event) {
- String executorID = (String) event.get("Executor ID");
- long executorAddTime = JSONUtils.getLong(event, "Timestamp", lastEventTime);
- this.lastEventTime = executorAddTime;
- SparkExecutor executor = this.initiateExecutor(executorID, executorAddTime);
-
- JSONObject executorInfo = JSONUtils.getJSONObject(event, "Executor Info");
-
- }
-
- private void handleBlockManagerAdd(JSONObject event) {
- long maxMemory = JSONUtils.getLong(event, "Maximum Memory");
- long timestamp = JSONUtils.getLong(event, "Timestamp", lastEventTime);
- this.lastEventTime = timestamp;
- JSONObject blockInfo = JSONUtils.getJSONObject(event, "Block Manager ID");
- String executorID = JSONUtils.getString(blockInfo, "Executor ID");
- String hostAndPort = JSONUtils.getString(blockInfo, "Host") + ":" + JSONUtils.getLong(blockInfo, "Port");
-
- SparkExecutor executor = this.initiateExecutor(executorID, timestamp);
- executor.setMaxMemory(maxMemory);
- executor.setHostPort(hostAndPort);
- }
-
- private void handleTaskStart(JSONObject event) {
- this.initializeTask(event);
- }
-
- private void handleTaskEnd(JSONObject event) {
- JSONObject taskInfo = JSONUtils.getJSONObject(event, "Task Info");
- long taskId = JSONUtils.getLong(taskInfo, "Task ID");
- SparkTask task = tasks.get(taskId);
- if (task == null) {
- return;
- }
-
- task.setFailed(JSONUtils.getBoolean(taskInfo, "Failed"));
- JSONObject taskMetrics = JSONUtils.getJSONObject(event, "Task Metrics");
- if (null != taskMetrics) {
- task.setExecutorDeserializeTime(JSONUtils.getLong(taskMetrics, "Executor Deserialize Time", lastEventTime));
- task.setExecutorRunTime(JSONUtils.getLong(taskMetrics, "Executor Run Time", lastEventTime));
- task.setJvmGcTime(JSONUtils.getLong(taskMetrics, "JVM GC Time", lastEventTime));
- task.setResultSize(JSONUtils.getLong(taskMetrics, "Result Size"));
- task.setResultSerializationTime(JSONUtils.getLong(taskMetrics, "Result Serialization Time", lastEventTime));
- task.setMemoryBytesSpilled(JSONUtils.getLong(taskMetrics, "Memory Bytes Spilled"));
- task.setDiskBytesSpilled(JSONUtils.getLong(taskMetrics, "Disk Bytes Spilled"));
-
- JSONObject inputMetrics = JSONUtils.getJSONObject(taskMetrics, "Input Metrics");
- if (null != inputMetrics) {
- task.setInputBytes(JSONUtils.getLong(inputMetrics, "Bytes Read"));
- task.setInputRecords(JSONUtils.getLong(inputMetrics, "Records Read"));
- }
-
- JSONObject outputMetrics = JSONUtils.getJSONObject(taskMetrics, "Output Metrics");
- if (null != outputMetrics) {
- task.setOutputBytes(JSONUtils.getLong(outputMetrics, "Bytes Written"));
- task.setOutputRecords(JSONUtils.getLong(outputMetrics, "Records Written"));
- }
-
- JSONObject shuffleWriteMetrics = JSONUtils.getJSONObject(taskMetrics, "Shuffle Write Metrics");
- if (null != shuffleWriteMetrics) {
- task.setShuffleWriteBytes(JSONUtils.getLong(shuffleWriteMetrics, "Shuffle Bytes Written"));
- task.setShuffleWriteRecords(JSONUtils.getLong(shuffleWriteMetrics, "Shuffle Records Written"));
- }
-
- JSONObject shuffleReadMetrics = JSONUtils.getJSONObject(taskMetrics, "Shuffle Read Metrics");
- if (null != shuffleReadMetrics) {
- task.setShuffleReadLocalBytes(JSONUtils.getLong(shuffleReadMetrics, "Local Bytes Read"));
- task.setShuffleReadRemoteBytes(JSONUtils.getLong(shuffleReadMetrics, "Remote Bytes Read"));
- task.setShuffleReadRecords(JSONUtils.getLong(shuffleReadMetrics, "Total Records Read"));
- }
- } else {
- //for tasks success without task metrics, save in the end if no other information
- if (!task.isFailed()) {
- return;
- }
- }
-
- aggregateToStage(task);
- aggregateToExecutor(task);
- tasks.remove(taskId);
- this.flushEntities(task, false);
- }
-
-
- private SparkTask initializeTask(JSONObject event) {
- SparkTask task = new SparkTask();
- task.setTags(new HashMap<>(this.app.getTags()));
- task.setTimestamp(app.getTimestamp());
-
- task.getTags().put(SparkJobTagName.SPARK_SATGE_ID.toString(), Long.toString(JSONUtils.getLong(event, "Stage ID")));
- task.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), Long.toString(JSONUtils.getLong(event, "Stage Attempt ID")));
-
- JSONObject taskInfo = JSONUtils.getJSONObject(event, "Task Info");
- long taskId = JSONUtils.getLong(taskInfo, "Task ID");
- task.setTaskId(taskId);
-
- task.getTags().put(SparkJobTagName.SPARK_TASK_INDEX.toString(), Long.toString(JSONUtils.getLong(taskInfo, "Index")));
- task.getTags().put(SparkJobTagName.SPARK_TASK_ATTEMPT_ID.toString(), Integer.toString(JSONUtils.getInt(taskInfo, "Attempt")));
- long launchTime = JSONUtils.getLong(taskInfo, "Launch Time", lastEventTime);
- this.lastEventTime = launchTime;
- if (taskId == 0) {
- this.setFirstTaskLaunchTime(launchTime);
- }
- task.setLaunchTime(launchTime);
- task.setExecutorId(JSONUtils.getString(taskInfo, "Executor ID"));
- task.setHost(JSONUtils.getString(taskInfo, "Host"));
- task.setTaskLocality(JSONUtils.getString(taskInfo, "Locality"));
- task.setSpeculative(JSONUtils.getBoolean(taskInfo, "Speculative"));
-
- tasks.put(task.getTaskId(), task);
- return task;
- }
-
- private void setFirstTaskLaunchTime(long launchTime) {
- this.firstTaskLaunchTime = launchTime;
- }
-
- private void handleJobStart(JSONObject event) {
- SparkJob job = new SparkJob();
- job.setTags(new HashMap<>(this.app.getTags()));
- job.setTimestamp(app.getTimestamp());
-
- int jobId = JSONUtils.getInt(event, "Job ID");
- job.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), Integer.toString(jobId));
- long submissionTime = JSONUtils.getLong(event, "Submission Time", lastEventTime);
- job.setSubmissionTime(submissionTime);
- this.lastEventTime = submissionTime;
-
- //for complete application, no active stages/tasks
- job.setNumActiveStages(0);
- job.setNumActiveTasks(0);
-
- this.jobs.put(jobId, job);
- this.jobStageMap.put(jobId, new HashSet<String>());
-
- JSONArray stages = JSONUtils.getJSONArray(event, "Stage Infos");
- int stagesSize = (stages == null ? 0 : stages.size());
- job.setNumStages(stagesSize);
- for (int i = 0; i < stagesSize; i++) {
- JSONObject stageInfo = (JSONObject) stages.get(i);
- int stageId = JSONUtils.getInt(stageInfo, "Stage ID");
- int stageAttemptId = JSONUtils.getInt(stageInfo, "Stage Attempt ID");
- String stageName = JSONUtils.getString(stageInfo, "Stage Name");
- int numTasks = JSONUtils.getInt(stageInfo, "Number of Tasks");
- this.initiateStage(jobId, stageId, stageAttemptId, stageName, numTasks);
- }
- }
-
- private void handleStageSubmit(JSONObject event) {
- JSONObject stageInfo = JSONUtils.getJSONObject(event, "Stage Info");
- int stageId = JSONUtils.getInt(stageInfo, "Stage ID");
- int stageAttemptId = JSONUtils.getInt(stageInfo, "Stage Attempt ID");
- String key = this.generateStageKey(Integer.toString(stageId), Integer.toString(stageAttemptId));
- stageTaskStatusMap.put(key, new HashMap<Integer, Boolean>());
-
- if (!stages.containsKey(key)) {
- //may be further attempt for one stage
- String baseAttempt = this.generateStageKey(Integer.toString(stageId), "0");
- if (stages.containsKey(baseAttempt)) {
- SparkStage stage = stages.get(baseAttempt);
- String jobId = stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString());
-
- String stageName = JSONUtils.getString(event, "Stage Name");
- int numTasks = JSONUtils.getInt(stageInfo, "Number of Tasks");
- this.initiateStage(Integer.parseInt(jobId), stageId, stageAttemptId, stageName, numTasks);
- }
- }
- }
-
- private void handleStageComplete(JSONObject event) {
- JSONObject stageInfo = JSONUtils.getJSONObject(event, "Stage Info");
- int stageId = JSONUtils.getInt(stageInfo, "Stage ID");
- int stageAttemptId = JSONUtils.getInt(stageInfo, "Stage Attempt ID");
- String key = this.generateStageKey(Integer.toString(stageId), Integer.toString(stageAttemptId));
- SparkStage stage = stages.get(key);
-
- // If "Submission Time" is not available, use the "Launch Time" of "Task ID" = 0.
- Long submissionTime = JSONUtils.getLong(stageInfo, "Submission Time", firstTaskLaunchTime);
-
- stage.setSubmitTime(submissionTime);
-
- long completeTime = JSONUtils.getLong(stageInfo, "Completion Time", lastEventTime);
- stage.setCompleteTime(completeTime);
- this.lastEventTime = completeTime;
-
- if (stageInfo != null && stageInfo.containsKey("Failure Reason")) {
- stage.setStatus(SparkEntityConstant.SparkStageStatus.FAILED.toString());
- } else {
- stage.setStatus(SparkEntityConstant.SparkStageStatus.COMPLETE.toString());
- }
- }
-
- private void handleExecutorRemoved(JSONObject event) {
- String executorID = JSONUtils.getString(event, "Executor ID");
- SparkExecutor executor = executors.get(executorID);
- long removedTime = JSONUtils.getLong(event, "Timestamp", lastEventTime);
- executor.setEndTime(removedTime);
- this.lastEventTime = removedTime;
- }
-
- private void handleJobEnd(JSONObject event) {
- int jobId = JSONUtils.getInt(event, "Job ID");
- SparkJob job = jobs.get(jobId);
-
- long completionTime = JSONUtils.getLong(event, "Completion Time", lastEventTime);
- job.setCompletionTime(completionTime);
- this.lastEventTime = completionTime;
-
- JSONObject jobResult = JSONUtils.getJSONObject(event, "Job Result");
- String result = JSONUtils.getString(jobResult, "Result");
- if (result.equalsIgnoreCase("JobSucceeded")) {
- job.setStatus(SparkEntityConstant.SparkJobStatus.SUCCEEDED.toString());
- } else {
- job.setStatus(SparkEntityConstant.SparkJobStatus.FAILED.toString());
- }
- }
-
- private void handleAppEnd(JSONObject event) {
- long endTime = JSONUtils.getLong(event, "Timestamp", lastEventTime);
- app.setEndTime(endTime);
- this.lastEventTime = endTime;
- }
-
- public void clearReader() throws Exception {
- //clear tasks
- for (SparkTask task : tasks.values()) {
- LOG.info("Task {} does not have result or no task metrics.", task.getTaskId());
- task.setFailed(true);
- aggregateToStage(task);
- aggregateToExecutor(task);
- this.flushEntities(task, false);
- }
-
- List<SparkStage> needStoreStages = new ArrayList<>();
- for (SparkStage stage : this.stages.values()) {
- int jobId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString()));
- if (stage.getSubmitTime() == 0 || stage.getCompleteTime() == 0) {
- SparkJob job = this.jobs.get(jobId);
- job.setNumSkippedStages(job.getNumSkippedStages() + 1);
- job.setNumSkippedTasks(job.getNumSkippedTasks() + stage.getNumTasks());
- } else {
- this.aggregateToJob(stage);
- this.aggregateStageToApp(stage);
- needStoreStages.add(stage);
- }
- String stageId = stage.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString());
- String stageAttemptId = stage.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString());
- this.jobStageMap.get(jobId).remove(this.generateStageKey(stageId, stageAttemptId));
- }
-
- this.flushEntities(needStoreStages, false);
- for (SparkJob job : jobs.values()) {
- this.aggregateJobToApp(job);
- }
- this.flushEntities(jobs.values(), false);
-
- app.setExecutors(executors.values().size());
-
- long executorMemory = Utils.parseMemory((String) this.getConfigVal(this.app.getConfig(), "spark.executor.memory", String.class.getName()));
- long driverMemory = Utils.parseMemory(this.isClientMode(app.getConfig())
- ? (String) this.getConfigVal(this.app.getConfig(), "spark.yarn.am.memory", String.class.getName())
- : (String) this.getConfigVal(app.getConfig(), "spark.driver.memory", String.class.getName()));
-
- int executorCore = (Integer) this.getConfigVal(app.getConfig(), "spark.executor.cores", Integer.class.getName());
- int driverCore = this.isClientMode(app.getConfig())
- ? (Integer) this.getConfigVal(app.getConfig(), "spark.yarn.am.cores", Integer.class.getName())
- : (Integer) this.getConfigVal(app.getConfig(), "spark.driver.cores", Integer.class.getName());
-
- long executorMemoryOverhead = this.getMemoryOverhead(app.getConfig(), executorMemory, "spark.yarn.executor.memoryOverhead");
- long driverMemoryOverhead = this.isClientMode(app.getConfig())
- ? this.getMemoryOverhead(app.getConfig(), driverMemory, "spark.yarn.am.memoryOverhead")
- : this.getMemoryOverhead(app.getConfig(), driverMemory, "spark.yarn.driver.memoryOverhead");
-
- app.setExecMemoryBytes(executorMemory);
- app.setDriveMemoryBytes(driverMemory);
- app.setExecutorCores(executorCore);
- app.setDriverCores(driverCore);
- app.setExecutorMemoryOverhead(executorMemoryOverhead);
- app.setDriverMemoryOverhead(driverMemoryOverhead);
-
- for (SparkExecutor executor : executors.values()) {
- String executorID = executor.getTags().get(SparkJobTagName.SPARK_EXECUTOR_ID.toString());
- if (executorID.equalsIgnoreCase("driver")) {
- executor.setExecMemoryBytes(driverMemory);
- executor.setCores(driverCore);
- executor.setMemoryOverhead(driverMemoryOverhead);
- } else {
- executor.setExecMemoryBytes(executorMemory);
- executor.setCores(executorCore);
- executor.setMemoryOverhead(executorMemoryOverhead);
- }
- if (app.getEndTime() <= 0L) {
- app.setEndTime(this.lastEventTime);
- }
- if (executor.getEndTime() <= 0L) {
- executor.setEndTime(app.getEndTime());
- }
- this.aggregateExecutorToApp(executor);
- }
- this.flushEntities(executors.values(), false);
- //spark code...tricky
- app.setSkippedTasks(app.getCompleteTasks());
- this.flushEntities(app, true);
- }
-
- private long getMemoryOverhead(JobConfig config, long executorMemory, String fieldName) {
- long result = 0L;
- String fieldValue = config.getConfig().get(fieldName);
- if (fieldValue != null) {
- result = Utils.parseMemory(fieldValue + "m");
- if (result == 0L) {
- result = Utils.parseMemory(fieldValue);
- }
- }
-
- if (result == 0L) {
- result = Math.max(
- Utils.parseMemory(conf.getString("spark.defaultVal.spark.yarn.overhead.min")),
- executorMemory * conf.getInt("spark.defaultVal." + fieldName + ".factor") / 100);
- }
- return result;
- }
-
- private void aggregateExecutorToApp(SparkExecutor executor) {
- long totalExecutorTime = app.getTotalExecutorTime() + executor.getEndTime() - executor.getStartTime();
- if (totalExecutorTime < 0L) {
- totalExecutorTime = 0L;
- }
- app.setTotalExecutorTime(totalExecutorTime);
- }
-
- private void aggregateJobToApp(SparkJob job) {
- //aggregate job level metrics
- app.setNumJobs(app.getNumJobs() + 1);
- app.setTotalTasks(app.getTotalTasks() + job.getNumTask());
- app.setCompleteTasks(app.getCompleteTasks() + job.getNumCompletedTasks());
- app.setSkippedTasks(app.getSkippedTasks() + job.getNumSkippedTasks());
- app.setFailedTasks(app.getFailedTasks() + job.getNumFailedTasks());
- app.setTotalStages(app.getTotalStages() + job.getNumStages());
- app.setFailedStages(app.getFailedStages() + job.getNumFailedStages());
- app.setSkippedStages(app.getSkippedStages() + job.getNumSkippedStages());
- }
-
- private void aggregateStageToApp(SparkStage stage) {
- //aggregate task level metrics
- app.setDiskBytesSpilled(app.getDiskBytesSpilled() + stage.getDiskBytesSpilled());
- app.setMemoryBytesSpilled(app.getMemoryBytesSpilled() + stage.getMemoryBytesSpilled());
- app.setExecutorRunTime(app.getExecutorRunTime() + stage.getExecutorRunTime());
- app.setJvmGcTime(app.getJvmGcTime() + stage.getJvmGcTime());
- app.setExecutorDeserializeTime(app.getExecutorDeserializeTime() + stage.getExecutorDeserializeTime());
- app.setResultSerializationTime(app.getResultSerializationTime() + stage.getResultSerializationTime());
- app.setResultSize(app.getResultSize() + stage.getResultSize());
- app.setInputRecords(app.getInputRecords() + stage.getInputRecords());
- app.setInputBytes(app.getInputBytes() + stage.getInputBytes());
- app.setOutputRecords(app.getOutputRecords() + stage.getOutputRecords());
- app.setOutputBytes(app.getOutputBytes() + stage.getOutputBytes());
- app.setShuffleWriteRecords(app.getShuffleWriteRecords() + stage.getShuffleWriteRecords());
- app.setShuffleWriteBytes(app.getShuffleWriteBytes() + stage.getShuffleWriteBytes());
- app.setShuffleReadRecords(app.getShuffleReadRecords() + stage.getShuffleReadRecords());
- app.setShuffleReadBytes(app.getShuffleReadBytes() + stage.getShuffleReadBytes());
- }
-
- private void aggregateToStage(SparkTask task) {
- String stageId = task.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString());
- String stageAttemptId = task.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString());
- String key = this.generateStageKey(stageId, stageAttemptId);
- SparkStage stage = stages.get(key);
-
- stage.setDiskBytesSpilled(stage.getDiskBytesSpilled() + task.getDiskBytesSpilled());
- stage.setMemoryBytesSpilled(stage.getMemoryBytesSpilled() + task.getMemoryBytesSpilled());
- stage.setExecutorRunTime(stage.getExecutorRunTime() + task.getExecutorRunTime());
- stage.setJvmGcTime(stage.getJvmGcTime() + task.getJvmGcTime());
- stage.setExecutorDeserializeTime(stage.getExecutorDeserializeTime() + task.getExecutorDeserializeTime());
- stage.setResultSerializationTime(stage.getResultSerializationTime() + task.getResultSerializationTime());
- stage.setResultSize(stage.getResultSize() + task.getResultSize());
- stage.setInputRecords(stage.getInputRecords() + task.getInputRecords());
- stage.setInputBytes(stage.getInputBytes() + task.getInputBytes());
- stage.setOutputRecords(stage.getOutputRecords() + task.getOutputRecords());
- stage.setOutputBytes(stage.getOutputBytes() + task.getOutputBytes());
- stage.setShuffleWriteRecords(stage.getShuffleWriteRecords() + task.getShuffleWriteRecords());
- stage.setShuffleWriteBytes(stage.getShuffleWriteBytes() + task.getShuffleWriteBytes());
- stage.setShuffleReadRecords(stage.getShuffleReadRecords() + task.getShuffleReadRecords());
- long taskShuffleReadBytes = task.getShuffleReadLocalBytes() + task.getShuffleReadRemoteBytes();
- stage.setShuffleReadBytes(stage.getShuffleReadBytes() + taskShuffleReadBytes);
-
- boolean success = !task.isFailed();
-
- Integer taskIndex = Integer.parseInt(task.getTags().get(SparkJobTagName.SPARK_TASK_INDEX.toString()));
- if (stageTaskStatusMap.get(key).containsKey(taskIndex)) {
- //has previous task attempt, retrieved from task index in one stage
- boolean previousResult = stageTaskStatusMap.get(key).get(taskIndex);
- success = previousResult || success;
- if (previousResult != success) {
- stage.setNumFailedTasks(stage.getNumFailedTasks() - 1);
- stage.setNumCompletedTasks(stage.getNumCompletedTasks() + 1);
- stageTaskStatusMap.get(key).put(taskIndex, success);
- }
- } else {
- if (success) {
- stage.setNumCompletedTasks(stage.getNumCompletedTasks() + 1);
- } else {
- stage.setNumFailedTasks(stage.getNumFailedTasks() + 1);
- }
- stageTaskStatusMap.get(key).put(taskIndex, success);
- }
-
- }
-
- private void aggregateToExecutor(SparkTask task) {
- String executorId = task.getExecutorId();
- SparkExecutor executor = executors.get(executorId);
-
- if (null != executor) {
- executor.setTotalTasks(executor.getTotalTasks() + 1);
- if (task.isFailed()) {
- executor.setFailedTasks(executor.getFailedTasks() + 1);
- } else {
- executor.setCompletedTasks(executor.getCompletedTasks() + 1);
- }
- long taskShuffleReadBytes = task.getShuffleReadLocalBytes() + task.getShuffleReadRemoteBytes();
- executor.setTotalShuffleRead(executor.getTotalShuffleRead() + taskShuffleReadBytes);
- executor.setTotalDuration(executor.getTotalDuration() + task.getExecutorRunTime());
- executor.setTotalInputBytes(executor.getTotalInputBytes() + task.getInputBytes());
- executor.setTotalShuffleWrite(executor.getTotalShuffleWrite() + task.getShuffleWriteBytes());
- executor.setTotalDuration(executor.getTotalDuration() + task.getExecutorRunTime());
- }
-
- }
-
- private void aggregateToJob(SparkStage stage) {
- int jobId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString()));
- SparkJob job = jobs.get(jobId);
- job.setNumCompletedTasks(job.getNumCompletedTasks() + stage.getNumCompletedTasks());
- job.setNumFailedTasks(job.getNumFailedTasks() + stage.getNumFailedTasks());
- job.setNumTask(job.getNumTask() + stage.getNumTasks());
-
-
- if (stage.getStatus().equalsIgnoreCase(SparkEntityConstant.SparkStageStatus.COMPLETE.toString())) {
- //if multiple attempts succeed, just count one
- if (!hasStagePriorAttemptSuccess(stage)) {
- job.setNumCompletedStages(job.getNumCompletedStages() + 1);
- }
- } else {
- job.setNumFailedStages(job.getNumFailedStages() + 1);
- }
- }
-
- private boolean hasStagePriorAttemptSuccess(SparkStage stage) {
- int stageAttemptId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString()));
- for (int i = 0; i < stageAttemptId; i++) {
- SparkStage previousStage = stages.get(this.generateStageKey(
- stage.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString()), Integer.toString(i)));
- if (previousStage.getStatus().equalsIgnoreCase(SparkEntityConstant.SparkStageStatus.COMPLETE.toString())) {
- return true;
- }
- }
- return false;
- }
-
-
- private String generateStageKey(String stageId, String stageAttemptId) {
- return stageId + "-" + stageAttemptId;
- }
-
- private void initiateStage(int jobId, int stageId, int stageAttemptId, String name, int numTasks) {
- SparkStage stage = new SparkStage();
- stage.setTags(new HashMap<>(this.app.getTags()));
- stage.setTimestamp(app.getTimestamp());
- stage.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), Integer.toString(jobId));
- stage.getTags().put(SparkJobTagName.SPARK_SATGE_ID.toString(), Integer.toString(stageId));
- stage.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), Integer.toString(stageAttemptId));
- stage.setName(name);
- stage.setNumActiveTasks(0);
- stage.setNumTasks(numTasks);
- stage.setSchedulingPool(this.app.getConfig().getConfig().get("spark.scheduler.pool") == null ?
- "default" : this.app.getConfig().getConfig().get("spark.scheduler.pool"));
-
- String stageKey = this.generateStageKey(Integer.toString(stageId), Integer.toString(stageAttemptId));
- stages.put(stageKey, stage);
- this.jobStageMap.get(jobId).add(stageKey);
- }
-
-
- private SparkExecutor initiateExecutor(String executorID, long startTime) {
- if (!executors.containsKey(executorID)) {
- SparkExecutor executor = new SparkExecutor();
- executor.setTags(new HashMap<>(this.app.getTags()));
- executor.getTags().put(SparkJobTagName.SPARK_EXECUTOR_ID.toString(), executorID);
- executor.setStartTime(startTime);
- executor.setTimestamp(app.getTimestamp());
-
- this.executors.put(executorID, executor);
- }
-
- return this.executors.get(executorID);
- }
-
- private String getNormalizedName(String jobName, String assignedName) {
- if (null != assignedName) {
- return assignedName;
- } else {
- return JobNameNormalization.getInstance().normalize(jobName);
- }
- }
-
- private void flushEntities(Object entity, boolean forceFlush) {
- this.flushEntities(Collections.singletonList(entity), forceFlush);
- }
-
- private void flushEntities(Collection entities, boolean forceFlush) {
- this.createEntities.addAll(entities);
-
- if (forceFlush || this.createEntities.size() >= FLUSH_LIMIT) {
- try {
- this.doFlush(this.createEntities);
- this.createEntities.clear();
- } catch (Exception e) {
- LOG.error("Fail to flush entities", e);
- }
-
- }
- }
-
- private EagleServiceBaseClient initiateClient() {
- String host = conf.getString("eagleProps.eagle.service.host");
- int port = conf.getInt("eagleProps.eagle.service.port");
- String userName = conf.getString("eagleProps.eagle.service.username");
- String pwd = conf.getString("eagleProps.eagle.service.password");
- client = new EagleServiceClientImpl(host, port, userName, pwd);
- int timeout = conf.getInt("eagleProps.eagle.service.read.timeout");
- client.getJerseyClient().setReadTimeout(timeout * 1000);
-
- return client;
- }
-
- private void doFlush(List entities) throws IOException, EagleServiceClientException {
- client.create(entities);
- int size = (entities == null ? 0 : entities.size());
- LOG.info("finish flushing entities of total number " + size);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java
deleted file mode 100644
index 02fc5cf..0000000
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.crawl;
-
-
-import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
-import org.json.simple.parser.ParseException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-
-public class JHFSparkParser implements JHFParserBase {
-
- private static final Logger logger = LoggerFactory.getLogger(JHFSparkParser.class);
-
- private boolean isValidJson;
-
- private JHFSparkEventReader eventReader;
-
- public JHFSparkParser(JHFSparkEventReader reader) {
- this.eventReader = reader;
- }
-
- @Override
- public void parse(InputStream is) throws Exception {
- try (BufferedReader reader = new BufferedReader(new InputStreamReader(is))) {
- for (String line = reader.readLine(); line != null; line = reader.readLine()) {
- isValidJson = true;
- JSONObject eventObj = parseAndValidateJSON(line);
- if (isValidJson) {
- try {
- this.eventReader.read(eventObj);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-
- this.eventReader.clearReader();
- }
- }
-
- private JSONObject parseAndValidateJSON(String line) {
- JSONObject eventObj = null;
- JSONParser parser = new JSONParser();
- try {
- eventObj = (JSONObject) parser.parse(line);
- } catch (ParseException ex) {
- isValidJson = false;
- logger.error(String.format("Invalid json string. Fail to parse %s.", line), ex);
- }
- return eventObj;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkApplicationInfo.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkApplicationInfo.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkApplicationInfo.java
deleted file mode 100644
index 423d045..0000000
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkApplicationInfo.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. See the NOTICE file distributed with
- * * this work for additional information regarding copyright ownership.
- * * The ASF licenses this file to You under the Apache License, Version 2.0
- * * (the "License"); you may not use this file except in compliance with
- * * the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
- *
- */
-
-package org.apache.eagle.jpm.spark.crawl;
-
-public class SparkApplicationInfo {
-
- private String state;
- private String finalStatus;
- private String queue;
- private String name;
- private String user;
-
- public String getState() {
- return state;
- }
-
- public void setState(String state) {
- this.state = state;
- }
-
- public String getFinalStatus() {
- return finalStatus;
- }
-
- public void setFinalStatus(String finalStatus) {
- this.finalStatus = finalStatus;
- }
-
- public String getQueue() {
- return queue;
- }
-
- public void setQueue(String queue) {
- this.queue = queue;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public String getUser() {
- return user;
- }
-
- public void setUser(String user) {
- this.user = user;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkFilesystemInputStreamReaderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkFilesystemInputStreamReaderImpl.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkFilesystemInputStreamReaderImpl.java
deleted file mode 100644
index 3964454..0000000
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkFilesystemInputStreamReaderImpl.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.crawl;
-
-import org.apache.eagle.jpm.util.SparkJobTagName;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.Map;
-
-public class SparkFilesystemInputStreamReaderImpl implements JHFInputStreamReader {
-
- private String site;
- private SparkApplicationInfo app;
-
-
- public SparkFilesystemInputStreamReaderImpl(String site, SparkApplicationInfo app) {
- this.site = site;
- this.app = app;
- }
-
- @Override
- public void read(InputStream is) throws Exception {
- Map<String, String> baseTags = new HashMap<>();
- baseTags.put(SparkJobTagName.SITE.toString(), site);
- baseTags.put(SparkJobTagName.SPARK_QUEUE.toString(), app.getQueue());
- JHFParserBase parser = new JHFSparkParser(new JHFSparkEventReader(baseTags, this.app));
- parser.parse(is);
- }
-
- public static void main(String[] args) throws Exception {
- SparkFilesystemInputStreamReaderImpl impl = new SparkFilesystemInputStreamReaderImpl("apollo-phx", new SparkApplicationInfo());
- impl.read(new FileInputStream(new File("E:\\eagle\\application_1459803563374_535667_1")));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java
new file mode 100644
index 0000000..81f266b
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import org.apache.eagle.jpm.mr.runningentity.JobConfigSerDeser;
+import org.apache.eagle.log.entity.repo.EntityRepository;
+
+public class JPMEntityRepository extends EntityRepository {
+ public JPMEntityRepository() {
+ entitySet.add(SparkAppEntity.class);
+ entitySet.add(SparkJobEntity.class);
+ entitySet.add(SparkStageEntity.class);
+ entitySet.add(SparkTaskEntity.class);
+ entitySet.add(SparkExecutorEntity.class);
+ serDeserMap.put(JobConfig.class, new JobConfigSerDeser());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java
new file mode 100644
index 0000000..0d3a86f
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import java.io.Serializable;
+import java.util.HashMap;
+
+public class JobConfig extends HashMap<String, String> implements Serializable {
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java
new file mode 100644
index 0000000..51c8a50
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+@Table("eagleSparkRunningApps")
+@ColumnFamily("f")
+@Prefix("sparkApp")
+@Service(Constants.RUNNING_SPARK_APP_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", "user", "queue"})
+@Partition({"site"})
+public class SparkAppEntity extends TaggedLogAPIEntity {
+ @Column("a")
+ private long startTime;
+ @Column("b")
+ private long endTime;
+ @Column("c")
+ private String yarnState;
+ @Column("d")
+ private String yarnStatus;
+ @Column("e")
+ private JobConfig config;
+ @Column("f")
+ private int numJobs;
+ @Column("g")
+ private int totalStages;
+ @Column("h")
+ private int skippedStages;
+ @Column("i")
+ private int failedStages;
+ @Column("j")
+ private int totalTasks;
+ @Column("k")
+ private int skippedTasks;
+ @Column("l")
+ private int failedTasks;
+ @Column("m")
+ private int executors;
+ @Column("n")
+ private long inputBytes;
+ @Column("o")
+ private long inputRecords;
+ @Column("p")
+ private long outputBytes;
+ @Column("q")
+ private long outputRecords;
+ @Column("r")
+ private long shuffleReadBytes;
+ @Column("s")
+ private long shuffleReadRecords;
+ @Column("t")
+ private long shuffleWriteBytes;
+ @Column("u")
+ private long shuffleWriteRecords;
+ @Column("v")
+ private long executorDeserializeTime;
+ @Column("w")
+ private long executorRunTime;
+ @Column("x")
+ private long resultSize;
+ @Column("y")
+ private long jvmGcTime;
+ @Column("z")
+ private long resultSerializationTime;
+ @Column("ab")
+ private long memoryBytesSpilled;
+ @Column("ac")
+ private long diskBytesSpilled;
+ @Column("ad")
+ private long execMemoryBytes;
+ @Column("ae")
+ private long driveMemoryBytes;
+ @Column("af")
+ private int completeTasks;
+ @Column("ag")
+ private long totalExecutorTime;
+ @Column("ah")
+ private long executorMemoryOverhead;
+ @Column("ai")
+ private long driverMemoryOverhead;
+ @Column("aj")
+ private int executorCores;
+ @Column("ak")
+ private int driverCores;
+ @Column("al")
+ private AppInfo appInfo;
+ @Column("am")
+ private int activeStages;
+ @Column("an")
+ private int completeStages;
+ @Column("ba")
+ private int activeTasks;
+
+ public int getActiveTasks() {
+ return activeTasks;
+ }
+
+ public void setActiveTasks(int activeTasks) {
+ this.activeTasks = activeTasks;
+ valueChanged("activeTasks");
+ }
+
+ public int getCompleteStages() {
+ return completeStages;
+ }
+
+ public void setCompleteStages(int completeStages) {
+ this.completeStages = completeStages;
+ valueChanged("completeStages");
+ }
+
+ public int getActiveStages() {
+ return activeStages;
+ }
+
+ public void setActiveStages(int activeStages) {
+ this.activeStages = activeStages;
+ valueChanged("activeStages");
+ }
+
+ public AppInfo getAppInfo() {
+ return appInfo;
+ }
+
+ public void setAppInfo(AppInfo appInfo) {
+ this.appInfo = appInfo;
+ valueChanged("appInfo");
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public long getEndTime() {
+ return endTime;
+ }
+
+ public String getYarnState() {
+ return yarnState;
+ }
+
+ public String getYarnStatus() {
+ return yarnStatus;
+ }
+
+ public int getNumJobs() {
+ return numJobs;
+ }
+
+ public int getTotalStages() {
+ return totalStages;
+ }
+
+ public int getSkippedStages() {
+ return skippedStages;
+ }
+
+ public int getFailedStages() {
+ return failedStages;
+ }
+
+ public int getTotalTasks() {
+ return totalTasks;
+ }
+
+ public int getSkippedTasks() {
+ return skippedTasks;
+ }
+
+ public int getFailedTasks() {
+ return failedTasks;
+ }
+
+ public int getExecutors() {
+ return executors;
+ }
+
+ public long getInputBytes() {
+ return inputBytes;
+ }
+
+ public long getInputRecords() {
+ return inputRecords;
+ }
+
+ public long getOutputBytes() {
+ return outputBytes;
+ }
+
+ public long getOutputRecords() {
+ return outputRecords;
+ }
+
+ public long getShuffleReadBytes() {
+ return shuffleReadBytes;
+ }
+
+ public long getShuffleReadRecords() {
+ return shuffleReadRecords;
+ }
+
+ public long getShuffleWriteBytes() {
+ return shuffleWriteBytes;
+ }
+
+ public long getShuffleWriteRecords() {
+ return shuffleWriteRecords;
+ }
+
+ public long getExecutorDeserializeTime() {
+ return executorDeserializeTime;
+ }
+
+ public long getExecutorRunTime() {
+ return executorRunTime;
+ }
+
+ public long getResultSize() {
+ return resultSize;
+ }
+
+ public long getJvmGcTime() {
+ return jvmGcTime;
+ }
+
+ public long getResultSerializationTime() {
+ return resultSerializationTime;
+ }
+
+ public long getMemoryBytesSpilled() {
+ return memoryBytesSpilled;
+ }
+
+ public long getDiskBytesSpilled() {
+ return diskBytesSpilled;
+ }
+
+ public long getExecMemoryBytes() {
+ return execMemoryBytes;
+ }
+
+ public long getDriveMemoryBytes() {
+ return driveMemoryBytes;
+ }
+
+ public int getCompleteTasks() {
+ return completeTasks;
+ }
+
+ public JobConfig getConfig() {
+ return config;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ valueChanged("startTime");
+ }
+
+ public void setEndTime(long endTime) {
+ this.endTime = endTime;
+ valueChanged("endTime");
+ }
+
+ public void setYarnState(String yarnState) {
+ this.yarnState = yarnState;
+ valueChanged("yarnState");
+ }
+
+ public void setYarnStatus(String yarnStatus) {
+ this.yarnStatus = yarnStatus;
+ valueChanged("yarnStatus");
+ }
+
+ public void setConfig(JobConfig config) {
+ this.config = config;
+ valueChanged("config");
+ }
+
+ public void setNumJobs(int numJobs) {
+ this.numJobs = numJobs;
+ valueChanged("numJobs");
+ }
+
+ public void setTotalStages(int totalStages) {
+ this.totalStages = totalStages;
+ valueChanged("totalStages");
+ }
+
+ public void setSkippedStages(int skippedStages) {
+ this.skippedStages = skippedStages;
+ valueChanged("skippedStages");
+ }
+
+ public void setFailedStages(int failedStages) {
+ this.failedStages = failedStages;
+ valueChanged("failedStages");
+ }
+
+ public void setTotalTasks(int totalTasks) {
+ this.totalTasks = totalTasks;
+ valueChanged("totalTasks");
+ }
+
+ public void setSkippedTasks(int skippedTasks) {
+ this.skippedTasks = skippedTasks;
+ valueChanged("skippedTasks");
+ }
+
+ public void setFailedTasks(int failedTasks) {
+ this.failedTasks = failedTasks;
+ valueChanged("failedTasks");
+ }
+
+ public void setExecutors(int executors) {
+ this.executors = executors;
+ valueChanged("executors");
+ }
+
+ public void setInputBytes(long inputBytes) {
+ this.inputBytes = inputBytes;
+ valueChanged("inputBytes");
+ }
+
+ public void setInputRecords(long inputRecords) {
+ this.inputRecords = inputRecords;
+ valueChanged("inputRecords");
+ }
+
+ public void setOutputBytes(long outputBytes) {
+ this.outputBytes = outputBytes;
+ valueChanged("outputBytes");
+ }
+
+ public void setOutputRecords(long outputRecords) {
+ this.outputRecords = outputRecords;
+ valueChanged("outputRecords");
+ }
+
+ public void setShuffleReadBytes(long shuffleReadRemoteBytes) {
+ this.shuffleReadBytes = shuffleReadRemoteBytes;
+ valueChanged("shuffleReadBytes");
+ }
+
+ public void setShuffleReadRecords(long shuffleReadRecords) {
+ this.shuffleReadRecords = shuffleReadRecords;
+ valueChanged("shuffleReadRecords");
+ }
+
+ public void setShuffleWriteBytes(long shuffleWriteBytes) {
+ this.shuffleWriteBytes = shuffleWriteBytes;
+ valueChanged("shuffleWriteBytes");
+ }
+
+ public void setShuffleWriteRecords(long shuffleWriteRecords) {
+ this.shuffleWriteRecords = shuffleWriteRecords;
+ valueChanged("shuffleWriteRecords");
+ }
+
+ public void setExecutorDeserializeTime(long executorDeserializeTime) {
+ this.executorDeserializeTime = executorDeserializeTime;
+ valueChanged("executorDeserializeTime");
+ }
+
+ public void setExecutorRunTime(long executorRunTime) {
+ this.executorRunTime = executorRunTime;
+ valueChanged("executorRunTime");
+ }
+
+ public void setResultSize(long resultSize) {
+ this.resultSize = resultSize;
+ valueChanged("resultSize");
+ }
+
+ public void setJvmGcTime(long jvmGcTime) {
+ this.jvmGcTime = jvmGcTime;
+ valueChanged("jvmGcTime");
+ }
+
+ public void setResultSerializationTime(long resultSerializationTime) {
+ this.resultSerializationTime = resultSerializationTime;
+ valueChanged("resultSerializationTime");
+ }
+
+ public void setMemoryBytesSpilled(long memoryBytesSpilled) {
+ this.memoryBytesSpilled = memoryBytesSpilled;
+ valueChanged("memoryBytesSpilled");
+ }
+
+ public void setDiskBytesSpilled(long diskBytesSpilled) {
+ this.diskBytesSpilled = diskBytesSpilled;
+ valueChanged("diskBytesSpilled");
+ }
+
+ public void setExecMemoryBytes(long execMemoryBytes) {
+ this.execMemoryBytes = execMemoryBytes;
+ valueChanged("execMemoryBytes");
+ }
+
+ public void setDriveMemoryBytes(long driveMemoryBytes) {
+ this.driveMemoryBytes = driveMemoryBytes;
+ valueChanged("driveMemoryBytes");
+ }
+
+ public void setCompleteTasks(int completeTasks) {
+ this.completeTasks = completeTasks;
+ valueChanged("completeTasks");
+ }
+
+ public long getTotalExecutorTime() {
+ return totalExecutorTime;
+ }
+
+ public void setTotalExecutorTime(long totalExecutorTime) {
+ this.totalExecutorTime = totalExecutorTime;
+ valueChanged("totalExecutorTime");
+ }
+
+ public long getExecutorMemoryOverhead() {
+ return executorMemoryOverhead;
+ }
+
+ public void setExecutorMemoryOverhead(long executorMemoryOverhead) {
+ this.executorMemoryOverhead = executorMemoryOverhead;
+ valueChanged("executorMemoryOverhead");
+ }
+
+ public long getDriverMemoryOverhead() {
+ return driverMemoryOverhead;
+ }
+
+ public void setDriverMemoryOverhead(long driverMemoryOverhead) {
+ this.driverMemoryOverhead = driverMemoryOverhead;
+ valueChanged("driverMemoryOverhead");
+ }
+
+ public int getExecutorCores() {
+ return executorCores;
+ }
+
+ public void setExecutorCores(int executorCores) {
+ this.executorCores = executorCores;
+ valueChanged("executorCores");
+ }
+
+ public int getDriverCores() {
+ return driverCores;
+ }
+
+ public void setDriverCores(int driverCores) {
+ this.driverCores = driverCores;
+ valueChanged("driverCores");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java
new file mode 100644
index 0000000..6d0441c
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+@Table("eagleSparkRunningExecutors")
+@ColumnFamily("f")
+@Prefix("sparkExecutor")
+@Service(Constants.RUNNING_SPARK_EXECUTOR_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", "executorId","user", "queue"})
+@Partition({"site"})
+public class SparkExecutorEntity extends TaggedLogAPIEntity {
+ @Column("a")
+ private String hostPort;
+ @Column("b")
+ private int rddBlocks;
+ @Column("c")
+ private long memoryUsed;
+ @Column("d")
+ private long diskUsed;
+ @Column("e")
+ private int activeTasks = 0;
+ @Column("f")
+ private int failedTasks = 0;
+ @Column("g")
+ private int completedTasks = 0;
+ @Column("h")
+ private int totalTasks = 0;
+ @Column("i")
+ private long totalDuration = 0;
+ @Column("j")
+ private long totalInputBytes = 0;
+ @Column("k")
+ private long totalShuffleRead = 0;
+ @Column("l")
+ private long totalShuffleWrite = 0;
+ @Column("m")
+ private long maxMemory;
+ @Column("n")
+ private long startTime;
+ @Column("o")
+ private long endTime = 0;
+ @Column("p")
+ private long execMemoryBytes;
+ @Column("q")
+ private int cores;
+ @Column("r")
+ private long memoryOverhead;
+
+ public String getHostPort() {
+ return hostPort;
+ }
+
+ public void setHostPort(String hostPort) {
+ this.hostPort = hostPort;
+ this.valueChanged("hostPort");
+ }
+
+ public int getRddBlocks() {
+ return rddBlocks;
+ }
+
+ public void setRddBlocks(int rddBlocks) {
+ this.rddBlocks = rddBlocks;
+ this.valueChanged("rddBlocks");
+ }
+
+ public long getMemoryUsed() {
+ return memoryUsed;
+ }
+
+ public void setMemoryUsed(long memoryUsed) {
+ this.memoryUsed = memoryUsed;
+ this.valueChanged("memoryUsed");
+ }
+
+ public long getDiskUsed() {
+ return diskUsed;
+ }
+
+ public void setDiskUsed(long diskUsed) {
+ this.diskUsed = diskUsed;
+ this.valueChanged("diskUsed");
+ }
+
+ public int getActiveTasks() {
+ return activeTasks;
+ }
+
+ public void setActiveTasks(int activeTasks) {
+ this.activeTasks = activeTasks;
+ this.valueChanged("activeTasks");
+ }
+
+ public int getFailedTasks() {
+ return failedTasks;
+ }
+
+ public void setFailedTasks(int failedTasks) {
+ this.failedTasks = failedTasks;
+ this.valueChanged("failedTasks");
+ }
+
+ public int getCompletedTasks() {
+ return completedTasks;
+ }
+
+ public void setCompletedTasks(int completedTasks) {
+ this.completedTasks = completedTasks;
+ this.valueChanged("completedTasks");
+ }
+
+ public int getTotalTasks() {
+ return totalTasks;
+ }
+
+ public void setTotalTasks(int totalTasks) {
+ this.totalTasks = totalTasks;
+ this.valueChanged("totalTasks");
+ }
+
+ public long getTotalDuration() {
+ return totalDuration;
+ }
+
+ public void setTotalDuration(long totalDuration) {
+ this.totalDuration = totalDuration;
+ this.valueChanged("totalDuration");
+ }
+
+ public long getTotalInputBytes() {
+ return totalInputBytes;
+ }
+
+ public void setTotalInputBytes(long totalInputBytes) {
+ this.totalInputBytes = totalInputBytes;
+ this.valueChanged("totalInputBytes");
+ }
+
+ public long getTotalShuffleRead() {
+ return totalShuffleRead;
+ }
+
+ public void setTotalShuffleRead(long totalShuffleRead) {
+ this.totalShuffleRead = totalShuffleRead;
+ this.valueChanged("totalShuffleRead");
+ }
+
+ public long getTotalShuffleWrite() {
+ return totalShuffleWrite;
+ }
+
+ public void setTotalShuffleWrite(long totalShuffleWrite) {
+ this.totalShuffleWrite = totalShuffleWrite;
+ this.valueChanged("totalShuffleWrite");
+ }
+
+ public long getMaxMemory() {
+ return maxMemory;
+ }
+
+ public void setMaxMemory(long maxMemory) {
+ this.maxMemory = maxMemory;
+ this.valueChanged("maxMemory");
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ valueChanged("startTime");
+ }
+
+ public long getEndTime() {
+ return endTime;
+ }
+
+ public void setEndTime(long endTime) {
+ this.endTime = endTime;
+ this.valueChanged("endTime");
+ }
+
+ public long getExecMemoryBytes() {
+ return execMemoryBytes;
+ }
+
+ public void setExecMemoryBytes(long execMemoryBytes) {
+ this.execMemoryBytes = execMemoryBytes;
+ this.valueChanged("execMemoryBytes");
+ }
+
+ public int getCores() {
+ return cores;
+ }
+
+ public void setCores(int cores) {
+ this.cores = cores;
+ valueChanged("cores");
+ }
+
+ public long getMemoryOverhead() {
+ return memoryOverhead;
+ }
+
+ public void setMemoryOverhead(long memoryOverhead) {
+ this.memoryOverhead = memoryOverhead;
+ valueChanged("memoryOverhead");
+ }
+}