You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/08/09 05:25:32 UTC

[4/8] incubator-eagle git commit: [EAGLE-422] eagle support for mr & spark running job monitoring

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/assembly/eagle-jpm-spark-running-assembly.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/assembly/eagle-jpm-spark-running-assembly.xml b/eagle-jpm/eagle-jpm-spark-running/src/assembly/eagle-jpm-spark-running-assembly.xml
new file mode 100644
index 0000000..66133a0
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/assembly/eagle-jpm-spark-running-assembly.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+          xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+    <id>assembly</id>
+    <formats>
+        <format>jar</format>
+    </formats>
+    <includeBaseDirectory>false</includeBaseDirectory>
+    <dependencySets>
+        <dependencySet>
+            <outputDirectory>/</outputDirectory>
+            <useProjectArtifact>false</useProjectArtifact>
+            <unpack>true</unpack>
+            <scope>runtime</scope>
+            <!--includes>
+                <include>org.apache.hadoop:hadoop-common</include>
+                <include>org.apache.hadoop:hadoop-hdfs</include>
+                <include>org.apache.hadoop:hadoop-client</include>
+                <include>org.apache.hadoop:hadoop-auth</include>
+                <include>org.apache.eagle:eagle-stream-process-api</include>
+                <include>org.apache.eagle:eagle-stream-process-base</include>
+                <include>org.jsoup:jsoup</include>
+            </includes-->
+            <excludes>
+                <exclude>org.wso2.orbit.com.lmax:disruptor</exclude>
+                <exclude>asm:asm</exclude>
+                <exclude>org.apache.storm:storm-core</exclude>
+            </excludes>
+        </dependencySet>
+    </dependencySets>
+    <fileSets>
+        <fileSet>
+            <directory>${project.build.outputDirectory}/</directory>
+            <outputDirectory>/</outputDirectory>
+            <!--<includes>-->
+            <!--<include>*.conf</include>-->
+            <!--<include>*.xml</include>-->
+            <!--<include>*.properties</include>-->
+            <!--<include>*.config</include>-->
+            <!--<include>classes/META-INF/*</include>-->
+            <!--</includes>-->
+
+            <excludes>
+                <exclude>*.yaml</exclude>
+            </excludes>
+        </fileSet>
+    </fileSets>
+</assembly>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobMain.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobMain.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobMain.java
new file mode 100644
index 0000000..749f4d1
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobMain.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.spark.running;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import org.apache.eagle.jpm.spark.running.common.SparkRunningConfigManager;
+import org.apache.eagle.jpm.spark.running.storm.SparkRunningJobFetchSpout;
+import org.apache.eagle.jpm.spark.running.storm.SparkRunningJobParseBolt;
+
+public class SparkRunningJobMain {
+    public static void main(String[] args) {
+        try {
+            //1. trigger init conf
+            SparkRunningConfigManager sparkRunningConfigManager = SparkRunningConfigManager.getInstance(args);
+
+            //2. init topology
+            TopologyBuilder topologyBuilder = new TopologyBuilder();
+            String topologyName = sparkRunningConfigManager.getConfig().getString("envContextConfig.topologyName");
+            String spoutName = "sparkRunningJobFetchSpout";
+            String boltName = "sparkRunningJobParseBolt";
+            int parallelism = sparkRunningConfigManager.getConfig().getInt("envContextConfig.parallelismConfig." + spoutName);
+            int tasks = sparkRunningConfigManager.getConfig().getInt("envContextConfig.tasks." + spoutName);
+            if (parallelism > tasks) {
+                parallelism = tasks;
+            }
+            topologyBuilder.setSpout(
+                    spoutName,
+                    new SparkRunningJobFetchSpout(
+                            sparkRunningConfigManager.getJobExtractorConfig(),
+                            sparkRunningConfigManager.getEndpointConfig(),
+                            sparkRunningConfigManager.getZkStateConfig()),
+                    parallelism
+            ).setNumTasks(tasks);
+
+            parallelism = sparkRunningConfigManager.getConfig().getInt("envContextConfig.parallelismConfig." + boltName);
+            tasks = sparkRunningConfigManager.getConfig().getInt("envContextConfig.tasks." + boltName);
+            if (parallelism > tasks) {
+                parallelism = tasks;
+            }
+            topologyBuilder.setBolt(boltName,
+                    new SparkRunningJobParseBolt(
+                            sparkRunningConfigManager.getZkStateConfig(),
+                            sparkRunningConfigManager.getEagleServiceConfig(),
+                            sparkRunningConfigManager.getEndpointConfig(),
+                            sparkRunningConfigManager.getJobExtractorConfig()),
+                    parallelism).setNumTasks(tasks).fieldsGrouping(spoutName, new Fields("appId"));
+
+            backtype.storm.Config config = new backtype.storm.Config();
+            config.setNumWorkers(sparkRunningConfigManager.getConfig().getInt("envContextConfig.workers"));
+            config.put(Config.TOPOLOGY_DEBUG, true);
+            if (!sparkRunningConfigManager.getEnv().equals("local")) {
+                //cluster mode
+                //parse conf here
+                StormSubmitter.submitTopology(topologyName, config, topologyBuilder.createTopology());
+            } else {
+                //local mode
+                LocalCluster cluster = new LocalCluster();
+                cluster.submitTopology(topologyName, config, topologyBuilder.createTopology());
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/common/SparkRunningConfigManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/common/SparkRunningConfigManager.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/common/SparkRunningConfigManager.java
new file mode 100644
index 0000000..b05d12e
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/common/SparkRunningConfigManager.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.spark.running.common;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.dataproc.util.ConfigOptionParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+public class SparkRunningConfigManager implements Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(SparkRunningConfigManager.class);
+
+    public String getEnv() {
+        return env;
+    }
+    private String env;
+
+    public ZKStateConfig getZkStateConfig() { return zkStateConfig; }
+    private ZKStateConfig zkStateConfig;
+
+    public EagleServiceConfig getEagleServiceConfig() {
+        return eagleServiceConfig;
+    }
+    private EagleServiceConfig eagleServiceConfig;
+
+    public JobExtractorConfig getJobExtractorConfig() {
+        return jobExtractorConfig;
+    }
+    private JobExtractorConfig jobExtractorConfig;
+
+    public EndpointConfig getEndpointConfig() {
+        return endpointConfig;
+    }
+    private EndpointConfig endpointConfig;
+
+    public static class ZKStateConfig implements Serializable {
+        public String zkQuorum;
+        public String zkRoot;
+        public int zkSessionTimeoutMs;
+        public int zkRetryTimes;
+        public int zkRetryInterval;
+        public String zkPort;
+        public boolean recoverEnabled;
+    }
+
+    public static class EagleServiceConfig implements Serializable {
+        public String eagleServiceHost;
+        public int eagleServicePort;
+        public int readTimeoutSeconds;
+        public int maxFlushNum;
+        public String username;
+        public String password;
+    }
+
+    public static class JobExtractorConfig implements Serializable {
+        public String site;
+        public int fetchRunningJobInterval;
+        public int parseThreadPoolSize;
+    }
+
+    public static class EndpointConfig implements Serializable {
+        public String nnEndpoint;
+        public String eventLog;
+        public String[] rmUrls;
+        public String principal;
+        public String keyTab;
+    }
+
+    public Config getConfig() {
+        return config;
+    }
+    private Config config;
+
+    private static SparkRunningConfigManager manager = new SparkRunningConfigManager();
+
+    private SparkRunningConfigManager() {
+        this.eagleServiceConfig = new EagleServiceConfig();
+        this.jobExtractorConfig = new JobExtractorConfig();
+        this.endpointConfig = new EndpointConfig();
+        this.zkStateConfig = new ZKStateConfig();
+    }
+
+    public static SparkRunningConfigManager getInstance(String[] args) {
+        manager.init(args);
+        return manager;
+    }
+
+    private void init(String[] args) {
+        try {
+            LOG.info("Loading from configuration file");
+            this.config = new ConfigOptionParser().load(args);
+        } catch (Exception e) {
+            LOG.error("failed to load config");
+        }
+
+        this.env = config.getString("envContextConfig.env");
+
+        this.zkStateConfig.zkQuorum = config.getString("zookeeperConfig.zkQuorum");
+        this.zkStateConfig.zkPort = config.getString("zookeeperConfig.zkPort");
+        this.zkStateConfig.zkSessionTimeoutMs = config.getInt("zookeeperConfig.zkSessionTimeoutMs");
+        this.zkStateConfig.zkRetryTimes = config.getInt("zookeeperConfig.zkRetryTimes");
+        this.zkStateConfig.zkRetryInterval = config.getInt("zookeeperConfig.zkRetryInterval");
+        this.zkStateConfig.zkRoot = config.getString("zookeeperConfig.zkRoot");
+        this.zkStateConfig.recoverEnabled = config.getBoolean("zookeeperConfig.recoverEnabled");
+
+        // parse eagle service endpoint
+        this.eagleServiceConfig.eagleServiceHost = config.getString("eagleProps.eagleService.host");
+        String port = config.getString("eagleProps.eagleService.port");
+        this.eagleServiceConfig.eagleServicePort = (port == null ? 8080 : Integer.parseInt(port));
+        this.eagleServiceConfig.username = config.getString("eagleProps.eagleService.username");
+        this.eagleServiceConfig.password = config.getString("eagleProps.eagleService.password");
+        this.eagleServiceConfig.readTimeoutSeconds = config.getInt("eagleProps.eagleService.readTimeOutSeconds");
+        this.eagleServiceConfig.maxFlushNum = config.getInt("eagleProps.eagleService.maxFlushNum");
+
+        //parse job extractor
+        this.jobExtractorConfig.site = config.getString("jobExtractorConfig.site");
+        this.jobExtractorConfig.fetchRunningJobInterval = config.getInt("jobExtractorConfig.fetchRunningJobInterval");
+        this.jobExtractorConfig.parseThreadPoolSize = config.getInt("jobExtractorConfig.parseThreadPoolSize");
+
+        //parse data source config
+        this.endpointConfig.eventLog = config.getString("dataSourceConfig.eventLog");
+        this.endpointConfig.nnEndpoint = config.getString("dataSourceConfig.nnEndpoint");
+        this.endpointConfig.keyTab = config.getString("dataSourceConfig.keytab");
+        this.endpointConfig.principal = config.getString("dataSourceConfig.principal");
+        this.endpointConfig.rmUrls = config.getStringList("dataSourceConfig.rmUrls").toArray(new String[0]);
+
+        LOG.info("Successfully initialized SparkRunningConfigManager");
+        LOG.info("env: " + this.env);
+        LOG.info("site: " + this.jobExtractorConfig.site);
+        LOG.info("eagle.service.host: " + this.eagleServiceConfig.eagleServiceHost);
+        LOG.info("eagle.service.port: " + this.eagleServiceConfig.eagleServicePort);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/common/Util.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/common/Util.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/common/Util.java
new file mode 100644
index 0000000..6c9f8f5
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/common/Util.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.spark.running.common;
+
+import org.apache.hadoop.fs.Path;
+
+public class Util {
+    public static String getAppAttemptLogName(String appId, String attemptId) {
+        if (attemptId.equals("0")) {
+            return appId;
+        }
+        return appId + "_" + attemptId;
+    }
+
+    public static Path getFilePath(String baseDir, String appAttemptLogName) {
+        String attemptLogDir = baseDir + "/" + appAttemptLogName;
+        return new Path(attemptLogDir);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java
new file mode 100644
index 0000000..5d1cfaa
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import org.apache.eagle.log.entity.repo.EntityRepository;
+
+public class JPMEntityRepository extends EntityRepository {
+    public JPMEntityRepository() {
+        entitySet.add(SparkAppEntity.class);
+        entitySet.add(SparkJobEntity.class);
+        entitySet.add(SparkStageEntity.class);
+        entitySet.add(SparkTaskEntity.class);
+        entitySet.add(SparkExecutorEntity.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java
new file mode 100644
index 0000000..e18f1e7
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import java.io.Serializable;
+import java.util.HashMap;
+
+public class JobConfig extends HashMap<String, String> implements Serializable {
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java
new file mode 100644
index 0000000..7b8f648
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.apache.eagle.jpm.util.Constants;
+
+@Table("eagleSparkRunningApps")
+@ColumnFamily("f")
+@Prefix("sparkApp")
+@Service(Constants.RUNNING_SPARK_APP_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", "user", "queue"})
+@Partition({"site"})
+public class SparkAppEntity extends TaggedLogAPIEntity {
+    @Column("a")
+    private long  startTime;
+    @Column("b")
+    private long endTime;
+    @Column("c")
+    private String yarnState;
+    @Column("d")
+    private String yarnStatus;
+    @Column("e")
+    private JobConfig config;
+    @Column("f")
+    private int numJobs;
+    @Column("g")
+    private int totalStages;
+    @Column("h")
+    private int skippedStages;
+    @Column("i")
+    private int failedStages;
+    @Column("j")
+    private int totalTasks;
+    @Column("k")
+    private int skippedTasks;
+    @Column("l")
+    private int failedTasks;
+    @Column("m")
+    private int executors;
+    @Column("n")
+    private long inputBytes;
+    @Column("o")
+    private long inputRecords;
+    @Column("p")
+    private long outputBytes;
+    @Column("q")
+    private long outputRecords;
+    @Column("r")
+    private long shuffleReadBytes;
+    @Column("s")
+    private long shuffleReadRecords;
+    @Column("t")
+    private long shuffleWriteBytes;
+    @Column("u")
+    private long shuffleWriteRecords;
+    @Column("v")
+    private long executorDeserializeTime;
+    @Column("w")
+    private long executorRunTime;
+    @Column("x")
+    private long resultSize;
+    @Column("y")
+    private long jvmGcTime;
+    @Column("z")
+    private long resultSerializationTime;
+    @Column("ab")
+    private long memoryBytesSpilled;
+    @Column("ac")
+    private long diskBytesSpilled;
+    @Column("ad")
+    private long execMemoryBytes;
+    @Column("ae")
+    private long driveMemoryBytes;
+    @Column("af")
+    private int completeTasks;
+    @Column("ag")
+    private long totalExecutorTime;
+    @Column("ah")
+    private long executorMemoryOverhead;
+    @Column("ai")
+    private long driverMemoryOverhead;
+    @Column("aj")
+    private int executorCores;
+    @Column("ak")
+    private int driverCores;
+    @Column("al")
+    private AppInfo appInfo;
+    @Column("am")
+    private int activeStages;
+    @Column("an")
+    private int completeStages;
+    @Column("ba")
+    private int activeTasks;
+
+    public int getActiveTasks() {
+        return activeTasks;
+    }
+
+    public void setActiveTasks(int activeTasks) {
+        this.activeTasks = activeTasks;
+        valueChanged("activeTasks");
+    }
+
+    public int getCompleteStages() {
+        return completeStages;
+    }
+
+    public void setCompleteStages(int completeStages) {
+        this.completeStages = completeStages;
+        valueChanged("completeStages");
+    }
+
+    public int getActiveStages() {
+        return activeStages;
+    }
+
+    public void setActiveStages(int activeStages) {
+        this.activeStages = activeStages;
+        valueChanged("activeStages");
+    }
+
+    public AppInfo getAppInfo() {
+        return appInfo;
+    }
+
+    public void setAppInfo(AppInfo appInfo) {
+        this.appInfo = appInfo;
+        valueChanged("appInfo");
+    }
+
+    public long getStartTime() {
+        return startTime;
+    }
+
+    public long getEndTime() {
+        return endTime;
+    }
+
+    public String getYarnState() {
+        return yarnState;
+    }
+
+    public String getYarnStatus() {
+        return yarnStatus;
+    }
+
+    public int getNumJobs() {
+        return numJobs;
+    }
+
+    public int getTotalStages() {
+        return totalStages;
+    }
+
+    public int getSkippedStages() {
+        return skippedStages;
+    }
+
+    public int getFailedStages() {
+        return failedStages;
+    }
+
+    public int getTotalTasks() {
+        return totalTasks;
+    }
+
+    public int getSkippedTasks() {
+        return skippedTasks;
+    }
+
+    public int getFailedTasks() {
+        return failedTasks;
+    }
+
+    public int getExecutors() {
+        return executors;
+    }
+
+    public long getInputBytes() {
+        return inputBytes;
+    }
+
+    public long getInputRecords() {
+        return inputRecords;
+    }
+
+    public long getOutputBytes() {
+        return outputBytes;
+    }
+
+    public long getOutputRecords() {
+        return outputRecords;
+    }
+
+    public long getShuffleReadBytes() {
+        return shuffleReadBytes;
+    }
+
+    public long getShuffleReadRecords() {
+        return shuffleReadRecords;
+    }
+
+    public long getShuffleWriteBytes() {
+        return shuffleWriteBytes;
+    }
+
+    public long getShuffleWriteRecords() {
+        return shuffleWriteRecords;
+    }
+
+    public long getExecutorDeserializeTime() {
+        return executorDeserializeTime;
+    }
+
+    public long getExecutorRunTime() {
+        return executorRunTime;
+    }
+
+    public long getResultSize() {
+        return resultSize;
+    }
+
+    public long getJvmGcTime() {
+        return jvmGcTime;
+    }
+
+    public long getResultSerializationTime() {
+        return resultSerializationTime;
+    }
+
+    public long getMemoryBytesSpilled() {
+        return memoryBytesSpilled;
+    }
+
+    public long getDiskBytesSpilled() {
+        return diskBytesSpilled;
+    }
+
+    public long getExecMemoryBytes() {
+        return execMemoryBytes;
+    }
+
+    public long getDriveMemoryBytes() {
+        return driveMemoryBytes;
+    }
+
+    public int getCompleteTasks(){ return completeTasks;}
+
+    public JobConfig getConfig() {
+        return config;
+    }
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+        valueChanged("startTime");
+    }
+
+    public void setEndTime(long endTime) {
+        this.endTime = endTime;
+        valueChanged("endTime");
+    }
+
+    public void setYarnState(String yarnState) {
+        this.yarnState = yarnState;
+        valueChanged("yarnState");
+    }
+
+    public void setYarnStatus(String yarnStatus) {
+        this.yarnStatus = yarnStatus;
+        valueChanged("yarnStatus");
+    }
+
+    public void setConfig(JobConfig config) {
+        this.config = config;
+        valueChanged("config");
+    }
+
+    public void setNumJobs(int numJobs) {
+        this.numJobs = numJobs;
+        valueChanged("numJobs");
+    }
+
+    public void setTotalStages(int totalStages) {
+        this.totalStages = totalStages;
+        valueChanged("totalStages");
+    }
+
+    public void setSkippedStages(int skippedStages) {
+        this.skippedStages = skippedStages;
+        valueChanged("skippedStages");
+    }
+
+    public void setFailedStages(int failedStages) {
+        this.failedStages = failedStages;
+        valueChanged("failedStages");
+    }
+
+    public void setTotalTasks(int totalTasks) {
+        this.totalTasks = totalTasks;
+        valueChanged("totalTasks");
+    }
+
+    public void setSkippedTasks(int skippedTasks) {
+        this.skippedTasks = skippedTasks;
+        valueChanged("skippedTasks");
+    }
+
+    public void setFailedTasks(int failedTasks) {
+        this.failedTasks = failedTasks;
+        valueChanged("failedTasks");
+    }
+
+    public void setExecutors(int executors) {
+        this.executors = executors;
+        valueChanged("executors");
+    }
+
+    public void setInputBytes(long inputBytes) {
+        this.inputBytes = inputBytes;
+        valueChanged("inputBytes");
+    }
+
+    public void setInputRecords(long inputRecords) {
+        this.inputRecords = inputRecords;
+        valueChanged("inputRecords");
+    }
+
+    public void setOutputBytes(long outputBytes) {
+        this.outputBytes = outputBytes;
+        valueChanged("outputBytes");
+    }
+
+    public void setOutputRecords(long outputRecords) {
+        this.outputRecords = outputRecords;
+        valueChanged("outputRecords");
+    }
+
+    public void setShuffleReadBytes(long shuffleReadRemoteBytes) {
+        this.shuffleReadBytes = shuffleReadRemoteBytes;
+        valueChanged("shuffleReadBytes");
+    }
+
+    public void setShuffleReadRecords(long shuffleReadRecords) {
+        this.shuffleReadRecords = shuffleReadRecords;
+        valueChanged("shuffleReadRecords");
+    }
+
+    public void setShuffleWriteBytes(long shuffleWriteBytes) {
+        this.shuffleWriteBytes = shuffleWriteBytes;
+        valueChanged("shuffleWriteBytes");
+    }
+
+    public void setShuffleWriteRecords(long shuffleWriteRecords) {
+        this.shuffleWriteRecords = shuffleWriteRecords;
+        valueChanged("shuffleWriteRecords");
+    }
+
+    public void setExecutorDeserializeTime(long executorDeserializeTime) {
+        this.executorDeserializeTime = executorDeserializeTime;
+        valueChanged("executorDeserializeTime");
+    }
+
+    public void setExecutorRunTime(long executorRunTime) {
+        this.executorRunTime = executorRunTime;
+        valueChanged("executorRunTime");
+    }
+
+    public void setResultSize(long resultSize) {
+        this.resultSize = resultSize;
+        valueChanged("resultSize");
+    }
+
+    public void setJvmGcTime(long jvmGcTime) {
+        this.jvmGcTime = jvmGcTime;
+        valueChanged("jvmGcTime");
+    }
+
+    public void setResultSerializationTime(long resultSerializationTime) {
+        this.resultSerializationTime = resultSerializationTime;
+        valueChanged("resultSerializationTime");
+    }
+
+    public void setMemoryBytesSpilled(long memoryBytesSpilled) {
+        this.memoryBytesSpilled = memoryBytesSpilled;
+        valueChanged("memoryBytesSpilled");
+    }
+
+    public void setDiskBytesSpilled(long diskBytesSpilled) {
+        this.diskBytesSpilled = diskBytesSpilled;
+        valueChanged("diskBytesSpilled");
+    }
+
+    public void setExecMemoryBytes(long execMemoryBytes) {
+        this.execMemoryBytes = execMemoryBytes;
+        valueChanged("execMemoryBytes");
+    }
+
+    public void setDriveMemoryBytes(long driveMemoryBytes) {
+        this.driveMemoryBytes = driveMemoryBytes;
+        valueChanged("driveMemoryBytes");
+    }
+
+    public void setCompleteTasks(int completeTasks){
+        this.completeTasks = completeTasks;
+        valueChanged("completeTasks");
+    }
+
+    public long getTotalExecutorTime() {
+        return totalExecutorTime;
+    }
+
+    public void setTotalExecutorTime(long totalExecutorTime) {
+        this.totalExecutorTime = totalExecutorTime;
+        valueChanged("totalExecutorTime");
+    }
+
+    public long getExecutorMemoryOverhead() {
+        return executorMemoryOverhead;
+    }
+
+    public void setExecutorMemoryOverhead(long executorMemoryOverhead) {
+        this.executorMemoryOverhead = executorMemoryOverhead;
+        valueChanged("executorMemoryOverhead");
+    }
+
+    public long getDriverMemoryOverhead() {
+        return driverMemoryOverhead;
+    }
+
+    public void setDriverMemoryOverhead(long driverMemoryOverhead) {
+        this.driverMemoryOverhead = driverMemoryOverhead;
+        valueChanged("driverMemoryOverhead");
+    }
+
+    public int getExecutorCores() {
+        return executorCores;
+    }
+
+    public void setExecutorCores(int executorCores) {
+        this.executorCores = executorCores;
+        valueChanged("executorCores");
+    }
+
+    public int getDriverCores() {
+        return driverCores;
+    }
+
+    public void setDriverCores(int driverCores) {
+        this.driverCores = driverCores;
+        valueChanged("driverCores");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java
new file mode 100644
index 0000000..f4de84c
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.apache.eagle.jpm.util.Constants;
+
+@Table("eagleSparkRunningExecutors")
+@ColumnFamily("f")
+@Prefix("sparkExecutor")
+@Service(Constants.RUNNING_SPARK_EXECUTOR_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", "executorId","user", "queue"})
+@Partition({"site"})
+public class SparkExecutorEntity extends TaggedLogAPIEntity {
+    @Column("a")
+    private String hostPort;
+    @Column("b")
+    private int rddBlocks;
+    @Column("c")
+    private long memoryUsed;
+    @Column("d")
+    private long diskUsed;
+    @Column("e")
+    private int activeTasks = 0;
+    @Column("f")
+    private int failedTasks = 0;
+    @Column("g")
+    private int completedTasks = 0;
+    @Column("h")
+    private int totalTasks = 0;
+    @Column("i")
+    private long totalDuration = 0;
+    @Column("j")
+    private long totalInputBytes = 0;
+    @Column("k")
+    private long totalShuffleRead = 0;
+    @Column("l")
+    private long totalShuffleWrite = 0;
+    @Column("m")
+    private long maxMemory;
+    @Column("n")
+    private long startTime;
+    @Column("o")
+    private long endTime = 0;
+    @Column("p")
+    private long execMemoryBytes;
+    @Column("q")
+    private int cores;
+    @Column("r")
+    private long memoryOverhead;
+
+    public String getHostPort() {
+        return hostPort;
+    }
+
+    public void setHostPort(String hostPort) {
+        this.hostPort = hostPort;
+        this.valueChanged("hostPort");
+    }
+
+    public int getRddBlocks() {
+        return rddBlocks;
+    }
+
+    public void setRddBlocks(int rddBlocks) {
+        this.rddBlocks = rddBlocks;
+        this.valueChanged("rddBlocks");
+    }
+
+    public long getMemoryUsed() {
+        return memoryUsed;
+    }
+
+    public void setMemoryUsed(long memoryUsed) {
+        this.memoryUsed = memoryUsed;
+        this.valueChanged("memoryUsed");
+    }
+
+    public long getDiskUsed() {
+        return diskUsed;
+    }
+
+    public void setDiskUsed(long diskUsed) {
+        this.diskUsed = diskUsed;
+        this.valueChanged("diskUsed");
+    }
+
+    public int getActiveTasks() {
+        return activeTasks;
+    }
+
+    public void setActiveTasks(int activeTasks) {
+        this.activeTasks = activeTasks;
+        this.valueChanged("activeTasks");
+    }
+
+    public int getFailedTasks() {
+        return failedTasks;
+    }
+
+    public void setFailedTasks(int failedTasks) {
+        this.failedTasks = failedTasks;
+        this.valueChanged("failedTasks");
+    }
+
+    public int getCompletedTasks() {
+        return completedTasks;
+    }
+
+    public void setCompletedTasks(int completedTasks) {
+        this.completedTasks = completedTasks;
+        this.valueChanged("completedTasks");
+    }
+
+    public int getTotalTasks() {
+        return totalTasks;
+    }
+
+    public void setTotalTasks(int totalTasks) {
+        this.totalTasks = totalTasks;
+        this.valueChanged("totalTasks");
+    }
+
+    public long getTotalDuration() {
+        return totalDuration;
+    }
+
+    public void setTotalDuration(long totalDuration) {
+        this.totalDuration = totalDuration;
+        this.valueChanged("totalDuration");
+    }
+
+    public long getTotalInputBytes() {
+        return totalInputBytes;
+    }
+
+    public void setTotalInputBytes(long totalInputBytes) {
+        this.totalInputBytes = totalInputBytes;
+        this.valueChanged("totalInputBytes");
+    }
+
+    public long getTotalShuffleRead() {
+        return totalShuffleRead;
+    }
+
+    public void setTotalShuffleRead(long totalShuffleRead) {
+        this.totalShuffleRead = totalShuffleRead;
+        this.valueChanged("totalShuffleRead");
+    }
+
+    public long getTotalShuffleWrite() {
+        return totalShuffleWrite;
+    }
+
+    public void setTotalShuffleWrite(long totalShuffleWrite) {
+        this.totalShuffleWrite = totalShuffleWrite;
+        this.valueChanged("totalShuffleWrite");
+    }
+
+    public long getMaxMemory() {
+        return maxMemory;
+    }
+
+    public void setMaxMemory(long maxMemory) {
+        this.maxMemory = maxMemory;
+        this.valueChanged("maxMemory");
+    }
+
+    public long getStartTime() {
+        return startTime;
+    }
+
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+        valueChanged("startTime");
+    }
+
+    public long getEndTime() {
+        return endTime;
+    }
+
+    public void setEndTime(long endTime) {
+        this.endTime = endTime;
+        this.valueChanged("endTime");
+    }
+
+    public long getExecMemoryBytes() {
+        return execMemoryBytes;
+    }
+
+    public void setExecMemoryBytes(long execMemoryBytes) {
+        this.execMemoryBytes = execMemoryBytes;
+        this.valueChanged("execMemoryBytes");
+    }
+
+    public int getCores() {
+        return cores;
+    }
+
+    public void setCores(int cores) {
+        this.cores = cores;
+        valueChanged("cores");
+    }
+
+    public long getMemoryOverhead() {
+        return memoryOverhead;
+    }
+
+    public void setMemoryOverhead(long memoryOverhead) {
+        this.memoryOverhead = memoryOverhead;
+        valueChanged("memoryOverhead");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java
new file mode 100644
index 0000000..1c2caa4
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.apache.eagle.jpm.util.Constants;
+
+import java.util.List;
+
+@Table("eagleSparkRunningJobs")
+@ColumnFamily("f")
+@Prefix("sparkJob")
+@Service(Constants.RUNNING_SPARK_JOB_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", "jobId","user", "queue"})
+@Partition({"site"})
+public class SparkJobEntity extends TaggedLogAPIEntity {
+    @Column("a")
+    private long  submissionTime;
+    @Column("b")
+    private long completionTime;
+    @Column("c")
+    private int numStages = 0;
+    @Column("d")
+    private String status;
+    @Column("e")
+    private int numTask = 0;
+    @Column("f")
+    private int numActiveTasks = 0;
+    @Column("g")
+    private int numCompletedTasks = 0;
+    @Column("h")
+    private int numSkippedTasks = 0;
+    @Column("i")
+    private int numFailedTasks = 0;
+    @Column("j")
+    private int numActiveStages = 0;
+    @Column("k")
+    private int numCompletedStages = 0;
+    @Column("l")
+    private int numSkippedStages = 0;
+    @Column("m")
+    private int numFailedStages = 0;
+    @Column("n")
+    private List<Integer> stages;
+
+    public List<Integer> getStages() {
+        return stages;
+    }
+
+    public void setStages(List<Integer> stages) {
+        this.stages = stages;
+        this.valueChanged("stages");
+    }
+
+    public long getSubmissionTime() {
+        return submissionTime;
+    }
+
+    public long getCompletionTime() {
+        return completionTime;
+    }
+
+    public int getNumStages() {
+        return numStages;
+    }
+
+    public String getStatus() {
+        return status;
+    }
+
+    public int getNumTask() {
+        return numTask;
+    }
+
+    public int getNumActiveTasks() {
+        return numActiveTasks;
+    }
+
+    public int getNumCompletedTasks() {
+        return numCompletedTasks;
+    }
+
+    public int getNumSkippedTasks() {
+        return numSkippedTasks;
+    }
+
+    public int getNumFailedTasks() {
+        return numFailedTasks;
+    }
+
+    public int getNumActiveStages() {
+        return numActiveStages;
+    }
+
+    public int getNumCompletedStages() {
+        return numCompletedStages;
+    }
+
+    public int getNumSkippedStages() {
+        return numSkippedStages;
+    }
+
+    public int getNumFailedStages() {
+        return numFailedStages;
+    }
+
+    public void setSubmissionTime(long submissionTime) {
+        this.submissionTime = submissionTime;
+        this.valueChanged("submissionTime");
+    }
+
+    public void setCompletionTime(long completionTime) {
+        this.completionTime = completionTime;
+        this.valueChanged("completionTime");
+    }
+
+    public void setNumStages(int numStages) {
+        this.numStages = numStages;
+        this.valueChanged("numStages");
+    }
+
+    public void setStatus(String status) {
+        this.status = status;
+        this.valueChanged("status");
+    }
+
+    public void setNumTask(int numTask) {
+        this.numTask = numTask;
+        this.valueChanged("numTask");
+    }
+
+    public void setNumActiveTasks(int numActiveTasks) {
+        this.numActiveTasks = numActiveTasks;
+        this.valueChanged("numActiveTasks");
+    }
+
+    public void setNumCompletedTasks(int numCompletedTasks) {
+        this.numCompletedTasks = numCompletedTasks;
+        this.valueChanged("numCompletedTasks");
+    }
+
+    public void setNumSkippedTasks(int numSkippedTasks) {
+        this.numSkippedTasks = numSkippedTasks;
+        this.valueChanged("numSkippedTasks");
+    }
+
+    public void setNumFailedTasks(int numFailedTasks) {
+        this.numFailedTasks = numFailedTasks;
+        this.valueChanged("numFailedTasks");
+    }
+
+    public void setNumActiveStages(int numActiveStages) {
+        this.numActiveStages = numActiveStages;
+        this.valueChanged("numActiveStages");
+    }
+
+    public void setNumCompletedStages(int numCompletedStages) {
+        this.numCompletedStages = numCompletedStages;
+        this.valueChanged("numCompletedStages");
+    }
+
+    public void setNumSkippedStages(int numSkippedStages) {
+        this.numSkippedStages = numSkippedStages;
+        this.valueChanged("numSkippedStages");
+    }
+
+    public void setNumFailedStages(int numFailedStages) {
+        this.numFailedStages = numFailedStages;
+        this.valueChanged("numFailedStages");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java
new file mode 100644
index 0000000..72dbe40
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.apache.eagle.jpm.util.Constants;
+
+@Table("eagleSparkRunningStages")
+@ColumnFamily("f")
+@Prefix("sparkStage")
+@Service(Constants.RUNNING_SPARK_STAGE_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", "jobId", "stageId","stageAttemptId","user", "queue"})
+@Partition({"site"})
+public class SparkStageEntity extends TaggedLogAPIEntity {
+    @Column("a")
+    private String status;
+    @Column("b")
+    private int numActiveTasks = 0;
+    @Column("c")
+    private int numCompletedTasks = 0;
+    @Column("d")
+    private int numFailedTasks = 0;
+    @Column("e")
+    private long executorRunTime = 0l;
+    @Column("f")
+    private long inputBytes = 0l;
+    @Column("g")
+    private long inputRecords = 0l;
+    @Column("h")
+    private long outputBytes = 0l;
+    @Column("i")
+    private long outputRecords = 0l;
+    @Column("j")
+    private long shuffleReadBytes = 0l;
+    @Column("k")
+    private long shuffleReadRecords = 0l;
+    @Column("l")
+    private long shuffleWriteBytes = 0l;
+    @Column("m")
+    private long shuffleWriteRecords = 0l;
+    @Column("n")
+    private long memoryBytesSpilled = 0l;
+    @Column("o")
+    private long diskBytesSpilled = 0l;
+    @Column("p")
+    private String name;
+    @Column("q")
+    private String schedulingPool;
+    @Column("r")
+    private long submitTime;
+    @Column("s")
+    private long completeTime;
+    @Column("t")
+    private int numTasks;
+    @Column("u")
+    private long executorDeserializeTime;
+    @Column("v")
+    private long resultSize;
+    @Column("w")
+    private long jvmGcTime;
+    @Column("x")
+    private long resultSerializationTime;
+
+    public String getStatus() {
+        return status;
+    }
+
+    public int getNumActiveTasks() {
+        return numActiveTasks;
+    }
+
+    public int getNumCompletedTasks() {
+        return numCompletedTasks;
+    }
+
+    public int getNumFailedTasks() {
+        return numFailedTasks;
+    }
+
+    public long getExecutorRunTime() {
+        return executorRunTime;
+    }
+
+    public long getInputBytes() {
+        return inputBytes;
+    }
+
+    public long getInputRecords() {
+        return inputRecords;
+    }
+
+    public long getOutputBytes() {
+        return outputBytes;
+    }
+
+    public long getOutputRecords() {
+        return outputRecords;
+    }
+
+    public long getShuffleReadBytes() {
+        return shuffleReadBytes;
+    }
+
+    public long getShuffleReadRecords() {
+        return shuffleReadRecords;
+    }
+
+    public long getShuffleWriteBytes() {
+        return shuffleWriteBytes;
+    }
+
+    public long getShuffleWriteRecords() {
+        return shuffleWriteRecords;
+    }
+
+    public long getMemoryBytesSpilled() {
+        return memoryBytesSpilled;
+    }
+
+    public long getDiskBytesSpilled() {
+        return diskBytesSpilled;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public String getSchedulingPool() {
+        return schedulingPool;
+    }
+
+    public long getSubmitTime() {
+        return submitTime;
+    }
+
+    public long getCompleteTime() {
+        return completeTime;
+    }
+
+    public int getNumTasks() {
+        return numTasks;
+    }
+
+    public long getExecutorDeserializeTime() {
+        return executorDeserializeTime;
+    }
+
+    public long getResultSize() {
+        return resultSize;
+    }
+
+    public long getJvmGcTime() {
+        return jvmGcTime;
+    }
+
+    public long getResultSerializationTime() {
+        return resultSerializationTime;
+    }
+
+    public void setStatus(String status) {
+        this.status = status;
+        this.valueChanged("status");
+    }
+
+    public void setNumActiveTasks(int numActiveTasks) {
+        this.numActiveTasks = numActiveTasks;
+        this.valueChanged("numActiveTasks");
+    }
+
+    public void setNumCompletedTasks(int numCompletedTasks) {
+        this.numCompletedTasks = numCompletedTasks;
+        this.valueChanged("numCompletedTasks");
+    }
+
+    public void setNumFailedTasks(int numFailedTasks) {
+        this.numFailedTasks = numFailedTasks;
+        this.valueChanged("numFailedTasks");
+    }
+
+    public void setExecutorRunTime(long executorRunTime) {
+        this.executorRunTime = executorRunTime;
+        this.valueChanged("executorRunTime");
+    }
+
+    public void setInputBytes(long inputBytes) {
+        this.inputBytes = inputBytes;
+        this.valueChanged("inputBytes");
+    }
+
+    public void setInputRecords(long inputRecords) {
+        this.inputRecords = inputRecords;
+        this.valueChanged("inputRecords");
+    }
+
+    public void setOutputBytes(long outputBytes) {
+        this.outputBytes = outputBytes;
+        this.valueChanged("outputBytes");
+    }
+
+    public void setOutputRecords(long outputRecords) {
+        this.outputRecords = outputRecords;
+        this.valueChanged("outputRecords");
+    }
+
+    public void setShuffleReadBytes(long shuffleReadBytes) {
+        this.shuffleReadBytes = shuffleReadBytes;
+        this.valueChanged("shuffleReadBytes");
+    }
+
+    public void setShuffleReadRecords(long shuffleReadRecords) {
+        this.shuffleReadRecords = shuffleReadRecords;
+        this.valueChanged("shuffleReadRecords");
+    }
+
+    public void setShuffleWriteBytes(long shuffleWriteBytes) {
+        this.shuffleWriteBytes = shuffleWriteBytes;
+        this.valueChanged("shuffleWriteBytes");
+    }
+
+    public void setShuffleWriteRecords(long shuffleWriteRecords) {
+        this.shuffleWriteRecords = shuffleWriteRecords;
+        this.valueChanged("shuffleWriteRecords");
+    }
+
+    public void setMemoryBytesSpilled(long memoryBytesSpilled) {
+        this.memoryBytesSpilled = memoryBytesSpilled;
+        this.valueChanged("memoryBytesSpilled");
+    }
+
+    public void setDiskBytesSpilled(long diskBytesSpilled) {
+        this.diskBytesSpilled = diskBytesSpilled;
+        this.valueChanged("diskBytesSpilled");
+    }
+
+    public void setName(String name) {
+        this.name = name;
+        this.valueChanged("name");
+    }
+
+    public void setSchedulingPool(String schedulingPool) {
+        this.schedulingPool = schedulingPool;
+        this.valueChanged("schedulingPool");
+    }
+
+    public void setSubmitTime(long submitTime) {
+        this.submitTime = submitTime;
+        this.valueChanged("submitTime");
+    }
+
+    public void setCompleteTime(long completeTime) {
+        this.completeTime = completeTime;
+        this.valueChanged("completeTime");
+    }
+
+    public void setNumTasks(int numTasks) {
+        this.numTasks = numTasks;
+        valueChanged("numTasks");
+    }
+
+    public void setExecutorDeserializeTime(long executorDeserializeTime) {
+        this.executorDeserializeTime = executorDeserializeTime;
+        valueChanged("executorDeserializeTime");
+    }
+
+    public void setResultSize(long resultSize) {
+        this.resultSize = resultSize;
+        valueChanged("resultSize");
+    }
+
+    public void setJvmGcTime(long jvmGcTime) {
+        this.jvmGcTime = jvmGcTime;
+        valueChanged("jvmGcTime");
+    }
+
+    public void setResultSerializationTime(long resultSerializationTime) {
+        this.resultSerializationTime = resultSerializationTime;
+        valueChanged("resultSerializationTime");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java
new file mode 100644
index 0000000..183a62a
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.apache.eagle.jpm.util.Constants;
+
+@Table("eagleSparkRunningTasks")
+@ColumnFamily("f")
+@Prefix("sparkTask")
+@Service(Constants.RUNNING_SPARK_TASK_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", "jobId", "jobName", "stageId","stageAttemptId","taskIndex","taskAttemptId","user", "queue"})
+@Partition({"site"})
+public class SparkTaskEntity extends TaggedLogAPIEntity {
+    @Column("a")
+    private int taskId;
+    @Column("b")
+    private long launchTime;
+    @Column("c")
+    private String executorId;
+    @Column("d")
+    private String host;
+    @Column("e")
+    private String taskLocality;
+    @Column("f")
+    private boolean speculative;
+    @Column("g")
+    private long executorDeserializeTime;
+    @Column("h")
+    private long executorRunTime;
+    @Column("i")
+    private long resultSize;
+    @Column("j")
+    private long jvmGcTime;
+    @Column("k")
+    private long resultSerializationTime;
+    @Column("l")
+    private long memoryBytesSpilled;
+    @Column("m")
+    private long diskBytesSpilled;
+    @Column("n")
+    private long inputBytes;
+    @Column("o")
+    private long inputRecords;
+    @Column("p")
+    private long outputBytes;
+    @Column("q")
+    private long outputRecords;
+    @Column("r")
+    private long shuffleReadRemoteBytes;
+    @Column("x")
+    private long shuffleReadLocalBytes;
+    @Column("s")
+    private long shuffleReadRecords;
+    @Column("t")
+    private long shuffleWriteBytes;
+    @Column("u")
+    private long shuffleWriteRecords;
+    @Column("v")
+    private boolean failed;
+
+    public int getTaskId() {
+        return taskId;
+    }
+
+    public long getLaunchTime() {
+        return launchTime;
+    }
+
+    public String getExecutorId() {
+        return executorId;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public String getTaskLocality() {
+        return taskLocality;
+    }
+
+    public boolean isSpeculative() {
+        return speculative;
+    }
+
+    public long getExecutorDeserializeTime() {
+        return executorDeserializeTime;
+    }
+
+    public long getExecutorRunTime() {
+        return executorRunTime;
+    }
+
+    public long getResultSize() {
+        return resultSize;
+    }
+
+    public long getJvmGcTime() {
+        return jvmGcTime;
+    }
+
+    public long getResultSerializationTime() {
+        return resultSerializationTime;
+    }
+
+    public long getMemoryBytesSpilled() {
+        return memoryBytesSpilled;
+    }
+
+    public long getDiskBytesSpilled() {
+        return diskBytesSpilled;
+    }
+
+    public long getInputBytes() {
+        return inputBytes;
+    }
+
+    public long getInputRecords() {
+        return inputRecords;
+    }
+
+    public long getOutputBytes() {
+        return outputBytes;
+    }
+
+    public long getOutputRecords() {
+        return outputRecords;
+    }
+
+    public long getShuffleReadRecords() {
+        return shuffleReadRecords;
+    }
+
+    public long getShuffleWriteBytes() {
+        return shuffleWriteBytes;
+    }
+
+    public long getShuffleWriteRecords() {
+        return shuffleWriteRecords;
+    }
+
+    public boolean isFailed() {
+        return failed;
+    }
+
+    public long getShuffleReadRemoteBytes() {
+        return shuffleReadRemoteBytes;
+    }
+
+    public long getShuffleReadLocalBytes() {
+        return shuffleReadLocalBytes;
+    }
+
+    public void setFailed(boolean failed) {
+        this.failed = failed;
+        valueChanged("failed");
+    }
+
+    public void setTaskId(int taskId) {
+        this.taskId = taskId;
+        valueChanged("taskId");
+    }
+
+    public void setLaunchTime(long launchTime) {
+        this.launchTime = launchTime;
+        valueChanged("launchTime");
+    }
+
+    public void setExecutorId(String executorId) {
+        this.executorId = executorId;
+        valueChanged("executorId");
+    }
+
+    public void setHost(String host) {
+        this.host = host;
+        this.valueChanged("host");
+    }
+
+    public void setTaskLocality(String taskLocality) {
+        this.taskLocality = taskLocality;
+        this.valueChanged("taskLocality");
+    }
+
+    public void setSpeculative(boolean speculative) {
+        this.speculative = speculative;
+        this.valueChanged("speculative");
+    }
+
+    public void setExecutorDeserializeTime(long executorDeserializeTime) {
+        this.executorDeserializeTime = executorDeserializeTime;
+        this.valueChanged("executorDeserializeTime");
+    }
+
+    public void setExecutorRunTime(long executorRunTime) {
+        this.executorRunTime = executorRunTime;
+        this.valueChanged("executorRunTime");
+    }
+
+    public void setResultSize(long resultSize) {
+        this.resultSize = resultSize;
+        this.valueChanged("resultSize");
+    }
+
+    public void setJvmGcTime(long jvmGcTime) {
+        this.jvmGcTime = jvmGcTime;
+        this.valueChanged("jvmGcTime");
+    }
+
+    public void setResultSerializationTime(long resultSerializationTime) {
+        this.resultSerializationTime = resultSerializationTime;
+        this.valueChanged("resultSerializationTime");
+    }
+
+    public void setMemoryBytesSpilled(long memoryBytesSpilled) {
+        this.memoryBytesSpilled = memoryBytesSpilled;
+        this.valueChanged("memoryBytesSpilled");
+    }
+
+    public void setDiskBytesSpilled(long diskBytesSpilled) {
+        this.diskBytesSpilled = diskBytesSpilled;
+        this.valueChanged("diskBytesSpilled");
+    }
+
+    public void setInputBytes(long inputBytes) {
+        this.inputBytes = inputBytes;
+        this.valueChanged("inputBytes");
+    }
+
+    public void setInputRecords(long inputRecords) {
+        this.inputRecords = inputRecords;
+        this.valueChanged("inputRecords");
+    }
+
+    public void setOutputBytes(long outputBytes) {
+        this.outputBytes = outputBytes;
+        this.valueChanged("outputBytes");
+    }
+
+    public void setOutputRecords(long outputRecords) {
+        this.outputRecords = outputRecords;
+        this.valueChanged("outputRecords");
+    }
+
+
+
+    public void setShuffleReadRecords(long shuffleReadRecords) {
+        this.shuffleReadRecords = shuffleReadRecords;
+        this.valueChanged("shuffleReadRecords");
+    }
+
+    public void setShuffleWriteBytes(long shuffleWriteBytes) {
+        this.shuffleWriteBytes = shuffleWriteBytes;
+        this.valueChanged("shuffleWriteBytes");
+    }
+
+    public void setShuffleWriteRecords(long shuffleWriteRecords) {
+        this.shuffleWriteRecords = shuffleWriteRecords;
+        this.valueChanged("shuffleWriteRecords");
+    }
+
+    public void setShuffleReadRemoteBytes(long shuffleReadRemoteBytes) {
+        this.shuffleReadRemoteBytes = shuffleReadRemoteBytes;
+        this.valueChanged("shuffleReadRemoteBytes");
+    }
+
+    public void setShuffleReadLocalBytes(long shuffleReadLocalBytes) {
+        this.shuffleReadLocalBytes = shuffleReadLocalBytes;
+        this.valueChanged("shuffleReadLocalBytes");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java
new file mode 100644
index 0000000..5491a80
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.spark.running.parser;
+
+import org.apache.eagle.jpm.spark.running.common.SparkRunningConfigManager;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SparkAppEntityCreationHandler {
+    private static final Logger LOG = LoggerFactory.getLogger(SparkAppEntityCreationHandler.class);
+
+    private List<TaggedLogAPIEntity> entities = new ArrayList<>();
+    private SparkRunningConfigManager.EagleServiceConfig eagleServiceConfig;
+
+    public SparkAppEntityCreationHandler(SparkRunningConfigManager.EagleServiceConfig eagleServiceConfig) {
+        this.eagleServiceConfig = eagleServiceConfig;
+    }
+
+    public void add(TaggedLogAPIEntity entity) {
+        entities.add(entity);
+        if (entities.size() >= eagleServiceConfig.maxFlushNum) {
+            this.flush();
+        }
+    }
+
+    public boolean flush() {
+        //need flush right now
+        if (entities.size() == 0) {
+            return true;
+        }
+
+
+        try (IEagleServiceClient client = new EagleServiceClientImpl(
+                eagleServiceConfig.eagleServiceHost,
+                eagleServiceConfig.eagleServicePort,
+                eagleServiceConfig.username,
+                eagleServiceConfig.password)) {
+            client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
+            LOG.info("start to flush spark app entities, size {}", entities.size());
+            client.create(entities);
+            LOG.info("finish flushing spark app entities, size {}", entities.size());
+            entities.clear();
+        } catch (Exception e) {
+            LOG.warn("exception found when flush entities, {}", e);
+            e.printStackTrace();
+            return false;
+        }
+
+        return true;
+    }
+}