You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ji...@apache.org on 2016/12/14 06:57:03 UTC

incubator-eagle git commit: [EAGLE-835] add task failure category

Repository: incubator-eagle
Updated Branches:
  refs/heads/master d6987af2b -> 9ca2cebae


[EAGLE-835] add task failure category

Author: wujinhu <wu...@126.com>

Closes #740 from wujinhu/EAGLE-836.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/9ca2ceba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/9ca2ceba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/9ca2ceba

Branch: refs/heads/master
Commit: 9ca2cebaed3dd91a9504326f753cb36a97f150d5
Parents: d6987af
Author: wujinhu <wu...@126.com>
Authored: Wed Dec 14 14:56:55 2016 +0800
Committer: wujinhu <wu...@126.com>
Committed: Wed Dec 14 14:56:55 2016 +0800

----------------------------------------------------------------------
 .../TaskAttemptErrorCategoryEntity.java         | 60 ++++++++++++++++++++
 .../TaskAttemptExecutionAPIEntity.java          | 11 ----
 .../mr/history/parser/JHFEventReaderBase.java   | 15 ++++-
 .../JobEntityCreationEagleServiceListener.java  |  9 +++
 .../mr/history/parser/TaskFailureListener.java  |  2 +-
 .../org/apache/eagle/jpm/util/Constants.java    |  1 +
 6 files changed, 85 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9ca2ceba/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptErrorCategoryEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptErrorCategoryEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptErrorCategoryEntity.java
