You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/11/01 00:24:30 UTC

[1/3] helix git commit: [HELIX-776] REST2.0: Add delete command to updateInstanceConfig

Repository: helix
Updated Branches:
  refs/heads/master b235c4ee5 -> ceba1a55a


[HELIX-776] REST2.0: Add delete command to updateInstanceConfig

For instance configs, REST2.0 did not expose the REST API for deletion of fields. This RB adds update and delete commands to updateInstanceConfig and an integration test thereof.
Changelist:
1. Add delete command to updateInstanceConfig in InstanceAccessor
2. Add integration tests


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/6090732b
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/6090732b
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/6090732b

Branch: refs/heads/master
Commit: 6090732be6b88863017a93106fa692dc7350520b
Parents: b235c4e
Author: Hunter Lee <hu...@linkedin.com>
Authored: Wed Oct 31 14:20:18 2018 -0700
Committer: Hunter Lee <hu...@linkedin.com>
Committed: Wed Oct 31 14:20:18 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/ConfigAccessor.java   |   6 +
 .../resources/helix/InstanceAccessor.java       |  34 +++++-
 .../helix/rest/server/TestInstanceAccessor.java | 114 ++++++++++++++++++-
 3 files changed, 146 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/6090732b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
index 2755113..53f42fb 100644
--- a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
@@ -765,6 +765,12 @@ public class ConfigAccessor {
             .forParticipant(instanceName).build();
     String zkPath = scope.getZkPath();
 
+    if (!zkClient.exists(zkPath)) {
+      throw new HelixException(
+          "updateInstanceConfig failed. Given InstanceConfig does not already exist. instance: "
+              + instanceName);
+    }
+
     if (overwrite) {
       ZKUtil.createOrReplace(zkClient, zkPath, instanceConfig.getRecord(), true);
     } else {

http://git-wip-us.apache.org/repos/asf/helix/blob/6090732b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstanceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstanceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstanceAccessor.java
index 98af0ee..38ee3b5 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstanceAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstanceAccessor.java
@@ -41,10 +41,12 @@ import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Error;
 import org.apache.helix.model.HealthStat;
+import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.ParticipantHistory;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.codehaus.jackson.JsonNode;
@@ -321,7 +323,19 @@ public class InstanceAccessor extends AbstractHelixResource {
   @POST
   @Path("{instanceName}/configs")
   public Response updateInstanceConfig(@PathParam("clusterId") String clusterId,
-      @PathParam("instanceName") String instanceName, String content) {
+      @PathParam("instanceName") String instanceName, @QueryParam("command") String commandStr,
+      String content) {
+    Command command;
+    if (commandStr == null || commandStr.isEmpty()) {
+      command = Command.update; // Default behavior to keep it backward-compatible
+    } else {
+      try {
+        command = getCommand(commandStr);
+      } catch (HelixException ex) {
+        return badRequest(ex.getMessage());
+      }
+    }
+
     ZNRecord record;
     try {
       record = toZNRecord(content);
@@ -332,11 +346,25 @@ public class InstanceAccessor extends AbstractHelixResource {
     InstanceConfig instanceConfig = new InstanceConfig(record);
     ConfigAccessor configAccessor = getConfigAccessor();
     try {
-      configAccessor.updateInstanceConfig(clusterId, instanceName, instanceConfig);
+      switch (command) {
+      case update:
+        configAccessor.updateInstanceConfig(clusterId, instanceName, instanceConfig);
+        break;
+      case delete: {
+        HelixConfigScope instanceScope =
+            new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT)
+                .forCluster(clusterId).forParticipant(instanceName).build();
+        configAccessor.remove(instanceScope, record);
+      }
+        break;
+      default:
+        return badRequest(String.format("Unsupported command: %s", command));
+      }
     } catch (HelixException ex) {
       return notFound(ex.getMessage());
     } catch (Exception ex) {
-      _logger.error(String.format("Error in update instance config for instance: %s", instanceName), ex);
+      _logger.error(String.format("Error in update instance config for instance: %s", instanceName),
+          ex);
       return serverError(ex);
     }
     return OK();

http://git-wip-us.apache.org/repos/asf/helix/blob/6090732b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstanceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstanceAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstanceAccessor.java
index 24d2910..94c28b2 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstanceAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstanceAccessor.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -201,20 +202,56 @@ public class TestInstanceAccessor extends AbstractTestClass {
         new HashSet<>(Arrays.asList(CLUSTER_NAME + dbName + "0", CLUSTER_NAME + dbName + "3")));
   }
 
+  /**
+   * Test "update" command for updateInstanceConfig endpoint.
+   * @throws IOException
+   */
   @Test(dependsOnMethods = "updateInstance")
   public void updateInstanceConfig() throws IOException {
     System.out.println("Start test :" + TestHelper.getTestMethodName());
     String instanceName = CLUSTER_NAME + "localhost_12918";
     InstanceConfig instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, instanceName);
     ZNRecord record = instanceConfig.getRecord();
-    record.getSimpleFields().put("TestSimple", "value");
-    record.getMapFields().put("TestMap", ImmutableMap.of("key", "value"));
-    record.getListFields().put("TestList", Arrays.asList("e1", "e2", "e3"));
 
+    // Generate a record containing three keys (k0, k1, k2) for all fields
+    String value = "value";
+    for (int i = 0; i < 3; i++) {
+      String key = "k" + i;
+      record.getSimpleFields().put(key, value);
+      record.getMapFields().put(key, ImmutableMap.of(key, value));
+      record.getListFields().put(key, Arrays.asList(key, value));
+    }
+
+    // 1. Add these fields by way of "update"
     Entity entity =
         Entity.entity(OBJECT_MAPPER.writeValueAsString(record), MediaType.APPLICATION_JSON_TYPE);
-    post("clusters/" + CLUSTER_NAME + "/instances/" + instanceName + "/configs", null, entity,
-        Response.Status.OK.getStatusCode());
+    post("clusters/" + CLUSTER_NAME + "/instances/" + instanceName + "/configs",
+        Collections.singletonMap("command", "update"), entity, Response.Status.OK.getStatusCode());
+
+    // Check that the fields have been added
+    Assert.assertEquals(record.getSimpleFields(),
+        _configAccessor.getInstanceConfig(CLUSTER_NAME, instanceName).getRecord()
+            .getSimpleFields());
+    Assert.assertEquals(record.getListFields(),
+        _configAccessor.getInstanceConfig(CLUSTER_NAME, instanceName).getRecord().getListFields());
+    Assert.assertEquals(record.getMapFields(),
+        _configAccessor.getInstanceConfig(CLUSTER_NAME, instanceName).getRecord().getMapFields());
+
+    String newValue = "newValue";
+    // 2. Modify the record and update
+    for (int i = 0; i < 3; i++) {
+      String key = "k" + i;
+      record.getSimpleFields().put(key, newValue);
+      record.getMapFields().put(key, ImmutableMap.of(key, newValue));
+      record.getListFields().put(key, Arrays.asList(key, newValue));
+    }
+
+    entity =
+        Entity.entity(OBJECT_MAPPER.writeValueAsString(record), MediaType.APPLICATION_JSON_TYPE);
+    post("clusters/" + CLUSTER_NAME + "/instances/" + instanceName + "/configs",
+        Collections.singletonMap("command", "update"), entity, Response.Status.OK.getStatusCode());
+
+    // Check that the fields have been modified
     Assert.assertEquals(record.getSimpleFields(),
         _configAccessor.getInstanceConfig(CLUSTER_NAME, instanceName).getRecord()
             .getSimpleFields());
@@ -223,4 +260,71 @@ public class TestInstanceAccessor extends AbstractTestClass {
     Assert.assertEquals(record.getMapFields(),
         _configAccessor.getInstanceConfig(CLUSTER_NAME, instanceName).getRecord().getMapFields());
   }
+
+  /**
+   * Test the "delete" command of updateInstanceConfig.
+   * @throws IOException
+   */
+  @Test(dependsOnMethods = "updateInstanceConfig")
+  public void deleteInstanceConfig() throws IOException {
+    System.out.println("Start test :" + TestHelper.getTestMethodName());
+    String instanceName = CLUSTER_NAME + "localhost_12918";
+    ZNRecord record = new ZNRecord(instanceName);
+
+    // Generate a record containing three keys (k1, k2, k3) for all fields for deletion
+    String value = "value";
+    for (int i = 1; i < 4; i++) {
+      String key = "k" + i;
+      record.getSimpleFields().put(key, value);
+      record.getMapFields().put(key, ImmutableMap.of(key, value));
+      record.getListFields().put(key, Arrays.asList(key, value));
+    }
+
+    // First, add these fields by way of "update"
+    Entity entity =
+        Entity.entity(OBJECT_MAPPER.writeValueAsString(record), MediaType.APPLICATION_JSON_TYPE);
+    post("clusters/" + CLUSTER_NAME + "/instances/" + instanceName + "/configs",
+        Collections.singletonMap("command", "delete"), entity, Response.Status.OK.getStatusCode());
+
+    // Check that the keys k1 and k2 have been deleted, and k0 remains
+    for (int i = 0; i < 4; i++) {
+      String key = "k" + i;
+      if (i == 0) {
+        Assert.assertTrue(_configAccessor.getInstanceConfig(CLUSTER_NAME, instanceName).getRecord()
+            .getSimpleFields().containsKey(key));
+        Assert.assertTrue(_configAccessor.getInstanceConfig(CLUSTER_NAME, instanceName).getRecord()
+            .getListFields().containsKey(key));
+        Assert.assertTrue(_configAccessor.getInstanceConfig(CLUSTER_NAME, instanceName).getRecord()
+            .getMapFields().containsKey(key));
+        continue;
+      }
+      Assert.assertFalse(_configAccessor.getInstanceConfig(CLUSTER_NAME, instanceName).getRecord()
+          .getSimpleFields().containsKey(key));
+      Assert.assertFalse(_configAccessor.getInstanceConfig(CLUSTER_NAME, instanceName).getRecord()
+          .getListFields().containsKey(key));
+      Assert.assertFalse(_configAccessor.getInstanceConfig(CLUSTER_NAME, instanceName).getRecord()
+          .getMapFields().containsKey(key));
+    }
+  }
+
+  /**
+   * Check that updateInstanceConfig fails when there is no pre-existing InstanceConfig ZNode. This
+   * is because InstanceConfig should have been created when the instance was added, and this REST
+   * endpoint is not meant for creation.
+   */
+  @Test(dependsOnMethods = "deleteInstanceConfig")
+  public void checkUpdateFails() throws IOException {
+    System.out.println("Start test :" + TestHelper.getTestMethodName());
+    String instanceName = CLUSTER_NAME + "non_existent_instance";
+    InstanceConfig instanceConfig = new InstanceConfig(INSTANCE_NAME + "TEST");
+    ZNRecord record = instanceConfig.getRecord();
+    record.getSimpleFields().put("TestSimple", "value");
+    record.getMapFields().put("TestMap", ImmutableMap.of("key", "value"));
+    record.getListFields().put("TestList", Arrays.asList("e1", "e2", "e3"));
+
+    Entity entity =
+        Entity.entity(OBJECT_MAPPER.writeValueAsString(record), MediaType.APPLICATION_JSON_TYPE);
+    post("clusters/" + CLUSTER_NAME + "/instances/" + instanceName + "/configs", null, entity,
+        Response.Status.NOT_FOUND.getStatusCode());
+  }
 }


[2/3] helix git commit: [HELIX-777] TASK: Handle null currentState for unscheduled tasks

Posted by jx...@apache.org.
[HELIX-777] TASK: Handle null currentState for unscheduled tasks

It was observed that when a workflow is submitted and the Controller attempts to schedule its tasks, ZK read fails to read the appropriate job's context, causing the job to be stuck in an unscheduled state. The job remained unscheduled because it had no currentStates, and its job context did not contain any assignment/state information. This RB fixes such stuck states by detecting null currentStates.
Changelist:
1. Check if currentState is null and if it is, manually assign an INIT state


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/5d24ed54
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/5d24ed54
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/5d24ed54

Branch: refs/heads/master
Commit: 5d24ed544898ff69f289f54be71a04413735d118
Parents: 6090732
Author: Hunter Lee <hu...@linkedin.com>
Authored: Wed Oct 31 14:21:49 2018 -0700
Committer: Hunter Lee <hu...@linkedin.com>
Committed: Wed Oct 31 17:17:16 2018 -0700

----------------------------------------------------------------------
 .../helix/task/AbstractTaskDispatcher.java      | 213 +++++++++----------
 1 file changed, 106 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/5d24ed54/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index 617263b..aa72f2d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -23,7 +23,6 @@ import org.apache.helix.model.Partition;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.helix.task.assigner.AssignableInstance;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,7 +72,6 @@ public abstract class AbstractTaskDispatcher {
         // this pending state transition, essentially "waiting" until this pending message clears
         Message pendingMessage =
             currStateOutput.getPendingMessage(jobResource, new Partition(pName), instance);
-
         if (pendingMessage != null && !pendingMessage.getToState().equals(currState.name())) {
           // If there is a pending message whose destination state is different from the current
           // state, just make the same assignment as the pending message. This is essentially
@@ -125,26 +123,26 @@ public abstract class AbstractTaskDispatcher {
         }
 
         switch (currState) {
-          case RUNNING: {
-            TaskPartitionState nextState = TaskPartitionState.RUNNING;
-            if (jobState == TaskState.TIMING_OUT) {
-              nextState = TaskPartitionState.TASK_ABORTED;
-            } else if (jobTgtState == TargetState.STOP) {
-              nextState = TaskPartitionState.STOPPED;
-            } else if (jobState == TaskState.ABORTED || jobState == TaskState.FAILED
-                || jobState == TaskState.FAILING || jobState == TaskState.TIMED_OUT) {
-              // Drop tasks if parent job is not in progress
-              paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name()));
-              break;
-            }
+        case RUNNING: {
+          TaskPartitionState nextState = TaskPartitionState.RUNNING;
+          if (jobState == TaskState.TIMING_OUT) {
+            nextState = TaskPartitionState.TASK_ABORTED;
+          } else if (jobTgtState == TargetState.STOP) {
+            nextState = TaskPartitionState.STOPPED;
+          } else if (jobState == TaskState.ABORTED || jobState == TaskState.FAILED
+              || jobState == TaskState.FAILING || jobState == TaskState.TIMED_OUT) {
+            // Drop tasks if parent job is not in progress
+            paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name()));
+            break;
+          }
 
-            paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
-            assignedPartitions.add(pId);
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
-                  nextState, instance));
-            }
+          paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
+          assignedPartitions.add(pId);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
+                nextState, instance));
           }
