You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/09/07 02:36:40 UTC
[1/3] incubator-eagle git commit: Update spark history job feeder
config & refactor the code
Repository: incubator-eagle
Updated Branches:
refs/heads/develop 8774b85cd -> 3110c72e4
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java
deleted file mode 100644
index 7de1530..0000000
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java
+++ /dev/null
@@ -1,475 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.running.entities;
-
-import org.apache.eagle.jpm.util.Constants;
-import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.meta.*;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-
-@Table("eagleSparkRunningApps")
-@ColumnFamily("f")
-@Prefix("sparkApp")
-@Service(Constants.RUNNING_SPARK_APP_SERVICE_ENDPOINT_NAME)
-@JsonIgnoreProperties(ignoreUnknown = true)
-@TimeSeries(true)
-@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", "user", "queue"})
-@Partition({"site"})
-public class SparkAppEntity extends TaggedLogAPIEntity {
- @Column("a")
- private long startTime;
- @Column("b")
- private long endTime;
- @Column("c")
- private String yarnState;
- @Column("d")
- private String yarnStatus;
- @Column("e")
- private JobConfig config;
- @Column("f")
- private int numJobs;
- @Column("g")
- private int totalStages;
- @Column("h")
- private int skippedStages;
- @Column("i")
- private int failedStages;
- @Column("j")
- private int totalTasks;
- @Column("k")
- private int skippedTasks;
- @Column("l")
- private int failedTasks;
- @Column("m")
- private int executors;
- @Column("n")
- private long inputBytes;
- @Column("o")
- private long inputRecords;
- @Column("p")
- private long outputBytes;
- @Column("q")
- private long outputRecords;
- @Column("r")
- private long shuffleReadBytes;
- @Column("s")
- private long shuffleReadRecords;
- @Column("t")
- private long shuffleWriteBytes;
- @Column("u")
- private long shuffleWriteRecords;
- @Column("v")
- private long executorDeserializeTime;
- @Column("w")
- private long executorRunTime;
- @Column("x")
- private long resultSize;
- @Column("y")
- private long jvmGcTime;
- @Column("z")
- private long resultSerializationTime;
- @Column("ab")
- private long memoryBytesSpilled;
- @Column("ac")
- private long diskBytesSpilled;
- @Column("ad")
- private long execMemoryBytes;
- @Column("ae")
- private long driveMemoryBytes;
- @Column("af")
- private int completeTasks;
- @Column("ag")
- private long totalExecutorTime;
- @Column("ah")
- private long executorMemoryOverhead;
- @Column("ai")
- private long driverMemoryOverhead;
- @Column("aj")
- private int executorCores;
- @Column("ak")
- private int driverCores;
- @Column("al")
- private AppInfo appInfo;
- @Column("am")
- private int activeStages;
- @Column("an")
- private int completeStages;
- @Column("ba")
- private int activeTasks;
-
- public int getActiveTasks() {
- return activeTasks;
- }
-
- public void setActiveTasks(int activeTasks) {
- this.activeTasks = activeTasks;
- valueChanged("activeTasks");
- }
-
- public int getCompleteStages() {
- return completeStages;
- }
-
- public void setCompleteStages(int completeStages) {
- this.completeStages = completeStages;
- valueChanged("completeStages");
- }
-
- public int getActiveStages() {
- return activeStages;
- }
-
- public void setActiveStages(int activeStages) {
- this.activeStages = activeStages;
- valueChanged("activeStages");
- }
-
- public AppInfo getAppInfo() {
- return appInfo;
- }
-
- public void setAppInfo(AppInfo appInfo) {
- this.appInfo = appInfo;
- valueChanged("appInfo");
- }
-
- public long getStartTime() {
- return startTime;
- }
-
- public long getEndTime() {
- return endTime;
- }
-
- public String getYarnState() {
- return yarnState;
- }
-
- public String getYarnStatus() {
- return yarnStatus;
- }
-
- public int getNumJobs() {
- return numJobs;
- }
-
- public int getTotalStages() {
- return totalStages;
- }
-
- public int getSkippedStages() {
- return skippedStages;
- }
-
- public int getFailedStages() {
- return failedStages;
- }
-
- public int getTotalTasks() {
- return totalTasks;
- }
-
- public int getSkippedTasks() {
- return skippedTasks;
- }
-
- public int getFailedTasks() {
- return failedTasks;
- }
-
- public int getExecutors() {
- return executors;
- }
-
- public long getInputBytes() {
- return inputBytes;
- }
-
- public long getInputRecords() {
- return inputRecords;
- }
-
- public long getOutputBytes() {
- return outputBytes;
- }
-
- public long getOutputRecords() {
- return outputRecords;
- }
-
- public long getShuffleReadBytes() {
- return shuffleReadBytes;
- }
-
- public long getShuffleReadRecords() {
- return shuffleReadRecords;
- }
-
- public long getShuffleWriteBytes() {
- return shuffleWriteBytes;
- }
-
- public long getShuffleWriteRecords() {
- return shuffleWriteRecords;
- }
-
- public long getExecutorDeserializeTime() {
- return executorDeserializeTime;
- }
-
- public long getExecutorRunTime() {
- return executorRunTime;
- }
-
- public long getResultSize() {
- return resultSize;
- }
-
- public long getJvmGcTime() {
- return jvmGcTime;
- }
-
- public long getResultSerializationTime() {
- return resultSerializationTime;
- }
-
- public long getMemoryBytesSpilled() {
- return memoryBytesSpilled;
- }
-
- public long getDiskBytesSpilled() {
- return diskBytesSpilled;
- }
-
- public long getExecMemoryBytes() {
- return execMemoryBytes;
- }
-
- public long getDriveMemoryBytes() {
- return driveMemoryBytes;
- }
-
- public int getCompleteTasks() {
- return completeTasks;
- }
-
- public JobConfig getConfig() {
- return config;
- }
-
- public void setStartTime(long startTime) {
- this.startTime = startTime;
- valueChanged("startTime");
- }
-
- public void setEndTime(long endTime) {
- this.endTime = endTime;
- valueChanged("endTime");
- }
-
- public void setYarnState(String yarnState) {
- this.yarnState = yarnState;
- valueChanged("yarnState");
- }
-
- public void setYarnStatus(String yarnStatus) {
- this.yarnStatus = yarnStatus;
- valueChanged("yarnStatus");
- }
-
- public void setConfig(JobConfig config) {
- this.config = config;
- valueChanged("config");
- }
-
- public void setNumJobs(int numJobs) {
- this.numJobs = numJobs;
- valueChanged("numJobs");
- }
-
- public void setTotalStages(int totalStages) {
- this.totalStages = totalStages;
- valueChanged("totalStages");
- }
-
- public void setSkippedStages(int skippedStages) {
- this.skippedStages = skippedStages;
- valueChanged("skippedStages");
- }
-
- public void setFailedStages(int failedStages) {
- this.failedStages = failedStages;
- valueChanged("failedStages");
- }
-
- public void setTotalTasks(int totalTasks) {
- this.totalTasks = totalTasks;
- valueChanged("totalTasks");
- }
-
- public void setSkippedTasks(int skippedTasks) {
- this.skippedTasks = skippedTasks;
- valueChanged("skippedTasks");
- }
-
- public void setFailedTasks(int failedTasks) {
- this.failedTasks = failedTasks;
- valueChanged("failedTasks");
- }
-
- public void setExecutors(int executors) {
- this.executors = executors;
- valueChanged("executors");
- }
-
- public void setInputBytes(long inputBytes) {
- this.inputBytes = inputBytes;
- valueChanged("inputBytes");
- }
-
- public void setInputRecords(long inputRecords) {
- this.inputRecords = inputRecords;
- valueChanged("inputRecords");
- }
-
- public void setOutputBytes(long outputBytes) {
- this.outputBytes = outputBytes;
- valueChanged("outputBytes");
- }
-
- public void setOutputRecords(long outputRecords) {
- this.outputRecords = outputRecords;
- valueChanged("outputRecords");
- }
-
- public void setShuffleReadBytes(long shuffleReadRemoteBytes) {
- this.shuffleReadBytes = shuffleReadRemoteBytes;
- valueChanged("shuffleReadBytes");
- }
-
- public void setShuffleReadRecords(long shuffleReadRecords) {
- this.shuffleReadRecords = shuffleReadRecords;
- valueChanged("shuffleReadRecords");
- }
-
- public void setShuffleWriteBytes(long shuffleWriteBytes) {
- this.shuffleWriteBytes = shuffleWriteBytes;
- valueChanged("shuffleWriteBytes");
- }
-
- public void setShuffleWriteRecords(long shuffleWriteRecords) {
- this.shuffleWriteRecords = shuffleWriteRecords;
- valueChanged("shuffleWriteRecords");
- }
-
- public void setExecutorDeserializeTime(long executorDeserializeTime) {
- this.executorDeserializeTime = executorDeserializeTime;
- valueChanged("executorDeserializeTime");
- }
-
- public void setExecutorRunTime(long executorRunTime) {
- this.executorRunTime = executorRunTime;
- valueChanged("executorRunTime");
- }
-
- public void setResultSize(long resultSize) {
- this.resultSize = resultSize;
- valueChanged("resultSize");
- }
-
- public void setJvmGcTime(long jvmGcTime) {
- this.jvmGcTime = jvmGcTime;
- valueChanged("jvmGcTime");
- }
-
- public void setResultSerializationTime(long resultSerializationTime) {
- this.resultSerializationTime = resultSerializationTime;
- valueChanged("resultSerializationTime");
- }
-
- public void setMemoryBytesSpilled(long memoryBytesSpilled) {
- this.memoryBytesSpilled = memoryBytesSpilled;
- valueChanged("memoryBytesSpilled");
- }
-
- public void setDiskBytesSpilled(long diskBytesSpilled) {
- this.diskBytesSpilled = diskBytesSpilled;
- valueChanged("diskBytesSpilled");
- }
-
- public void setExecMemoryBytes(long execMemoryBytes) {
- this.execMemoryBytes = execMemoryBytes;
- valueChanged("execMemoryBytes");
- }
-
- public void setDriveMemoryBytes(long driveMemoryBytes) {
- this.driveMemoryBytes = driveMemoryBytes;
- valueChanged("driveMemoryBytes");
- }
-
- public void setCompleteTasks(int completeTasks) {
- this.completeTasks = completeTasks;
- valueChanged("completeTasks");
- }
-
- public long getTotalExecutorTime() {
- return totalExecutorTime;
- }
-
- public void setTotalExecutorTime(long totalExecutorTime) {
- this.totalExecutorTime = totalExecutorTime;
- valueChanged("totalExecutorTime");
- }
-
- public long getExecutorMemoryOverhead() {
- return executorMemoryOverhead;
- }
-
- public void setExecutorMemoryOverhead(long executorMemoryOverhead) {
- this.executorMemoryOverhead = executorMemoryOverhead;
- valueChanged("executorMemoryOverhead");
- }
-
- public long getDriverMemoryOverhead() {
- return driverMemoryOverhead;
- }
-
- public void setDriverMemoryOverhead(long driverMemoryOverhead) {
- this.driverMemoryOverhead = driverMemoryOverhead;
- valueChanged("driverMemoryOverhead");
- }
-
- public int getExecutorCores() {
- return executorCores;
- }
-
- public void setExecutorCores(int executorCores) {
- this.executorCores = executorCores;
- valueChanged("executorCores");
- }
-
- public int getDriverCores() {
- return driverCores;
- }
-
- public void setDriverCores(int driverCores) {
- this.driverCores = driverCores;
- valueChanged("driverCores");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java
deleted file mode 100644
index 89549ca..0000000
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.running.entities;
-
-import org.apache.eagle.jpm.util.Constants;
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.meta.*;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-
-@Table("eagleSparkRunningExecutors")
-@ColumnFamily("f")
-@Prefix("sparkExecutor")
-@Service(Constants.RUNNING_SPARK_EXECUTOR_SERVICE_ENDPOINT_NAME)
-@JsonIgnoreProperties(ignoreUnknown = true)
-@TimeSeries(true)
-@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", "executorId","user", "queue"})
-@Partition({"site"})
-public class SparkExecutorEntity extends TaggedLogAPIEntity {
- @Column("a")
- private String hostPort;
- @Column("b")
- private int rddBlocks;
- @Column("c")
- private long memoryUsed;
- @Column("d")
- private long diskUsed;
- @Column("e")
- private int activeTasks = 0;
- @Column("f")
- private int failedTasks = 0;
- @Column("g")
- private int completedTasks = 0;
- @Column("h")
- private int totalTasks = 0;
- @Column("i")
- private long totalDuration = 0;
- @Column("j")
- private long totalInputBytes = 0;
- @Column("k")
- private long totalShuffleRead = 0;
- @Column("l")
- private long totalShuffleWrite = 0;
- @Column("m")
- private long maxMemory;
- @Column("n")
- private long startTime;
- @Column("o")
- private long endTime = 0;
- @Column("p")
- private long execMemoryBytes;
- @Column("q")
- private int cores;
- @Column("r")
- private long memoryOverhead;
-
- public String getHostPort() {
- return hostPort;
- }
-
- public void setHostPort(String hostPort) {
- this.hostPort = hostPort;
- this.valueChanged("hostPort");
- }
-
- public int getRddBlocks() {
- return rddBlocks;
- }
-
- public void setRddBlocks(int rddBlocks) {
- this.rddBlocks = rddBlocks;
- this.valueChanged("rddBlocks");
- }
-
- public long getMemoryUsed() {
- return memoryUsed;
- }
-
- public void setMemoryUsed(long memoryUsed) {
- this.memoryUsed = memoryUsed;
- this.valueChanged("memoryUsed");
- }
-
- public long getDiskUsed() {
- return diskUsed;
- }
-
- public void setDiskUsed(long diskUsed) {
- this.diskUsed = diskUsed;
- this.valueChanged("diskUsed");
- }
-
- public int getActiveTasks() {
- return activeTasks;
- }
-
- public void setActiveTasks(int activeTasks) {
- this.activeTasks = activeTasks;
- this.valueChanged("activeTasks");
- }
-
- public int getFailedTasks() {
- return failedTasks;
- }
-
- public void setFailedTasks(int failedTasks) {
- this.failedTasks = failedTasks;
- this.valueChanged("failedTasks");
- }
-
- public int getCompletedTasks() {
- return completedTasks;
- }
-
- public void setCompletedTasks(int completedTasks) {
- this.completedTasks = completedTasks;
- this.valueChanged("completedTasks");
- }
-
- public int getTotalTasks() {
- return totalTasks;
- }
-
- public void setTotalTasks(int totalTasks) {
- this.totalTasks = totalTasks;
- this.valueChanged("totalTasks");
- }
-
- public long getTotalDuration() {
- return totalDuration;
- }
-
- public void setTotalDuration(long totalDuration) {
- this.totalDuration = totalDuration;
- this.valueChanged("totalDuration");
- }
-
- public long getTotalInputBytes() {
- return totalInputBytes;
- }
-
- public void setTotalInputBytes(long totalInputBytes) {
- this.totalInputBytes = totalInputBytes;
- this.valueChanged("totalInputBytes");
- }
-
- public long getTotalShuffleRead() {
- return totalShuffleRead;
- }
-
- public void setTotalShuffleRead(long totalShuffleRead) {
- this.totalShuffleRead = totalShuffleRead;
- this.valueChanged("totalShuffleRead");
- }
-
- public long getTotalShuffleWrite() {
- return totalShuffleWrite;
- }
-
- public void setTotalShuffleWrite(long totalShuffleWrite) {
- this.totalShuffleWrite = totalShuffleWrite;
- this.valueChanged("totalShuffleWrite");
- }
-
- public long getMaxMemory() {
- return maxMemory;
- }
-
- public void setMaxMemory(long maxMemory) {
- this.maxMemory = maxMemory;
- this.valueChanged("maxMemory");
- }
-
- public long getStartTime() {
- return startTime;
- }
-
- public void setStartTime(long startTime) {
- this.startTime = startTime;
- valueChanged("startTime");
- }
-
- public long getEndTime() {
- return endTime;
- }
-
- public void setEndTime(long endTime) {
- this.endTime = endTime;
- this.valueChanged("endTime");
- }
-
- public long getExecMemoryBytes() {
- return execMemoryBytes;
- }
-
- public void setExecMemoryBytes(long execMemoryBytes) {
- this.execMemoryBytes = execMemoryBytes;
- this.valueChanged("execMemoryBytes");
- }
-
- public int getCores() {
- return cores;
- }
-
- public void setCores(int cores) {
- this.cores = cores;
- valueChanged("cores");
- }
-
- public long getMemoryOverhead() {
- return memoryOverhead;
- }
-
- public void setMemoryOverhead(long memoryOverhead) {
- this.memoryOverhead = memoryOverhead;
- valueChanged("memoryOverhead");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java
deleted file mode 100644
index bb56b52..0000000
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.running.entities;
-
-import org.apache.eagle.jpm.util.Constants;
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.meta.*;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-
-import java.util.List;
-
-@Table("eagleSparkRunningJobs")
-@ColumnFamily("f")
-@Prefix("sparkJob")
-@Service(Constants.RUNNING_SPARK_JOB_SERVICE_ENDPOINT_NAME)
-@JsonIgnoreProperties(ignoreUnknown = true)
-@TimeSeries(true)
-@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", "jobId","user", "queue"})
-@Partition({"site"})
-public class SparkJobEntity extends TaggedLogAPIEntity {
- @Column("a")
- private long submissionTime;
- @Column("b")
- private long completionTime;
- @Column("c")
- private int numStages = 0;
- @Column("d")
- private String status;
- @Column("e")
- private int numTask = 0;
- @Column("f")
- private int numActiveTasks = 0;
- @Column("g")
- private int numCompletedTasks = 0;
- @Column("h")
- private int numSkippedTasks = 0;
- @Column("i")
- private int numFailedTasks = 0;
- @Column("j")
- private int numActiveStages = 0;
- @Column("k")
- private int numCompletedStages = 0;
- @Column("l")
- private int numSkippedStages = 0;
- @Column("m")
- private int numFailedStages = 0;
- @Column("n")
- private List<Integer> stages;
-
- public List<Integer> getStages() {
- return stages;
- }
-
- public void setStages(List<Integer> stages) {
- this.stages = stages;
- this.valueChanged("stages");
- }
-
- public long getSubmissionTime() {
- return submissionTime;
- }
-
- public long getCompletionTime() {
- return completionTime;
- }
-
- public int getNumStages() {
- return numStages;
- }
-
- public String getStatus() {
- return status;
- }
-
- public int getNumTask() {
- return numTask;
- }
-
- public int getNumActiveTasks() {
- return numActiveTasks;
- }
-
- public int getNumCompletedTasks() {
- return numCompletedTasks;
- }
-
- public int getNumSkippedTasks() {
- return numSkippedTasks;
- }
-
- public int getNumFailedTasks() {
- return numFailedTasks;
- }
-
- public int getNumActiveStages() {
- return numActiveStages;
- }
-
- public int getNumCompletedStages() {
- return numCompletedStages;
- }
-
- public int getNumSkippedStages() {
- return numSkippedStages;
- }
-
- public int getNumFailedStages() {
- return numFailedStages;
- }
-
- public void setSubmissionTime(long submissionTime) {
- this.submissionTime = submissionTime;
- this.valueChanged("submissionTime");
- }
-
- public void setCompletionTime(long completionTime) {
- this.completionTime = completionTime;
- this.valueChanged("completionTime");
- }
-
- public void setNumStages(int numStages) {
- this.numStages = numStages;
- this.valueChanged("numStages");
- }
-
- public void setStatus(String status) {
- this.status = status;
- this.valueChanged("status");
- }
-
- public void setNumTask(int numTask) {
- this.numTask = numTask;
- this.valueChanged("numTask");
- }
-
- public void setNumActiveTasks(int numActiveTasks) {
- this.numActiveTasks = numActiveTasks;
- this.valueChanged("numActiveTasks");
- }
-
- public void setNumCompletedTasks(int numCompletedTasks) {
- this.numCompletedTasks = numCompletedTasks;
- this.valueChanged("numCompletedTasks");
- }
-
- public void setNumSkippedTasks(int numSkippedTasks) {
- this.numSkippedTasks = numSkippedTasks;
- this.valueChanged("numSkippedTasks");
- }
-
- public void setNumFailedTasks(int numFailedTasks) {
- this.numFailedTasks = numFailedTasks;
- this.valueChanged("numFailedTasks");
- }
-
- public void setNumActiveStages(int numActiveStages) {
- this.numActiveStages = numActiveStages;
- this.valueChanged("numActiveStages");
- }
-
- public void setNumCompletedStages(int numCompletedStages) {
- this.numCompletedStages = numCompletedStages;
- this.valueChanged("numCompletedStages");
- }
-
- public void setNumSkippedStages(int numSkippedStages) {
- this.numSkippedStages = numSkippedStages;
- this.valueChanged("numSkippedStages");
- }
-
- public void setNumFailedStages(int numFailedStages) {
- this.numFailedStages = numFailedStages;
- this.valueChanged("numFailedStages");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java
deleted file mode 100644
index be0ffd0..0000000
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java
+++ /dev/null
@@ -1,298 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.running.entities;
-
-import org.apache.eagle.jpm.util.Constants;
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.meta.*;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-
-@Table("eagleSparkRunningStages")
-@ColumnFamily("f")
-@Prefix("sparkStage")
-@Service(Constants.RUNNING_SPARK_STAGE_SERVICE_ENDPOINT_NAME)
-@JsonIgnoreProperties(ignoreUnknown = true)
-@TimeSeries(true)
-@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", "jobId", "stageId","stageAttemptId","user", "queue"})
-@Partition({"site"})
-public class SparkStageEntity extends TaggedLogAPIEntity {
- @Column("a")
- private String status;
- @Column("b")
- private int numActiveTasks = 0;
- @Column("c")
- private int numCompletedTasks = 0;
- @Column("d")
- private int numFailedTasks = 0;
- @Column("e")
- private long executorRunTime = 0L;
- @Column("f")
- private long inputBytes = 0L;
- @Column("g")
- private long inputRecords = 0L;
- @Column("h")
- private long outputBytes = 0L;
- @Column("i")
- private long outputRecords = 0L;
- @Column("j")
- private long shuffleReadBytes = 0L;
- @Column("k")
- private long shuffleReadRecords = 0L;
- @Column("l")
- private long shuffleWriteBytes = 0L;
- @Column("m")
- private long shuffleWriteRecords = 0L;
- @Column("n")
- private long memoryBytesSpilled = 0L;
- @Column("o")
- private long diskBytesSpilled = 0L;
- @Column("p")
- private String name;
- @Column("q")
- private String schedulingPool;
- @Column("r")
- private long submitTime;
- @Column("s")
- private long completeTime;
- @Column("t")
- private int numTasks;
- @Column("u")
- private long executorDeserializeTime;
- @Column("v")
- private long resultSize;
- @Column("w")
- private long jvmGcTime;
- @Column("x")
- private long resultSerializationTime;
-
- public String getStatus() {
- return status;
- }
-
- public int getNumActiveTasks() {
- return numActiveTasks;
- }
-
- public int getNumCompletedTasks() {
- return numCompletedTasks;
- }
-
- public int getNumFailedTasks() {
- return numFailedTasks;
- }
-
- public long getExecutorRunTime() {
- return executorRunTime;
- }
-
- public long getInputBytes() {
- return inputBytes;
- }
-
- public long getInputRecords() {
- return inputRecords;
- }
-
- public long getOutputBytes() {
- return outputBytes;
- }
-
- public long getOutputRecords() {
- return outputRecords;
- }
-
- public long getShuffleReadBytes() {
- return shuffleReadBytes;
- }
-
- public long getShuffleReadRecords() {
- return shuffleReadRecords;
- }
-
- public long getShuffleWriteBytes() {
- return shuffleWriteBytes;
- }
-
- public long getShuffleWriteRecords() {
- return shuffleWriteRecords;
- }
-
- public long getMemoryBytesSpilled() {
- return memoryBytesSpilled;
- }
-
- public long getDiskBytesSpilled() {
- return diskBytesSpilled;
- }
-
- public String getName() {
- return name;
- }
-
- public String getSchedulingPool() {
- return schedulingPool;
- }
-
- public long getSubmitTime() {
- return submitTime;
- }
-
- public long getCompleteTime() {
- return completeTime;
- }
-
- public int getNumTasks() {
- return numTasks;
- }
-
- public long getExecutorDeserializeTime() {
- return executorDeserializeTime;
- }
-
- public long getResultSize() {
- return resultSize;
- }
-
- public long getJvmGcTime() {
- return jvmGcTime;
- }
-
- public long getResultSerializationTime() {
- return resultSerializationTime;
- }
-
- public void setStatus(String status) {
- this.status = status;
- this.valueChanged("status");
- }
-
- public void setNumActiveTasks(int numActiveTasks) {
- this.numActiveTasks = numActiveTasks;
- this.valueChanged("numActiveTasks");
- }
-
- public void setNumCompletedTasks(int numCompletedTasks) {
- this.numCompletedTasks = numCompletedTasks;
- this.valueChanged("numCompletedTasks");
- }
-
- public void setNumFailedTasks(int numFailedTasks) {
- this.numFailedTasks = numFailedTasks;
- this.valueChanged("numFailedTasks");
- }
-
- public void setExecutorRunTime(long executorRunTime) {
- this.executorRunTime = executorRunTime;
- this.valueChanged("executorRunTime");
- }
-
- public void setInputBytes(long inputBytes) {
- this.inputBytes = inputBytes;
- this.valueChanged("inputBytes");
- }
-
- public void setInputRecords(long inputRecords) {
- this.inputRecords = inputRecords;
- this.valueChanged("inputRecords");
- }
-
- public void setOutputBytes(long outputBytes) {
- this.outputBytes = outputBytes;
- this.valueChanged("outputBytes");
- }
-
- public void setOutputRecords(long outputRecords) {
- this.outputRecords = outputRecords;
- this.valueChanged("outputRecords");
- }
-
- public void setShuffleReadBytes(long shuffleReadBytes) {
- this.shuffleReadBytes = shuffleReadBytes;
- this.valueChanged("shuffleReadBytes");
- }
-
- public void setShuffleReadRecords(long shuffleReadRecords) {
- this.shuffleReadRecords = shuffleReadRecords;
- this.valueChanged("shuffleReadRecords");
- }
-
- public void setShuffleWriteBytes(long shuffleWriteBytes) {
- this.shuffleWriteBytes = shuffleWriteBytes;
- this.valueChanged("shuffleWriteBytes");
- }
-
- public void setShuffleWriteRecords(long shuffleWriteRecords) {
- this.shuffleWriteRecords = shuffleWriteRecords;
- this.valueChanged("shuffleWriteRecords");
- }
-
- public void setMemoryBytesSpilled(long memoryBytesSpilled) {
- this.memoryBytesSpilled = memoryBytesSpilled;
- this.valueChanged("memoryBytesSpilled");
- }
-
- public void setDiskBytesSpilled(long diskBytesSpilled) {
- this.diskBytesSpilled = diskBytesSpilled;
- this.valueChanged("diskBytesSpilled");
- }
-
- public void setName(String name) {
- this.name = name;
- this.valueChanged("name");
- }
-
- public void setSchedulingPool(String schedulingPool) {
- this.schedulingPool = schedulingPool;
- this.valueChanged("schedulingPool");
- }
-
- public void setSubmitTime(long submitTime) {
- this.submitTime = submitTime;
- this.valueChanged("submitTime");
- }
-
- public void setCompleteTime(long completeTime) {
- this.completeTime = completeTime;
- this.valueChanged("completeTime");
- }
-
- public void setNumTasks(int numTasks) {
- this.numTasks = numTasks;
- valueChanged("numTasks");
- }
-
- public void setExecutorDeserializeTime(long executorDeserializeTime) {
- this.executorDeserializeTime = executorDeserializeTime;
- valueChanged("executorDeserializeTime");
- }
-
- public void setResultSize(long resultSize) {
- this.resultSize = resultSize;
- valueChanged("resultSize");
- }
-
- public void setJvmGcTime(long jvmGcTime) {
- this.jvmGcTime = jvmGcTime;
- valueChanged("jvmGcTime");
- }
-
- public void setResultSerializationTime(long resultSerializationTime) {
- this.resultSerializationTime = resultSerializationTime;
- valueChanged("resultSerializationTime");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java
deleted file mode 100644
index e531806..0000000
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java
+++ /dev/null
@@ -1,289 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.running.entities;
-
-import org.apache.eagle.jpm.util.Constants;
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.meta.*;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-
-@Table("eagleSparkRunningTasks")
-@ColumnFamily("f")
-@Prefix("sparkTask")
-@Service(Constants.RUNNING_SPARK_TASK_SERVICE_ENDPOINT_NAME)
-@JsonIgnoreProperties(ignoreUnknown = true)
-@TimeSeries(true)
-@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", "jobId", "jobName", "stageId","stageAttemptId","taskIndex","taskAttemptId","user", "queue"})
-@Partition({"site"})
-public class SparkTaskEntity extends TaggedLogAPIEntity {
- @Column("a")
- private int taskId;
- @Column("b")
- private long launchTime;
- @Column("c")
- private String executorId;
- @Column("d")
- private String host;
- @Column("e")
- private String taskLocality;
- @Column("f")
- private boolean speculative;
- @Column("g")
- private long executorDeserializeTime;
- @Column("h")
- private long executorRunTime;
- @Column("i")
- private long resultSize;
- @Column("j")
- private long jvmGcTime;
- @Column("k")
- private long resultSerializationTime;
- @Column("l")
- private long memoryBytesSpilled;
- @Column("m")
- private long diskBytesSpilled;
- @Column("n")
- private long inputBytes;
- @Column("o")
- private long inputRecords;
- @Column("p")
- private long outputBytes;
- @Column("q")
- private long outputRecords;
- @Column("r")
- private long shuffleReadRemoteBytes;
- @Column("x")
- private long shuffleReadLocalBytes;
- @Column("s")
- private long shuffleReadRecords;
- @Column("t")
- private long shuffleWriteBytes;
- @Column("u")
- private long shuffleWriteRecords;
- @Column("v")
- private boolean failed;
-
- public int getTaskId() {
- return taskId;
- }
-
- public long getLaunchTime() {
- return launchTime;
- }
-
- public String getExecutorId() {
- return executorId;
- }
-
- public String getHost() {
- return host;
- }
-
- public String getTaskLocality() {
- return taskLocality;
- }
-
- public boolean isSpeculative() {
- return speculative;
- }
-
- public long getExecutorDeserializeTime() {
- return executorDeserializeTime;
- }
-
- public long getExecutorRunTime() {
- return executorRunTime;
- }
-
- public long getResultSize() {
- return resultSize;
- }
-
- public long getJvmGcTime() {
- return jvmGcTime;
- }
-
- public long getResultSerializationTime() {
- return resultSerializationTime;
- }
-
- public long getMemoryBytesSpilled() {
- return memoryBytesSpilled;
- }
-
- public long getDiskBytesSpilled() {
- return diskBytesSpilled;
- }
-
- public long getInputBytes() {
- return inputBytes;
- }
-
- public long getInputRecords() {
- return inputRecords;
- }
-
- public long getOutputBytes() {
- return outputBytes;
- }
-
- public long getOutputRecords() {
- return outputRecords;
- }
-
- public long getShuffleReadRecords() {
- return shuffleReadRecords;
- }
-
- public long getShuffleWriteBytes() {
- return shuffleWriteBytes;
- }
-
- public long getShuffleWriteRecords() {
- return shuffleWriteRecords;
- }
-
- public boolean isFailed() {
- return failed;
- }
-
- public long getShuffleReadRemoteBytes() {
- return shuffleReadRemoteBytes;
- }
-
- public long getShuffleReadLocalBytes() {
- return shuffleReadLocalBytes;
- }
-
- public void setFailed(boolean failed) {
- this.failed = failed;
- valueChanged("failed");
- }
-
- public void setTaskId(int taskId) {
- this.taskId = taskId;
- valueChanged("taskId");
- }
-
- public void setLaunchTime(long launchTime) {
- this.launchTime = launchTime;
- valueChanged("launchTime");
- }
-
- public void setExecutorId(String executorId) {
- this.executorId = executorId;
- valueChanged("executorId");
- }
-
- public void setHost(String host) {
- this.host = host;
- this.valueChanged("host");
- }
-
- public void setTaskLocality(String taskLocality) {
- this.taskLocality = taskLocality;
- this.valueChanged("taskLocality");
- }
-
- public void setSpeculative(boolean speculative) {
- this.speculative = speculative;
- this.valueChanged("speculative");
- }
-
- public void setExecutorDeserializeTime(long executorDeserializeTime) {
- this.executorDeserializeTime = executorDeserializeTime;
- this.valueChanged("executorDeserializeTime");
- }
-
- public void setExecutorRunTime(long executorRunTime) {
- this.executorRunTime = executorRunTime;
- this.valueChanged("executorRunTime");
- }
-
- public void setResultSize(long resultSize) {
- this.resultSize = resultSize;
- this.valueChanged("resultSize");
- }
-
- public void setJvmGcTime(long jvmGcTime) {
- this.jvmGcTime = jvmGcTime;
- this.valueChanged("jvmGcTime");
- }
-
- public void setResultSerializationTime(long resultSerializationTime) {
- this.resultSerializationTime = resultSerializationTime;
- this.valueChanged("resultSerializationTime");
- }
-
- public void setMemoryBytesSpilled(long memoryBytesSpilled) {
- this.memoryBytesSpilled = memoryBytesSpilled;
- this.valueChanged("memoryBytesSpilled");
- }
-
- public void setDiskBytesSpilled(long diskBytesSpilled) {
- this.diskBytesSpilled = diskBytesSpilled;
- this.valueChanged("diskBytesSpilled");
- }
-
- public void setInputBytes(long inputBytes) {
- this.inputBytes = inputBytes;
- this.valueChanged("inputBytes");
- }
-
- public void setInputRecords(long inputRecords) {
- this.inputRecords = inputRecords;
- this.valueChanged("inputRecords");
- }
-
- public void setOutputBytes(long outputBytes) {
- this.outputBytes = outputBytes;
- this.valueChanged("outputBytes");
- }
-
- public void setOutputRecords(long outputRecords) {
- this.outputRecords = outputRecords;
- this.valueChanged("outputRecords");
- }
-
-
-
- public void setShuffleReadRecords(long shuffleReadRecords) {
- this.shuffleReadRecords = shuffleReadRecords;
- this.valueChanged("shuffleReadRecords");
- }
-
- public void setShuffleWriteBytes(long shuffleWriteBytes) {
- this.shuffleWriteBytes = shuffleWriteBytes;
- this.valueChanged("shuffleWriteBytes");
- }
-
- public void setShuffleWriteRecords(long shuffleWriteRecords) {
- this.shuffleWriteRecords = shuffleWriteRecords;
- this.valueChanged("shuffleWriteRecords");
- }
-
- public void setShuffleReadRemoteBytes(long shuffleReadRemoteBytes) {
- this.shuffleReadRemoteBytes = shuffleReadRemoteBytes;
- this.valueChanged("shuffleReadRemoteBytes");
- }
-
- public void setShuffleReadLocalBytes(long shuffleReadLocalBytes) {
- this.shuffleReadLocalBytes = shuffleReadLocalBytes;
- this.valueChanged("shuffleReadLocalBytes");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java
index 6411018..3719325 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java
@@ -18,14 +18,10 @@
package org.apache.eagle.jpm.spark.running.parser;
-import org.apache.eagle.jpm.spark.crawl.EventType;
import org.apache.eagle.jpm.spark.running.SparkRunningJobAppConfig;
import org.apache.eagle.jpm.spark.running.entities.*;
import org.apache.eagle.jpm.spark.running.recover.SparkRunningJobManager;
-import org.apache.eagle.jpm.util.Constants;
-import org.apache.eagle.jpm.util.HDFSUtil;
-import org.apache.eagle.jpm.util.SparkJobTagName;
-import org.apache.eagle.jpm.util.Utils;
+import org.apache.eagle.jpm.util.*;
import org.apache.eagle.jpm.util.resourcefetch.ResourceFetcher;
import org.apache.eagle.jpm.util.resourcefetch.connection.InputStreamUtils;
import org.apache.eagle.jpm.util.resourcefetch.model.*;
@@ -219,7 +215,7 @@ public class SparkApplicationParser implements Runnable {
if (eventObj != null) {
String eventType = (String) eventObj.get("Event");
LOG.info("Event type: " + eventType);
- if (eventType.equalsIgnoreCase(EventType.SparkListenerEnvironmentUpdate.toString())) {
+ if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerEnvironmentUpdate.toString())) {
stop = true;
JSONObject sparkProps = (JSONObject) eventObj.get("Spark Properties");
for (Object key : sparkProps.keySet()) {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf
index 9d9f622..4d07b38 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf
@@ -14,6 +14,9 @@
# limitations under the License.
{
+ "appId":"sparkRunningJob",
+ "mode":"LOCAL",
+ "workers" : 3,
"envContextConfig" : {
"stormConfigFile" : "storm.yaml",
"parallelismConfig" : {
@@ -24,7 +27,6 @@
"sparkRunningJobFetchSpout" : 1,
"sparkRunningJobParseBolt" : 4
},
- "workers" : 2
},
"jobExtractorConfig" : {
"site" : "sandbox",
@@ -48,8 +50,6 @@
"zkRetryTimes" : 3,
"zkRetryInterval" : 20000
},
- "appId":"sparkRunningJob",
- "mode":"LOCAL",
"eagleProps" : {
"mailHost" : "abc.com",
"mailDebug" : "true",
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkEventType.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkEventType.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkEventType.java
new file mode 100644
index 0000000..7c9f625
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkEventType.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.util;
+
+public enum SparkEventType {
+ SparkListenerBlockManagerAdded, SparkListenerEnvironmentUpdate, SparkListenerApplicationStart,
+ SparkListenerExecutorAdded, SparkListenerJobStart,SparkListenerStageSubmitted, SparkListenerTaskStart,SparkListenerBlockManagerRemoved,
+ SparkListenerTaskEnd, SparkListenerStageCompleted, SparkListenerJobEnd, SparkListenerApplicationEnd,SparkListenerExecutorRemoved
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java
index b1881ef..61b2fee 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java
@@ -107,7 +107,7 @@ public class RMResourceFetcher implements ResourceFetcher<AppInfo> {
private String getMRFinishedJobURL(String lastFinishedTime) {
String url = URLUtil.removeTrailingSlash(selector.getSelectedUrl());
- return url + "/" + "Constants.V2_APPS_URL"
+ return url + "/" + Constants.V2_APPS_URL
+ "?applicationTypes=MAPREDUCE&state=FINISHED&finishedTimeBegin="
+ lastFinishedTime + "&" + Constants.ANONYMOUS_PARAMETER;
}
[2/3] incubator-eagle git commit: Update spark history job feeder
config & refactor the code
Posted by qi...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java
new file mode 100644
index 0000000..211d6b7
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+import java.util.List;
+
+@Table("eagleSparkRunningJobs")
+@ColumnFamily("f")
+@Prefix("sparkJob")
+@Service(Constants.RUNNING_SPARK_JOB_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", "jobId","user", "queue"})
+@Partition({"site"})
+public class SparkJobEntity extends TaggedLogAPIEntity {
+ @Column("a")
+ private long submissionTime;
+ @Column("b")
+ private long completionTime;
+ @Column("c")
+ private int numStages = 0;
+ @Column("d")
+ private String status;
+ @Column("e")
+ private int numTask = 0;
+ @Column("f")
+ private int numActiveTasks = 0;
+ @Column("g")
+ private int numCompletedTasks = 0;
+ @Column("h")
+ private int numSkippedTasks = 0;
+ @Column("i")
+ private int numFailedTasks = 0;
+ @Column("j")
+ private int numActiveStages = 0;
+ @Column("k")
+ private int numCompletedStages = 0;
+ @Column("l")
+ private int numSkippedStages = 0;
+ @Column("m")
+ private int numFailedStages = 0;
+ @Column("n")
+ private List<Integer> stages;
+
+ public List<Integer> getStages() {
+ return stages;
+ }
+
+ public void setStages(List<Integer> stages) {
+ this.stages = stages;
+ this.valueChanged("stages");
+ }
+
+ public long getSubmissionTime() {
+ return submissionTime;
+ }
+
+ public long getCompletionTime() {
+ return completionTime;
+ }
+
+ public int getNumStages() {
+ return numStages;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public int getNumTask() {
+ return numTask;
+ }
+
+ public int getNumActiveTasks() {
+ return numActiveTasks;
+ }
+
+ public int getNumCompletedTasks() {
+ return numCompletedTasks;
+ }
+
+ public int getNumSkippedTasks() {
+ return numSkippedTasks;
+ }
+
+ public int getNumFailedTasks() {
+ return numFailedTasks;
+ }
+
+ public int getNumActiveStages() {
+ return numActiveStages;
+ }
+
+ public int getNumCompletedStages() {
+ return numCompletedStages;
+ }
+
+ public int getNumSkippedStages() {
+ return numSkippedStages;
+ }
+
+ public int getNumFailedStages() {
+ return numFailedStages;
+ }
+
+ public void setSubmissionTime(long submissionTime) {
+ this.submissionTime = submissionTime;
+ this.valueChanged("submissionTime");
+ }
+
+ public void setCompletionTime(long completionTime) {
+ this.completionTime = completionTime;
+ this.valueChanged("completionTime");
+ }
+
+ public void setNumStages(int numStages) {
+ this.numStages = numStages;
+ this.valueChanged("numStages");
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ this.valueChanged("status");
+ }
+
+ public void setNumTask(int numTask) {
+ this.numTask = numTask;
+ this.valueChanged("numTask");
+ }
+
+ public void setNumActiveTasks(int numActiveTasks) {
+ this.numActiveTasks = numActiveTasks;
+ this.valueChanged("numActiveTasks");
+ }
+
+ public void setNumCompletedTasks(int numCompletedTasks) {
+ this.numCompletedTasks = numCompletedTasks;
+ this.valueChanged("numCompletedTasks");
+ }
+
+ public void setNumSkippedTasks(int numSkippedTasks) {
+ this.numSkippedTasks = numSkippedTasks;
+ this.valueChanged("numSkippedTasks");
+ }
+
+ public void setNumFailedTasks(int numFailedTasks) {
+ this.numFailedTasks = numFailedTasks;
+ this.valueChanged("numFailedTasks");
+ }
+
+ public void setNumActiveStages(int numActiveStages) {
+ this.numActiveStages = numActiveStages;
+ this.valueChanged("numActiveStages");
+ }
+
+ public void setNumCompletedStages(int numCompletedStages) {
+ this.numCompletedStages = numCompletedStages;
+ this.valueChanged("numCompletedStages");
+ }
+
+ public void setNumSkippedStages(int numSkippedStages) {
+ this.numSkippedStages = numSkippedStages;
+ this.valueChanged("numSkippedStages");
+ }
+
+ public void setNumFailedStages(int numFailedStages) {
+ this.numFailedStages = numFailedStages;
+ this.valueChanged("numFailedStages");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java
new file mode 100644
index 0000000..0194132
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+@Table("eagleSparkRunningStages")
+@ColumnFamily("f")
+@Prefix("sparkStage")
+@Service(Constants.RUNNING_SPARK_STAGE_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", "jobId", "stageId","stageAttemptId","user", "queue"})
+@Partition({"site"})
+public class SparkStageEntity extends TaggedLogAPIEntity {
+ @Column("a")
+ private String status;
+ @Column("b")
+ private int numActiveTasks = 0;
+ @Column("c")
+ private int numCompletedTasks = 0;
+ @Column("d")
+ private int numFailedTasks = 0;
+ @Column("e")
+ private long executorRunTime = 0L;
+ @Column("f")
+ private long inputBytes = 0L;
+ @Column("g")
+ private long inputRecords = 0L;
+ @Column("h")
+ private long outputBytes = 0L;
+ @Column("i")
+ private long outputRecords = 0L;
+ @Column("j")
+ private long shuffleReadBytes = 0L;
+ @Column("k")
+ private long shuffleReadRecords = 0L;
+ @Column("l")
+ private long shuffleWriteBytes = 0L;
+ @Column("m")
+ private long shuffleWriteRecords = 0L;
+ @Column("n")
+ private long memoryBytesSpilled = 0L;
+ @Column("o")
+ private long diskBytesSpilled = 0L;
+ @Column("p")
+ private String name;
+ @Column("q")
+ private String schedulingPool;
+ @Column("r")
+ private long submitTime;
+ @Column("s")
+ private long completeTime;
+ @Column("t")
+ private int numTasks;
+ @Column("u")
+ private long executorDeserializeTime;
+ @Column("v")
+ private long resultSize;
+ @Column("w")
+ private long jvmGcTime;
+ @Column("x")
+ private long resultSerializationTime;
+
+ public String getStatus() {
+ return status;
+ }
+
+ public int getNumActiveTasks() {
+ return numActiveTasks;
+ }
+
+ public int getNumCompletedTasks() {
+ return numCompletedTasks;
+ }
+
+ public int getNumFailedTasks() {
+ return numFailedTasks;
+ }
+
+ public long getExecutorRunTime() {
+ return executorRunTime;
+ }
+
+ public long getInputBytes() {
+ return inputBytes;
+ }
+
+ public long getInputRecords() {
+ return inputRecords;
+ }
+
+ public long getOutputBytes() {
+ return outputBytes;
+ }
+
+ public long getOutputRecords() {
+ return outputRecords;
+ }
+
+ public long getShuffleReadBytes() {
+ return shuffleReadBytes;
+ }
+
+ public long getShuffleReadRecords() {
+ return shuffleReadRecords;
+ }
+
+ public long getShuffleWriteBytes() {
+ return shuffleWriteBytes;
+ }
+
+ public long getShuffleWriteRecords() {
+ return shuffleWriteRecords;
+ }
+
+ public long getMemoryBytesSpilled() {
+ return memoryBytesSpilled;
+ }
+
+ public long getDiskBytesSpilled() {
+ return diskBytesSpilled;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getSchedulingPool() {
+ return schedulingPool;
+ }
+
+ public long getSubmitTime() {
+ return submitTime;
+ }
+
+ public long getCompleteTime() {
+ return completeTime;
+ }
+
+ public int getNumTasks() {
+ return numTasks;
+ }
+
+ public long getExecutorDeserializeTime() {
+ return executorDeserializeTime;
+ }
+
+ public long getResultSize() {
+ return resultSize;
+ }
+
+ public long getJvmGcTime() {
+ return jvmGcTime;
+ }
+
+ public long getResultSerializationTime() {
+ return resultSerializationTime;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ this.valueChanged("status");
+ }
+
+ public void setNumActiveTasks(int numActiveTasks) {
+ this.numActiveTasks = numActiveTasks;
+ this.valueChanged("numActiveTasks");
+ }
+
+ public void setNumCompletedTasks(int numCompletedTasks) {
+ this.numCompletedTasks = numCompletedTasks;
+ this.valueChanged("numCompletedTasks");
+ }
+
+ public void setNumFailedTasks(int numFailedTasks) {
+ this.numFailedTasks = numFailedTasks;
+ this.valueChanged("numFailedTasks");
+ }
+
+ public void setExecutorRunTime(long executorRunTime) {
+ this.executorRunTime = executorRunTime;
+ this.valueChanged("executorRunTime");
+ }
+
+ public void setInputBytes(long inputBytes) {
+ this.inputBytes = inputBytes;
+ this.valueChanged("inputBytes");
+ }
+
+ public void setInputRecords(long inputRecords) {
+ this.inputRecords = inputRecords;
+ this.valueChanged("inputRecords");
+ }
+
+ public void setOutputBytes(long outputBytes) {
+ this.outputBytes = outputBytes;
+ this.valueChanged("outputBytes");
+ }
+
+ public void setOutputRecords(long outputRecords) {
+ this.outputRecords = outputRecords;
+ this.valueChanged("outputRecords");
+ }
+
+ public void setShuffleReadBytes(long shuffleReadBytes) {
+ this.shuffleReadBytes = shuffleReadBytes;
+ this.valueChanged("shuffleReadBytes");
+ }
+
+ public void setShuffleReadRecords(long shuffleReadRecords) {
+ this.shuffleReadRecords = shuffleReadRecords;
+ this.valueChanged("shuffleReadRecords");
+ }
+
+ public void setShuffleWriteBytes(long shuffleWriteBytes) {
+ this.shuffleWriteBytes = shuffleWriteBytes;
+ this.valueChanged("shuffleWriteBytes");
+ }
+
+ public void setShuffleWriteRecords(long shuffleWriteRecords) {
+ this.shuffleWriteRecords = shuffleWriteRecords;
+ this.valueChanged("shuffleWriteRecords");
+ }
+
+ public void setMemoryBytesSpilled(long memoryBytesSpilled) {
+ this.memoryBytesSpilled = memoryBytesSpilled;
+ this.valueChanged("memoryBytesSpilled");
+ }
+
+ public void setDiskBytesSpilled(long diskBytesSpilled) {
+ this.diskBytesSpilled = diskBytesSpilled;
+ this.valueChanged("diskBytesSpilled");
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ this.valueChanged("name");
+ }
+
+ public void setSchedulingPool(String schedulingPool) {
+ this.schedulingPool = schedulingPool;
+ this.valueChanged("schedulingPool");
+ }
+
+ public void setSubmitTime(long submitTime) {
+ this.submitTime = submitTime;
+ this.valueChanged("submitTime");
+ }
+
+ public void setCompleteTime(long completeTime) {
+ this.completeTime = completeTime;
+ this.valueChanged("completeTime");
+ }
+
+ public void setNumTasks(int numTasks) {
+ this.numTasks = numTasks;
+ valueChanged("numTasks");
+ }
+
+ public void setExecutorDeserializeTime(long executorDeserializeTime) {
+ this.executorDeserializeTime = executorDeserializeTime;
+ valueChanged("executorDeserializeTime");
+ }
+
+ public void setResultSize(long resultSize) {
+ this.resultSize = resultSize;
+ valueChanged("resultSize");
+ }
+
+ public void setJvmGcTime(long jvmGcTime) {
+ this.jvmGcTime = jvmGcTime;
+ valueChanged("jvmGcTime");
+ }
+
+ public void setResultSerializationTime(long resultSerializationTime) {
+ this.resultSerializationTime = resultSerializationTime;
+ valueChanged("resultSerializationTime");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java
new file mode 100644
index 0000000..6522c3c
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+@Table("eagleSparkRunningTasks")
+@ColumnFamily("f")
+@Prefix("sparkTask")
+@Service(Constants.RUNNING_SPARK_TASK_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", "jobId", "jobName", "stageId","stageAttemptId","taskIndex","taskAttemptId","user", "queue"})
+@Partition({"site"})
+public class SparkTaskEntity extends TaggedLogAPIEntity {
+ @Column("a")
+ private int taskId;
+ @Column("b")
+ private long launchTime;
+ @Column("c")
+ private String executorId;
+ @Column("d")
+ private String host;
+ @Column("e")
+ private String taskLocality;
+ @Column("f")
+ private boolean speculative;
+ @Column("g")
+ private long executorDeserializeTime;
+ @Column("h")
+ private long executorRunTime;
+ @Column("i")
+ private long resultSize;
+ @Column("j")
+ private long jvmGcTime;
+ @Column("k")
+ private long resultSerializationTime;
+ @Column("l")
+ private long memoryBytesSpilled;
+ @Column("m")
+ private long diskBytesSpilled;
+ @Column("n")
+ private long inputBytes;
+ @Column("o")
+ private long inputRecords;
+ @Column("p")
+ private long outputBytes;
+ @Column("q")
+ private long outputRecords;
+ @Column("r")
+ private long shuffleReadRemoteBytes;
+ @Column("x")
+ private long shuffleReadLocalBytes;
+ @Column("s")
+ private long shuffleReadRecords;
+ @Column("t")
+ private long shuffleWriteBytes;
+ @Column("u")
+ private long shuffleWriteRecords;
+ @Column("v")
+ private boolean failed;
+
+ public int getTaskId() {
+ return taskId;
+ }
+
+ public long getLaunchTime() {
+ return launchTime;
+ }
+
+ public String getExecutorId() {
+ return executorId;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public String getTaskLocality() {
+ return taskLocality;
+ }
+
+ public boolean isSpeculative() {
+ return speculative;
+ }
+
+ public long getExecutorDeserializeTime() {
+ return executorDeserializeTime;
+ }
+
+ public long getExecutorRunTime() {
+ return executorRunTime;
+ }
+
+ public long getResultSize() {
+ return resultSize;
+ }
+
+ public long getJvmGcTime() {
+ return jvmGcTime;
+ }
+
+ public long getResultSerializationTime() {
+ return resultSerializationTime;
+ }
+
+ public long getMemoryBytesSpilled() {
+ return memoryBytesSpilled;
+ }
+
+ public long getDiskBytesSpilled() {
+ return diskBytesSpilled;
+ }
+
+ public long getInputBytes() {
+ return inputBytes;
+ }
+
+ public long getInputRecords() {
+ return inputRecords;
+ }
+
+ public long getOutputBytes() {
+ return outputBytes;
+ }
+
+ public long getOutputRecords() {
+ return outputRecords;
+ }
+
+ public long getShuffleReadRecords() {
+ return shuffleReadRecords;
+ }
+
+ public long getShuffleWriteBytes() {
+ return shuffleWriteBytes;
+ }
+
+ public long getShuffleWriteRecords() {
+ return shuffleWriteRecords;
+ }
+
+ public boolean isFailed() {
+ return failed;
+ }
+
+ public long getShuffleReadRemoteBytes() {
+ return shuffleReadRemoteBytes;
+ }
+
+ public long getShuffleReadLocalBytes() {
+ return shuffleReadLocalBytes;
+ }
+
+ public void setFailed(boolean failed) {
+ this.failed = failed;
+ valueChanged("failed");
+ }
+
+ public void setTaskId(int taskId) {
+ this.taskId = taskId;
+ valueChanged("taskId");
+ }
+
+ public void setLaunchTime(long launchTime) {
+ this.launchTime = launchTime;
+ valueChanged("launchTime");
+ }
+
+ public void setExecutorId(String executorId) {
+ this.executorId = executorId;
+ valueChanged("executorId");
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ this.valueChanged("host");
+ }
+
+ public void setTaskLocality(String taskLocality) {
+ this.taskLocality = taskLocality;
+ this.valueChanged("taskLocality");
+ }
+
+ public void setSpeculative(boolean speculative) {
+ this.speculative = speculative;
+ this.valueChanged("speculative");
+ }
+
+ public void setExecutorDeserializeTime(long executorDeserializeTime) {
+ this.executorDeserializeTime = executorDeserializeTime;
+ this.valueChanged("executorDeserializeTime");
+ }
+
+ public void setExecutorRunTime(long executorRunTime) {
+ this.executorRunTime = executorRunTime;
+ this.valueChanged("executorRunTime");
+ }
+
+ public void setResultSize(long resultSize) {
+ this.resultSize = resultSize;
+ this.valueChanged("resultSize");
+ }
+
+ public void setJvmGcTime(long jvmGcTime) {
+ this.jvmGcTime = jvmGcTime;
+ this.valueChanged("jvmGcTime");
+ }
+
+ public void setResultSerializationTime(long resultSerializationTime) {
+ this.resultSerializationTime = resultSerializationTime;
+ this.valueChanged("resultSerializationTime");
+ }
+
+ public void setMemoryBytesSpilled(long memoryBytesSpilled) {
+ this.memoryBytesSpilled = memoryBytesSpilled;
+ this.valueChanged("memoryBytesSpilled");
+ }
+
+ public void setDiskBytesSpilled(long diskBytesSpilled) {
+ this.diskBytesSpilled = diskBytesSpilled;
+ this.valueChanged("diskBytesSpilled");
+ }
+
+ public void setInputBytes(long inputBytes) {
+ this.inputBytes = inputBytes;
+ this.valueChanged("inputBytes");
+ }
+
+ public void setInputRecords(long inputRecords) {
+ this.inputRecords = inputRecords;
+ this.valueChanged("inputRecords");
+ }
+
+ public void setOutputBytes(long outputBytes) {
+ this.outputBytes = outputBytes;
+ this.valueChanged("outputBytes");
+ }
+
+ public void setOutputRecords(long outputRecords) {
+ this.outputRecords = outputRecords;
+ this.valueChanged("outputRecords");
+ }
+
+
+
+ public void setShuffleReadRecords(long shuffleReadRecords) {
+ this.shuffleReadRecords = shuffleReadRecords;
+ this.valueChanged("shuffleReadRecords");
+ }
+
+ public void setShuffleWriteBytes(long shuffleWriteBytes) {
+ this.shuffleWriteBytes = shuffleWriteBytes;
+ this.valueChanged("shuffleWriteBytes");
+ }
+
+ public void setShuffleWriteRecords(long shuffleWriteRecords) {
+ this.shuffleWriteRecords = shuffleWriteRecords;
+ this.valueChanged("shuffleWriteRecords");
+ }
+
+ public void setShuffleReadRemoteBytes(long shuffleReadRemoteBytes) {
+ this.shuffleReadRemoteBytes = shuffleReadRemoteBytes;
+ this.valueChanged("shuffleReadRemoteBytes");
+ }
+
+ public void setShuffleReadLocalBytes(long shuffleReadLocalBytes) {
+ this.shuffleReadLocalBytes = shuffleReadLocalBytes;
+ this.valueChanged("shuffleReadLocalBytes");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
index 0fc74d7..284eeee 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
@@ -82,8 +82,6 @@ public class SparkHistoryJobAppConfig implements Serializable {
this.eagleInfo.host = config.getString("eagleProps.eagle.service.host");
this.eagleInfo.port = config.getInt("eagleProps.eagle.service.port");
- this.stormConfig.topologyName = config.getString("storm.name");
- this.stormConfig.workerNo = config.getInt("storm.worker.num");
this.stormConfig.timeoutSec = config.getInt("storm.messageTimeoutSec");
this.stormConfig.spoutPending = config.getInt("storm.pendingSpout");
this.stormConfig.spoutCrawlInterval = config.getInt("storm.spoutCrawlInterval");
@@ -117,9 +115,7 @@ public class SparkHistoryJobAppConfig implements Serializable {
}
public static class StormConfig implements Serializable {
- public int workerNo;
public int timeoutSec;
- public String topologyName;
public int spoutPending;
public int spoutCrawlInterval;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFInputStreamReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFInputStreamReader.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFInputStreamReader.java
new file mode 100644
index 0000000..b73b52e
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFInputStreamReader.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.history.crawl;
+
+import java.io.InputStream;
+
+public interface JHFInputStreamReader {
+ void read(InputStream is) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFParserBase.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFParserBase.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFParserBase.java
new file mode 100644
index 0000000..047e2d5
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFParserBase.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.history.crawl;
+
+import java.io.InputStream;
+
+public interface JHFParserBase {
+ /**
+ * this method will ensure to close the inputStream.
+ * @param is
+ * @throws Exception
+ */
+ void parse(InputStream is) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
new file mode 100644
index 0000000..571620a
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
@@ -0,0 +1,713 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.history.crawl;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.eagle.jpm.spark.entity.*;
+import org.apache.eagle.jpm.util.*;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.service.client.EagleServiceClientException;
+import org.apache.eagle.service.client.impl.EagleServiceBaseClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+
+public class JHFSparkEventReader {
+ private static final Logger LOG = LoggerFactory.getLogger(JHFSparkEventReader.class);
+
+ private static final int FLUSH_LIMIT = 500;
+ private long firstTaskLaunchTime;
+ private long lastEventTime;
+
+ private Map<String, SparkExecutor> executors;
+ private SparkApp app;
+ private Map<Integer, SparkJob> jobs;
+ private Map<String, SparkStage> stages;
+ private Map<Integer, Set<String>> jobStageMap;
+ private Map<Long, SparkTask> tasks;
+ private EagleServiceClientImpl client;
+ private Map<String, Map<Integer, Boolean>> stageTaskStatusMap;
+
+ private List<TaggedLogAPIEntity> createEntities;
+
+ private Config conf;
+
+ public JHFSparkEventReader(Map<String, String> baseTags, SparkApplicationInfo info) {
+ app = new SparkApp();
+ app.setTags(new HashMap<String, String>(baseTags));
+ app.setYarnState(info.getState());
+ app.setYarnStatus(info.getFinalStatus());
+ createEntities = new ArrayList<>();
+ jobs = new HashMap<Integer, SparkJob>();
+ stages = new HashMap<String, SparkStage>();
+ jobStageMap = new HashMap<Integer, Set<String>>();
+ tasks = new HashMap<Long, SparkTask>();
+ executors = new HashMap<String, SparkExecutor>();
+ stageTaskStatusMap = new HashMap<>();
+ conf = ConfigFactory.load();
+ this.initiateClient();
+ }
+
+ public SparkApp getApp() {
+ return this.app;
+ }
+
+ public void read(JSONObject eventObj) {
+ String eventType = (String) eventObj.get("Event");
+ if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerApplicationStart.toString())) {
+ handleAppStarted(eventObj);
+ } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerEnvironmentUpdate.toString())) {
+ handleEnvironmentSet(eventObj);
+ } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerExecutorAdded.toString())) {
+ handleExecutorAdd(eventObj);
+ } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerBlockManagerAdded.toString())) {
+ handleBlockManagerAdd(eventObj);
+ } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerJobStart.toString())) {
+ handleJobStart(eventObj);
+ } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerStageSubmitted.toString())) {
+ handleStageSubmit(eventObj);
+ } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerTaskStart.toString())) {
+ handleTaskStart(eventObj);
+ } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerTaskEnd.toString())) {
+ handleTaskEnd(eventObj);
+ } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerStageCompleted.toString())) {
+ handleStageComplete(eventObj);
+ } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerJobEnd.toString())) {
+ handleJobEnd(eventObj);
+ } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerExecutorRemoved.toString())) {
+ handleExecutorRemoved(eventObj);
+ } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerApplicationEnd.toString())) {
+ handleAppEnd(eventObj);
+ } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerBlockManagerRemoved.toString())) {
+ //nothing to do now
+ } else {
+ LOG.info("Not registered event type:" + eventType);
+ }
+
+ }
+
+ private void handleEnvironmentSet(JSONObject event) {
+ app.setConfig(new JobConfig());
+ JSONObject sparkProps = (JSONObject) event.get("Spark Properties");
+
+ String[] additionalJobConf = conf.getString("basic.jobConf.additional.info").split(",\\s*");
+ String[] props = {"spark.yarn.app.id", "spark.executor.memory", "spark.driver.host", "spark.driver.port",
+ "spark.driver.memory", "spark.scheduler.pool", "spark.executor.cores", "spark.yarn.am.memory",
+ "spark.yarn.am.cores", "spark.yarn.executor.memoryOverhead", "spark.yarn.driver.memoryOverhead", "spark.yarn.am.memoryOverhead", "spark.master"};
+ String[] jobConf = (String[])ArrayUtils.addAll(additionalJobConf, props);
+ for (String prop : jobConf) {
+ if (sparkProps.containsKey(prop)) {
+ app.getConfig().getConfig().put(prop, (String) sparkProps.get(prop));
+ }
+ }
+ }
+
+ private Object getConfigVal(JobConfig config, String configName, String type) {
+ if (config.getConfig().containsKey(configName)) {
+ Object val = config.getConfig().get(configName);
+ if (type.equalsIgnoreCase(Integer.class.getName())) {
+ return Integer.parseInt((String) val);
+ } else {
+ return val;
+ }
+ } else {
+ if (type.equalsIgnoreCase(Integer.class.getName())) {
+ return conf.getInt("spark.defaultVal." + configName);
+ } else {
+ return conf.getString("spark.defaultVal." + configName);
+ }
+ }
+ }
+
+ private boolean isClientMode(JobConfig config) {
+ return config.getConfig().get("spark.master").equalsIgnoreCase("yarn-client");
+ }
+
+ private void handleAppStarted(JSONObject event) {
+ //need update all entities tag before app start
+ List<TaggedLogAPIEntity> entities = new ArrayList<TaggedLogAPIEntity>();
+ entities.addAll(this.executors.values());
+ entities.add(this.app);
+
+ long appStartTime = JSONUtils.getLong(event, "Timestamp", lastEventTime);
+ for (TaggedLogAPIEntity entity : entities) {
+ entity.getTags().put(SparkJobTagName.SPARK_APP_ID.toString(), JSONUtils.getString(event, "App ID"));
+ entity.getTags().put(SparkJobTagName.SPARK_APP_NAME.toString(), JSONUtils.getString(event, "App Name"));
+ // In yarn-client mode, attemptId is not available in the log, so we set attemptId = 1.
+ String attemptId = isClientMode(this.app.getConfig()) ? "1" : JSONUtils.getString(event, "App Attempt ID");
+ entity.getTags().put(SparkJobTagName.SPARK_APP_ATTEMPT_ID.toString(), attemptId);
+ // the second argument of getNormalizeName() is changed to null because the original code contains sensitive text
+ // original second argument looks like: this.app.getConfig().getConfig().get("xxx"), "xxx" is the sensitive text
+ entity.getTags().put(SparkJobTagName.SPARK_APP_NORM_NAME.toString(), this.getNormalizedName(JSONUtils.getString(event, "App Name"), null));
+ entity.getTags().put(SparkJobTagName.SPARK_USER.toString(), JSONUtils.getString(event, "User"));
+
+ entity.setTimestamp(appStartTime);
+ }
+
+ this.app.setStartTime(appStartTime);
+ this.lastEventTime = appStartTime;
+ }
+
+ private void handleExecutorAdd(JSONObject event) {
+ String executorID = (String) event.get("Executor ID");
+ long executorAddTime = JSONUtils.getLong(event, "Timestamp", lastEventTime);
+ this.lastEventTime = executorAddTime;
+ SparkExecutor executor = this.initiateExecutor(executorID, executorAddTime);
+
+ JSONObject executorInfo = JSONUtils.getJSONObject(event, "Executor Info");
+
+ }
+
+ private void handleBlockManagerAdd(JSONObject event) {
+ long maxMemory = JSONUtils.getLong(event, "Maximum Memory");
+ long timestamp = JSONUtils.getLong(event, "Timestamp", lastEventTime);
+ this.lastEventTime = timestamp;
+ JSONObject blockInfo = JSONUtils.getJSONObject(event, "Block Manager ID");
+ String executorID = JSONUtils.getString(blockInfo, "Executor ID");
+ String hostAndPort = JSONUtils.getString(blockInfo, "Host") + ":" + JSONUtils.getLong(blockInfo, "Port");
+
+ SparkExecutor executor = this.initiateExecutor(executorID, timestamp);
+ executor.setMaxMemory(maxMemory);
+ executor.setHostPort(hostAndPort);
+ }
+
+ private void handleTaskStart(JSONObject event) {
+ this.initializeTask(event);
+ }
+
+ private void handleTaskEnd(JSONObject event) {
+ JSONObject taskInfo = JSONUtils.getJSONObject(event, "Task Info");
+ long taskId = JSONUtils.getLong(taskInfo, "Task ID");
+ SparkTask task = tasks.get(taskId);
+ if (task == null) {
+ return;
+ }
+
+ task.setFailed(JSONUtils.getBoolean(taskInfo, "Failed"));
+ JSONObject taskMetrics = JSONUtils.getJSONObject(event, "Task Metrics");
+ if (null != taskMetrics) {
+ task.setExecutorDeserializeTime(JSONUtils.getLong(taskMetrics, "Executor Deserialize Time", lastEventTime));
+ task.setExecutorRunTime(JSONUtils.getLong(taskMetrics, "Executor Run Time", lastEventTime));
+ task.setJvmGcTime(JSONUtils.getLong(taskMetrics, "JVM GC Time", lastEventTime));
+ task.setResultSize(JSONUtils.getLong(taskMetrics, "Result Size"));
+ task.setResultSerializationTime(JSONUtils.getLong(taskMetrics, "Result Serialization Time", lastEventTime));
+ task.setMemoryBytesSpilled(JSONUtils.getLong(taskMetrics, "Memory Bytes Spilled"));
+ task.setDiskBytesSpilled(JSONUtils.getLong(taskMetrics, "Disk Bytes Spilled"));
+
+ JSONObject inputMetrics = JSONUtils.getJSONObject(taskMetrics, "Input Metrics");
+ if (null != inputMetrics) {
+ task.setInputBytes(JSONUtils.getLong(inputMetrics, "Bytes Read"));
+ task.setInputRecords(JSONUtils.getLong(inputMetrics, "Records Read"));
+ }
+
+ JSONObject outputMetrics = JSONUtils.getJSONObject(taskMetrics, "Output Metrics");
+ if (null != outputMetrics) {
+ task.setOutputBytes(JSONUtils.getLong(outputMetrics, "Bytes Written"));
+ task.setOutputRecords(JSONUtils.getLong(outputMetrics, "Records Written"));
+ }
+
+ JSONObject shuffleWriteMetrics = JSONUtils.getJSONObject(taskMetrics, "Shuffle Write Metrics");
+ if (null != shuffleWriteMetrics) {
+ task.setShuffleWriteBytes(JSONUtils.getLong(shuffleWriteMetrics, "Shuffle Bytes Written"));
+ task.setShuffleWriteRecords(JSONUtils.getLong(shuffleWriteMetrics, "Shuffle Records Written"));
+ }
+
+ JSONObject shuffleReadMetrics = JSONUtils.getJSONObject(taskMetrics, "Shuffle Read Metrics");
+ if (null != shuffleReadMetrics) {
+ task.setShuffleReadLocalBytes(JSONUtils.getLong(shuffleReadMetrics, "Local Bytes Read"));
+ task.setShuffleReadRemoteBytes(JSONUtils.getLong(shuffleReadMetrics, "Remote Bytes Read"));
+ task.setShuffleReadRecords(JSONUtils.getLong(shuffleReadMetrics, "Total Records Read"));
+ }
+ } else {
+ //for tasks success without task metrics, save in the end if no other information
+ if (!task.isFailed()) {
+ return;
+ }
+ }
+
+ aggregateToStage(task);
+ aggregateToExecutor(task);
+ tasks.remove(taskId);
+ this.flushEntities(task, false);
+ }
+
+
+ private SparkTask initializeTask(JSONObject event) {
+ SparkTask task = new SparkTask();
+ task.setTags(new HashMap<>(this.app.getTags()));
+ task.setTimestamp(app.getTimestamp());
+
+ task.getTags().put(SparkJobTagName.SPARK_SATGE_ID.toString(), Long.toString(JSONUtils.getLong(event, "Stage ID")));
+ task.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), Long.toString(JSONUtils.getLong(event, "Stage Attempt ID")));
+
+ JSONObject taskInfo = JSONUtils.getJSONObject(event, "Task Info");
+ long taskId = JSONUtils.getLong(taskInfo, "Task ID");
+ task.setTaskId(taskId);
+
+ task.getTags().put(SparkJobTagName.SPARK_TASK_INDEX.toString(), Long.toString(JSONUtils.getLong(taskInfo, "Index")));
+ task.getTags().put(SparkJobTagName.SPARK_TASK_ATTEMPT_ID.toString(), Integer.toString(JSONUtils.getInt(taskInfo, "Attempt")));
+ long launchTime = JSONUtils.getLong(taskInfo, "Launch Time", lastEventTime);
+ this.lastEventTime = launchTime;
+ if (taskId == 0) {
+ this.setFirstTaskLaunchTime(launchTime);
+ }
+ task.setLaunchTime(launchTime);
+ task.setExecutorId(JSONUtils.getString(taskInfo, "Executor ID"));
+ task.setHost(JSONUtils.getString(taskInfo, "Host"));
+ task.setTaskLocality(JSONUtils.getString(taskInfo, "Locality"));
+ task.setSpeculative(JSONUtils.getBoolean(taskInfo, "Speculative"));
+
+ tasks.put(task.getTaskId(), task);
+ return task;
+ }
+
+ private void setFirstTaskLaunchTime(long launchTime) {
+ this.firstTaskLaunchTime = launchTime;
+ }
+
+ private void handleJobStart(JSONObject event) {
+ SparkJob job = new SparkJob();
+ job.setTags(new HashMap<>(this.app.getTags()));
+ job.setTimestamp(app.getTimestamp());
+
+ int jobId = JSONUtils.getInt(event, "Job ID");
+ job.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), Integer.toString(jobId));
+ long submissionTime = JSONUtils.getLong(event, "Submission Time", lastEventTime);
+ job.setSubmissionTime(submissionTime);
+ this.lastEventTime = submissionTime;
+
+ //for complete application, no active stages/tasks
+ job.setNumActiveStages(0);
+ job.setNumActiveTasks(0);
+
+ this.jobs.put(jobId, job);
+ this.jobStageMap.put(jobId, new HashSet<String>());
+
+ JSONArray stages = JSONUtils.getJSONArray(event, "Stage Infos");
+ int stagesSize = (stages == null ? 0 : stages.size());
+ job.setNumStages(stagesSize);
+ for (int i = 0; i < stagesSize; i++) {
+ JSONObject stageInfo = (JSONObject) stages.get(i);
+ int stageId = JSONUtils.getInt(stageInfo, "Stage ID");
+ int stageAttemptId = JSONUtils.getInt(stageInfo, "Stage Attempt ID");
+ String stageName = JSONUtils.getString(stageInfo, "Stage Name");
+ int numTasks = JSONUtils.getInt(stageInfo, "Number of Tasks");
+ this.initiateStage(jobId, stageId, stageAttemptId, stageName, numTasks);
+ }
+ }
+
+ private void handleStageSubmit(JSONObject event) {
+ JSONObject stageInfo = JSONUtils.getJSONObject(event, "Stage Info");
+ int stageId = JSONUtils.getInt(stageInfo, "Stage ID");
+ int stageAttemptId = JSONUtils.getInt(stageInfo, "Stage Attempt ID");
+ String key = this.generateStageKey(Integer.toString(stageId), Integer.toString(stageAttemptId));
+ stageTaskStatusMap.put(key, new HashMap<Integer, Boolean>());
+
+ if (!stages.containsKey(key)) {
+ //may be further attempt for one stage
+ String baseAttempt = this.generateStageKey(Integer.toString(stageId), "0");
+ if (stages.containsKey(baseAttempt)) {
+ SparkStage stage = stages.get(baseAttempt);
+ String jobId = stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString());
+
+ String stageName = JSONUtils.getString(event, "Stage Name");
+ int numTasks = JSONUtils.getInt(stageInfo, "Number of Tasks");
+ this.initiateStage(Integer.parseInt(jobId), stageId, stageAttemptId, stageName, numTasks);
+ }
+ }
+ }
+
+ private void handleStageComplete(JSONObject event) {
+ JSONObject stageInfo = JSONUtils.getJSONObject(event, "Stage Info");
+ int stageId = JSONUtils.getInt(stageInfo, "Stage ID");
+ int stageAttemptId = JSONUtils.getInt(stageInfo, "Stage Attempt ID");
+ String key = this.generateStageKey(Integer.toString(stageId), Integer.toString(stageAttemptId));
+ SparkStage stage = stages.get(key);
+
+ // If "Submission Time" is not available, use the "Launch Time" of "Task ID" = 0.
+ Long submissionTime = JSONUtils.getLong(stageInfo, "Submission Time", firstTaskLaunchTime);
+
+ stage.setSubmitTime(submissionTime);
+
+ long completeTime = JSONUtils.getLong(stageInfo, "Completion Time", lastEventTime);
+ stage.setCompleteTime(completeTime);
+ this.lastEventTime = completeTime;
+
+ if (stageInfo != null && stageInfo.containsKey("Failure Reason")) {
+ stage.setStatus(SparkEntityConstant.SparkStageStatus.FAILED.toString());
+ } else {
+ stage.setStatus(SparkEntityConstant.SparkStageStatus.COMPLETE.toString());
+ }
+ }
+
+ private void handleExecutorRemoved(JSONObject event) {
+ String executorID = JSONUtils.getString(event, "Executor ID");
+ SparkExecutor executor = executors.get(executorID);
+ long removedTime = JSONUtils.getLong(event, "Timestamp", lastEventTime);
+ executor.setEndTime(removedTime);
+ this.lastEventTime = removedTime;
+ }
+
+ private void handleJobEnd(JSONObject event) {
+ int jobId = JSONUtils.getInt(event, "Job ID");
+ SparkJob job = jobs.get(jobId);
+
+ long completionTime = JSONUtils.getLong(event, "Completion Time", lastEventTime);
+ job.setCompletionTime(completionTime);
+ this.lastEventTime = completionTime;
+
+ JSONObject jobResult = JSONUtils.getJSONObject(event, "Job Result");
+ String result = JSONUtils.getString(jobResult, "Result");
+ if (result.equalsIgnoreCase("JobSucceeded")) {
+ job.setStatus(SparkEntityConstant.SparkJobStatus.SUCCEEDED.toString());
+ } else {
+ job.setStatus(SparkEntityConstant.SparkJobStatus.FAILED.toString());
+ }
+ }
+
+ private void handleAppEnd(JSONObject event) {
+ long endTime = JSONUtils.getLong(event, "Timestamp", lastEventTime);
+ app.setEndTime(endTime);
+ this.lastEventTime = endTime;
+ }
+
+ public void clearReader() throws Exception {
+ //clear tasks
+ for (SparkTask task : tasks.values()) {
+ LOG.info("Task {} does not have result or no task metrics.", task.getTaskId());
+ task.setFailed(true);
+ aggregateToStage(task);
+ aggregateToExecutor(task);
+ this.flushEntities(task, false);
+ }
+
+ List<SparkStage> needStoreStages = new ArrayList<>();
+ for (SparkStage stage : this.stages.values()) {
+ int jobId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString()));
+ if (stage.getSubmitTime() == 0 || stage.getCompleteTime() == 0) {
+ SparkJob job = this.jobs.get(jobId);
+ job.setNumSkippedStages(job.getNumSkippedStages() + 1);
+ job.setNumSkippedTasks(job.getNumSkippedTasks() + stage.getNumTasks());
+ } else {
+ this.aggregateToJob(stage);
+ this.aggregateStageToApp(stage);
+ needStoreStages.add(stage);
+ }
+ String stageId = stage.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString());
+ String stageAttemptId = stage.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString());
+ this.jobStageMap.get(jobId).remove(this.generateStageKey(stageId, stageAttemptId));
+ }
+
+ this.flushEntities(needStoreStages, false);
+ for (SparkJob job : jobs.values()) {
+ this.aggregateJobToApp(job);
+ }
+ this.flushEntities(jobs.values(), false);
+
+ app.setExecutors(executors.values().size());
+
+ long executorMemory = Utils.parseMemory((String) this.getConfigVal(this.app.getConfig(), "spark.executor.memory", String.class.getName()));
+ long driverMemory = Utils.parseMemory(this.isClientMode(app.getConfig())
+ ? (String) this.getConfigVal(this.app.getConfig(), "spark.yarn.am.memory", String.class.getName())
+ : (String) this.getConfigVal(app.getConfig(), "spark.driver.memory", String.class.getName()));
+
+ int executorCore = (Integer) this.getConfigVal(app.getConfig(), "spark.executor.cores", Integer.class.getName());
+ int driverCore = this.isClientMode(app.getConfig())
+ ? (Integer) this.getConfigVal(app.getConfig(), "spark.yarn.am.cores", Integer.class.getName())
+ : (Integer) this.getConfigVal(app.getConfig(), "spark.driver.cores", Integer.class.getName());
+
+ long executorMemoryOverhead = this.getMemoryOverhead(app.getConfig(), executorMemory, "spark.yarn.executor.memoryOverhead");
+ long driverMemoryOverhead = this.isClientMode(app.getConfig())
+ ? this.getMemoryOverhead(app.getConfig(), driverMemory, "spark.yarn.am.memoryOverhead")
+ : this.getMemoryOverhead(app.getConfig(), driverMemory, "spark.yarn.driver.memoryOverhead");
+
+ app.setExecMemoryBytes(executorMemory);
+ app.setDriveMemoryBytes(driverMemory);
+ app.setExecutorCores(executorCore);
+ app.setDriverCores(driverCore);
+ app.setExecutorMemoryOverhead(executorMemoryOverhead);
+ app.setDriverMemoryOverhead(driverMemoryOverhead);
+
+ for (SparkExecutor executor : executors.values()) {
+ String executorID = executor.getTags().get(SparkJobTagName.SPARK_EXECUTOR_ID.toString());
+ if (executorID.equalsIgnoreCase("driver")) {
+ executor.setExecMemoryBytes(driverMemory);
+ executor.setCores(driverCore);
+ executor.setMemoryOverhead(driverMemoryOverhead);
+ } else {
+ executor.setExecMemoryBytes(executorMemory);
+ executor.setCores(executorCore);
+ executor.setMemoryOverhead(executorMemoryOverhead);
+ }
+ if (app.getEndTime() <= 0L) {
+ app.setEndTime(this.lastEventTime);
+ }
+ if (executor.getEndTime() <= 0L) {
+ executor.setEndTime(app.getEndTime());
+ }
+ this.aggregateExecutorToApp(executor);
+ }
+ this.flushEntities(executors.values(), false);
+ //spark code...tricky
+ app.setSkippedTasks(app.getCompleteTasks());
+ this.flushEntities(app, true);
+ }
+
+ private long getMemoryOverhead(JobConfig config, long executorMemory, String fieldName) {
+ long result = 0L;
+ String fieldValue = config.getConfig().get(fieldName);
+ if (fieldValue != null) {
+ result = Utils.parseMemory(fieldValue + "m");
+ if (result == 0L) {
+ result = Utils.parseMemory(fieldValue);
+ }
+ }
+
+ if (result == 0L) {
+ result = Math.max(
+ Utils.parseMemory(conf.getString("spark.defaultVal.spark.yarn.overhead.min")),
+ executorMemory * conf.getInt("spark.defaultVal." + fieldName + ".factor") / 100);
+ }
+ return result;
+ }
+
+ private void aggregateExecutorToApp(SparkExecutor executor) {
+ long totalExecutorTime = app.getTotalExecutorTime() + executor.getEndTime() - executor.getStartTime();
+ if (totalExecutorTime < 0L) {
+ totalExecutorTime = 0L;
+ }
+ app.setTotalExecutorTime(totalExecutorTime);
+ }
+
+ private void aggregateJobToApp(SparkJob job) {
+ //aggregate job level metrics
+ app.setNumJobs(app.getNumJobs() + 1);
+ app.setTotalTasks(app.getTotalTasks() + job.getNumTask());
+ app.setCompleteTasks(app.getCompleteTasks() + job.getNumCompletedTasks());
+ app.setSkippedTasks(app.getSkippedTasks() + job.getNumSkippedTasks());
+ app.setFailedTasks(app.getFailedTasks() + job.getNumFailedTasks());
+ app.setTotalStages(app.getTotalStages() + job.getNumStages());
+ app.setFailedStages(app.getFailedStages() + job.getNumFailedStages());
+ app.setSkippedStages(app.getSkippedStages() + job.getNumSkippedStages());
+ }
+
+ private void aggregateStageToApp(SparkStage stage) {
+ //aggregate task level metrics
+ app.setDiskBytesSpilled(app.getDiskBytesSpilled() + stage.getDiskBytesSpilled());
+ app.setMemoryBytesSpilled(app.getMemoryBytesSpilled() + stage.getMemoryBytesSpilled());
+ app.setExecutorRunTime(app.getExecutorRunTime() + stage.getExecutorRunTime());
+ app.setJvmGcTime(app.getJvmGcTime() + stage.getJvmGcTime());
+ app.setExecutorDeserializeTime(app.getExecutorDeserializeTime() + stage.getExecutorDeserializeTime());
+ app.setResultSerializationTime(app.getResultSerializationTime() + stage.getResultSerializationTime());
+ app.setResultSize(app.getResultSize() + stage.getResultSize());
+ app.setInputRecords(app.getInputRecords() + stage.getInputRecords());
+ app.setInputBytes(app.getInputBytes() + stage.getInputBytes());
+ app.setOutputRecords(app.getOutputRecords() + stage.getOutputRecords());
+ app.setOutputBytes(app.getOutputBytes() + stage.getOutputBytes());
+ app.setShuffleWriteRecords(app.getShuffleWriteRecords() + stage.getShuffleWriteRecords());
+ app.setShuffleWriteBytes(app.getShuffleWriteBytes() + stage.getShuffleWriteBytes());
+ app.setShuffleReadRecords(app.getShuffleReadRecords() + stage.getShuffleReadRecords());
+ app.setShuffleReadBytes(app.getShuffleReadBytes() + stage.getShuffleReadBytes());
+ }
+
+ private void aggregateToStage(SparkTask task) {
+ String stageId = task.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString());
+ String stageAttemptId = task.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString());
+ String key = this.generateStageKey(stageId, stageAttemptId);
+ SparkStage stage = stages.get(key);
+
+ stage.setDiskBytesSpilled(stage.getDiskBytesSpilled() + task.getDiskBytesSpilled());
+ stage.setMemoryBytesSpilled(stage.getMemoryBytesSpilled() + task.getMemoryBytesSpilled());
+ stage.setExecutorRunTime(stage.getExecutorRunTime() + task.getExecutorRunTime());
+ stage.setJvmGcTime(stage.getJvmGcTime() + task.getJvmGcTime());
+ stage.setExecutorDeserializeTime(stage.getExecutorDeserializeTime() + task.getExecutorDeserializeTime());
+ stage.setResultSerializationTime(stage.getResultSerializationTime() + task.getResultSerializationTime());
+ stage.setResultSize(stage.getResultSize() + task.getResultSize());
+ stage.setInputRecords(stage.getInputRecords() + task.getInputRecords());
+ stage.setInputBytes(stage.getInputBytes() + task.getInputBytes());
+ stage.setOutputRecords(stage.getOutputRecords() + task.getOutputRecords());
+ stage.setOutputBytes(stage.getOutputBytes() + task.getOutputBytes());
+ stage.setShuffleWriteRecords(stage.getShuffleWriteRecords() + task.getShuffleWriteRecords());
+ stage.setShuffleWriteBytes(stage.getShuffleWriteBytes() + task.getShuffleWriteBytes());
+ stage.setShuffleReadRecords(stage.getShuffleReadRecords() + task.getShuffleReadRecords());
+ long taskShuffleReadBytes = task.getShuffleReadLocalBytes() + task.getShuffleReadRemoteBytes();
+ stage.setShuffleReadBytes(stage.getShuffleReadBytes() + taskShuffleReadBytes);
+
+ boolean success = !task.isFailed();
+
+ Integer taskIndex = Integer.parseInt(task.getTags().get(SparkJobTagName.SPARK_TASK_INDEX.toString()));
+ if (stageTaskStatusMap.get(key).containsKey(taskIndex)) {
+ //has previous task attempt, retrieved from task index in one stage
+ boolean previousResult = stageTaskStatusMap.get(key).get(taskIndex);
+ success = previousResult || success;
+ if (previousResult != success) {
+ stage.setNumFailedTasks(stage.getNumFailedTasks() - 1);
+ stage.setNumCompletedTasks(stage.getNumCompletedTasks() + 1);
+ stageTaskStatusMap.get(key).put(taskIndex, success);
+ }
+ } else {
+ if (success) {
+ stage.setNumCompletedTasks(stage.getNumCompletedTasks() + 1);
+ } else {
+ stage.setNumFailedTasks(stage.getNumFailedTasks() + 1);
+ }
+ stageTaskStatusMap.get(key).put(taskIndex, success);
+ }
+
+ }
+
+ private void aggregateToExecutor(SparkTask task) {
+ String executorId = task.getExecutorId();
+ SparkExecutor executor = executors.get(executorId);
+
+ if (null != executor) {
+ executor.setTotalTasks(executor.getTotalTasks() + 1);
+ if (task.isFailed()) {
+ executor.setFailedTasks(executor.getFailedTasks() + 1);
+ } else {
+ executor.setCompletedTasks(executor.getCompletedTasks() + 1);
+ }
+ long taskShuffleReadBytes = task.getShuffleReadLocalBytes() + task.getShuffleReadRemoteBytes();
+ executor.setTotalShuffleRead(executor.getTotalShuffleRead() + taskShuffleReadBytes);
+ executor.setTotalDuration(executor.getTotalDuration() + task.getExecutorRunTime());
+ executor.setTotalInputBytes(executor.getTotalInputBytes() + task.getInputBytes());
+ executor.setTotalShuffleWrite(executor.getTotalShuffleWrite() + task.getShuffleWriteBytes());
+ executor.setTotalDuration(executor.getTotalDuration() + task.getExecutorRunTime());
+ }
+
+ }
+
+ private void aggregateToJob(SparkStage stage) {
+ int jobId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString()));
+ SparkJob job = jobs.get(jobId);
+ job.setNumCompletedTasks(job.getNumCompletedTasks() + stage.getNumCompletedTasks());
+ job.setNumFailedTasks(job.getNumFailedTasks() + stage.getNumFailedTasks());
+ job.setNumTask(job.getNumTask() + stage.getNumTasks());
+
+
+ if (stage.getStatus().equalsIgnoreCase(SparkEntityConstant.SparkStageStatus.COMPLETE.toString())) {
+ //if multiple attempts succeed, just count one
+ if (!hasStagePriorAttemptSuccess(stage)) {
+ job.setNumCompletedStages(job.getNumCompletedStages() + 1);
+ }
+ } else {
+ job.setNumFailedStages(job.getNumFailedStages() + 1);
+ }
+ }
+
+ private boolean hasStagePriorAttemptSuccess(SparkStage stage) {
+ int stageAttemptId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString()));
+ for (int i = 0; i < stageAttemptId; i++) {
+ SparkStage previousStage = stages.get(this.generateStageKey(
+ stage.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString()), Integer.toString(i)));
+ if (previousStage.getStatus().equalsIgnoreCase(SparkEntityConstant.SparkStageStatus.COMPLETE.toString())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+
+ private String generateStageKey(String stageId, String stageAttemptId) {
+ return stageId + "-" + stageAttemptId;
+ }
+
+ private void initiateStage(int jobId, int stageId, int stageAttemptId, String name, int numTasks) {
+ SparkStage stage = new SparkStage();
+ stage.setTags(new HashMap<>(this.app.getTags()));
+ stage.setTimestamp(app.getTimestamp());
+ stage.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), Integer.toString(jobId));
+ stage.getTags().put(SparkJobTagName.SPARK_SATGE_ID.toString(), Integer.toString(stageId));
+ stage.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), Integer.toString(stageAttemptId));
+ stage.setName(name);
+ stage.setNumActiveTasks(0);
+ stage.setNumTasks(numTasks);
+ stage.setSchedulingPool(this.app.getConfig().getConfig().get("spark.scheduler.pool") == null ?
+ "default" : this.app.getConfig().getConfig().get("spark.scheduler.pool"));
+
+ String stageKey = this.generateStageKey(Integer.toString(stageId), Integer.toString(stageAttemptId));
+ stages.put(stageKey, stage);
+ this.jobStageMap.get(jobId).add(stageKey);
+ }
+
+
+ private SparkExecutor initiateExecutor(String executorID, long startTime) {
+ if (!executors.containsKey(executorID)) {
+ SparkExecutor executor = new SparkExecutor();
+ executor.setTags(new HashMap<>(this.app.getTags()));
+ executor.getTags().put(SparkJobTagName.SPARK_EXECUTOR_ID.toString(), executorID);
+ executor.setStartTime(startTime);
+ executor.setTimestamp(app.getTimestamp());
+
+ this.executors.put(executorID, executor);
+ }
+
+ return this.executors.get(executorID);
+ }
+
+ private String getNormalizedName(String jobName, String assignedName) {
+ if (null != assignedName) {
+ return assignedName;
+ } else {
+ return JobNameNormalization.getInstance().normalize(jobName);
+ }
+ }
+
+ private void flushEntities(Object entity, boolean forceFlush) {
+ this.flushEntities(Collections.singletonList(entity), forceFlush);
+ }
+
+ private void flushEntities(Collection entities, boolean forceFlush) {
+ this.createEntities.addAll(entities);
+
+ if (forceFlush || this.createEntities.size() >= FLUSH_LIMIT) {
+ try {
+ this.doFlush(this.createEntities);
+ this.createEntities.clear();
+ } catch (Exception e) {
+ LOG.error("Fail to flush entities", e);
+ }
+
+ }
+ }
+
+ private EagleServiceBaseClient initiateClient() {
+ String host = conf.getString("eagleProps.eagle.service.host");
+ int port = conf.getInt("eagleProps.eagle.service.port");
+ String userName = conf.getString("eagleProps.eagle.service.username");
+ String pwd = conf.getString("eagleProps.eagle.service.password");
+ client = new EagleServiceClientImpl(host, port, userName, pwd);
+ int timeout = conf.getInt("eagleProps.eagle.service.read.timeout");
+ client.getJerseyClient().setReadTimeout(timeout * 1000);
+
+ return client;
+ }
+
+ private void doFlush(List entities) throws IOException, EagleServiceClientException {
+ client.create(entities);
+ int size = (entities == null ? 0 : entities.size());
+ LOG.info("finish flushing entities of total number " + size);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java
new file mode 100644
index 0000000..b1dd09c
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.history.crawl;
+
+
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+public class JHFSparkParser implements JHFParserBase {
+
+ private static final Logger logger = LoggerFactory.getLogger(JHFSparkParser.class);
+
+ private boolean isValidJson;
+
+ private JHFSparkEventReader eventReader;
+
+ public JHFSparkParser(JHFSparkEventReader reader) {
+ this.eventReader = reader;
+ }
+
+ @Override
+ public void parse(InputStream is) throws Exception {
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(is))) {
+ for (String line = reader.readLine(); line != null; line = reader.readLine()) {
+ isValidJson = true;
+ JSONObject eventObj = parseAndValidateJSON(line);
+ if (isValidJson) {
+ try {
+ this.eventReader.read(eventObj);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ this.eventReader.clearReader();
+ }
+ }
+
+ private JSONObject parseAndValidateJSON(String line) {
+ JSONObject eventObj = null;
+ JSONParser parser = new JSONParser();
+ try {
+ eventObj = (JSONObject) parser.parse(line);
+ } catch (ParseException ex) {
+ isValidJson = false;
+ logger.error(String.format("Invalid json string. Fail to parse %s.", line), ex);
+ }
+ return eventObj;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkApplicationInfo.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkApplicationInfo.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkApplicationInfo.java
new file mode 100644
index 0000000..c206b71
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkApplicationInfo.java
@@ -0,0 +1,69 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.spark.history.crawl;
+
+public class SparkApplicationInfo {
+
+ private String state;
+ private String finalStatus;
+ private String queue;
+ private String name;
+ private String user;
+
+ public String getState() {
+ return state;
+ }
+
+ public void setState(String state) {
+ this.state = state;
+ }
+
+ public String getFinalStatus() {
+ return finalStatus;
+ }
+
+ public void setFinalStatus(String finalStatus) {
+ this.finalStatus = finalStatus;
+ }
+
+ public String getQueue() {
+ return queue;
+ }
+
+ public void setQueue(String queue) {
+ this.queue = queue;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkFilesystemInputStreamReaderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkFilesystemInputStreamReaderImpl.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkFilesystemInputStreamReaderImpl.java
new file mode 100644
index 0000000..0144410
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkFilesystemInputStreamReaderImpl.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.history.crawl;
+
+import org.apache.eagle.jpm.util.SparkJobTagName;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+public class SparkFilesystemInputStreamReaderImpl implements JHFInputStreamReader {
+
+ private String site;
+ private SparkApplicationInfo app;
+
+
+ public SparkFilesystemInputStreamReaderImpl(String site, SparkApplicationInfo app) {
+ this.site = site;
+ this.app = app;
+ }
+
+ @Override
+ public void read(InputStream is) throws Exception {
+ Map<String, String> baseTags = new HashMap<>();
+ baseTags.put(SparkJobTagName.SITE.toString(), site);
+ baseTags.put(SparkJobTagName.SPARK_QUEUE.toString(), app.getQueue());
+ JHFParserBase parser = new JHFSparkParser(new JHFSparkEventReader(baseTags, this.app));
+ parser.parse(is);
+ }
+
+ public static void main(String[] args) throws Exception {
+ SparkFilesystemInputStreamReaderImpl impl = new SparkFilesystemInputStreamReaderImpl("apollo-phx", new SparkApplicationInfo());
+ impl.read(new FileInputStream(new File("E:\\eagle\\application_1459803563374_535667_1")));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
index 9fafc1f..0bb65df 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
@@ -19,7 +19,7 @@
package org.apache.eagle.jpm.spark.history.status;
-import org.apache.eagle.jpm.spark.crawl.SparkApplicationInfo;
+import org.apache.eagle.jpm.spark.history.crawl.SparkApplicationInfo;
import org.apache.eagle.jpm.spark.history.SparkHistoryJobAppConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -233,10 +233,11 @@ public class JobHistoryZKStateManager {
curator.setData().forPath(path, status.toString().getBytes("UTF-8"));
}
} else {
- LOG.error("Failed to update for application with path: " + path);
+ LOG.warn("failed to update with status {} due to path {} not existing ", status, path);
+ //throw new RuntimeException("Failed to update for application with path: " + path);
}
} catch (Exception e) {
- LOG.error("fail to update application status", e);
+ LOG.error("fail to update application status as {}", status, e);
throw new RuntimeException(e);
} finally {
try {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
index e88c62f..0351de3 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
@@ -19,9 +19,9 @@
package org.apache.eagle.jpm.spark.history.storm;
-import org.apache.eagle.jpm.spark.crawl.JHFInputStreamReader;
-import org.apache.eagle.jpm.spark.crawl.SparkApplicationInfo;
-import org.apache.eagle.jpm.spark.crawl.SparkFilesystemInputStreamReaderImpl;
+import org.apache.eagle.jpm.spark.history.crawl.JHFInputStreamReader;
+import org.apache.eagle.jpm.spark.history.crawl.SparkApplicationInfo;
+import org.apache.eagle.jpm.spark.history.crawl.SparkFilesystemInputStreamReaderImpl;
import org.apache.eagle.jpm.spark.history.SparkHistoryJobAppConfig;
import org.apache.eagle.jpm.spark.history.status.JobHistoryZKStateManager;
import org.apache.eagle.jpm.spark.history.status.ZKStateConstant;
@@ -106,9 +106,12 @@ public class SparkHistoryJobParseBolt extends BaseRichBolt {
zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.FINISHED);
LOG.info("Successfully parse application {}", appId);
collector.ack(tuple);
+ } catch (RuntimeException e) {
+ LOG.warn("fail to process application {} due to RuntimeException, ignore it", appId, e);
+ zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.FINISHED);
+ collector.ack(tuple);
} catch (Exception e) {
- LOG.error("Fail to process application {}", appId, e);
- zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.FAILED);
+ LOG.error("Fail to process application {}, and retry", appId, e);
collector.fail(tuple);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java
index 5602b4c..4c50607 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java
@@ -91,10 +91,11 @@ public class SparkHistoryJobSpout extends BaseRichSpout {
LOG.info("emit " + appId);
zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.SENT_FOR_PARSE);
}
- LOG.info("{} apps sent.", appIds.size());
if (appIds.isEmpty()) {
- this.takeRest(60);
+ this.takeRest(10);
+ } else {
+ LOG.info("{} apps sent.", appIds.size());
}
} catch (Exception e) {
LOG.error("Fail to run next tuple", e);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
index 26842b8..b94c603 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
@@ -153,24 +153,6 @@
<value>http://sandbox.hortonworks.com:8088</value>
</property>
<property>
- <name>storm.mode</name>
- <displayName>mode</displayName>
- <description>Storm Mode: local or cluster</description>
- <value>local</value>
- </property>
- <property>
- <name>storm.worker.num</name>
- <displayName>worker.num</displayName>
- <description>The number of workers</description>
- <value>2</value>
- </property>
- <property>
- <name>name</name>
- <displayName>name</displayName>
- <description>Name of the topology</description>
- <value>sparkHistoryJob</value>
- </property>
- <property>
<name>storm.messageTimeoutSec</name>
<displayName>messageTimeoutSec</displayName>
<description>Message timeout (in seconds)</description>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
index 58dd552..4c22b15 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
@@ -15,6 +15,9 @@
{
+ "appId": "sparkHistoryJob",
+ "mode": "CLUSTER",
+ "workers" : 3,
"basic":{
"cluster":"sandbox",
"dataCenter":"sandbox",
@@ -45,8 +48,6 @@
}
},
"storm":{
- worker.num: 2,
- "name":"sparkHistoryJob",
"messageTimeoutSec": 3000,
"pendingSpout": 1000,
"spoutCrawlInterval": 10000,#in ms
@@ -72,7 +73,5 @@
spark.yarn.am.memoryOverhead.factor: 10,
spark.yarn.overhead.min: "384m"
}
- },
- "appId": "sparkHistoryJob",
- "mode": "CLUSTER"
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java
deleted file mode 100644
index 5d1cfaa..0000000
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.running.entities;
-
-import org.apache.eagle.log.entity.repo.EntityRepository;
-
-public class JPMEntityRepository extends EntityRepository {
- public JPMEntityRepository() {
- entitySet.add(SparkAppEntity.class);
- entitySet.add(SparkJobEntity.class);
- entitySet.add(SparkStageEntity.class);
- entitySet.add(SparkTaskEntity.class);
- entitySet.add(SparkExecutorEntity.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java
deleted file mode 100644
index e18f1e7..0000000
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.running.entities;
-
-import java.io.Serializable;
-import java.util.HashMap;
-
-public class JobConfig extends HashMap<String, String> implements Serializable {
-}
-
[3/3] incubator-eagle git commit: Update spark history job feeder
config & refactor the code
Posted by qi...@apache.org.
Update spark history job feeder config & refactor the code
Author: Qingwen Zhao <qi...@gmail.com>
Closes #416 from qingwen220/sparkHist.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/3110c72e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/3110c72e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/3110c72e
Branch: refs/heads/develop
Commit: 3110c72e47f697f5c59b5f7d6559d527cf25db3a
Parents: 8774b85
Author: Qingwen Zhao <qi...@gmail.com>
Authored: Wed Sep 7 10:36:24 2016 +0800
Committer: Qingwen Zhao <qi...@gmail.com>
Committed: Wed Sep 7 10:36:24 2016 +0800
----------------------------------------------------------------------
.../environment/impl/StormExecutionRuntime.java | 32 +-
.../apache/eagle/jpm/spark/crawl/EventType.java | 24 -
.../jpm/spark/crawl/JHFInputStreamReader.java | 24 -
.../eagle/jpm/spark/crawl/JHFParserBase.java | 29 -
.../jpm/spark/crawl/JHFSparkEventReader.java | 713 -------------------
.../eagle/jpm/spark/crawl/JHFSparkParser.java | 73 --
.../jpm/spark/crawl/SparkApplicationInfo.java | 69 --
.../SparkFilesystemInputStreamReaderImpl.java | 53 --
.../running/entities/JPMEntityRepository.java | 33 +
.../jpm/spark/running/entities/JobConfig.java | 26 +
.../spark/running/entities/SparkAppEntity.java | 476 +++++++++++++
.../running/entities/SparkExecutorEntity.java | 233 ++++++
.../spark/running/entities/SparkJobEntity.java | 191 +++++
.../running/entities/SparkStageEntity.java | 299 ++++++++
.../spark/running/entities/SparkTaskEntity.java | 290 ++++++++
.../spark/history/SparkHistoryJobAppConfig.java | 4 -
.../history/crawl/JHFInputStreamReader.java | 24 +
.../jpm/spark/history/crawl/JHFParserBase.java | 29 +
.../history/crawl/JHFSparkEventReader.java | 713 +++++++++++++++++++
.../jpm/spark/history/crawl/JHFSparkParser.java | 73 ++
.../history/crawl/SparkApplicationInfo.java | 69 ++
.../SparkFilesystemInputStreamReaderImpl.java | 53 ++
.../status/JobHistoryZKStateManager.java | 7 +-
.../history/storm/SparkHistoryJobParseBolt.java | 13 +-
.../history/storm/SparkHistoryJobSpout.java | 5 +-
...spark.history.SparkHistoryJobAppProvider.xml | 18 -
.../src/main/resources/application.conf | 9 +-
.../running/entities/JPMEntityRepository.java | 30 -
.../jpm/spark/running/entities/JobConfig.java | 25 -
.../spark/running/entities/SparkAppEntity.java | 475 ------------
.../running/entities/SparkExecutorEntity.java | 232 ------
.../spark/running/entities/SparkJobEntity.java | 190 -----
.../running/entities/SparkStageEntity.java | 298 --------
.../spark/running/entities/SparkTaskEntity.java | 289 --------
.../running/parser/SparkApplicationParser.java | 8 +-
.../src/main/resources/application.conf | 6 +-
.../apache/eagle/jpm/util/SparkEventType.java | 25 +
.../util/resourcefetch/RMResourceFetcher.java | 2 +-
38 files changed, 2575 insertions(+), 2587 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
index 04cc19b..e37e8f2 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
@@ -34,13 +34,13 @@ import scala.Int;
import storm.trident.spout.RichSpoutBatchExecutor;
public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,StormTopology> {
- private final static Logger LOG = LoggerFactory.getLogger(StormExecutionRuntime.class);
+ private static final Logger LOG = LoggerFactory.getLogger(StormExecutionRuntime.class);
private static LocalCluster _localCluster;
private StormEnvironment environment;
- private static LocalCluster getLocalCluster(){
- if(_localCluster == null){
+ private static LocalCluster getLocalCluster() {
+ if (_localCluster == null) {
_localCluster = new LocalCluster();
}
return _localCluster;
@@ -56,13 +56,13 @@ public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,
return this.environment;
}
- private final static String STORM_NIMBUS_HOST_CONF_PATH = "application.storm.nimbusHost";
- private final static String STORM_NIMBUS_HOST_DEFAULT = "localhost";
- private final static Integer STORM_NIMBUS_THRIFT_DEFAULT = 6627;
- private final static String STORM_NIMBUS_THRIFT_CONF_PATH = "application.storm.nimbusThriftPort";
+ private static final String STORM_NIMBUS_HOST_CONF_PATH = "application.storm.nimbusHost";
+ private static final String STORM_NIMBUS_HOST_DEFAULT = "localhost";
+ private static final Integer STORM_NIMBUS_THRIFT_DEFAULT = 6627;
+ private static final String STORM_NIMBUS_THRIFT_CONF_PATH = "application.storm.nimbusThriftPort";
private static final String WORKERS = "workers";
- public backtype.storm.Config getStormConfig(){
+ public backtype.storm.Config getStormConfig() {
backtype.storm.Config conf = new backtype.storm.Config();
conf.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, Int.box(64 * 1024));
conf.put(backtype.storm.Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, Int.box(8));
@@ -71,14 +71,14 @@ public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,
conf.put(backtype.storm.Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, Int.box(16384));
conf.put(backtype.storm.Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE, Int.box(20480000));
String nimbusHost = STORM_NIMBUS_HOST_DEFAULT;
- if(environment.config().hasPath(STORM_NIMBUS_HOST_CONF_PATH)) {
+ if (environment.config().hasPath(STORM_NIMBUS_HOST_CONF_PATH)) {
nimbusHost = environment.config().getString(STORM_NIMBUS_HOST_CONF_PATH);
LOG.info("Overriding {} = {}",STORM_NIMBUS_HOST_CONF_PATH,nimbusHost);
} else {
LOG.info("Using default {} = {}",STORM_NIMBUS_HOST_CONF_PATH,STORM_NIMBUS_HOST_DEFAULT);
}
Integer nimbusThriftPort = STORM_NIMBUS_THRIFT_DEFAULT;
- if(environment.config().hasPath(STORM_NIMBUS_THRIFT_CONF_PATH)) {
+ if (environment.config().hasPath(STORM_NIMBUS_THRIFT_CONF_PATH)) {
nimbusThriftPort = environment.config().getInt(STORM_NIMBUS_THRIFT_CONF_PATH);
LOG.info("Overriding {} = {}",STORM_NIMBUS_THRIFT_CONF_PATH,nimbusThriftPort);
} else {
@@ -94,15 +94,15 @@ public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,
}
@Override
- public void start(Application<StormEnvironment, StormTopology> executor, com.typesafe.config.Config config){
+ public void start(Application<StormEnvironment, StormTopology> executor, com.typesafe.config.Config config) {
String topologyName = config.getString("appId");
- Preconditions.checkNotNull(topologyName,"[appId] is required by null for "+executor.getClass().getCanonicalName());
+ Preconditions.checkNotNull(topologyName,"[appId] is required by null for " + executor.getClass().getCanonicalName());
StormTopology topology = executor.execute(config, environment);
- LOG.info("Starting {} ({}), mode: {}",topologyName,executor.getClass().getCanonicalName(), config.getString("mode"));
+ LOG.info("Starting {} ({}), mode: {}",topologyName, executor.getClass().getCanonicalName(), config.getString("mode"));
Config conf = getStormConfig();
- if(ApplicationEntity.Mode.CLUSTER.name().equalsIgnoreCase(config.getString("mode"))){
+ if (ApplicationEntity.Mode.CLUSTER.name().equalsIgnoreCase(config.getString("mode"))) {
String jarFile = config.hasPath("jarPath") ? config.getString("jarPath") : null;
- if(jarFile == null){
+ if (jarFile == null) {
jarFile = DynamicJarPathFinder.findPath(executor.getClass());
}
synchronized (StormExecutionRuntime.class) {
@@ -129,7 +129,7 @@ public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,
public void stop(Application<StormEnvironment, StormTopology> executor, com.typesafe.config.Config config) {
String appId = config.getString("appId");
LOG.info("Stopping topology {} ..." + appId);
- if(config.getString("mode") == ApplicationEntity.Mode.CLUSTER.name()){
+ if (config.getString("mode") == ApplicationEntity.Mode.CLUSTER.name()) {
Nimbus.Client stormClient = NimbusClient.getConfiguredClient(getStormConfig()).getClient();
try {
stormClient.killTopology(appId);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/EventType.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/EventType.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/EventType.java
deleted file mode 100644
index 1ba15b7..0000000
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/EventType.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.crawl;
-
-public enum EventType {
- SparkListenerBlockManagerAdded, SparkListenerEnvironmentUpdate, SparkListenerApplicationStart,
- SparkListenerExecutorAdded, SparkListenerJobStart,SparkListenerStageSubmitted, SparkListenerTaskStart,SparkListenerBlockManagerRemoved,
- SparkListenerTaskEnd, SparkListenerStageCompleted, SparkListenerJobEnd, SparkListenerApplicationEnd,SparkListenerExecutorRemoved
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFInputStreamReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFInputStreamReader.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFInputStreamReader.java
deleted file mode 100644
index 8a8d0db..0000000
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFInputStreamReader.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.crawl;
-
-import java.io.InputStream;
-
-public interface JHFInputStreamReader {
- void read(InputStream is) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFParserBase.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFParserBase.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFParserBase.java
deleted file mode 100644
index 62ba7d9..0000000
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFParserBase.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.crawl;
-
-import java.io.InputStream;
-
-public interface JHFParserBase {
- /**
- * this method will ensure to close the inputStream.
- * @param is
- * @throws Exception
- */
- void parse(InputStream is) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
deleted file mode 100644
index 22b715a..0000000
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
+++ /dev/null
@@ -1,713 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.crawl;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.eagle.jpm.spark.entity.*;
-import org.apache.eagle.jpm.util.*;
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.service.client.EagleServiceClientException;
-import org.apache.eagle.service.client.impl.EagleServiceBaseClient;
-import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
-import org.json.simple.JSONArray;
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-
-public class JHFSparkEventReader {
- private static final Logger LOG = LoggerFactory.getLogger(JHFSparkEventReader.class);
-
- private static final int FLUSH_LIMIT = 500;
- private long firstTaskLaunchTime;
- private long lastEventTime;
-
- private Map<String, SparkExecutor> executors;
- private SparkApp app;
- private Map<Integer, SparkJob> jobs;
- private Map<String, SparkStage> stages;
- private Map<Integer, Set<String>> jobStageMap;
- private Map<Long, SparkTask> tasks;
- private EagleServiceClientImpl client;
- private Map<String, Map<Integer, Boolean>> stageTaskStatusMap;
-
- private List<TaggedLogAPIEntity> createEntities;
-
- private Config conf;
-
- public JHFSparkEventReader(Map<String, String> baseTags, SparkApplicationInfo info) {
- app = new SparkApp();
- app.setTags(new HashMap<String, String>(baseTags));
- app.setYarnState(info.getState());
- app.setYarnStatus(info.getFinalStatus());
- createEntities = new ArrayList<>();
- jobs = new HashMap<Integer, SparkJob>();
- stages = new HashMap<String, SparkStage>();
- jobStageMap = new HashMap<Integer, Set<String>>();
- tasks = new HashMap<Long, SparkTask>();
- executors = new HashMap<String, SparkExecutor>();
- stageTaskStatusMap = new HashMap<>();
- conf = ConfigFactory.load();
- this.initiateClient();
- }
-
- public SparkApp getApp() {
- return this.app;
- }
-
- public void read(JSONObject eventObj) {
- String eventType = (String) eventObj.get("Event");
- if (eventType.equalsIgnoreCase(EventType.SparkListenerApplicationStart.toString())) {
- handleAppStarted(eventObj);
- } else if (eventType.equalsIgnoreCase(EventType.SparkListenerEnvironmentUpdate.toString())) {
- handleEnvironmentSet(eventObj);
- } else if (eventType.equalsIgnoreCase(EventType.SparkListenerExecutorAdded.toString())) {
- handleExecutorAdd(eventObj);
- } else if (eventType.equalsIgnoreCase(EventType.SparkListenerBlockManagerAdded.toString())) {
- handleBlockManagerAdd(eventObj);
- } else if (eventType.equalsIgnoreCase(EventType.SparkListenerJobStart.toString())) {
- handleJobStart(eventObj);
- } else if (eventType.equalsIgnoreCase(EventType.SparkListenerStageSubmitted.toString())) {
- handleStageSubmit(eventObj);
- } else if (eventType.equalsIgnoreCase(EventType.SparkListenerTaskStart.toString())) {
- handleTaskStart(eventObj);
- } else if (eventType.equalsIgnoreCase(EventType.SparkListenerTaskEnd.toString())) {
- handleTaskEnd(eventObj);
- } else if (eventType.equalsIgnoreCase(EventType.SparkListenerStageCompleted.toString())) {
- handleStageComplete(eventObj);
- } else if (eventType.equalsIgnoreCase(EventType.SparkListenerJobEnd.toString())) {
- handleJobEnd(eventObj);
- } else if (eventType.equalsIgnoreCase(EventType.SparkListenerExecutorRemoved.toString())) {
- handleExecutorRemoved(eventObj);
- } else if (eventType.equalsIgnoreCase(EventType.SparkListenerApplicationEnd.toString())) {
- handleAppEnd(eventObj);
- } else if (eventType.equalsIgnoreCase(EventType.SparkListenerBlockManagerRemoved.toString())) {
- //nothing to do now
- } else {
- LOG.info("Not registered event type:" + eventType);
- }
-
- }
-
- private void handleEnvironmentSet(JSONObject event) {
- app.setConfig(new JobConfig());
- JSONObject sparkProps = (JSONObject) event.get("Spark Properties");
-
- String[] additionalJobConf = conf.getString("basic.jobConf.additional.info").split(",\\s*");
- String[] props = {"spark.yarn.app.id", "spark.executor.memory", "spark.driver.host", "spark.driver.port",
- "spark.driver.memory", "spark.scheduler.pool", "spark.executor.cores", "spark.yarn.am.memory",
- "spark.yarn.am.cores", "spark.yarn.executor.memoryOverhead", "spark.yarn.driver.memoryOverhead", "spark.yarn.am.memoryOverhead", "spark.master"};
- String[] jobConf = (String[])ArrayUtils.addAll(additionalJobConf, props);
- for (String prop : jobConf) {
- if (sparkProps.containsKey(prop)) {
- app.getConfig().getConfig().put(prop, (String) sparkProps.get(prop));
- }
- }
- }
-
- private Object getConfigVal(JobConfig config, String configName, String type) {
- if (config.getConfig().containsKey(configName)) {
- Object val = config.getConfig().get(configName);
- if (type.equalsIgnoreCase(Integer.class.getName())) {
- return Integer.parseInt((String) val);
- } else {
- return val;
- }
- } else {
- if (type.equalsIgnoreCase(Integer.class.getName())) {
- return conf.getInt("spark.defaultVal." + configName);
- } else {
- return conf.getString("spark.defaultVal." + configName);
- }
- }
- }
-
- private boolean isClientMode(JobConfig config) {
- return config.getConfig().get("spark.master").equalsIgnoreCase("yarn-client");
- }
-
- private void handleAppStarted(JSONObject event) {
- //need update all entities tag before app start
- List<TaggedLogAPIEntity> entities = new ArrayList<TaggedLogAPIEntity>();
- entities.addAll(this.executors.values());
- entities.add(this.app);
-
- long appStartTime = JSONUtils.getLong(event, "Timestamp", lastEventTime);
- for (TaggedLogAPIEntity entity : entities) {
- entity.getTags().put(SparkJobTagName.SPARK_APP_ID.toString(), JSONUtils.getString(event, "App ID"));
- entity.getTags().put(SparkJobTagName.SPARK_APP_NAME.toString(), JSONUtils.getString(event, "App Name"));
- // In yarn-client mode, attemptId is not available in the log, so we set attemptId = 1.
- String attemptId = isClientMode(this.app.getConfig()) ? "1" : JSONUtils.getString(event, "App Attempt ID");
- entity.getTags().put(SparkJobTagName.SPARK_APP_ATTEMPT_ID.toString(), attemptId);
- // the second argument of getNormalizeName() is changed to null because the original code contains sensitive text
- // original second argument looks like: this.app.getConfig().getConfig().get("xxx"), "xxx" is the sensitive text
- entity.getTags().put(SparkJobTagName.SPARK_APP_NORM_NAME.toString(), this.getNormalizedName(JSONUtils.getString(event, "App Name"), null));
- entity.getTags().put(SparkJobTagName.SPARK_USER.toString(), JSONUtils.getString(event, "User"));
-
- entity.setTimestamp(appStartTime);
- }
-
- this.app.setStartTime(appStartTime);
- this.lastEventTime = appStartTime;
- }
-
- private void handleExecutorAdd(JSONObject event) {
- String executorID = (String) event.get("Executor ID");
- long executorAddTime = JSONUtils.getLong(event, "Timestamp", lastEventTime);
- this.lastEventTime = executorAddTime;
- SparkExecutor executor = this.initiateExecutor(executorID, executorAddTime);
-
- JSONObject executorInfo = JSONUtils.getJSONObject(event, "Executor Info");
-
- }
-
- private void handleBlockManagerAdd(JSONObject event) {
- long maxMemory = JSONUtils.getLong(event, "Maximum Memory");
- long timestamp = JSONUtils.getLong(event, "Timestamp", lastEventTime);
- this.lastEventTime = timestamp;
- JSONObject blockInfo = JSONUtils.getJSONObject(event, "Block Manager ID");
- String executorID = JSONUtils.getString(blockInfo, "Executor ID");
- String hostAndPort = JSONUtils.getString(blockInfo, "Host") + ":" + JSONUtils.getLong(blockInfo, "Port");
-
- SparkExecutor executor = this.initiateExecutor(executorID, timestamp);
- executor.setMaxMemory(maxMemory);
- executor.setHostPort(hostAndPort);
- }
-
- private void handleTaskStart(JSONObject event) {
- this.initializeTask(event);
- }
-
- private void handleTaskEnd(JSONObject event) {
- JSONObject taskInfo = JSONUtils.getJSONObject(event, "Task Info");
- long taskId = JSONUtils.getLong(taskInfo, "Task ID");
- SparkTask task = tasks.get(taskId);
- if (task == null) {
- return;
- }
-
- task.setFailed(JSONUtils.getBoolean(taskInfo, "Failed"));
- JSONObject taskMetrics = JSONUtils.getJSONObject(event, "Task Metrics");
- if (null != taskMetrics) {
- task.setExecutorDeserializeTime(JSONUtils.getLong(taskMetrics, "Executor Deserialize Time", lastEventTime));
- task.setExecutorRunTime(JSONUtils.getLong(taskMetrics, "Executor Run Time", lastEventTime));
- task.setJvmGcTime(JSONUtils.getLong(taskMetrics, "JVM GC Time", lastEventTime));
- task.setResultSize(JSONUtils.getLong(taskMetrics, "Result Size"));
- task.setResultSerializationTime(JSONUtils.getLong(taskMetrics, "Result Serialization Time", lastEventTime));
- task.setMemoryBytesSpilled(JSONUtils.getLong(taskMetrics, "Memory Bytes Spilled"));
- task.setDiskBytesSpilled(JSONUtils.getLong(taskMetrics, "Disk Bytes Spilled"));
-
- JSONObject inputMetrics = JSONUtils.getJSONObject(taskMetrics, "Input Metrics");
- if (null != inputMetrics) {
- task.setInputBytes(JSONUtils.getLong(inputMetrics, "Bytes Read"));
- task.setInputRecords(JSONUtils.getLong(inputMetrics, "Records Read"));
- }
-
- JSONObject outputMetrics = JSONUtils.getJSONObject(taskMetrics, "Output Metrics");
- if (null != outputMetrics) {
- task.setOutputBytes(JSONUtils.getLong(outputMetrics, "Bytes Written"));
- task.setOutputRecords(JSONUtils.getLong(outputMetrics, "Records Written"));
- }
-
- JSONObject shuffleWriteMetrics = JSONUtils.getJSONObject(taskMetrics, "Shuffle Write Metrics");
- if (null != shuffleWriteMetrics) {
- task.setShuffleWriteBytes(JSONUtils.getLong(shuffleWriteMetrics, "Shuffle Bytes Written"));
- task.setShuffleWriteRecords(JSONUtils.getLong(shuffleWriteMetrics, "Shuffle Records Written"));
- }
-
- JSONObject shuffleReadMetrics = JSONUtils.getJSONObject(taskMetrics, "Shuffle Read Metrics");
- if (null != shuffleReadMetrics) {
- task.setShuffleReadLocalBytes(JSONUtils.getLong(shuffleReadMetrics, "Local Bytes Read"));
- task.setShuffleReadRemoteBytes(JSONUtils.getLong(shuffleReadMetrics, "Remote Bytes Read"));
- task.setShuffleReadRecords(JSONUtils.getLong(shuffleReadMetrics, "Total Records Read"));
- }
- } else {
- //for tasks success without task metrics, save in the end if no other information
- if (!task.isFailed()) {
- return;
- }
- }
-
- aggregateToStage(task);
- aggregateToExecutor(task);
- tasks.remove(taskId);
- this.flushEntities(task, false);
- }
-
-
- private SparkTask initializeTask(JSONObject event) {
- SparkTask task = new SparkTask();
- task.setTags(new HashMap<>(this.app.getTags()));
- task.setTimestamp(app.getTimestamp());
-
- task.getTags().put(SparkJobTagName.SPARK_SATGE_ID.toString(), Long.toString(JSONUtils.getLong(event, "Stage ID")));
- task.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), Long.toString(JSONUtils.getLong(event, "Stage Attempt ID")));
-
- JSONObject taskInfo = JSONUtils.getJSONObject(event, "Task Info");
- long taskId = JSONUtils.getLong(taskInfo, "Task ID");
- task.setTaskId(taskId);
-
- task.getTags().put(SparkJobTagName.SPARK_TASK_INDEX.toString(), Long.toString(JSONUtils.getLong(taskInfo, "Index")));
- task.getTags().put(SparkJobTagName.SPARK_TASK_ATTEMPT_ID.toString(), Integer.toString(JSONUtils.getInt(taskInfo, "Attempt")));
- long launchTime = JSONUtils.getLong(taskInfo, "Launch Time", lastEventTime);
- this.lastEventTime = launchTime;
- if (taskId == 0) {
- this.setFirstTaskLaunchTime(launchTime);
- }
- task.setLaunchTime(launchTime);
- task.setExecutorId(JSONUtils.getString(taskInfo, "Executor ID"));
- task.setHost(JSONUtils.getString(taskInfo, "Host"));
- task.setTaskLocality(JSONUtils.getString(taskInfo, "Locality"));
- task.setSpeculative(JSONUtils.getBoolean(taskInfo, "Speculative"));
-
- tasks.put(task.getTaskId(), task);
- return task;
- }
-
- private void setFirstTaskLaunchTime(long launchTime) {
- this.firstTaskLaunchTime = launchTime;
- }
-
- private void handleJobStart(JSONObject event) {
- SparkJob job = new SparkJob();
- job.setTags(new HashMap<>(this.app.getTags()));
- job.setTimestamp(app.getTimestamp());
-
- int jobId = JSONUtils.getInt(event, "Job ID");
- job.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), Integer.toString(jobId));
- long submissionTime = JSONUtils.getLong(event, "Submission Time", lastEventTime);
- job.setSubmissionTime(submissionTime);
- this.lastEventTime = submissionTime;
-
- //for complete application, no active stages/tasks
- job.setNumActiveStages(0);
- job.setNumActiveTasks(0);
-
- this.jobs.put(jobId, job);
- this.jobStageMap.put(jobId, new HashSet<String>());
-
- JSONArray stages = JSONUtils.getJSONArray(event, "Stage Infos");
- int stagesSize = (stages == null ? 0 : stages.size());
- job.setNumStages(stagesSize);
- for (int i = 0; i < stagesSize; i++) {
- JSONObject stageInfo = (JSONObject) stages.get(i);
- int stageId = JSONUtils.getInt(stageInfo, "Stage ID");
- int stageAttemptId = JSONUtils.getInt(stageInfo, "Stage Attempt ID");
- String stageName = JSONUtils.getString(stageInfo, "Stage Name");
- int numTasks = JSONUtils.getInt(stageInfo, "Number of Tasks");
- this.initiateStage(jobId, stageId, stageAttemptId, stageName, numTasks);
- }
- }
-
- private void handleStageSubmit(JSONObject event) {
- JSONObject stageInfo = JSONUtils.getJSONObject(event, "Stage Info");
- int stageId = JSONUtils.getInt(stageInfo, "Stage ID");
- int stageAttemptId = JSONUtils.getInt(stageInfo, "Stage Attempt ID");
- String key = this.generateStageKey(Integer.toString(stageId), Integer.toString(stageAttemptId));
- stageTaskStatusMap.put(key, new HashMap<Integer, Boolean>());
-
- if (!stages.containsKey(key)) {
- //may be further attempt for one stage
- String baseAttempt = this.generateStageKey(Integer.toString(stageId), "0");
- if (stages.containsKey(baseAttempt)) {
- SparkStage stage = stages.get(baseAttempt);
- String jobId = stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString());
-
- String stageName = JSONUtils.getString(event, "Stage Name");
- int numTasks = JSONUtils.getInt(stageInfo, "Number of Tasks");
- this.initiateStage(Integer.parseInt(jobId), stageId, stageAttemptId, stageName, numTasks);
- }
- }
- }
-
- private void handleStageComplete(JSONObject event) {
- JSONObject stageInfo = JSONUtils.getJSONObject(event, "Stage Info");
- int stageId = JSONUtils.getInt(stageInfo, "Stage ID");
- int stageAttemptId = JSONUtils.getInt(stageInfo, "Stage Attempt ID");
- String key = this.generateStageKey(Integer.toString(stageId), Integer.toString(stageAttemptId));
- SparkStage stage = stages.get(key);
-
- // If "Submission Time" is not available, use the "Launch Time" of "Task ID" = 0.
- Long submissionTime = JSONUtils.getLong(stageInfo, "Submission Time", firstTaskLaunchTime);
-
- stage.setSubmitTime(submissionTime);
-
- long completeTime = JSONUtils.getLong(stageInfo, "Completion Time", lastEventTime);
- stage.setCompleteTime(completeTime);
- this.lastEventTime = completeTime;
-
- if (stageInfo != null && stageInfo.containsKey("Failure Reason")) {
- stage.setStatus(SparkEntityConstant.SparkStageStatus.FAILED.toString());
- } else {
- stage.setStatus(SparkEntityConstant.SparkStageStatus.COMPLETE.toString());
- }
- }
-
- private void handleExecutorRemoved(JSONObject event) {
- String executorID = JSONUtils.getString(event, "Executor ID");
- SparkExecutor executor = executors.get(executorID);
- long removedTime = JSONUtils.getLong(event, "Timestamp", lastEventTime);
- executor.setEndTime(removedTime);
- this.lastEventTime = removedTime;
- }
-
- private void handleJobEnd(JSONObject event) {
- int jobId = JSONUtils.getInt(event, "Job ID");
- SparkJob job = jobs.get(jobId);
-
- long completionTime = JSONUtils.getLong(event, "Completion Time", lastEventTime);
- job.setCompletionTime(completionTime);
- this.lastEventTime = completionTime;
-
- JSONObject jobResult = JSONUtils.getJSONObject(event, "Job Result");
- String result = JSONUtils.getString(jobResult, "Result");
- if (result.equalsIgnoreCase("JobSucceeded")) {
- job.setStatus(SparkEntityConstant.SparkJobStatus.SUCCEEDED.toString());
- } else {
- job.setStatus(SparkEntityConstant.SparkJobStatus.FAILED.toString());
- }
- }
-
- private void handleAppEnd(JSONObject event) {
- long endTime = JSONUtils.getLong(event, "Timestamp", lastEventTime);
- app.setEndTime(endTime);
- this.lastEventTime = endTime;
- }
-
- public void clearReader() throws Exception {
- //clear tasks
- for (SparkTask task : tasks.values()) {
- LOG.info("Task {} does not have result or no task metrics.", task.getTaskId());
- task.setFailed(true);
- aggregateToStage(task);
- aggregateToExecutor(task);
- this.flushEntities(task, false);
- }
-
- List<SparkStage> needStoreStages = new ArrayList<>();
- for (SparkStage stage : this.stages.values()) {
- int jobId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString()));
- if (stage.getSubmitTime() == 0 || stage.getCompleteTime() == 0) {
- SparkJob job = this.jobs.get(jobId);
- job.setNumSkippedStages(job.getNumSkippedStages() + 1);
- job.setNumSkippedTasks(job.getNumSkippedTasks() + stage.getNumTasks());
- } else {
- this.aggregateToJob(stage);
- this.aggregateStageToApp(stage);
- needStoreStages.add(stage);
- }
- String stageId = stage.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString());
- String stageAttemptId = stage.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString());
- this.jobStageMap.get(jobId).remove(this.generateStageKey(stageId, stageAttemptId));
- }
-
- this.flushEntities(needStoreStages, false);
- for (SparkJob job : jobs.values()) {
- this.aggregateJobToApp(job);
- }
- this.flushEntities(jobs.values(), false);
-
- app.setExecutors(executors.values().size());
-
- long executorMemory = Utils.parseMemory((String) this.getConfigVal(this.app.getConfig(), "spark.executor.memory", String.class.getName()));
- long driverMemory = Utils.parseMemory(this.isClientMode(app.getConfig())
- ? (String) this.getConfigVal(this.app.getConfig(), "spark.yarn.am.memory", String.class.getName())
- : (String) this.getConfigVal(app.getConfig(), "spark.driver.memory", String.class.getName()));
-
- int executorCore = (Integer) this.getConfigVal(app.getConfig(), "spark.executor.cores", Integer.class.getName());
- int driverCore = this.isClientMode(app.getConfig())
- ? (Integer) this.getConfigVal(app.getConfig(), "spark.yarn.am.cores", Integer.class.getName())
- : (Integer) this.getConfigVal(app.getConfig(), "spark.driver.cores", Integer.class.getName());
-
- long executorMemoryOverhead = this.getMemoryOverhead(app.getConfig(), executorMemory, "spark.yarn.executor.memoryOverhead");
- long driverMemoryOverhead = this.isClientMode(app.getConfig())
- ? this.getMemoryOverhead(app.getConfig(), driverMemory, "spark.yarn.am.memoryOverhead")
- : this.getMemoryOverhead(app.getConfig(), driverMemory, "spark.yarn.driver.memoryOverhead");
-
- app.setExecMemoryBytes(executorMemory);
- app.setDriveMemoryBytes(driverMemory);
- app.setExecutorCores(executorCore);
- app.setDriverCores(driverCore);
- app.setExecutorMemoryOverhead(executorMemoryOverhead);
- app.setDriverMemoryOverhead(driverMemoryOverhead);
-
- for (SparkExecutor executor : executors.values()) {
- String executorID = executor.getTags().get(SparkJobTagName.SPARK_EXECUTOR_ID.toString());
- if (executorID.equalsIgnoreCase("driver")) {
- executor.setExecMemoryBytes(driverMemory);
- executor.setCores(driverCore);
- executor.setMemoryOverhead(driverMemoryOverhead);
- } else {
- executor.setExecMemoryBytes(executorMemory);
- executor.setCores(executorCore);
- executor.setMemoryOverhead(executorMemoryOverhead);
- }
- if (app.getEndTime() <= 0L) {
- app.setEndTime(this.lastEventTime);
- }
- if (executor.getEndTime() <= 0L) {
- executor.setEndTime(app.getEndTime());
- }
- this.aggregateExecutorToApp(executor);
- }
- this.flushEntities(executors.values(), false);
- //spark code...tricky
- app.setSkippedTasks(app.getCompleteTasks());
- this.flushEntities(app, true);
- }
-
- private long getMemoryOverhead(JobConfig config, long executorMemory, String fieldName) {
- long result = 0L;
- String fieldValue = config.getConfig().get(fieldName);
- if (fieldValue != null) {
- result = Utils.parseMemory(fieldValue + "m");
- if (result == 0L) {
- result = Utils.parseMemory(fieldValue);
- }
- }
-
- if (result == 0L) {
- result = Math.max(
- Utils.parseMemory(conf.getString("spark.defaultVal.spark.yarn.overhead.min")),
- executorMemory * conf.getInt("spark.defaultVal." + fieldName + ".factor") / 100);
- }
- return result;
- }
-
- private void aggregateExecutorToApp(SparkExecutor executor) {
- long totalExecutorTime = app.getTotalExecutorTime() + executor.getEndTime() - executor.getStartTime();
- if (totalExecutorTime < 0L) {
- totalExecutorTime = 0L;
- }
- app.setTotalExecutorTime(totalExecutorTime);
- }
-
- private void aggregateJobToApp(SparkJob job) {
- //aggregate job level metrics
- app.setNumJobs(app.getNumJobs() + 1);
- app.setTotalTasks(app.getTotalTasks() + job.getNumTask());
- app.setCompleteTasks(app.getCompleteTasks() + job.getNumCompletedTasks());
- app.setSkippedTasks(app.getSkippedTasks() + job.getNumSkippedTasks());
- app.setFailedTasks(app.getFailedTasks() + job.getNumFailedTasks());
- app.setTotalStages(app.getTotalStages() + job.getNumStages());
- app.setFailedStages(app.getFailedStages() + job.getNumFailedStages());
- app.setSkippedStages(app.getSkippedStages() + job.getNumSkippedStages());
- }
-
- private void aggregateStageToApp(SparkStage stage) {
- //aggregate task level metrics
- app.setDiskBytesSpilled(app.getDiskBytesSpilled() + stage.getDiskBytesSpilled());
- app.setMemoryBytesSpilled(app.getMemoryBytesSpilled() + stage.getMemoryBytesSpilled());
- app.setExecutorRunTime(app.getExecutorRunTime() + stage.getExecutorRunTime());
- app.setJvmGcTime(app.getJvmGcTime() + stage.getJvmGcTime());
- app.setExecutorDeserializeTime(app.getExecutorDeserializeTime() + stage.getExecutorDeserializeTime());
- app.setResultSerializationTime(app.getResultSerializationTime() + stage.getResultSerializationTime());
- app.setResultSize(app.getResultSize() + stage.getResultSize());
- app.setInputRecords(app.getInputRecords() + stage.getInputRecords());
- app.setInputBytes(app.getInputBytes() + stage.getInputBytes());
- app.setOutputRecords(app.getOutputRecords() + stage.getOutputRecords());
- app.setOutputBytes(app.getOutputBytes() + stage.getOutputBytes());
- app.setShuffleWriteRecords(app.getShuffleWriteRecords() + stage.getShuffleWriteRecords());
- app.setShuffleWriteBytes(app.getShuffleWriteBytes() + stage.getShuffleWriteBytes());
- app.setShuffleReadRecords(app.getShuffleReadRecords() + stage.getShuffleReadRecords());
- app.setShuffleReadBytes(app.getShuffleReadBytes() + stage.getShuffleReadBytes());
- }
-
- private void aggregateToStage(SparkTask task) {
- String stageId = task.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString());
- String stageAttemptId = task.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString());
- String key = this.generateStageKey(stageId, stageAttemptId);
- SparkStage stage = stages.get(key);
-
- stage.setDiskBytesSpilled(stage.getDiskBytesSpilled() + task.getDiskBytesSpilled());
- stage.setMemoryBytesSpilled(stage.getMemoryBytesSpilled() + task.getMemoryBytesSpilled());
- stage.setExecutorRunTime(stage.getExecutorRunTime() + task.getExecutorRunTime());
- stage.setJvmGcTime(stage.getJvmGcTime() + task.getJvmGcTime());
- stage.setExecutorDeserializeTime(stage.getExecutorDeserializeTime() + task.getExecutorDeserializeTime());
- stage.setResultSerializationTime(stage.getResultSerializationTime() + task.getResultSerializationTime());
- stage.setResultSize(stage.getResultSize() + task.getResultSize());
- stage.setInputRecords(stage.getInputRecords() + task.getInputRecords());
- stage.setInputBytes(stage.getInputBytes() + task.getInputBytes());
- stage.setOutputRecords(stage.getOutputRecords() + task.getOutputRecords());
- stage.setOutputBytes(stage.getOutputBytes() + task.getOutputBytes());
- stage.setShuffleWriteRecords(stage.getShuffleWriteRecords() + task.getShuffleWriteRecords());
- stage.setShuffleWriteBytes(stage.getShuffleWriteBytes() + task.getShuffleWriteBytes());
- stage.setShuffleReadRecords(stage.getShuffleReadRecords() + task.getShuffleReadRecords());
- long taskShuffleReadBytes = task.getShuffleReadLocalBytes() + task.getShuffleReadRemoteBytes();
- stage.setShuffleReadBytes(stage.getShuffleReadBytes() + taskShuffleReadBytes);
-
- boolean success = !task.isFailed();
-
- Integer taskIndex = Integer.parseInt(task.getTags().get(SparkJobTagName.SPARK_TASK_INDEX.toString()));
- if (stageTaskStatusMap.get(key).containsKey(taskIndex)) {
- //has previous task attempt, retrieved from task index in one stage
- boolean previousResult = stageTaskStatusMap.get(key).get(taskIndex);
- success = previousResult || success;
- if (previousResult != success) {
- stage.setNumFailedTasks(stage.getNumFailedTasks() - 1);
- stage.setNumCompletedTasks(stage.getNumCompletedTasks() + 1);
- stageTaskStatusMap.get(key).put(taskIndex, success);
- }
- } else {
- if (success) {
- stage.setNumCompletedTasks(stage.getNumCompletedTasks() + 1);
- } else {
- stage.setNumFailedTasks(stage.getNumFailedTasks() + 1);
- }
- stageTaskStatusMap.get(key).put(taskIndex, success);
- }
-
- }
-
- private void aggregateToExecutor(SparkTask task) {
- String executorId = task.getExecutorId();
- SparkExecutor executor = executors.get(executorId);
-
- if (null != executor) {
- executor.setTotalTasks(executor.getTotalTasks() + 1);
- if (task.isFailed()) {
- executor.setFailedTasks(executor.getFailedTasks() + 1);
- } else {
- executor.setCompletedTasks(executor.getCompletedTasks() + 1);
- }
- long taskShuffleReadBytes = task.getShuffleReadLocalBytes() + task.getShuffleReadRemoteBytes();
- executor.setTotalShuffleRead(executor.getTotalShuffleRead() + taskShuffleReadBytes);
- executor.setTotalDuration(executor.getTotalDuration() + task.getExecutorRunTime());
- executor.setTotalInputBytes(executor.getTotalInputBytes() + task.getInputBytes());
- executor.setTotalShuffleWrite(executor.getTotalShuffleWrite() + task.getShuffleWriteBytes());
- executor.setTotalDuration(executor.getTotalDuration() + task.getExecutorRunTime());
- }
-
- }
-
- private void aggregateToJob(SparkStage stage) {
- int jobId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString()));
- SparkJob job = jobs.get(jobId);
- job.setNumCompletedTasks(job.getNumCompletedTasks() + stage.getNumCompletedTasks());
- job.setNumFailedTasks(job.getNumFailedTasks() + stage.getNumFailedTasks());
- job.setNumTask(job.getNumTask() + stage.getNumTasks());
-
-
- if (stage.getStatus().equalsIgnoreCase(SparkEntityConstant.SparkStageStatus.COMPLETE.toString())) {
- //if multiple attempts succeed, just count one
- if (!hasStagePriorAttemptSuccess(stage)) {
- job.setNumCompletedStages(job.getNumCompletedStages() + 1);
- }
- } else {
- job.setNumFailedStages(job.getNumFailedStages() + 1);
- }
- }
-
- private boolean hasStagePriorAttemptSuccess(SparkStage stage) {
- int stageAttemptId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString()));
- for (int i = 0; i < stageAttemptId; i++) {
- SparkStage previousStage = stages.get(this.generateStageKey(
- stage.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString()), Integer.toString(i)));
- if (previousStage.getStatus().equalsIgnoreCase(SparkEntityConstant.SparkStageStatus.COMPLETE.toString())) {
- return true;
- }
- }
- return false;
- }
-
-
- private String generateStageKey(String stageId, String stageAttemptId) {
- return stageId + "-" + stageAttemptId;
- }
-
- private void initiateStage(int jobId, int stageId, int stageAttemptId, String name, int numTasks) {
- SparkStage stage = new SparkStage();
- stage.setTags(new HashMap<>(this.app.getTags()));
- stage.setTimestamp(app.getTimestamp());
- stage.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), Integer.toString(jobId));
- stage.getTags().put(SparkJobTagName.SPARK_SATGE_ID.toString(), Integer.toString(stageId));
- stage.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), Integer.toString(stageAttemptId));
- stage.setName(name);
- stage.setNumActiveTasks(0);
- stage.setNumTasks(numTasks);
- stage.setSchedulingPool(this.app.getConfig().getConfig().get("spark.scheduler.pool") == null ?
- "default" : this.app.getConfig().getConfig().get("spark.scheduler.pool"));
-
- String stageKey = this.generateStageKey(Integer.toString(stageId), Integer.toString(stageAttemptId));
- stages.put(stageKey, stage);
- this.jobStageMap.get(jobId).add(stageKey);
- }
-
-
- private SparkExecutor initiateExecutor(String executorID, long startTime) {
- if (!executors.containsKey(executorID)) {
- SparkExecutor executor = new SparkExecutor();
- executor.setTags(new HashMap<>(this.app.getTags()));
- executor.getTags().put(SparkJobTagName.SPARK_EXECUTOR_ID.toString(), executorID);
- executor.setStartTime(startTime);
- executor.setTimestamp(app.getTimestamp());
-
- this.executors.put(executorID, executor);
- }
-
- return this.executors.get(executorID);
- }
-
- private String getNormalizedName(String jobName, String assignedName) {
- if (null != assignedName) {
- return assignedName;
- } else {
- return JobNameNormalization.getInstance().normalize(jobName);
- }
- }
-
- private void flushEntities(Object entity, boolean forceFlush) {
- this.flushEntities(Collections.singletonList(entity), forceFlush);
- }
-
- private void flushEntities(Collection entities, boolean forceFlush) {
- this.createEntities.addAll(entities);
-
- if (forceFlush || this.createEntities.size() >= FLUSH_LIMIT) {
- try {
- this.doFlush(this.createEntities);
- this.createEntities.clear();
- } catch (Exception e) {
- LOG.error("Fail to flush entities", e);
- }
-
- }
- }
-
- private EagleServiceBaseClient initiateClient() {
- String host = conf.getString("eagleProps.eagle.service.host");
- int port = conf.getInt("eagleProps.eagle.service.port");
- String userName = conf.getString("eagleProps.eagle.service.username");
- String pwd = conf.getString("eagleProps.eagle.service.password");
- client = new EagleServiceClientImpl(host, port, userName, pwd);
- int timeout = conf.getInt("eagleProps.eagle.service.read.timeout");
- client.getJerseyClient().setReadTimeout(timeout * 1000);
-
- return client;
- }
-
- private void doFlush(List entities) throws IOException, EagleServiceClientException {
- client.create(entities);
- int size = (entities == null ? 0 : entities.size());
- LOG.info("finish flushing entities of total number " + size);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java
deleted file mode 100644
index 02fc5cf..0000000
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.crawl;
-
-
-import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
-import org.json.simple.parser.ParseException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-
-public class JHFSparkParser implements JHFParserBase {
-
- private static final Logger logger = LoggerFactory.getLogger(JHFSparkParser.class);
-
- private boolean isValidJson;
-
- private JHFSparkEventReader eventReader;
-
- public JHFSparkParser(JHFSparkEventReader reader) {
- this.eventReader = reader;
- }
-
- @Override
- public void parse(InputStream is) throws Exception {
- try (BufferedReader reader = new BufferedReader(new InputStreamReader(is))) {
- for (String line = reader.readLine(); line != null; line = reader.readLine()) {
- isValidJson = true;
- JSONObject eventObj = parseAndValidateJSON(line);
- if (isValidJson) {
- try {
- this.eventReader.read(eventObj);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-
- this.eventReader.clearReader();
- }
- }
-
- private JSONObject parseAndValidateJSON(String line) {
- JSONObject eventObj = null;
- JSONParser parser = new JSONParser();
- try {
- eventObj = (JSONObject) parser.parse(line);
- } catch (ParseException ex) {
- isValidJson = false;
- logger.error(String.format("Invalid json string. Fail to parse %s.", line), ex);
- }
- return eventObj;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkApplicationInfo.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkApplicationInfo.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkApplicationInfo.java
deleted file mode 100644
index 423d045..0000000
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkApplicationInfo.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. See the NOTICE file distributed with
- * * this work for additional information regarding copyright ownership.
- * * The ASF licenses this file to You under the Apache License, Version 2.0
- * * (the "License"); you may not use this file except in compliance with
- * * the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
- *
- */
-
-package org.apache.eagle.jpm.spark.crawl;
-
-public class SparkApplicationInfo {
-
- private String state;
- private String finalStatus;
- private String queue;
- private String name;
- private String user;
-
- public String getState() {
- return state;
- }
-
- public void setState(String state) {
- this.state = state;
- }
-
- public String getFinalStatus() {
- return finalStatus;
- }
-
- public void setFinalStatus(String finalStatus) {
- this.finalStatus = finalStatus;
- }
-
- public String getQueue() {
- return queue;
- }
-
- public void setQueue(String queue) {
- this.queue = queue;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public String getUser() {
- return user;
- }
-
- public void setUser(String user) {
- this.user = user;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkFilesystemInputStreamReaderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkFilesystemInputStreamReaderImpl.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkFilesystemInputStreamReaderImpl.java
deleted file mode 100644
index 3964454..0000000
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkFilesystemInputStreamReaderImpl.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.crawl;
-
-import org.apache.eagle.jpm.util.SparkJobTagName;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.Map;
-
-public class SparkFilesystemInputStreamReaderImpl implements JHFInputStreamReader {
-
- private String site;
- private SparkApplicationInfo app;
-
-
- public SparkFilesystemInputStreamReaderImpl(String site, SparkApplicationInfo app) {
- this.site = site;
- this.app = app;
- }
-
- @Override
- public void read(InputStream is) throws Exception {
- Map<String, String> baseTags = new HashMap<>();
- baseTags.put(SparkJobTagName.SITE.toString(), site);
- baseTags.put(SparkJobTagName.SPARK_QUEUE.toString(), app.getQueue());
- JHFParserBase parser = new JHFSparkParser(new JHFSparkEventReader(baseTags, this.app));
- parser.parse(is);
- }
-
- public static void main(String[] args) throws Exception {
- SparkFilesystemInputStreamReaderImpl impl = new SparkFilesystemInputStreamReaderImpl("apollo-phx", new SparkApplicationInfo());
- impl.read(new FileInputStream(new File("E:\\eagle\\application_1459803563374_535667_1")));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java
new file mode 100644
index 0000000..81f266b
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import org.apache.eagle.jpm.mr.runningentity.JobConfigSerDeser;
+import org.apache.eagle.log.entity.repo.EntityRepository;
+
+public class JPMEntityRepository extends EntityRepository {
+ public JPMEntityRepository() {
+ entitySet.add(SparkAppEntity.class);
+ entitySet.add(SparkJobEntity.class);
+ entitySet.add(SparkStageEntity.class);
+ entitySet.add(SparkTaskEntity.class);
+ entitySet.add(SparkExecutorEntity.class);
+ serDeserMap.put(JobConfig.class, new JobConfigSerDeser());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java
new file mode 100644
index 0000000..0d3a86f
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import java.io.Serializable;
+import java.util.HashMap;
+
+public class JobConfig extends HashMap<String, String> implements Serializable {
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java
new file mode 100644
index 0000000..51c8a50
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+@Table("eagleSparkRunningApps")
+@ColumnFamily("f")
+@Prefix("sparkApp")
+@Service(Constants.RUNNING_SPARK_APP_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", "user", "queue"})
+@Partition({"site"})
+public class SparkAppEntity extends TaggedLogAPIEntity {
+ @Column("a")
+ private long startTime;
+ @Column("b")
+ private long endTime;
+ @Column("c")
+ private String yarnState;
+ @Column("d")
+ private String yarnStatus;
+ @Column("e")
+ private JobConfig config;
+ @Column("f")
+ private int numJobs;
+ @Column("g")
+ private int totalStages;
+ @Column("h")
+ private int skippedStages;
+ @Column("i")
+ private int failedStages;
+ @Column("j")
+ private int totalTasks;
+ @Column("k")
+ private int skippedTasks;
+ @Column("l")
+ private int failedTasks;
+ @Column("m")
+ private int executors;
+ @Column("n")
+ private long inputBytes;
+ @Column("o")
+ private long inputRecords;
+ @Column("p")
+ private long outputBytes;
+ @Column("q")
+ private long outputRecords;
+ @Column("r")
+ private long shuffleReadBytes;
+ @Column("s")
+ private long shuffleReadRecords;
+ @Column("t")
+ private long shuffleWriteBytes;
+ @Column("u")
+ private long shuffleWriteRecords;
+ @Column("v")
+ private long executorDeserializeTime;
+ @Column("w")
+ private long executorRunTime;
+ @Column("x")
+ private long resultSize;
+ @Column("y")
+ private long jvmGcTime;
+ @Column("z")
+ private long resultSerializationTime;
+ @Column("ab")
+ private long memoryBytesSpilled;
+ @Column("ac")
+ private long diskBytesSpilled;
+ @Column("ad")
+ private long execMemoryBytes;
+ @Column("ae")
+ private long driveMemoryBytes;
+ @Column("af")
+ private int completeTasks;
+ @Column("ag")
+ private long totalExecutorTime;
+ @Column("ah")
+ private long executorMemoryOverhead;
+ @Column("ai")
+ private long driverMemoryOverhead;
+ @Column("aj")
+ private int executorCores;
+ @Column("ak")
+ private int driverCores;
+ @Column("al")
+ private AppInfo appInfo;
+ @Column("am")
+ private int activeStages;
+ @Column("an")
+ private int completeStages;
+ @Column("ba")
+ private int activeTasks;
+
+ public int getActiveTasks() {
+ return activeTasks;
+ }
+
+ public void setActiveTasks(int activeTasks) {
+ this.activeTasks = activeTasks;
+ valueChanged("activeTasks");
+ }
+
+ public int getCompleteStages() {
+ return completeStages;
+ }
+
+ public void setCompleteStages(int completeStages) {
+ this.completeStages = completeStages;
+ valueChanged("completeStages");
+ }
+
+ public int getActiveStages() {
+ return activeStages;
+ }
+
+ public void setActiveStages(int activeStages) {
+ this.activeStages = activeStages;
+ valueChanged("activeStages");
+ }
+
+ public AppInfo getAppInfo() {
+ return appInfo;
+ }
+
+ public void setAppInfo(AppInfo appInfo) {
+ this.appInfo = appInfo;
+ valueChanged("appInfo");
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public long getEndTime() {
+ return endTime;
+ }
+
+ public String getYarnState() {
+ return yarnState;
+ }
+
+ public String getYarnStatus() {
+ return yarnStatus;
+ }
+
+ public int getNumJobs() {
+ return numJobs;
+ }
+
+ public int getTotalStages() {
+ return totalStages;
+ }
+
+ public int getSkippedStages() {
+ return skippedStages;
+ }
+
+ public int getFailedStages() {
+ return failedStages;
+ }
+
+ public int getTotalTasks() {
+ return totalTasks;
+ }
+
+ public int getSkippedTasks() {
+ return skippedTasks;
+ }
+
+ public int getFailedTasks() {
+ return failedTasks;
+ }
+
+ public int getExecutors() {
+ return executors;
+ }
+
+ public long getInputBytes() {
+ return inputBytes;
+ }
+
+ public long getInputRecords() {
+ return inputRecords;
+ }
+
+ public long getOutputBytes() {
+ return outputBytes;
+ }
+
+ public long getOutputRecords() {
+ return outputRecords;
+ }
+
+ public long getShuffleReadBytes() {
+ return shuffleReadBytes;
+ }
+
+ public long getShuffleReadRecords() {
+ return shuffleReadRecords;
+ }
+
+ public long getShuffleWriteBytes() {
+ return shuffleWriteBytes;
+ }
+
+ public long getShuffleWriteRecords() {
+ return shuffleWriteRecords;
+ }
+
+ public long getExecutorDeserializeTime() {
+ return executorDeserializeTime;
+ }
+
+ public long getExecutorRunTime() {
+ return executorRunTime;
+ }
+
+ public long getResultSize() {
+ return resultSize;
+ }
+
+ public long getJvmGcTime() {
+ return jvmGcTime;
+ }
+
+ public long getResultSerializationTime() {
+ return resultSerializationTime;
+ }
+
+ public long getMemoryBytesSpilled() {
+ return memoryBytesSpilled;
+ }
+
+ public long getDiskBytesSpilled() {
+ return diskBytesSpilled;
+ }
+
+ public long getExecMemoryBytes() {
+ return execMemoryBytes;
+ }
+
+ public long getDriveMemoryBytes() {
+ return driveMemoryBytes;
+ }
+
+ public int getCompleteTasks() {
+ return completeTasks;
+ }
+
+ public JobConfig getConfig() {
+ return config;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ valueChanged("startTime");
+ }
+
+ public void setEndTime(long endTime) {
+ this.endTime = endTime;
+ valueChanged("endTime");
+ }
+
+ public void setYarnState(String yarnState) {
+ this.yarnState = yarnState;
+ valueChanged("yarnState");
+ }
+
+ public void setYarnStatus(String yarnStatus) {
+ this.yarnStatus = yarnStatus;
+ valueChanged("yarnStatus");
+ }
+
+ public void setConfig(JobConfig config) {
+ this.config = config;
+ valueChanged("config");
+ }
+
+ public void setNumJobs(int numJobs) {
+ this.numJobs = numJobs;
+ valueChanged("numJobs");
+ }
+
+ public void setTotalStages(int totalStages) {
+ this.totalStages = totalStages;
+ valueChanged("totalStages");
+ }
+
+ public void setSkippedStages(int skippedStages) {
+ this.skippedStages = skippedStages;
+ valueChanged("skippedStages");
+ }
+
+ public void setFailedStages(int failedStages) {
+ this.failedStages = failedStages;
+ valueChanged("failedStages");
+ }
+
+ public void setTotalTasks(int totalTasks) {
+ this.totalTasks = totalTasks;
+ valueChanged("totalTasks");
+ }
+
+ public void setSkippedTasks(int skippedTasks) {
+ this.skippedTasks = skippedTasks;
+ valueChanged("skippedTasks");
+ }
+
+ public void setFailedTasks(int failedTasks) {
+ this.failedTasks = failedTasks;
+ valueChanged("failedTasks");
+ }
+
+ public void setExecutors(int executors) {
+ this.executors = executors;
+ valueChanged("executors");
+ }
+
+ public void setInputBytes(long inputBytes) {
+ this.inputBytes = inputBytes;
+ valueChanged("inputBytes");
+ }
+
+ public void setInputRecords(long inputRecords) {
+ this.inputRecords = inputRecords;
+ valueChanged("inputRecords");
+ }
+
+ public void setOutputBytes(long outputBytes) {
+ this.outputBytes = outputBytes;
+ valueChanged("outputBytes");
+ }
+
+ public void setOutputRecords(long outputRecords) {
+ this.outputRecords = outputRecords;
+ valueChanged("outputRecords");
+ }
+
+ public void setShuffleReadBytes(long shuffleReadRemoteBytes) {
+ this.shuffleReadBytes = shuffleReadRemoteBytes;
+ valueChanged("shuffleReadBytes");
+ }
+
+ public void setShuffleReadRecords(long shuffleReadRecords) {
+ this.shuffleReadRecords = shuffleReadRecords;
+ valueChanged("shuffleReadRecords");
+ }
+
+ public void setShuffleWriteBytes(long shuffleWriteBytes) {
+ this.shuffleWriteBytes = shuffleWriteBytes;
+ valueChanged("shuffleWriteBytes");
+ }
+
+ public void setShuffleWriteRecords(long shuffleWriteRecords) {
+ this.shuffleWriteRecords = shuffleWriteRecords;
+ valueChanged("shuffleWriteRecords");
+ }
+
+ public void setExecutorDeserializeTime(long executorDeserializeTime) {
+ this.executorDeserializeTime = executorDeserializeTime;
+ valueChanged("executorDeserializeTime");
+ }
+
+ public void setExecutorRunTime(long executorRunTime) {
+ this.executorRunTime = executorRunTime;
+ valueChanged("executorRunTime");
+ }
+
+ public void setResultSize(long resultSize) {
+ this.resultSize = resultSize;
+ valueChanged("resultSize");
+ }
+
+ public void setJvmGcTime(long jvmGcTime) {
+ this.jvmGcTime = jvmGcTime;
+ valueChanged("jvmGcTime");
+ }
+
+ public void setResultSerializationTime(long resultSerializationTime) {
+ this.resultSerializationTime = resultSerializationTime;
+ valueChanged("resultSerializationTime");
+ }
+
+ public void setMemoryBytesSpilled(long memoryBytesSpilled) {
+ this.memoryBytesSpilled = memoryBytesSpilled;
+ valueChanged("memoryBytesSpilled");
+ }
+
+ public void setDiskBytesSpilled(long diskBytesSpilled) {
+ this.diskBytesSpilled = diskBytesSpilled;
+ valueChanged("diskBytesSpilled");
+ }
+
+ public void setExecMemoryBytes(long execMemoryBytes) {
+ this.execMemoryBytes = execMemoryBytes;
+ valueChanged("execMemoryBytes");
+ }
+
+ public void setDriveMemoryBytes(long driveMemoryBytes) {
+ this.driveMemoryBytes = driveMemoryBytes;
+ valueChanged("driveMemoryBytes");
+ }
+
+ public void setCompleteTasks(int completeTasks) {
+ this.completeTasks = completeTasks;
+ valueChanged("completeTasks");
+ }
+
+ public long getTotalExecutorTime() {
+ return totalExecutorTime;
+ }
+
+ public void setTotalExecutorTime(long totalExecutorTime) {
+ this.totalExecutorTime = totalExecutorTime;
+ valueChanged("totalExecutorTime");
+ }
+
+ public long getExecutorMemoryOverhead() {
+ return executorMemoryOverhead;
+ }
+
+ public void setExecutorMemoryOverhead(long executorMemoryOverhead) {
+ this.executorMemoryOverhead = executorMemoryOverhead;
+ valueChanged("executorMemoryOverhead");
+ }
+
+ public long getDriverMemoryOverhead() {
+ return driverMemoryOverhead;
+ }
+
+ public void setDriverMemoryOverhead(long driverMemoryOverhead) {
+ this.driverMemoryOverhead = driverMemoryOverhead;
+ valueChanged("driverMemoryOverhead");
+ }
+
+ public int getExecutorCores() {
+ return executorCores;
+ }
+
+ public void setExecutorCores(int executorCores) {
+ this.executorCores = executorCores;
+ valueChanged("executorCores");
+ }
+
+ public int getDriverCores() {
+ return driverCores;
+ }
+
+ public void setDriverCores(int driverCores) {
+ this.driverCores = driverCores;
+ valueChanged("driverCores");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java
new file mode 100644
index 0000000..6d0441c
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+@Table("eagleSparkRunningExecutors")
+@ColumnFamily("f")
+@Prefix("sparkExecutor")
+@Service(Constants.RUNNING_SPARK_EXECUTOR_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", "executorId","user", "queue"})
+@Partition({"site"})
+public class SparkExecutorEntity extends TaggedLogAPIEntity {
+ @Column("a")
+ private String hostPort;
+ @Column("b")
+ private int rddBlocks;
+ @Column("c")
+ private long memoryUsed;
+ @Column("d")
+ private long diskUsed;
+ @Column("e")
+ private int activeTasks = 0;
+ @Column("f")
+ private int failedTasks = 0;
+ @Column("g")
+ private int completedTasks = 0;
+ @Column("h")
+ private int totalTasks = 0;
+ @Column("i")
+ private long totalDuration = 0;
+ @Column("j")
+ private long totalInputBytes = 0;
+ @Column("k")
+ private long totalShuffleRead = 0;
+ @Column("l")
+ private long totalShuffleWrite = 0;
+ @Column("m")
+ private long maxMemory;
+ @Column("n")
+ private long startTime;
+ @Column("o")
+ private long endTime = 0;
+ @Column("p")
+ private long execMemoryBytes;
+ @Column("q")
+ private int cores;
+ @Column("r")
+ private long memoryOverhead;
+
+ public String getHostPort() {
+ return hostPort;
+ }
+
+ public void setHostPort(String hostPort) {
+ this.hostPort = hostPort;
+ this.valueChanged("hostPort");
+ }
+
+ public int getRddBlocks() {
+ return rddBlocks;
+ }
+
+ public void setRddBlocks(int rddBlocks) {
+ this.rddBlocks = rddBlocks;
+ this.valueChanged("rddBlocks");
+ }
+
+ public long getMemoryUsed() {
+ return memoryUsed;
+ }
+
+ public void setMemoryUsed(long memoryUsed) {
+ this.memoryUsed = memoryUsed;
+ this.valueChanged("memoryUsed");
+ }
+
+ public long getDiskUsed() {
+ return diskUsed;
+ }
+
+ public void setDiskUsed(long diskUsed) {
+ this.diskUsed = diskUsed;
+ this.valueChanged("diskUsed");
+ }
+
+ public int getActiveTasks() {
+ return activeTasks;
+ }
+
+ public void setActiveTasks(int activeTasks) {
+ this.activeTasks = activeTasks;
+ this.valueChanged("activeTasks");
+ }
+
+ public int getFailedTasks() {
+ return failedTasks;
+ }
+
+ public void setFailedTasks(int failedTasks) {
+ this.failedTasks = failedTasks;
+ this.valueChanged("failedTasks");
+ }
+
+ public int getCompletedTasks() {
+ return completedTasks;
+ }
+
+ public void setCompletedTasks(int completedTasks) {
+ this.completedTasks = completedTasks;
+ this.valueChanged("completedTasks");
+ }
+
+ public int getTotalTasks() {
+ return totalTasks;
+ }
+
+ public void setTotalTasks(int totalTasks) {
+ this.totalTasks = totalTasks;
+ this.valueChanged("totalTasks");
+ }
+
+ public long getTotalDuration() {
+ return totalDuration;
+ }
+
+ public void setTotalDuration(long totalDuration) {
+ this.totalDuration = totalDuration;
+ this.valueChanged("totalDuration");
+ }
+
+ public long getTotalInputBytes() {
+ return totalInputBytes;
+ }
+
+ public void setTotalInputBytes(long totalInputBytes) {
+ this.totalInputBytes = totalInputBytes;
+ this.valueChanged("totalInputBytes");
+ }
+
+ public long getTotalShuffleRead() {
+ return totalShuffleRead;
+ }
+
+ public void setTotalShuffleRead(long totalShuffleRead) {
+ this.totalShuffleRead = totalShuffleRead;
+ this.valueChanged("totalShuffleRead");
+ }
+
+ public long getTotalShuffleWrite() {
+ return totalShuffleWrite;
+ }
+
+ public void setTotalShuffleWrite(long totalShuffleWrite) {
+ this.totalShuffleWrite = totalShuffleWrite;
+ this.valueChanged("totalShuffleWrite");
+ }
+
+ public long getMaxMemory() {
+ return maxMemory;
+ }
+
+ public void setMaxMemory(long maxMemory) {
+ this.maxMemory = maxMemory;
+ this.valueChanged("maxMemory");
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ valueChanged("startTime");
+ }
+
+ public long getEndTime() {
+ return endTime;
+ }
+
+ public void setEndTime(long endTime) {
+ this.endTime = endTime;
+ this.valueChanged("endTime");
+ }
+
+ public long getExecMemoryBytes() {
+ return execMemoryBytes;
+ }
+
+ public void setExecMemoryBytes(long execMemoryBytes) {
+ this.execMemoryBytes = execMemoryBytes;
+ this.valueChanged("execMemoryBytes");
+ }
+
+ public int getCores() {
+ return cores;
+ }
+
+ public void setCores(int cores) {
+ this.cores = cores;
+ valueChanged("cores");
+ }
+
+ public long getMemoryOverhead() {
+ return memoryOverhead;
+ }
+
+ public void setMemoryOverhead(long memoryOverhead) {
+ this.memoryOverhead = memoryOverhead;
+ valueChanged("memoryOverhead");
+ }
+}