You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/08/17 04:27:25 UTC

[29/33] helix git commit: Populate Error message from running client's task and persist it into JobContext for better error reporting.

Populate Error message from running client's task and persist it into JobContext for better error reporting.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/9508a1ac
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/9508a1ac
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/9508a1ac

Branch: refs/heads/helix-0.6.x
Commit: 9508a1acfae1d915148138daccb2abd5f9dce430
Parents: c3624e0
Author: Lei Xia <lx...@linkedin.com>
Authored: Thu May 5 11:25:22 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Tue Jul 5 16:18:34 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/task/JobContext.java  |  51 ++++++--
 .../org/apache/helix/task/JobRebalancer.java    |  15 ++-
 .../java/org/apache/helix/task/TaskRunner.java  |   5 +-
 .../org/apache/helix/task/TaskStateModel.java   |   4 +-
 .../apache/helix/integration/task/MockTask.java |  23 +++-
 .../task/TestTaskErrorReporting.java            | 117 +++++++++++++++++++
 6 files changed, 194 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/9508a1ac/helix-core/src/main/java/org/apache/helix/task/JobContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobContext.java b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
index 2057f27..328fcc0 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobContext.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
@@ -45,7 +45,8 @@ public class JobContext extends HelixProperty {
     TARGET,
     TASK_ID,
     ASSIGNED_PARTICIPANT,
-    NEXT_RETRY_TIME
+    NEXT_RETRY_TIME,
+    INFO
   }
 
   public JobContext(ZNRecord record) {
@@ -76,8 +77,18 @@ public class JobContext extends HelixProperty {
     return Long.parseLong(tStr);
   }
 
+  public void setInfo(String info) {
+    if (info != null) {
+      _record.setSimpleField(ContextProperties.INFO.toString(), info);
+    }
+  }
+
+  public String getInfo() {
+    return _record.getSimpleField(ContextProperties.INFO.toString());
+  }
+
   public void setPartitionState(int p, TaskPartitionState s) {
-    Map<String, String> map = getMapField(p);
+    Map<String, String> map = getMapField(p, true);
     map.put(ContextProperties.STATE.toString(), s.name());
   }
 
@@ -95,7 +106,7 @@ public class JobContext extends HelixProperty {
   }
 
   public void setPartitionNumAttempts(int p, int n) {
-    Map<String, String> map = getMapField(p);
+    Map<String, String> map = getMapField(p, true);
     map.put(ContextProperties.NUM_ATTEMPTS.toString(), String.valueOf(n));
   }
 
@@ -122,7 +133,7 @@ public class JobContext extends HelixProperty {
   }
 
   public void setPartitionStartTime(int p, long t) {
-    Map<String, String> map = getMapField(p);
+    Map<String, String> map = getMapField(p, true);
     map.put(ContextProperties.START_TIME.toString(), String.valueOf(t));
   }
 
@@ -139,7 +150,7 @@ public class JobContext extends HelixProperty {
   }
 
   public void setPartitionFinishTime(int p, long t) {
-    Map<String, String> map = getMapField(p);
+    Map<String, String> map = getMapField(p, true);
     map.put(ContextProperties.FINISH_TIME.toString(), String.valueOf(t));
   }
 
@@ -156,7 +167,7 @@ public class JobContext extends HelixProperty {
   }
 
   public void setPartitionTarget(int p, String targetPName) {
-    Map<String, String> map = getMapField(p);
+    Map<String, String> map = getMapField(p, true);
     map.put(ContextProperties.TARGET.toString(), targetPName);
   }
 
@@ -165,6 +176,16 @@ public class JobContext extends HelixProperty {
     return (map != null) ? map.get(ContextProperties.TARGET.toString()) : null;
   }
 
+  public void setPartitionInfo(int p, String info) {
+    Map<String, String> map = getMapField(p, true);
+    map.put(ContextProperties.INFO.toString(), info);
+  }
+
+  public String getPartitionInfo(int p) {
+    Map<String, String> map = getMapField(p);
+    return (map != null) ? map.get(ContextProperties.INFO.toString()) : null;
+  }
+
   public Map<String, List<Integer>> getPartitionsByTarget() {
     Map<String, List<Integer>> result = Maps.newHashMap();
     for (Map.Entry<String, Map<String, String>> mapField : _record.getMapFields().entrySet()) {
@@ -194,7 +215,7 @@ public class JobContext extends HelixProperty {
   }
 
   public void setTaskIdForPartition(int p, String taskId) {
-    Map<String, String> map = getMapField(p);
+    Map<String, String> map = getMapField(p, true);
     map.put(ContextProperties.TASK_ID.toString(), taskId);
   }
 
@@ -216,7 +237,7 @@ public class JobContext extends HelixProperty {
   }
 
   public void setAssignedParticipant(int p, String participantName) {
-    Map<String, String> map = getMapField(p);
+    Map<String, String> map = getMapField(p, true);
     map.put(ContextProperties.ASSIGNED_PARTICIPANT.toString(), participantName);
   }
 
@@ -226,7 +247,7 @@ public class JobContext extends HelixProperty {
   }
 
   public void setNextRetryTime(int p, long t) {
-    Map<String, String> map = getMapField(p);
+    Map<String, String> map = getMapField(p, true);
     map.put(ContextProperties.NEXT_RETRY_TIME.toString(), String.valueOf(t));
   }
 
@@ -242,10 +263,20 @@ public class JobContext extends HelixProperty {
     return Long.parseLong(tStr);
   }
 
+  /**
+   * Get MapField for the given partition.
+   *
+   * @param p
+   * @return mapField for the partition, NULL if the partition has not scheduled yet.
+   */
   public Map<String, String> getMapField(int p) {
+    return getMapField(p, false);
+  }
+
+  private Map<String, String> getMapField(int p, boolean createIfNotPresent) {
     String pStr = String.valueOf(p);
     Map<String, String> map = _record.getMapField(pStr);
-    if (map == null) {
+    if (map == null && createIfNotPresent) {
       map = new TreeMap<String, String>();
       _record.setMapField(pStr, map);
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/9508a1ac/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index 0f34178..fae7ac7 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -214,8 +214,9 @@ public class JobRebalancer extends TaskRebalancer {
 
     if (allPartitions == null || allPartitions.isEmpty()) {
       // Empty target partitions, mark the job as FAILED.
-      LOG.warn(
-          "Missing task partition mapping for job " + jobResource + ", marked the job as FAILED!");
+      String failureMsg = "Empty task partition mapping for job " + jobResource + ", marked the job as FAILED!";
+      LOG.info(failureMsg);
+      jobCtx.setInfo(failureMsg);
       markJobFailed(jobResource, jobCtx, workflowConfig, workflowCtx);
       markAllPartitionsError(jobCtx, TaskPartitionState.ERROR, false);
       return new ResourceAssignment(jobResource);
@@ -266,6 +267,12 @@ public class JobRebalancer extends TaskRebalancer {
                 pName), instance));
         jobCtx.setPartitionState(pId, currState);
 
+        String taskMsg = currStateOutput.getInfo(jobResource, new Partition(
+            pName), instance);
+        if (taskMsg != null) {
+          jobCtx.setPartitionInfo(pId, taskMsg);
+        }
+
         // Process any requested state transitions.
         String requestedStateStr =
             currStateOutput.getRequestedState(jobResource, new Partition(pName), instance);
@@ -318,8 +325,8 @@ public class JobRebalancer extends TaskRebalancer {
         case ERROR: {
           donePartitions.add(pId); // The task may be rescheduled on a different instance.
           LOG.debug(String.format(
-              "Task partition %s has error state %s. Marking as such in rebalancer context.", pName,
-              currState));
+              "Task partition %s has error state %s with msg %s. Marking as such in rebalancer context.", pName,
+              currState, taskMsg));
           markPartitionError(jobCtx, pId, currState, true);
           // The error policy is to fail the task as soon a single partition fails for a specified
           // maximum number of attempts or task is in ABORTED state.

http://git-wip-us.apache.org/repos/asf/helix/blob/9508a1ac/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
index c43d0ce..eabaf64 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
@@ -72,7 +72,7 @@ public class TaskRunner implements Runnable {
         throw death;
       } catch (Throwable t) {
         LOG.error("Problem running the task, report task as FAILED.", t);
-        _result = new TaskResult(Status.FAILED, null);
+        _result = new TaskResult(Status.FAILED, "Exception happened in running task: " + t.getMessage());
       }
 
       switch (_result.getStatus()) {
@@ -98,6 +98,9 @@ public class TaskRunner implements Runnable {
         throw new AssertionError("Unknown task result type: " + _result.getStatus().name());
       }
     } catch (Exception e) {
+      LOG.error("Problem running the task, report task as FAILED.", e);
+      _result =
+          new TaskResult(Status.FAILED, "Exception happened in running task: " + e.getMessage());
       requestStateTransition(TaskPartitionState.TASK_ERROR);
     } finally {
       synchronized (_doneSync) {

http://git-wip-us.apache.org/repos/asf/helix/blob/9508a1ac/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
index ba68a78..fd07176 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
@@ -87,7 +87,7 @@ public class TaskStateModel extends StateModel {
   }
 
   @Transition(to = "COMPLETED", from = "RUNNING")
-  public void onBecomeCompletedFromRunning(Message msg, NotificationContext context) {
+  public String onBecomeCompletedFromRunning(Message msg, NotificationContext context) {
     String taskPartition = msg.getPartitionName();
     if (_taskRunner == null) {
       throw new IllegalStateException(String.format(
@@ -102,6 +102,8 @@ public class TaskStateModel extends StateModel {
     }
 
     timeout_task.cancel(false);
+
+    return r.getInfo();
   }
 
   @Transition(to = "TIMED_OUT", from = "RUNNING")

http://git-wip-us.apache.org/repos/asf/helix/blob/9508a1ac/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java b/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
index 3fe1d6f..db0c8f4 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
@@ -32,12 +32,14 @@ public class MockTask implements Task {
   public static final String TIMEOUT_CONFIG = "Timeout";
   public static final String TASK_RESULT_STATUS = "TaskResultStatus";
   public static final String THROW_EXCEPTION = "ThrowException";
+  public static final String ERROR_MESSAGE = "ErrorMessage";
   public static final String FAILURE_COUNT_BEFORE_SUCCESS = "FailureCountBeforeSuccess";
   private final long _delay;
   private volatile boolean _canceled;
   private TaskResult.Status _taskResultStatus;
   private boolean _throwException;
   private int _expectedToSuccess;
+  private String _errorMsg;
 
   public MockTask(TaskCallbackContext context) {
     Map<String, String> cfg = context.getJobConfig().getJobCommandConfigMap();
@@ -59,8 +61,10 @@ public class MockTask implements Task {
         Boolean.valueOf(cfg.containsKey(THROW_EXCEPTION)) :
         false;
     _expectedToSuccess =
-        cfg.containsKey(FAILURE_COUNT_BEFORE_SUCCESS) ? Integer.parseInt(cfg.get(
-            FAILURE_COUNT_BEFORE_SUCCESS)) : 0;
+        cfg.containsKey(FAILURE_COUNT_BEFORE_SUCCESS) ? Integer.parseInt(
+            cfg.get(FAILURE_COUNT_BEFORE_SUCCESS)) : 0;
+
+    _errorMsg = cfg.containsKey(ERROR_MESSAGE) ? cfg.get(ERROR_MESSAGE) : null;
   }
 
   @Override
@@ -77,12 +81,21 @@ public class MockTask implements Task {
     }
     timeLeft = expiry - System.currentTimeMillis();
 
-    if (_throwException || _expectedToSuccess > 0) {
+    if (_throwException) {
+      _expectedToSuccess--;
+      if (_errorMsg == null) {
+        _errorMsg = "Test failed";
+      }
+      throw new RuntimeException(_errorMsg != null ? _errorMsg : "Test failed");
+    }
+
+    if (_expectedToSuccess > 0){
       _expectedToSuccess--;
-      throw new RuntimeException("Test failed");
+      throw new RuntimeException(_errorMsg != null ? _errorMsg : "Test failed");
     }
 
-    return new TaskResult(_taskResultStatus, String.valueOf(timeLeft < 0 ? 0 : timeLeft));
+    return new TaskResult(_taskResultStatus,
+        _errorMsg != null ? _errorMsg : String.valueOf(timeLeft < 0 ? 0 : timeLeft));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/9508a1ac/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskErrorReporting.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskErrorReporting.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskErrorReporting.java
new file mode 100644
index 0000000..906dcff
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskErrorReporting.java
@@ -0,0 +1,117 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskPartitionState;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.Workflow;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Test Error reporting for failed tasks
+ */
+public class TestTaskErrorReporting extends TaskTestBase {
+
+  @Test
+  public void test() throws Exception {
+    int taskRetryCount = 1;
+    int num_tasks = 5;
+
+    String jobResource = TestHelper.getTestMethodName();
+    JobConfig.Builder jobBuilder = new JobConfig.Builder();
+    jobBuilder.setCommand(MockTask.TASK_COMMAND).setTimeoutPerTask(10000)
+        .setMaxAttemptsPerTask(taskRetryCount).setFailureThreshold(Integer.MAX_VALUE);
+
+    // create each task configs.
+    final int abortedTask = 1;
+    final int failedTask = 2;
+    final int exceptionTask = 3;
+
+    final String abortedMsg = "This task aborted, some terrible things must happened.";
+    final String failedMsg = "This task failed, something may be wrong.";
+    final String exceptionMsg = "This task throws exception.";
+    final String successMsg = "Yes, we did it!";
+
+    List<TaskConfig> taskConfigs = new ArrayList<TaskConfig>();
+    for (int j = 0; j < num_tasks; j++) {
+      TaskConfig.Builder configBuilder = new TaskConfig.Builder().setTaskId("task_" + j);
+      switch (j) {
+      case abortedTask:
+        configBuilder.addConfig(MockTask.TASK_RESULT_STATUS, TaskResult.Status.FATAL_FAILED.name())
+            .addConfig(MockTask.ERROR_MESSAGE, abortedMsg);
+        break;
+      case failedTask:
+        configBuilder.addConfig(MockTask.TASK_RESULT_STATUS, TaskResult.Status.FAILED.name())
+            .addConfig(MockTask.ERROR_MESSAGE, failedMsg);
+        break;
+      case exceptionTask:
+        configBuilder.addConfig(MockTask.THROW_EXCEPTION, Boolean.TRUE.toString())
+            .addConfig(MockTask.ERROR_MESSAGE, exceptionMsg);
+        break;
+      default:
+        configBuilder.addConfig(MockTask.ERROR_MESSAGE, successMsg);
+        break;
+      }
+      configBuilder.setTargetPartition(String.valueOf(j));
+      taskConfigs.add(configBuilder.build());
+    }
+    jobBuilder.addTaskConfigs(taskConfigs);
+
+    Workflow flow =
+        WorkflowGenerator.generateSingleJobWorkflowBuilder(jobResource, jobBuilder).build();
+
+    _driver.start(flow);
+
+    // Wait until the job completes.
+    TaskTestUtil.pollForWorkflowState(_driver, jobResource, TaskState.COMPLETED);
+
+    JobContext ctx = _driver.getJobContext(TaskUtil.getNamespacedJobName(jobResource));
+    for (int i = 0; i < num_tasks; i++) {
+      TaskPartitionState state = ctx.getPartitionState(i);
+      String taskId = ctx.getTaskIdForPartition(i);
+      String errMsg = ctx.getPartitionInfo(i);
+
+      if (taskId.equals("task_" + abortedTask)) {
+        Assert.assertEquals(state, TaskPartitionState.TASK_ABORTED);
+        Assert.assertEquals(errMsg, abortedMsg);
+      } else if (taskId.equals("task_" + failedTask)) {
+        Assert.assertEquals(state, TaskPartitionState.TASK_ERROR);
+        Assert.assertEquals(errMsg, failedMsg);
+      } else if (taskId.equals("task_" + exceptionTask)) {
+        Assert.assertEquals(state, TaskPartitionState.TASK_ERROR);
+        Assert.assertTrue(errMsg.contains(exceptionMsg));
+      } else {
+        Assert.assertEquals(state, TaskPartitionState.COMPLETED);
+        Assert.assertEquals(errMsg, successMsg);
+
+      }
+    }
+  }
+}