+        }
           break;
         case STOPPED: {
           // TODO: This case statement might be unreachable code - Hunter
@@ -166,106 +164,107 @@ public abstract class AbstractTaskDispatcher {
           assignedPartitions.add(pId);
 
           if (LOG.isDebugEnabled()) {
-            LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName, nextState, instance));
+            LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
+                nextState, instance));
           }
         }
           break;
-          case COMPLETED: {
-            // The task has completed on this partition. Mark as such in the context object.
-            donePartitions.add(pId);
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(String.format(
-                  "Task partition %s has completed with state %s. Marking as such in rebalancer context.",
-                  pName, currState));
-            }
-            partitionsToDropFromIs.add(pId);
-            markPartitionCompleted(jobCtx, pId);
-
-            // This task is COMPLETED, so release this task
-            assignableInstance.release(taskConfig, quotaType);
+        case COMPLETED: {
+          // The task has completed on this partition. Mark as such in the context object.
+          donePartitions.add(pId);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format(
+                "Task partition %s has completed with state %s. Marking as such in rebalancer context.",
+                pName, currState));
           }
+          partitionsToDropFromIs.add(pId);
+          markPartitionCompleted(jobCtx, pId);
+
+          // This task is COMPLETED, so release this task
+          assignableInstance.release(taskConfig, quotaType);
+        }
           break;
