You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/08/09 05:25:32 UTC
[4/8] incubator-eagle git commit: [EAGLE-422] eagle support for mr &
spark running job monitoring
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/assembly/eagle-jpm-spark-running-assembly.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/assembly/eagle-jpm-spark-running-assembly.xml b/eagle-jpm/eagle-jpm-spark-running/src/assembly/eagle-jpm-spark-running-assembly.xml
new file mode 100644
index 0000000..66133a0
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/assembly/eagle-jpm-spark-running-assembly.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+ <id>assembly</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <dependencySets>
+ <dependencySet>
+ <outputDirectory>/</outputDirectory>
+ <useProjectArtifact>false</useProjectArtifact>
+ <unpack>true</unpack>
+ <scope>runtime</scope>
+ <!--includes>
+ <include>org.apache.hadoop:hadoop-common</include>
+ <include>org.apache.hadoop:hadoop-hdfs</include>
+ <include>org.apache.hadoop:hadoop-client</include>
+ <include>org.apache.hadoop:hadoop-auth</include>
+ <include>org.apache.eagle:eagle-stream-process-api</include>
+ <include>org.apache.eagle:eagle-stream-process-base</include>
+ <include>org.jsoup:jsoup</include>
+ </includes-->
+ <excludes>
+ <exclude>org.wso2.orbit.com.lmax:disruptor</exclude>
+ <exclude>asm:asm</exclude>
+ <exclude>org.apache.storm:storm-core</exclude>
+ </excludes>
+ </dependencySet>
+ </dependencySets>
+ <fileSets>
+ <fileSet>
+ <directory>${project.build.outputDirectory}/</directory>
+ <outputDirectory>/</outputDirectory>
+ <!--<includes>-->
+ <!--<include>*.conf</include>-->
+ <!--<include>*.xml</include>-->
+ <!--<include>*.properties</include>-->
+ <!--<include>*.config</include>-->
+ <!--<include>classes/META-INF/*</include>-->
+ <!--</includes>-->
+
+ <excludes>
+ <exclude>*.yaml</exclude>
+ </excludes>
+ </fileSet>
+ </fileSets>
+</assembly>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobMain.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobMain.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobMain.java
new file mode 100644
index 0000000..749f4d1
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobMain.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.spark.running;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import org.apache.eagle.jpm.spark.running.common.SparkRunningConfigManager;
+import org.apache.eagle.jpm.spark.running.storm.SparkRunningJobFetchSpout;
+import org.apache.eagle.jpm.spark.running.storm.SparkRunningJobParseBolt;
+
+public class SparkRunningJobMain {
+ public static void main(String[] args) {
+ try {
+ //1. trigger init conf
+ SparkRunningConfigManager sparkRunningConfigManager = SparkRunningConfigManager.getInstance(args);
+
+ //2. init topology
+ TopologyBuilder topologyBuilder = new TopologyBuilder();
+ String topologyName = sparkRunningConfigManager.getConfig().getString("envContextConfig.topologyName");
+ String spoutName = "sparkRunningJobFetchSpout";
+ String boltName = "sparkRunningJobParseBolt";
+ int parallelism = sparkRunningConfigManager.getConfig().getInt("envContextConfig.parallelismConfig." + spoutName);
+ int tasks = sparkRunningConfigManager.getConfig().getInt("envContextConfig.tasks." + spoutName);
+ if (parallelism > tasks) {
+ parallelism = tasks;
+ }
+ topologyBuilder.setSpout(
+ spoutName,
+ new SparkRunningJobFetchSpout(
+ sparkRunningConfigManager.getJobExtractorConfig(),
+ sparkRunningConfigManager.getEndpointConfig(),
+ sparkRunningConfigManager.getZkStateConfig()),
+ parallelism
+ ).setNumTasks(tasks);
+
+ parallelism = sparkRunningConfigManager.getConfig().getInt("envContextConfig.parallelismConfig." + boltName);
+ tasks = sparkRunningConfigManager.getConfig().getInt("envContextConfig.tasks." + boltName);
+ if (parallelism > tasks) {
+ parallelism = tasks;
+ }
+ topologyBuilder.setBolt(boltName,
+ new SparkRunningJobParseBolt(
+ sparkRunningConfigManager.getZkStateConfig(),
+ sparkRunningConfigManager.getEagleServiceConfig(),
+ sparkRunningConfigManager.getEndpointConfig(),
+ sparkRunningConfigManager.getJobExtractorConfig()),
+ parallelism).setNumTasks(tasks).fieldsGrouping(spoutName, new Fields("appId"));
+
+ backtype.storm.Config config = new backtype.storm.Config();
+ config.setNumWorkers(sparkRunningConfigManager.getConfig().getInt("envContextConfig.workers"));
+ config.put(Config.TOPOLOGY_DEBUG, true);
+ if (!sparkRunningConfigManager.getEnv().equals("local")) {
+ //cluster mode
+ //parse conf here
+ StormSubmitter.submitTopology(topologyName, config, topologyBuilder.createTopology());
+ } else {
+ //local mode
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology(topologyName, config, topologyBuilder.createTopology());
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/common/SparkRunningConfigManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/common/SparkRunningConfigManager.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/common/SparkRunningConfigManager.java
new file mode 100644
index 0000000..b05d12e
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/common/SparkRunningConfigManager.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.spark.running.common;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.dataproc.util.ConfigOptionParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+public class SparkRunningConfigManager implements Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(SparkRunningConfigManager.class);
+
+ public String getEnv() {
+ return env;
+ }
+ private String env;
+
+ public ZKStateConfig getZkStateConfig() { return zkStateConfig; }
+ private ZKStateConfig zkStateConfig;
+
+ public EagleServiceConfig getEagleServiceConfig() {
+ return eagleServiceConfig;
+ }
+ private EagleServiceConfig eagleServiceConfig;
+
+ public JobExtractorConfig getJobExtractorConfig() {
+ return jobExtractorConfig;
+ }
+ private JobExtractorConfig jobExtractorConfig;
+
+ public EndpointConfig getEndpointConfig() {
+ return endpointConfig;
+ }
+ private EndpointConfig endpointConfig;
+
+ public static class ZKStateConfig implements Serializable {
+ public String zkQuorum;
+ public String zkRoot;
+ public int zkSessionTimeoutMs;
+ public int zkRetryTimes;
+ public int zkRetryInterval;
+ public String zkPort;
+ public boolean recoverEnabled;
+ }
+
+ public static class EagleServiceConfig implements Serializable {
+ public String eagleServiceHost;
+ public int eagleServicePort;
+ public int readTimeoutSeconds;
+ public int maxFlushNum;
+ public String username;
+ public String password;
+ }
+
+ public static class JobExtractorConfig implements Serializable {
+ public String site;
+ public int fetchRunningJobInterval;
+ public int parseThreadPoolSize;
+ }
+
+ public static class EndpointConfig implements Serializable {
+ public String nnEndpoint;
+ public String eventLog;
+ public String[] rmUrls;
+ public String principal;
+ public String keyTab;
+ }
+
+ public Config getConfig() {
+ return config;
+ }
+ private Config config;
+
+ private static SparkRunningConfigManager manager = new SparkRunningConfigManager();
+
+ private SparkRunningConfigManager() {
+ this.eagleServiceConfig = new EagleServiceConfig();
+ this.jobExtractorConfig = new JobExtractorConfig();
+ this.endpointConfig = new EndpointConfig();
+ this.zkStateConfig = new ZKStateConfig();
+ }
+
+ public static SparkRunningConfigManager getInstance(String[] args) {
+ manager.init(args);
+ return manager;
+ }
+
+ private void init(String[] args) {
+ try {
+ LOG.info("Loading from configuration file");
+ this.config = new ConfigOptionParser().load(args);
+ } catch (Exception e) {
+ LOG.error("failed to load config");
+ }
+
+ this.env = config.getString("envContextConfig.env");
+
+ this.zkStateConfig.zkQuorum = config.getString("zookeeperConfig.zkQuorum");
+ this.zkStateConfig.zkPort = config.getString("zookeeperConfig.zkPort");
+ this.zkStateConfig.zkSessionTimeoutMs = config.getInt("zookeeperConfig.zkSessionTimeoutMs");
+ this.zkStateConfig.zkRetryTimes = config.getInt("zookeeperConfig.zkRetryTimes");
+ this.zkStateConfig.zkRetryInterval = config.getInt("zookeeperConfig.zkRetryInterval");
+ this.zkStateConfig.zkRoot = config.getString("zookeeperConfig.zkRoot");
+ this.zkStateConfig.recoverEnabled = config.getBoolean("zookeeperConfig.recoverEnabled");
+
+ // parse eagle service endpoint
+ this.eagleServiceConfig.eagleServiceHost = config.getString("eagleProps.eagleService.host");
+ String port = config.getString("eagleProps.eagleService.port");
+ this.eagleServiceConfig.eagleServicePort = (port == null ? 8080 : Integer.parseInt(port));
+ this.eagleServiceConfig.username = config.getString("eagleProps.eagleService.username");
+ this.eagleServiceConfig.password = config.getString("eagleProps.eagleService.password");
+ this.eagleServiceConfig.readTimeoutSeconds = config.getInt("eagleProps.eagleService.readTimeOutSeconds");
+ this.eagleServiceConfig.maxFlushNum = config.getInt("eagleProps.eagleService.maxFlushNum");
+
+ //parse job extractor
+ this.jobExtractorConfig.site = config.getString("jobExtractorConfig.site");
+ this.jobExtractorConfig.fetchRunningJobInterval = config.getInt("jobExtractorConfig.fetchRunningJobInterval");
+ this.jobExtractorConfig.parseThreadPoolSize = config.getInt("jobExtractorConfig.parseThreadPoolSize");
+
+ //parse data source config
+ this.endpointConfig.eventLog = config.getString("dataSourceConfig.eventLog");
+ this.endpointConfig.nnEndpoint = config.getString("dataSourceConfig.nnEndpoint");
+ this.endpointConfig.keyTab = config.getString("dataSourceConfig.keytab");
+ this.endpointConfig.principal = config.getString("dataSourceConfig.principal");
+ this.endpointConfig.rmUrls = config.getStringList("dataSourceConfig.rmUrls").toArray(new String[0]);
+
+ LOG.info("Successfully initialized SparkRunningConfigManager");
+ LOG.info("env: " + this.env);
+ LOG.info("site: " + this.jobExtractorConfig.site);
+ LOG.info("eagle.service.host: " + this.eagleServiceConfig.eagleServiceHost);
+ LOG.info("eagle.service.port: " + this.eagleServiceConfig.eagleServicePort);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/common/Util.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/common/Util.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/common/Util.java
new file mode 100644
index 0000000..6c9f8f5
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/common/Util.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.spark.running.common;
+
+import org.apache.hadoop.fs.Path;
+
+public class Util {
+ public static String getAppAttemptLogName(String appId, String attemptId) {
+ if (attemptId.equals("0")) {
+ return appId;
+ }
+ return appId + "_" + attemptId;
+ }
+
+ public static Path getFilePath(String baseDir, String appAttemptLogName) {
+ String attemptLogDir = baseDir + "/" + appAttemptLogName;
+ return new Path(attemptLogDir);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java
new file mode 100644
index 0000000..5d1cfaa
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import org.apache.eagle.log.entity.repo.EntityRepository;
+
+public class JPMEntityRepository extends EntityRepository {
+ public JPMEntityRepository() {
+ entitySet.add(SparkAppEntity.class);
+ entitySet.add(SparkJobEntity.class);
+ entitySet.add(SparkStageEntity.class);
+ entitySet.add(SparkTaskEntity.class);
+ entitySet.add(SparkExecutorEntity.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java
new file mode 100644
index 0000000..e18f1e7
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import java.io.Serializable;
+import java.util.HashMap;
+
+public class JobConfig extends HashMap<String, String> implements Serializable {
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java
new file mode 100644
index 0000000..7b8f648
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.apache.eagle.jpm.util.Constants;
+
+@Table("eagleSparkRunningApps")
+@ColumnFamily("f")
+@Prefix("sparkApp")
+@Service(Constants.RUNNING_SPARK_APP_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", "user", "queue"})
+@Partition({"site"})
+public class SparkAppEntity extends TaggedLogAPIEntity {
+ @Column("a")
+ private long startTime;
+ @Column("b")
+ private long endTime;
+ @Column("c")
+ private String yarnState;
+ @Column("d")
+ private String yarnStatus;
+ @Column("e")
+ private JobConfig config;
+ @Column("f")
+ private int numJobs;
+ @Column("g")
+ private int totalStages;
+ @Column("h")
+ private int skippedStages;
+ @Column("i")
+ private int failedStages;
+ @Column("j")
+ private int totalTasks;
+ @Column("k")
+ private int skippedTasks;
+ @Column("l")
+ private int failedTasks;
+ @Column("m")
+ private int executors;
+ @Column("n")
+ private long inputBytes;
+ @Column("o")
+ private long inputRecords;
+ @Column("p")
+ private long outputBytes;
+ @Column("q")
+ private long outputRecords;
+ @Column("r")
+ private long shuffleReadBytes;
+ @Column("s")
+ private long shuffleReadRecords;
+ @Column("t")
+ private long shuffleWriteBytes;
+ @Column("u")
+ private long shuffleWriteRecords;
+ @Column("v")
+ private long executorDeserializeTime;
+ @Column("w")
+ private long executorRunTime;
+ @Column("x")
+ private long resultSize;
+ @Column("y")
+ private long jvmGcTime;
+ @Column("z")
+ private long resultSerializationTime;
+ @Column("ab")
+ private long memoryBytesSpilled;
+ @Column("ac")
+ private long diskBytesSpilled;
+ @Column("ad")
+ private long execMemoryBytes;
+ @Column("ae")
+ private long driveMemoryBytes;
+ @Column("af")
+ private int completeTasks;
+ @Column("ag")
+ private long totalExecutorTime;
+ @Column("ah")
+ private long executorMemoryOverhead;
+ @Column("ai")
+ private long driverMemoryOverhead;
+ @Column("aj")
+ private int executorCores;
+ @Column("ak")
+ private int driverCores;
+ @Column("al")
+ private AppInfo appInfo;
+ @Column("am")
+ private int activeStages;
+ @Column("an")
+ private int completeStages;
+ @Column("ba")
+ private int activeTasks;
+
+ public int getActiveTasks() {
+ return activeTasks;
+ }
+
+ public void setActiveTasks(int activeTasks) {
+ this.activeTasks = activeTasks;
+ valueChanged("activeTasks");
+ }
+
+ public int getCompleteStages() {
+ return completeStages;
+ }
+
+ public void setCompleteStages(int completeStages) {
+ this.completeStages = completeStages;
+ valueChanged("completeStages");
+ }
+
+ public int getActiveStages() {
+ return activeStages;
+ }
+
+ public void setActiveStages(int activeStages) {
+ this.activeStages = activeStages;
+ valueChanged("activeStages");
+ }
+
+ public AppInfo getAppInfo() {
+ return appInfo;
+ }
+
+ public void setAppInfo(AppInfo appInfo) {
+ this.appInfo = appInfo;
+ valueChanged("appInfo");
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public long getEndTime() {
+ return endTime;
+ }
+
+ public String getYarnState() {
+ return yarnState;
+ }
+
+ public String getYarnStatus() {
+ return yarnStatus;
+ }
+
+ public int getNumJobs() {
+ return numJobs;
+ }
+
+ public int getTotalStages() {
+ return totalStages;
+ }
+
+ public int getSkippedStages() {
+ return skippedStages;
+ }
+
+ public int getFailedStages() {
+ return failedStages;
+ }
+
+ public int getTotalTasks() {
+ return totalTasks;
+ }
+
+ public int getSkippedTasks() {
+ return skippedTasks;
+ }
+
+ public int getFailedTasks() {
+ return failedTasks;
+ }
+
+ public int getExecutors() {
+ return executors;
+ }
+
+ public long getInputBytes() {
+ return inputBytes;
+ }
+
+ public long getInputRecords() {
+ return inputRecords;
+ }
+
+ public long getOutputBytes() {
+ return outputBytes;
+ }
+
+ public long getOutputRecords() {
+ return outputRecords;
+ }
+
+ public long getShuffleReadBytes() {
+ return shuffleReadBytes;
+ }
+
+ public long getShuffleReadRecords() {
+ return shuffleReadRecords;
+ }
+
+ public long getShuffleWriteBytes() {
+ return shuffleWriteBytes;
+ }
+
+ public long getShuffleWriteRecords() {
+ return shuffleWriteRecords;
+ }
+
+ public long getExecutorDeserializeTime() {
+ return executorDeserializeTime;
+ }
+
+ public long getExecutorRunTime() {
+ return executorRunTime;
+ }
+
+ public long getResultSize() {
+ return resultSize;
+ }
+
+ public long getJvmGcTime() {
+ return jvmGcTime;
+ }
+
+ public long getResultSerializationTime() {
+ return resultSerializationTime;
+ }
+
+ public long getMemoryBytesSpilled() {
+ return memoryBytesSpilled;
+ }
+
+ public long getDiskBytesSpilled() {
+ return diskBytesSpilled;
+ }
+
+ public long getExecMemoryBytes() {
+ return execMemoryBytes;
+ }
+
+ public long getDriveMemoryBytes() {
+ return driveMemoryBytes;
+ }
+
+ public int getCompleteTasks(){ return completeTasks;}
+
+ public JobConfig getConfig() {
+ return config;
+ }
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ valueChanged("startTime");
+ }
+
+ public void setEndTime(long endTime) {
+ this.endTime = endTime;
+ valueChanged("endTime");
+ }
+
+ public void setYarnState(String yarnState) {
+ this.yarnState = yarnState;
+ valueChanged("yarnState");
+ }
+
+ public void setYarnStatus(String yarnStatus) {
+ this.yarnStatus = yarnStatus;
+ valueChanged("yarnStatus");
+ }
+
+ public void setConfig(JobConfig config) {
+ this.config = config;
+ valueChanged("config");
+ }
+
+ public void setNumJobs(int numJobs) {
+ this.numJobs = numJobs;
+ valueChanged("numJobs");
+ }
+
+ public void setTotalStages(int totalStages) {
+ this.totalStages = totalStages;
+ valueChanged("totalStages");
+ }
+
+ public void setSkippedStages(int skippedStages) {
+ this.skippedStages = skippedStages;
+ valueChanged("skippedStages");
+ }
+
+ public void setFailedStages(int failedStages) {
+ this.failedStages = failedStages;
+ valueChanged("failedStages");
+ }
+
+ public void setTotalTasks(int totalTasks) {
+ this.totalTasks = totalTasks;
+ valueChanged("totalTasks");
+ }
+
+ public void setSkippedTasks(int skippedTasks) {
+ this.skippedTasks = skippedTasks;
+ valueChanged("skippedTasks");
+ }
+
+ public void setFailedTasks(int failedTasks) {
+ this.failedTasks = failedTasks;
+ valueChanged("failedTasks");
+ }
+
+ public void setExecutors(int executors) {
+ this.executors = executors;
+ valueChanged("executors");
+ }
+
+ public void setInputBytes(long inputBytes) {
+ this.inputBytes = inputBytes;
+ valueChanged("inputBytes");
+ }
+
+ public void setInputRecords(long inputRecords) {
+ this.inputRecords = inputRecords;
+ valueChanged("inputRecords");
+ }
+
+ public void setOutputBytes(long outputBytes) {
+ this.outputBytes = outputBytes;
+ valueChanged("outputBytes");
+ }
+
+ public void setOutputRecords(long outputRecords) {
+ this.outputRecords = outputRecords;
+ valueChanged("outputRecords");
+ }
+
+ public void setShuffleReadBytes(long shuffleReadRemoteBytes) {
+ this.shuffleReadBytes = shuffleReadRemoteBytes;
+ valueChanged("shuffleReadBytes");
+ }
+
+ public void setShuffleReadRecords(long shuffleReadRecords) {
+ this.shuffleReadRecords = shuffleReadRecords;
+ valueChanged("shuffleReadRecords");
+ }
+
+ public void setShuffleWriteBytes(long shuffleWriteBytes) {
+ this.shuffleWriteBytes = shuffleWriteBytes;
+ valueChanged("shuffleWriteBytes");
+ }
+
+ public void setShuffleWriteRecords(long shuffleWriteRecords) {
+ this.shuffleWriteRecords = shuffleWriteRecords;
+ valueChanged("shuffleWriteRecords");
+ }
+
+ public void setExecutorDeserializeTime(long executorDeserializeTime) {
+ this.executorDeserializeTime = executorDeserializeTime;
+ valueChanged("executorDeserializeTime");
+ }
+
+ public void setExecutorRunTime(long executorRunTime) {
+ this.executorRunTime = executorRunTime;
+ valueChanged("executorRunTime");
+ }
+
+ public void setResultSize(long resultSize) {
+ this.resultSize = resultSize;
+ valueChanged("resultSize");
+ }
+
+ public void setJvmGcTime(long jvmGcTime) {
+ this.jvmGcTime = jvmGcTime;
+ valueChanged("jvmGcTime");
+ }
+
+ public void setResultSerializationTime(long resultSerializationTime) {
+ this.resultSerializationTime = resultSerializationTime;
+ valueChanged("resultSerializationTime");
+ }
+
+ public void setMemoryBytesSpilled(long memoryBytesSpilled) {
+ this.memoryBytesSpilled = memoryBytesSpilled;
+ valueChanged("memoryBytesSpilled");
+ }
+
+ public void setDiskBytesSpilled(long diskBytesSpilled) {
+ this.diskBytesSpilled = diskBytesSpilled;
+ valueChanged("diskBytesSpilled");
+ }
+
+ public void setExecMemoryBytes(long execMemoryBytes) {
+ this.execMemoryBytes = execMemoryBytes;
+ valueChanged("execMemoryBytes");
+ }
+
+ public void setDriveMemoryBytes(long driveMemoryBytes) {
+ this.driveMemoryBytes = driveMemoryBytes;
+ valueChanged("driveMemoryBytes");
+ }
+
+ public void setCompleteTasks(int completeTasks){
+ this.completeTasks = completeTasks;
+ valueChanged("completeTasks");
+ }
+
+ public long getTotalExecutorTime() {
+ return totalExecutorTime;
+ }
+
+ public void setTotalExecutorTime(long totalExecutorTime) {
+ this.totalExecutorTime = totalExecutorTime;
+ valueChanged("totalExecutorTime");
+ }
+
+ public long getExecutorMemoryOverhead() {
+ return executorMemoryOverhead;
+ }
+
+ public void setExecutorMemoryOverhead(long executorMemoryOverhead) {
+ this.executorMemoryOverhead = executorMemoryOverhead;
+ valueChanged("executorMemoryOverhead");
+ }
+
+ public long getDriverMemoryOverhead() {
+ return driverMemoryOverhead;
+ }
+
+ public void setDriverMemoryOverhead(long driverMemoryOverhead) {
+ this.driverMemoryOverhead = driverMemoryOverhead;
+ valueChanged("driverMemoryOverhead");
+ }
+
+ public int getExecutorCores() {
+ return executorCores;
+ }
+
+ public void setExecutorCores(int executorCores) {
+ this.executorCores = executorCores;
+ valueChanged("executorCores");
+ }
+
+ public int getDriverCores() {
+ return driverCores;
+ }
+
+ public void setDriverCores(int driverCores) {
+ this.driverCores = driverCores;
+ valueChanged("driverCores");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java
new file mode 100644
index 0000000..f4de84c
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.apache.eagle.jpm.util.Constants;
+
+@Table("eagleSparkRunningExecutors")
+@ColumnFamily("f")
+@Prefix("sparkExecutor")
+@Service(Constants.RUNNING_SPARK_EXECUTOR_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", "executorId","user", "queue"})
+@Partition({"site"})
+public class SparkExecutorEntity extends TaggedLogAPIEntity {
+ @Column("a")
+ private String hostPort;
+ @Column("b")
+ private int rddBlocks;
+ @Column("c")
+ private long memoryUsed;
+ @Column("d")
+ private long diskUsed;
+ @Column("e")
+ private int activeTasks = 0;
+ @Column("f")
+ private int failedTasks = 0;
+ @Column("g")
+ private int completedTasks = 0;
+ @Column("h")
+ private int totalTasks = 0;
+ @Column("i")
+ private long totalDuration = 0;
+ @Column("j")
+ private long totalInputBytes = 0;
+ @Column("k")
+ private long totalShuffleRead = 0;
+ @Column("l")
+ private long totalShuffleWrite = 0;
+ @Column("m")
+ private long maxMemory;
+ @Column("n")
+ private long startTime;
+ @Column("o")
+ private long endTime = 0;
+ @Column("p")
+ private long execMemoryBytes;
+ @Column("q")
+ private int cores;
+ @Column("r")
+ private long memoryOverhead;
+
+ public String getHostPort() {
+ return hostPort;
+ }
+
+ public void setHostPort(String hostPort) {
+ this.hostPort = hostPort;
+ this.valueChanged("hostPort");
+ }
+
+ public int getRddBlocks() {
+ return rddBlocks;
+ }
+
+ public void setRddBlocks(int rddBlocks) {
+ this.rddBlocks = rddBlocks;
+ this.valueChanged("rddBlocks");
+ }
+
+ public long getMemoryUsed() {
+ return memoryUsed;
+ }
+
+ public void setMemoryUsed(long memoryUsed) {
+ this.memoryUsed = memoryUsed;
+ this.valueChanged("memoryUsed");
+ }
+
+ public long getDiskUsed() {
+ return diskUsed;
+ }
+
+ public void setDiskUsed(long diskUsed) {
+ this.diskUsed = diskUsed;
+ this.valueChanged("diskUsed");
+ }
+
+ public int getActiveTasks() {
+ return activeTasks;
+ }
+
+ public void setActiveTasks(int activeTasks) {
+ this.activeTasks = activeTasks;
+ this.valueChanged("activeTasks");
+ }
+
+ public int getFailedTasks() {
+ return failedTasks;
+ }
+
+ public void setFailedTasks(int failedTasks) {
+ this.failedTasks = failedTasks;
+ this.valueChanged("failedTasks");
+ }
+
+ public int getCompletedTasks() {
+ return completedTasks;
+ }
+
+ public void setCompletedTasks(int completedTasks) {
+ this.completedTasks = completedTasks;
+ this.valueChanged("completedTasks");
+ }
+
+ public int getTotalTasks() {
+ return totalTasks;
+ }
+
+ public void setTotalTasks(int totalTasks) {
+ this.totalTasks = totalTasks;
+ this.valueChanged("totalTasks");
+ }
+
+ public long getTotalDuration() {
+ return totalDuration;
+ }
+
+ public void setTotalDuration(long totalDuration) {
+ this.totalDuration = totalDuration;
+ this.valueChanged("totalDuration");
+ }
+
+ public long getTotalInputBytes() {
+ return totalInputBytes;
+ }
+
+ public void setTotalInputBytes(long totalInputBytes) {
+ this.totalInputBytes = totalInputBytes;
+ this.valueChanged("totalInputBytes");
+ }
+
+ public long getTotalShuffleRead() {
+ return totalShuffleRead;
+ }
+
+ public void setTotalShuffleRead(long totalShuffleRead) {
+ this.totalShuffleRead = totalShuffleRead;
+ this.valueChanged("totalShuffleRead");
+ }
+
+ public long getTotalShuffleWrite() {
+ return totalShuffleWrite;
+ }
+
+ public void setTotalShuffleWrite(long totalShuffleWrite) {
+ this.totalShuffleWrite = totalShuffleWrite;
+ this.valueChanged("totalShuffleWrite");
+ }
+
+ public long getMaxMemory() {
+ return maxMemory;
+ }
+
+ public void setMaxMemory(long maxMemory) {
+ this.maxMemory = maxMemory;
+ this.valueChanged("maxMemory");
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ valueChanged("startTime");
+ }
+
+ public long getEndTime() {
+ return endTime;
+ }
+
+ public void setEndTime(long endTime) {
+ this.endTime = endTime;
+ this.valueChanged("endTime");
+ }
+
+ public long getExecMemoryBytes() {
+ return execMemoryBytes;
+ }
+
+ public void setExecMemoryBytes(long execMemoryBytes) {
+ this.execMemoryBytes = execMemoryBytes;
+ this.valueChanged("execMemoryBytes");
+ }
+
+ public int getCores() {
+ return cores;
+ }
+
+ public void setCores(int cores) {
+ this.cores = cores;
+ valueChanged("cores");
+ }
+
+ public long getMemoryOverhead() {
+ return memoryOverhead;
+ }
+
+ public void setMemoryOverhead(long memoryOverhead) {
+ this.memoryOverhead = memoryOverhead;
+ valueChanged("memoryOverhead");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java
new file mode 100644
index 0000000..1c2caa4
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.apache.eagle.jpm.util.Constants;
+
+import java.util.List;
+
+@Table("eagleSparkRunningJobs")
+@ColumnFamily("f")
+@Prefix("sparkJob")
+@Service(Constants.RUNNING_SPARK_JOB_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", "jobId","user", "queue"})
+@Partition({"site"})
+public class SparkJobEntity extends TaggedLogAPIEntity {
+ @Column("a")
+ private long submissionTime;
+ @Column("b")
+ private long completionTime;
+ @Column("c")
+ private int numStages = 0;
+ @Column("d")
+ private String status;
+ @Column("e")
+ private int numTask = 0;
+ @Column("f")
+ private int numActiveTasks = 0;
+ @Column("g")
+ private int numCompletedTasks = 0;
+ @Column("h")
+ private int numSkippedTasks = 0;
+ @Column("i")
+ private int numFailedTasks = 0;
+ @Column("j")
+ private int numActiveStages = 0;
+ @Column("k")
+ private int numCompletedStages = 0;
+ @Column("l")
+ private int numSkippedStages = 0;
+ @Column("m")
+ private int numFailedStages = 0;
+ @Column("n")
+ private List<Integer> stages;
+
+ public List<Integer> getStages() {
+ return stages;
+ }
+
+ public void setStages(List<Integer> stages) {
+ this.stages = stages;
+ this.valueChanged("stages");
+ }
+
+ public long getSubmissionTime() {
+ return submissionTime;
+ }
+
+ public long getCompletionTime() {
+ return completionTime;
+ }
+
+ public int getNumStages() {
+ return numStages;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public int getNumTask() {
+ return numTask;
+ }
+
+ public int getNumActiveTasks() {
+ return numActiveTasks;
+ }
+
+ public int getNumCompletedTasks() {
+ return numCompletedTasks;
+ }
+
+ public int getNumSkippedTasks() {
+ return numSkippedTasks;
+ }
+
+ public int getNumFailedTasks() {
+ return numFailedTasks;
+ }
+
+ public int getNumActiveStages() {
+ return numActiveStages;
+ }
+
+ public int getNumCompletedStages() {
+ return numCompletedStages;
+ }
+
+ public int getNumSkippedStages() {
+ return numSkippedStages;
+ }
+
+ public int getNumFailedStages() {
+ return numFailedStages;
+ }
+
+ public void setSubmissionTime(long submissionTime) {
+ this.submissionTime = submissionTime;
+ this.valueChanged("submissionTime");
+ }
+
+ public void setCompletionTime(long completionTime) {
+ this.completionTime = completionTime;
+ this.valueChanged("completionTime");
+ }
+
+ public void setNumStages(int numStages) {
+ this.numStages = numStages;
+ this.valueChanged("numStages");
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ this.valueChanged("status");
+ }
+
+ public void setNumTask(int numTask) {
+ this.numTask = numTask;
+ this.valueChanged("numTask");
+ }
+
+ public void setNumActiveTasks(int numActiveTasks) {
+ this.numActiveTasks = numActiveTasks;
+ this.valueChanged("numActiveTasks");
+ }
+
+ public void setNumCompletedTasks(int numCompletedTasks) {
+ this.numCompletedTasks = numCompletedTasks;
+ this.valueChanged("numCompletedTasks");
+ }
+
+ public void setNumSkippedTasks(int numSkippedTasks) {
+ this.numSkippedTasks = numSkippedTasks;
+ this.valueChanged("numSkippedTasks");
+ }
+
+ public void setNumFailedTasks(int numFailedTasks) {
+ this.numFailedTasks = numFailedTasks;
+ this.valueChanged("numFailedTasks");
+ }
+
+ public void setNumActiveStages(int numActiveStages) {
+ this.numActiveStages = numActiveStages;
+ this.valueChanged("numActiveStages");
+ }
+
+ public void setNumCompletedStages(int numCompletedStages) {
+ this.numCompletedStages = numCompletedStages;
+ this.valueChanged("numCompletedStages");
+ }
+
+ public void setNumSkippedStages(int numSkippedStages) {
+ this.numSkippedStages = numSkippedStages;
+ this.valueChanged("numSkippedStages");
+ }
+
+ public void setNumFailedStages(int numFailedStages) {
+ this.numFailedStages = numFailedStages;
+ this.valueChanged("numFailedStages");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java
new file mode 100644
index 0000000..72dbe40
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.apache.eagle.jpm.util.Constants;
+
+@Table("eagleSparkRunningStages")
+@ColumnFamily("f")
+@Prefix("sparkStage")
+@Service(Constants.RUNNING_SPARK_STAGE_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", "jobId", "stageId","stageAttemptId","user", "queue"})
+@Partition({"site"})
+public class SparkStageEntity extends TaggedLogAPIEntity {
+ @Column("a")
+ private String status;
+ @Column("b")
+ private int numActiveTasks = 0;
+ @Column("c")
+ private int numCompletedTasks = 0;
+ @Column("d")
+ private int numFailedTasks = 0;
+ @Column("e")
+ private long executorRunTime = 0l;
+ @Column("f")
+ private long inputBytes = 0l;
+ @Column("g")
+ private long inputRecords = 0l;
+ @Column("h")
+ private long outputBytes = 0l;
+ @Column("i")
+ private long outputRecords = 0l;
+ @Column("j")
+ private long shuffleReadBytes = 0l;
+ @Column("k")
+ private long shuffleReadRecords = 0l;
+ @Column("l")
+ private long shuffleWriteBytes = 0l;
+ @Column("m")
+ private long shuffleWriteRecords = 0l;
+ @Column("n")
+ private long memoryBytesSpilled = 0l;
+ @Column("o")
+ private long diskBytesSpilled = 0l;
+ @Column("p")
+ private String name;
+ @Column("q")
+ private String schedulingPool;
+ @Column("r")
+ private long submitTime;
+ @Column("s")
+ private long completeTime;
+ @Column("t")
+ private int numTasks;
+ @Column("u")
+ private long executorDeserializeTime;
+ @Column("v")
+ private long resultSize;
+ @Column("w")
+ private long jvmGcTime;
+ @Column("x")
+ private long resultSerializationTime;
+
+ public String getStatus() {
+ return status;
+ }
+
+ public int getNumActiveTasks() {
+ return numActiveTasks;
+ }
+
+ public int getNumCompletedTasks() {
+ return numCompletedTasks;
+ }
+
+ public int getNumFailedTasks() {
+ return numFailedTasks;
+ }
+
+ public long getExecutorRunTime() {
+ return executorRunTime;
+ }
+
+ public long getInputBytes() {
+ return inputBytes;
+ }
+
+ public long getInputRecords() {
+ return inputRecords;
+ }
+
+ public long getOutputBytes() {
+ return outputBytes;
+ }
+
+ public long getOutputRecords() {
+ return outputRecords;
+ }
+
+ public long getShuffleReadBytes() {
+ return shuffleReadBytes;
+ }
+
+ public long getShuffleReadRecords() {
+ return shuffleReadRecords;
+ }
+
+ public long getShuffleWriteBytes() {
+ return shuffleWriteBytes;
+ }
+
+ public long getShuffleWriteRecords() {
+ return shuffleWriteRecords;
+ }
+
+ public long getMemoryBytesSpilled() {
+ return memoryBytesSpilled;
+ }
+
+ public long getDiskBytesSpilled() {
+ return diskBytesSpilled;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getSchedulingPool() {
+ return schedulingPool;
+ }
+
+ public long getSubmitTime() {
+ return submitTime;
+ }
+
+ public long getCompleteTime() {
+ return completeTime;
+ }
+
+ public int getNumTasks() {
+ return numTasks;
+ }
+
+ public long getExecutorDeserializeTime() {
+ return executorDeserializeTime;
+ }
+
+ public long getResultSize() {
+ return resultSize;
+ }
+
+ public long getJvmGcTime() {
+ return jvmGcTime;
+ }
+
+ public long getResultSerializationTime() {
+ return resultSerializationTime;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ this.valueChanged("status");
+ }
+
+ public void setNumActiveTasks(int numActiveTasks) {
+ this.numActiveTasks = numActiveTasks;
+ this.valueChanged("numActiveTasks");
+ }
+
+ public void setNumCompletedTasks(int numCompletedTasks) {
+ this.numCompletedTasks = numCompletedTasks;
+ this.valueChanged("numCompletedTasks");
+ }
+
+ public void setNumFailedTasks(int numFailedTasks) {
+ this.numFailedTasks = numFailedTasks;
+ this.valueChanged("numFailedTasks");
+ }
+
+ public void setExecutorRunTime(long executorRunTime) {
+ this.executorRunTime = executorRunTime;
+ this.valueChanged("executorRunTime");
+ }
+
+ public void setInputBytes(long inputBytes) {
+ this.inputBytes = inputBytes;
+ this.valueChanged("inputBytes");
+ }
+
+ public void setInputRecords(long inputRecords) {
+ this.inputRecords = inputRecords;
+ this.valueChanged("inputRecords");
+ }
+
+ public void setOutputBytes(long outputBytes) {
+ this.outputBytes = outputBytes;
+ this.valueChanged("outputBytes");
+ }
+
+ public void setOutputRecords(long outputRecords) {
+ this.outputRecords = outputRecords;
+ this.valueChanged("outputRecords");
+ }
+
+ public void setShuffleReadBytes(long shuffleReadBytes) {
+ this.shuffleReadBytes = shuffleReadBytes;
+ this.valueChanged("shuffleReadBytes");
+ }
+
+ public void setShuffleReadRecords(long shuffleReadRecords) {
+ this.shuffleReadRecords = shuffleReadRecords;
+ this.valueChanged("shuffleReadRecords");
+ }
+
+ public void setShuffleWriteBytes(long shuffleWriteBytes) {
+ this.shuffleWriteBytes = shuffleWriteBytes;
+ this.valueChanged("shuffleWriteBytes");
+ }
+
+ public void setShuffleWriteRecords(long shuffleWriteRecords) {
+ this.shuffleWriteRecords = shuffleWriteRecords;
+ this.valueChanged("shuffleWriteRecords");
+ }
+
+ public void setMemoryBytesSpilled(long memoryBytesSpilled) {
+ this.memoryBytesSpilled = memoryBytesSpilled;
+ this.valueChanged("memoryBytesSpilled");
+ }
+
+ public void setDiskBytesSpilled(long diskBytesSpilled) {
+ this.diskBytesSpilled = diskBytesSpilled;
+ this.valueChanged("diskBytesSpilled");
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ this.valueChanged("name");
+ }
+
+ public void setSchedulingPool(String schedulingPool) {
+ this.schedulingPool = schedulingPool;
+ this.valueChanged("schedulingPool");
+ }
+
+ public void setSubmitTime(long submitTime) {
+ this.submitTime = submitTime;
+ this.valueChanged("submitTime");
+ }
+
+ public void setCompleteTime(long completeTime) {
+ this.completeTime = completeTime;
+ this.valueChanged("completeTime");
+ }
+
+ public void setNumTasks(int numTasks) {
+ this.numTasks = numTasks;
+ valueChanged("numTasks");
+ }
+
+ public void setExecutorDeserializeTime(long executorDeserializeTime) {
+ this.executorDeserializeTime = executorDeserializeTime;
+ valueChanged("executorDeserializeTime");
+ }
+
+ public void setResultSize(long resultSize) {
+ this.resultSize = resultSize;
+ valueChanged("resultSize");
+ }
+
+ public void setJvmGcTime(long jvmGcTime) {
+ this.jvmGcTime = jvmGcTime;
+ valueChanged("jvmGcTime");
+ }
+
+ public void setResultSerializationTime(long resultSerializationTime) {
+ this.resultSerializationTime = resultSerializationTime;
+ valueChanged("resultSerializationTime");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java
new file mode 100644
index 0000000..183a62a
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.apache.eagle.jpm.util.Constants;
+
+@Table("eagleSparkRunningTasks")
+@ColumnFamily("f")
+@Prefix("sparkTask")
+@Service(Constants.RUNNING_SPARK_TASK_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", "jobId", "jobName", "stageId","stageAttemptId","taskIndex","taskAttemptId","user", "queue"})
+@Partition({"site"})
+public class SparkTaskEntity extends TaggedLogAPIEntity {
+ @Column("a")
+ private int taskId;
+ @Column("b")
+ private long launchTime;
+ @Column("c")
+ private String executorId;
+ @Column("d")
+ private String host;
+ @Column("e")
+ private String taskLocality;
+ @Column("f")
+ private boolean speculative;
+ @Column("g")
+ private long executorDeserializeTime;
+ @Column("h")
+ private long executorRunTime;
+ @Column("i")
+ private long resultSize;
+ @Column("j")
+ private long jvmGcTime;
+ @Column("k")
+ private long resultSerializationTime;
+ @Column("l")
+ private long memoryBytesSpilled;
+ @Column("m")
+ private long diskBytesSpilled;
+ @Column("n")
+ private long inputBytes;
+ @Column("o")
+ private long inputRecords;
+ @Column("p")
+ private long outputBytes;
+ @Column("q")
+ private long outputRecords;
+ @Column("r")
+ private long shuffleReadRemoteBytes;
+ @Column("x")
+ private long shuffleReadLocalBytes;
+ @Column("s")
+ private long shuffleReadRecords;
+ @Column("t")
+ private long shuffleWriteBytes;
+ @Column("u")
+ private long shuffleWriteRecords;
+ @Column("v")
+ private boolean failed;
+
+ public int getTaskId() {
+ return taskId;
+ }
+
+ public long getLaunchTime() {
+ return launchTime;
+ }
+
+ public String getExecutorId() {
+ return executorId;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public String getTaskLocality() {
+ return taskLocality;
+ }
+
+ public boolean isSpeculative() {
+ return speculative;
+ }
+
+ public long getExecutorDeserializeTime() {
+ return executorDeserializeTime;
+ }
+
+ public long getExecutorRunTime() {
+ return executorRunTime;
+ }
+
+ public long getResultSize() {
+ return resultSize;
+ }
+
+ public long getJvmGcTime() {
+ return jvmGcTime;
+ }
+
+ public long getResultSerializationTime() {
+ return resultSerializationTime;
+ }
+
+ public long getMemoryBytesSpilled() {
+ return memoryBytesSpilled;
+ }
+
+ public long getDiskBytesSpilled() {
+ return diskBytesSpilled;
+ }
+
+ public long getInputBytes() {
+ return inputBytes;
+ }
+
+ public long getInputRecords() {
+ return inputRecords;
+ }
+
+ public long getOutputBytes() {
+ return outputBytes;
+ }
+
+ public long getOutputRecords() {
+ return outputRecords;
+ }
+
+ public long getShuffleReadRecords() {
+ return shuffleReadRecords;
+ }
+
+ public long getShuffleWriteBytes() {
+ return shuffleWriteBytes;
+ }
+
+ public long getShuffleWriteRecords() {
+ return shuffleWriteRecords;
+ }
+
+ public boolean isFailed() {
+ return failed;
+ }
+
+ public long getShuffleReadRemoteBytes() {
+ return shuffleReadRemoteBytes;
+ }
+
+ public long getShuffleReadLocalBytes() {
+ return shuffleReadLocalBytes;
+ }
+
+ public void setFailed(boolean failed) {
+ this.failed = failed;
+ valueChanged("failed");
+ }
+
+ public void setTaskId(int taskId) {
+ this.taskId = taskId;
+ valueChanged("taskId");
+ }
+
+ public void setLaunchTime(long launchTime) {
+ this.launchTime = launchTime;
+ valueChanged("launchTime");
+ }
+
+ public void setExecutorId(String executorId) {
+ this.executorId = executorId;
+ valueChanged("executorId");
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ this.valueChanged("host");
+ }
+
+ public void setTaskLocality(String taskLocality) {
+ this.taskLocality = taskLocality;
+ this.valueChanged("taskLocality");
+ }
+
+ public void setSpeculative(boolean speculative) {
+ this.speculative = speculative;
+ this.valueChanged("speculative");
+ }
+
+ public void setExecutorDeserializeTime(long executorDeserializeTime) {
+ this.executorDeserializeTime = executorDeserializeTime;
+ this.valueChanged("executorDeserializeTime");
+ }
+
+ public void setExecutorRunTime(long executorRunTime) {
+ this.executorRunTime = executorRunTime;
+ this.valueChanged("executorRunTime");
+ }
+
+ public void setResultSize(long resultSize) {
+ this.resultSize = resultSize;
+ this.valueChanged("resultSize");
+ }
+
+ public void setJvmGcTime(long jvmGcTime) {
+ this.jvmGcTime = jvmGcTime;
+ this.valueChanged("jvmGcTime");
+ }
+
+ public void setResultSerializationTime(long resultSerializationTime) {
+ this.resultSerializationTime = resultSerializationTime;
+ this.valueChanged("resultSerializationTime");
+ }
+
+ public void setMemoryBytesSpilled(long memoryBytesSpilled) {
+ this.memoryBytesSpilled = memoryBytesSpilled;
+ this.valueChanged("memoryBytesSpilled");
+ }
+
+ public void setDiskBytesSpilled(long diskBytesSpilled) {
+ this.diskBytesSpilled = diskBytesSpilled;
+ this.valueChanged("diskBytesSpilled");
+ }
+
+ public void setInputBytes(long inputBytes) {
+ this.inputBytes = inputBytes;
+ this.valueChanged("inputBytes");
+ }
+
+ public void setInputRecords(long inputRecords) {
+ this.inputRecords = inputRecords;
+ this.valueChanged("inputRecords");
+ }
+
+ public void setOutputBytes(long outputBytes) {
+ this.outputBytes = outputBytes;
+ this.valueChanged("outputBytes");
+ }
+
+ public void setOutputRecords(long outputRecords) {
+ this.outputRecords = outputRecords;
+ this.valueChanged("outputRecords");
+ }
+
+
+
+ public void setShuffleReadRecords(long shuffleReadRecords) {
+ this.shuffleReadRecords = shuffleReadRecords;
+ this.valueChanged("shuffleReadRecords");
+ }
+
+ public void setShuffleWriteBytes(long shuffleWriteBytes) {
+ this.shuffleWriteBytes = shuffleWriteBytes;
+ this.valueChanged("shuffleWriteBytes");
+ }
+
+ public void setShuffleWriteRecords(long shuffleWriteRecords) {
+ this.shuffleWriteRecords = shuffleWriteRecords;
+ this.valueChanged("shuffleWriteRecords");
+ }
+
+ public void setShuffleReadRemoteBytes(long shuffleReadRemoteBytes) {
+ this.shuffleReadRemoteBytes = shuffleReadRemoteBytes;
+ this.valueChanged("shuffleReadRemoteBytes");
+ }
+
+ public void setShuffleReadLocalBytes(long shuffleReadLocalBytes) {
+ this.shuffleReadLocalBytes = shuffleReadLocalBytes;
+ this.valueChanged("shuffleReadLocalBytes");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java
new file mode 100644
index 0000000..5491a80
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.spark.running.parser;
+
+import org.apache.eagle.jpm.spark.running.common.SparkRunningConfigManager;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SparkAppEntityCreationHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(SparkAppEntityCreationHandler.class);
+
+ private List<TaggedLogAPIEntity> entities = new ArrayList<>();
+ private SparkRunningConfigManager.EagleServiceConfig eagleServiceConfig;
+
+ public SparkAppEntityCreationHandler(SparkRunningConfigManager.EagleServiceConfig eagleServiceConfig) {
+ this.eagleServiceConfig = eagleServiceConfig;
+ }
+
+ public void add(TaggedLogAPIEntity entity) {
+ entities.add(entity);
+ if (entities.size() >= eagleServiceConfig.maxFlushNum) {
+ this.flush();
+ }
+ }
+
+ public boolean flush() {
+ //need flush right now
+ if (entities.size() == 0) {
+ return true;
+ }
+
+
+ try (IEagleServiceClient client = new EagleServiceClientImpl(
+ eagleServiceConfig.eagleServiceHost,
+ eagleServiceConfig.eagleServicePort,
+ eagleServiceConfig.username,
+ eagleServiceConfig.password)) {
+ client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
+ LOG.info("start to flush spark app entities, size {}", entities.size());
+ client.create(entities);
+ LOG.info("finish flushing spark app entities, size {}", entities.size());
+ entities.clear();
+ } catch (Exception e) {
+ LOG.warn("exception found when flush entities, {}", e);
+ e.printStackTrace();
+ return false;
+ }
+
+ return true;
+ }
+}