You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ji...@apache.org on 2016/12/14 06:57:03 UTC
incubator-eagle git commit: [EAGLE-835] add task failure category
Repository: incubator-eagle
Updated Branches:
refs/heads/master d6987af2b -> 9ca2cebae
[EAGLE-835] add task failure category
Author: wujinhu <wu...@126.com>
Closes #740 from wujinhu/EAGLE-836.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/9ca2ceba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/9ca2ceba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/9ca2ceba
Branch: refs/heads/master
Commit: 9ca2cebaed3dd91a9504326f753cb36a97f150d5
Parents: d6987af
Author: wujinhu <wu...@126.com>
Authored: Wed Dec 14 14:56:55 2016 +0800
Committer: wujinhu <wu...@126.com>
Committed: Wed Dec 14 14:56:55 2016 +0800
----------------------------------------------------------------------
.../TaskAttemptErrorCategoryEntity.java | 60 ++++++++++++++++++++
.../TaskAttemptExecutionAPIEntity.java | 11 ----
.../mr/history/parser/JHFEventReaderBase.java | 15 ++++-
.../JobEntityCreationEagleServiceListener.java | 9 +++
.../mr/history/parser/TaskFailureListener.java | 2 +-
.../org/apache/eagle/jpm/util/Constants.java | 1 +
6 files changed, 85 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9ca2ceba/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptErrorCategoryEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptErrorCategoryEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptErrorCategoryEntity.java
new file mode 100644
index 0000000..51254c8
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptErrorCategoryEntity.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+
+package org.apache.eagle.jpm.mr.historyentity;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@Table("eaglejpa_task")
+@ColumnFamily("f")
+@Prefix("taexece")
+@Service(Constants.JPA_TASK_ATTEMPT_ERROR_SERVICE_NAME)
+@TimeSeries(true)
+@Partition({"site"})
+@Indexes({
+ @Index(name = "Index_1_jobId", columns = { "jobId" }, unique = false)
+ })
+public class TaskAttemptErrorCategoryEntity extends JobBaseAPIEntity {
+ @Column("a")
+ private long startTime;
+ @Column("b")
+ private long endTime;
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ valueChanged("startTime");
+ }
+
+ public long getEndTime() {
+ return endTime;
+ }
+
+ public void setEndTime(long endTime) {
+ this.endTime = endTime;
+ valueChanged("endTime");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9ca2ceba/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java
index d1210b9..c284341 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java
@@ -46,8 +46,6 @@ public class TaskAttemptExecutionAPIEntity extends JobBaseAPIEntity {
private String error;
@Column("f")
private JobCounters jobCounters;
- @Column("g")
- private String taskAttemptID;
public String getTaskStatus() {
return taskStatus;
@@ -102,13 +100,4 @@ public class TaskAttemptExecutionAPIEntity extends JobBaseAPIEntity {
this.jobCounters = jobCounters;
pcs.firePropertyChange("jobCounters", null, null);
}
-
- public String getTaskAttemptID() {
- return taskAttemptID;
- }
-
- public void setTaskAttemptID(String taskAttemptID) {
- this.taskAttemptID = taskAttemptID;
- pcs.firePropertyChange("taskAttemptID", null, null);
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9ca2ceba/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
index 3a9e147..fb05bae 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
@@ -426,6 +426,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
String rack = values.get(Keys.RACK);
taskAttemptExecutionTags.put(MRJobTagName.HOSTNAME.toString(), hostname);
taskAttemptExecutionTags.put(MRJobTagName.RACK.toString(), rack);
+ taskAttemptExecutionTags.put(MRJobTagName.TASK_ATTEMPT_ID.toString(), taskAttemptID);
// put last attempt's hostname to task level
taskRunningHosts.put(taskID, hostname);
// it is very likely that an attempt ID could be both succeeded and failed due to M/R system
@@ -444,7 +445,6 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
//entity.setJobCounters(parseCounters(values.get(Keys.COUNTERS)));
entity.setJobCounters(parseCounters(counters));
}
- entity.setTaskAttemptID(taskAttemptID);
if (recType == RecordTypes.MapAttempt) {
jobExecutionEntity.setTotalMapAttempts(1 + jobExecutionEntity.getTotalMapAttempts());
@@ -462,6 +462,19 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
entityCreated(entity);
attempt2ErrorMsg.put(taskAttemptID, Pair.of(taskID, entity.getError()));
+ //generate TaskAttemptErrorCategoryEntity
+ TaskAttemptErrorCategoryEntity taskAttemptErrorCategoryEntity = new TaskAttemptErrorCategoryEntity();
+ Map<String, String> taskAttemptErrorCategoryEntityTags = new HashMap<>(entity.getTags());
+ taskAttemptErrorCategoryEntity.setTags(taskAttemptErrorCategoryEntityTags);
+ if (!taskAttemptErrorCategoryEntityTags.containsKey(MRJobTagName.ERROR_CATEGORY.toString())) {
+ taskAttemptErrorCategoryEntityTags.put(MRJobTagName.ERROR_CATEGORY.toString(), "");
+ }
+
+ taskAttemptErrorCategoryEntity.setStartTime(entity.getStartTime());
+ taskAttemptErrorCategoryEntity.setEndTime(entity.getEndTime());
+ taskAttemptErrorCategoryEntity.setTimestamp(entity.getTimestamp());
+ entityCreated(taskAttemptErrorCategoryEntity);
+
taskAttemptStartTime.remove(taskAttemptID);
} else {
// silently ignore
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9ca2ceba/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
index c802442..4eb58a7 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
@@ -47,6 +47,7 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
List<JobEventAPIEntity> jobEvents = new ArrayList<>();
List<TaskExecutionAPIEntity> taskExecs = new ArrayList<>();
List<TaskAttemptExecutionAPIEntity> taskAttemptExecs = new ArrayList<>();
+ List<TaskAttemptErrorCategoryEntity> taskAttemptErrors = new ArrayList<>();
private JobExecutionMetricsCreationListener jobExecutionMetricsCreationListener = new JobExecutionMetricsCreationListener();
private TimeZone timeZone;
private EagleOutputCollector collector;
@@ -117,6 +118,8 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
taskExecs.add((TaskExecutionAPIEntity) entity);
} else if (entity instanceof TaskAttemptExecutionAPIEntity) {
taskAttemptExecs.add((TaskAttemptExecutionAPIEntity) entity);
+ } else if (entity instanceof TaskAttemptErrorCategoryEntity) {
+ taskAttemptErrors.add((TaskAttemptErrorCategoryEntity) entity);
}
}
GenericServiceAPIResponseEntity result;
@@ -150,6 +153,12 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
checkResult(result);
taskAttemptExecs.clear();
}
+ if (taskAttemptErrors.size() > 0) {
+ logger.info("flush TaskAttemptErrorCategoryEntity of number " + taskAttemptErrors.size());
+ result = client.create(taskAttemptErrors);
+ checkResult(result);
+ taskAttemptErrors.clear();
+ }
logger.info("finish flushing entities of total number " + list.size());
list.clear();
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9ca2ceba/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
index 46056c4..61be66f 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
@@ -91,7 +91,7 @@ public class TaskFailureListener implements HistoryJobEntityCreationListener {
tags.put(MRJobTagName.RACK.toString(), e.getTags().get(MRJobTagName.RACK.toString()));
tags.put(MRJobTagName.HOSTNAME.toString(), e.getTags().get(MRJobTagName.HOSTNAME.toString()));
tags.put(MRJobTagName.JOB_ID.toString(), e.getTags().get(MRJobTagName.JOB_ID.toString()));
- tags.put(MRJobTagName.TASK_ATTEMPT_ID.toString(), e.getTaskAttemptID());
+ tags.put(MRJobTagName.TASK_ATTEMPT_ID.toString(), e.getTags().get(MRJobTagName.TASK_ATTEMPT_ID.toString()));
tags.put(MRJobTagName.TASK_TYPE.toString(), e.getTags().get(MRJobTagName.TASK_TYPE.toString()));
//TODO need optimize, match and then capture the data
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9ca2ceba/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
index 038618f..0ba6521 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
@@ -110,6 +110,7 @@ public class Constants {
public static final String JPA_JOB_COUNT_SERVICE_NAME = "JobCountService";
public static final String JPA_RUNNING_JOB_EXECUTION_SERVICE_NAME = "RunningJobExecutionService";
public static final String JPA_TASK_ATTEMPT_EXECUTION_SERVICE_NAME = "TaskAttemptExecutionService";
+ public static final String JPA_TASK_ATTEMPT_ERROR_SERVICE_NAME = "TaskAttemptErrorCategoryService";
public static final String JPA_TASK_FAILURE_COUNT_SERVICE_NAME = "TaskFailureCountService";
public static final String JPA_TASK_ATTEMPT_COUNTER_SERVICE_NAME = "TaskAttemptCounterService";
public static final String JPA_TASK_EXECUTION_SERVICE_NAME = "TaskExecutionService";