-          case TIMED_OUT:
+        case TIMED_OUT:
 
-          case TASK_ERROR:
+        case TASK_ERROR:
 
-          case TASK_ABORTED:
+        case TASK_ABORTED:
 
-          case ERROR: {
-            donePartitions.add(pId); // The task may be rescheduled on a different instance.
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(String.format(
-                  "Task partition %s has error state %s with msg %s. Marking as such in rebalancer context.",
-                  pName, currState, jobCtx.getPartitionInfo(pId)));
-            }
-            markPartitionError(jobCtx, pId, currState, true);
-            // The error policy is to fail the task as soon a single partition fails for a specified
-            // maximum number of attempts or task is in ABORTED state.
-            // But notice that if job is TIMED_OUT, aborted task won't be treated as fail and won't
-            // cause job fail.
-            // After all tasks are aborted, they will be dropped, because of job timeout.
-            if (jobState != TaskState.TIMED_OUT && jobState != TaskState.TIMING_OUT) {
-              if (jobCtx.getPartitionNumAttempts(pId) >= jobCfg.getMaxAttemptsPerTask()
-                  || currState.equals(TaskPartitionState.TASK_ABORTED)
-                  || currState.equals(TaskPartitionState.ERROR)) {
-                skippedPartitions.add(pId);
-                partitionsToDropFromIs.add(pId);
-                if (LOG.isDebugEnabled()) {
-                  LOG.debug("skippedPartitions:" + skippedPartitions);
-                }
-              } else {
-                // Mark the task to be started at some later time (if enabled)
-                markPartitionDelayed(jobCfg, jobCtx, pId);
+        case ERROR: {
+          donePartitions.add(pId); // The task may be rescheduled on a different instance.
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format(
+                "Task partition %s has error state %s with msg %s. Marking as such in rebalancer context.",
+                pName, currState, jobCtx.getPartitionInfo(pId)));
+          }
+          markPartitionError(jobCtx, pId, currState, true);
+          // The error policy is to fail the task as soon a single partition fails for a specified
+          // maximum number of attempts or task is in ABORTED state.
+          // But notice that if job is TIMED_OUT, aborted task won't be treated as fail and won't
+          // cause job fail.
+          // After all tasks are aborted, they will be dropped, because of job timeout.
+          if (jobState != TaskState.TIMED_OUT && jobState != TaskState.TIMING_OUT) {
+            if (jobCtx.getPartitionNumAttempts(pId) >= jobCfg.getMaxAttemptsPerTask()
+                || currState.equals(TaskPartitionState.TASK_ABORTED)
+                || currState.equals(TaskPartitionState.ERROR)) {
+              skippedPartitions.add(pId);
+              partitionsToDropFromIs.add(pId);
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("skippedPartitions:" + skippedPartitions);
               }
+            } else {
+              // Mark the task to be started at some later time (if enabled)
+              markPartitionDelayed(jobCfg, jobCtx, pId);
             }
-            // Release this task
-            assignableInstance.release(taskConfig, quotaType);
           }