new file mode 100644
index 0000000..51254c8
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptErrorCategoryEntity.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+
+package org.apache.eagle.jpm.mr.historyentity;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@Table("eaglejpa_task")
+@ColumnFamily("f")
+@Prefix("taexece")
+@Service(Constants.JPA_TASK_ATTEMPT_ERROR_SERVICE_NAME)
+@TimeSeries(true)
+@Partition({"site"})
+@Indexes({
+        @Index(name = "Index_1_jobId", columns = { "jobId" }, unique = false)
+    })
+public class TaskAttemptErrorCategoryEntity extends JobBaseAPIEntity {
+    @Column("a")
+    private long startTime;
+    @Column("b")
+    private long endTime;
+
+    public long getStartTime() {
+        return startTime;
+    }
+
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+        valueChanged("startTime");
+    }
+
+    public long getEndTime() {
+        return endTime;
+    }
+
+    public void setEndTime(long endTime) {
+        this.endTime = endTime;
+        valueChanged("endTime");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9ca2ceba/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java
index d1210b9..c284341 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java
@@ -46,8 +46,6 @@ public class TaskAttemptExecutionAPIEntity extends JobBaseAPIEntity {
     private String error;
     @Column("f")
     private JobCounters jobCounters;
-    @Column("g")
-    private String taskAttemptID;
 
     public String getTaskStatus() {
         return taskStatus;
@@ -102,13 +100,4 @@ public class TaskAttemptExecutionAPIEntity extends JobBaseAPIEntity {
         this.jobCounters = jobCounters;
         pcs.firePropertyChange("jobCounters", null, null);
     }
-
-    public String getTaskAttemptID() {
-        return taskAttemptID;
-    }
-
-    public void setTaskAttemptID(String taskAttemptID) {
-        this.taskAttemptID = taskAttemptID;
-        pcs.firePropertyChange("taskAttemptID", null, null);
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9ca2ceba/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
index 3a9e147..fb05bae 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
@@ -426,6 +426,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
             String rack = values.get(Keys.RACK);
             taskAttemptExecutionTags.put(MRJobTagName.HOSTNAME.toString(), hostname);
             taskAttemptExecutionTags.put(MRJobTagName.RACK.toString(), rack);
+            taskAttemptExecutionTags.put(MRJobTagName.TASK_ATTEMPT_ID.toString(), taskAttemptID);
             // put last attempt's hostname to task level
             taskRunningHosts.put(taskID, hostname);
             // it is very likely that an attempt ID could be both succeeded and failed due to M/R system
@@ -444,7 +445,6 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
                 //entity.setJobCounters(parseCounters(values.get(Keys.COUNTERS)));
                 entity.setJobCounters(parseCounters(counters));
             }
-            entity.setTaskAttemptID(taskAttemptID);
 
             if (recType == RecordTypes.MapAttempt) {
                 jobExecutionEntity.setTotalMapAttempts(1 + jobExecutionEntity.getTotalMapAttempts());
@@ -462,6 +462,19 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
 
             entityCreated(entity);
             attempt2ErrorMsg.put(taskAttemptID, Pair.of(taskID, entity.getError()));
+            //generate TaskAttemptErrorCategoryEntity
+            TaskAttemptErrorCategoryEntity taskAttemptErrorCategoryEntity = new TaskAttemptErrorCategoryEntity();
+            Map<String, String> taskAttemptErrorCategoryEntityTags = new HashMap<>(entity.getTags());
+            taskAttemptErrorCategoryEntity.setTags(taskAttemptErrorCategoryEntityTags);
+            if (!taskAttemptErrorCategoryEntityTags.containsKey(MRJobTagName.ERROR_CATEGORY.toString())) {
+                taskAttemptErrorCategoryEntityTags.put(MRJobTagName.ERROR_CATEGORY.toString(), "");
+            }
+
+            taskAttemptErrorCategoryEntity.setStartTime(entity.getStartTime());
+            taskAttemptErrorCategoryEntity.setEndTime(entity.getEndTime());
+            taskAttemptErrorCategoryEntity.setTimestamp(entity.getTimestamp());
+            entityCreated(taskAttemptErrorCategoryEntity);
+
             taskAttemptStartTime.remove(taskAttemptID);
         } else {
             // silently ignore

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9ca2ceba/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
index c802442..4eb58a7 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
@@ -47,6 +47,7 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
     List<JobEventAPIEntity> jobEvents = new ArrayList<>();
     List<TaskExecutionAPIEntity> taskExecs = new ArrayList<>();
     List<TaskAttemptExecutionAPIEntity> taskAttemptExecs = new ArrayList<>();
+    List<TaskAttemptErrorCategoryEntity> taskAttemptErrors = new ArrayList<>();
     private JobExecutionMetricsCreationListener jobExecutionMetricsCreationListener = new JobExecutionMetricsCreationListener();
     private TimeZone timeZone;
     private EagleOutputCollector collector;
@@ -117,6 +118,8 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
                 taskExecs.add((TaskExecutionAPIEntity) entity);
             } else if (entity instanceof TaskAttemptExecutionAPIEntity) {
                 taskAttemptExecs.add((TaskAttemptExecutionAPIEntity) entity);
+            } else if (entity instanceof TaskAttemptErrorCategoryEntity) {
+                taskAttemptErrors.add((TaskAttemptErrorCategoryEntity) entity);
             }
         }
         GenericServiceAPIResponseEntity result;
@@ -150,6 +153,12 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
             checkResult(result);
             taskAttemptExecs.clear();
         }
+        if (taskAttemptErrors.size() > 0) {
+            logger.info("flush TaskAttemptErrorCategoryEntity of number " + taskAttemptErrors.size());
+            result = client.create(taskAttemptErrors);
+            checkResult(result);
+            taskAttemptErrors.clear();
+        }
 
         logger.info("finish flushing entities of total number " + list.size());
         list.clear();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9ca2ceba/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
index 46056c4..61be66f 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
@@ -91,7 +91,7 @@ public class TaskFailureListener implements HistoryJobEntityCreationListener {
         tags.put(MRJobTagName.RACK.toString(), e.getTags().get(MRJobTagName.RACK.toString()));
         tags.put(MRJobTagName.HOSTNAME.toString(), e.getTags().get(MRJobTagName.HOSTNAME.toString()));
         tags.put(MRJobTagName.JOB_ID.toString(), e.getTags().get(MRJobTagName.JOB_ID.toString()));
-        tags.put(MRJobTagName.TASK_ATTEMPT_ID.toString(), e.getTaskAttemptID());
+        tags.put(MRJobTagName.TASK_ATTEMPT_ID.toString(), e.getTags().get(MRJobTagName.TASK_ATTEMPT_ID.toString()));
         tags.put(MRJobTagName.TASK_TYPE.toString(), e.getTags().get(MRJobTagName.TASK_TYPE.toString()));
 
         //TODO need optimize, match and then capture the data

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9ca2ceba/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
index 038618f..0ba6521 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
@@ -110,6 +110,7 @@ public class Constants {
     public static final String JPA_JOB_COUNT_SERVICE_NAME = "JobCountService";
     public static final String JPA_RUNNING_JOB_EXECUTION_SERVICE_NAME = "RunningJobExecutionService";
     public static final String JPA_TASK_ATTEMPT_EXECUTION_SERVICE_NAME = "TaskAttemptExecutionService";
+    public static final String JPA_TASK_ATTEMPT_ERROR_SERVICE_NAME = "TaskAttemptErrorCategoryService";
     public static final String JPA_TASK_FAILURE_COUNT_SERVICE_NAME = "TaskFailureCountService";
     public static final String JPA_TASK_ATTEMPT_COUNTER_SERVICE_NAME = "TaskAttemptCounterService";
     public static final String JPA_TASK_EXECUTION_SERVICE_NAME = "TaskExecutionService";