+          // Release this task
+          assignableInstance.release(taskConfig, quotaType);
+        }
           break;
-          case INIT: {
-            // INIT is a temporary state for tasks
-            // Two possible scenarios for INIT:
-            // 1. Task is getting scheduled for the first time. In this case, Task's state will go
-            // from null->INIT->RUNNING, and this INIT state will be transient and very short-lived
-            // 2. Task is getting scheduled for the first time, but in this case, job is timed out or
-            // timing out. In this case, it will be sent back to INIT state to be removed. Here we
-            // ensure that this task then goes from INIT to DROPPED so that it will be released from
-            // AssignableInstance to prevent resource leak
-            if (jobState == TaskState.TIMED_OUT || jobState == TaskState.TIMING_OUT
-                || jobTgtState == TargetState.DELETE) {
-              // Job is timed out or timing out or targetState is to be deleted, so its tasks will be
-              // sent back to INIT
-              // In this case, tasks' IdealState will be removed, and they will be sent to DROPPED
-              partitionsToDropFromIs.add(pId);
+        case INIT: {
+          // INIT is a temporary state for tasks
+          // Two possible scenarios for INIT:
+          // 1. Task is getting scheduled for the first time. In this case, Task's state will go
+          // from null->INIT->RUNNING, and this INIT state will be transient and very short-lived
+          // 2. Task is getting scheduled for the first time, but in this case, job is timed out or
+          // timing out. In this case, it will be sent back to INIT state to be removed. Here we
+          // ensure that this task then goes from INIT to DROPPED so that it will be released from
+          // AssignableInstance to prevent resource leak
+          if (jobState == TaskState.TIMED_OUT || jobState == TaskState.TIMING_OUT
+              || jobTgtState == TargetState.DELETE) {
+            // Job is timed out or timing out or targetState is to be deleted, so its tasks will be
+            // sent back to INIT
+            // In this case, tasks' IdealState will be removed, and they will be sent to DROPPED
+            partitionsToDropFromIs.add(pId);
 
-              // Also release resources for these tasks
-              assignableInstance.release(taskConfig, quotaType);
+            // Also release resources for these tasks
+            assignableInstance.release(taskConfig, quotaType);
 
-            } else if (jobState == TaskState.IN_PROGRESS
-                && (jobTgtState != TargetState.STOP && jobTgtState != TargetState.DELETE)) {
-              // Job is in progress, implying that tasks are being re-tried, so set it to RUNNING
-              paMap.put(pId,
-                  new JobRebalancer.PartitionAssignment(instance, TaskPartitionState.RUNNING.name()));
-              assignedPartitions.add(pId);
-            }
+          } else if (jobState == TaskState.IN_PROGRESS
+              && (jobTgtState != TargetState.STOP && jobTgtState != TargetState.DELETE)) {
+            // Job is in progress, implying that tasks are being re-tried, so set it to RUNNING
+            paMap.put(pId,
+                new JobRebalancer.PartitionAssignment(instance, TaskPartitionState.RUNNING.name()));
+            assignedPartitions.add(pId);
           }
+        }
 
-          case DROPPED: {
-            // currState in [INIT, DROPPED]. Do nothing, the partition is eligible to be reassigned.
-            donePartitions.add(pId);
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(String.format(
-                  "Task partition %s has state %s. It will be dropped from the current ideal state.",
-                  pName, currState));
-            }
-            // If it's DROPPED, release this task. If INIT, do not release
-            if (currState == TaskPartitionState.DROPPED) {
-              assignableInstance.release(taskConfig, quotaType);
-            }
+        case DROPPED: {
+          // currState in [INIT, DROPPED]. Do nothing, the partition is eligible to be reassigned.
+          donePartitions.add(pId);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format(
+                "Task partition %s has state %s. It will be dropped from the current ideal state.",
+                pName, currState));
+          }
+          // If it's DROPPED, release this task. If INIT, do not release
+          if (currState == TaskPartitionState.DROPPED) {
+            assignableInstance.release(taskConfig, quotaType);
           }
+        }
           break;
-          default:
-            throw new AssertionError("Unknown enum symbol: " + currState);
+        default:
+          throw new AssertionError("Unknown enum symbol: " + currState);
         }
       }
 
@@ -301,7 +300,8 @@ public abstract class AbstractTaskDispatcher {
         currentStateOutput.getCurrentState(jobResource, new Partition(pName), instance);
     if (currentStateString == null) {
       // Task state is either DROPPED or INIT
-      return jobCtx.getPartitionState(pId);
+      TaskPartitionState stateFromContext = jobCtx.getPartitionState(pId);
+      return stateFromContext == null ? TaskPartitionState.INIT : stateFromContext;
     }
     TaskPartitionState currentState = TaskPartitionState.valueOf(currentStateString);
     jobCtx.setPartitionState(pId, currentState);
@@ -859,7 +859,6 @@ public abstract class AbstractTaskDispatcher {
         incomplete = true;
       }
     }
-
     if (!incomplete && cfg.isTerminable()) {
       ctx.setWorkflowState(TaskState.COMPLETED);
       return true;
@@ -922,7 +921,7 @@ public abstract class AbstractTaskDispatcher {
     long nextRebalanceTime = _rebalanceScheduler.getRebalanceTime(resourceId);
     if (nextTimeout >= System.currentTimeMillis()
         && (nextRebalanceTime == TaskConstants.DEFAULT_NEVER_TIMEOUT
-        || nextTimeout < nextRebalanceTime)) {
+            || nextTimeout < nextRebalanceTime)) {
       _rebalanceScheduler.scheduleRebalance(_manager, resourceId, nextTimeout);
     }
   }
@@ -942,8 +941,8 @@ public abstract class AbstractTaskDispatcher {
   private long getTimeoutTime(long startTime, long timeoutPeriod) {
     return (timeoutPeriod == TaskConstants.DEFAULT_NEVER_TIMEOUT
         || timeoutPeriod > Long.MAX_VALUE - startTime) // check long overflow
-        ? TaskConstants.DEFAULT_NEVER_TIMEOUT
-        : startTime + timeoutPeriod;
+            ? TaskConstants.DEFAULT_NEVER_TIMEOUT
+            : startTime + timeoutPeriod;
   }
 
   /**


[3/3] helix git commit: [HELIX-778] TASK: Fix a race condition in updatePreviousAssignedTasksStatus

Posted by jx...@apache.org.
[HELIX-778] TASK: Fix a race condition in updatePreviousAssignedTasksStatus

It was observed that TestUnregisteredCommand is very unstable. The reason was identified to be a race condition where when a task fails, sometimes a pending message for that task (from INIT to RUNNING) wasn't being cleaned up on time, so AbstractTaskDispatcher's updatePreviousAssignedTasksStatus would try to process that message and skip the status update of that task (like updating its status and NUM_ATTEMPTS field in JobContext).

A short, temporary fix is to call markPartitionError() prior to checking the pending message, but over the long haul, we would need to revisit the task status update's design here to avoid this type of race conditions.

Changelist:
1. Move markPartitionError() up before checking for a pending message on the task
2. Fix TestUnregisteredCommand's instability


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/ceba1a55
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/ceba1a55
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/ceba1a55

Branch: refs/heads/master
Commit: ceba1a55ae351090144c001324f908f2364212a4
Parents: 5d24ed5
Author: Hunter Lee <hu...@linkedin.com>
Authored: Wed Oct 31 17:20:37 2018 -0700
Committer: Hunter Lee <hu...@linkedin.com>
Committed: Wed Oct 31 17:20:37 2018 -0700

----------------------------------------------------------------------
 .../apache/helix/task/AbstractTaskDispatcher.java    | 15 ++++++++++++---
 .../integration/task/TestUnregisteredCommand.java    |  3 ++-
 2 files changed, 14 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/ceba1a55/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index aa72f2d..cbf9fb8 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -67,6 +67,16 @@ public abstract class AbstractTaskDispatcher {
         TaskPartitionState currState = updateJobContextAndGetTaskCurrentState(currStateOutput,
             jobResource, pId, pName, instance, jobCtx);
 
+        // This avoids a race condition in the case that although currentState is in the following
+        // error condition, the pending message (INIT->RUNNNING) might still be present.
+        // This is undesirable because this prevents JobContext from getting the proper update of
+        // fields including task state and task's NUM_ATTEMPTS
+        if (currState == TaskPartitionState.ERROR || currState == TaskPartitionState.TASK_ERROR
+            || currState == TaskPartitionState.TIMED_OUT
+            || currState == TaskPartitionState.TASK_ABORTED) {
+          markPartitionError(jobCtx, pId, currState, true);
+        }
+
         // Check for pending state transitions on this (partition, instance). If there is a pending
         // state transition, we prioritize this pending state transition and set the assignment from
         // this pending state transition, essentially "waiting" until this pending message clears
@@ -197,7 +207,6 @@ public abstract class AbstractTaskDispatcher {
                 "Task partition %s has error state %s with msg %s. Marking as such in rebalancer context.",
                 pName, currState, jobCtx.getPartitionInfo(pId)));
           }
-          markPartitionError(jobCtx, pId, currState, true);
           // The error policy is to fail the task as soon a single partition fails for a specified
           // maximum number of attempts or task is in ABORTED state.
           // But notice that if job is TIMED_OUT, aborted task won't be treated as fail and won't
@@ -239,7 +248,6 @@ public abstract class AbstractTaskDispatcher {
 
             // Also release resources for these tasks
             assignableInstance.release(taskConfig, quotaType);
-
           } else if (jobState == TaskState.IN_PROGRESS
               && (jobTgtState != TargetState.STOP && jobTgtState != TargetState.DELETE)) {
             // Job is in progress, implying that tasks are being re-tried, so set it to RUNNING
@@ -940,7 +948,8 @@ public abstract class AbstractTaskDispatcher {
 
   private long getTimeoutTime(long startTime, long timeoutPeriod) {
     return (timeoutPeriod == TaskConstants.DEFAULT_NEVER_TIMEOUT
-        || timeoutPeriod > Long.MAX_VALUE - startTime) // check long overflow
+        || timeoutPeriod > Long.MAX_VALUE - startTime)
+            // check long overflow
             ? TaskConstants.DEFAULT_NEVER_TIMEOUT
             : startTime + timeoutPeriod;
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/ceba1a55/helix-core/src/test/java/org/apache/helix/integration/task/TestUnregisteredCommand.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUnregisteredCommand.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestUnregisteredCommand.java
index 95a9be4..6f78cc0 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestUnregisteredCommand.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestUnregisteredCommand.java
@@ -37,7 +37,8 @@ public class TestUnregisteredCommand extends TaskTestBase {
     super.beforeClass();
   }
 
-  @Test public void testUnregisteredCommand() throws InterruptedException {
+  @Test
+  public void testUnregisteredCommand() throws InterruptedException {
     String workflowName = TestHelper.getTestMethodName();
     Workflow.Builder builder = new Workflow.Builder(workflowName);