You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by al...@apache.org on 2020/08/10 20:47:02 UTC

[helix] branch master updated: Drop the task related current states for new session (#1221)

This is an automated email from the ASF dual-hosted git repository.

alizamus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e17bd9  Drop the task related current states for new session (#1221)
1e17bd9 is described below

commit 1e17bd9abf8adcba0fb3f382a5416b66867f500e
Author: Ali Reza Zamani Zadeh Najari <an...@linkedin.com>
AuthorDate: Mon Aug 10 13:46:51 2020 -0700

    Drop the task related current states for new session (#1221)
    
    Since the controller is not calculating quota based on current state,
    there is no need to set requested state to DROPPED and ask controller
    to drop the current state. Instead, in this commit, the participant
    can drop the current state right away.
---
 .../helix/manager/zk/CurStateCarryOverUpdater.java |   8 --
 .../helix/manager/zk/ParticipantManager.java       |   7 ++
 .../integration/task/TestTaskCurrentStateDrop.java | 130 +++++++++++++++++++++
 3 files changed, 137 insertions(+), 8 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CurStateCarryOverUpdater.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CurStateCarryOverUpdater.java
index 3fbc077..a00c337 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CurStateCarryOverUpdater.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CurStateCarryOverUpdater.java
@@ -61,19 +61,11 @@ class CurStateCarryOverUpdater implements DataUpdater<ZNRecord> {
     }
 
     for (String partitionName : _lastCurState.getPartitionStateMap().keySet()) {
-      // For tasks, we preserve previous session's CurrentStates and set RequestState to DROPPED so
-      // that they will be dropped by the Controller
-      if (_lastCurState.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)) {
-        curState.setState(partitionName, _lastCurState.getState(partitionName));
-        curState.setRequestedState(partitionName, TaskPartitionState.DROPPED.name());
-      } else {
         // carry-over only when current-state does not exist for regular Helix resource partitions
         if (curState.getState(partitionName) == null) {
           curState.setState(partitionName, _initState);
         }
-      }
     }
     return curState.getRecord();
   }
-
 }
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
index 88e7f7b..b50617c 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
@@ -52,6 +52,7 @@ import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory;
+import org.apache.helix.task.TaskConstants;
 import org.apache.helix.util.HelixUtil;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -350,6 +351,12 @@ public class ParticipantManager {
                   + lastCurState);
           continue;
         }
+
+        // If the the current state is related to tasks, there is no need to carry it over to new session.
+        if (stateModelDefRef.equals(TaskConstants.STATE_MODEL_NAME)) {
+          continue;
+        }
+
         StateModelDefinition stateModel =
             _dataAccessor.getProperty(_keyBuilder.stateModelDef(stateModelDefRef));
 
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStateDrop.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStateDrop.java
new file mode 100644
index 0000000..627a7b9
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStateDrop.java
@@ -0,0 +1,130 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkTestHelper;
+import org.apache.helix.model.MasterSlaveSMD;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskPartitionState;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.zookeeper.data.Stat;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+
+/**
+ * This test makes sure that the Current State of the task are being removed after participant
+ * handles new session.
+ */
+public class TestTaskCurrentStateDrop extends TaskTestBase {
+  private static final String DATABASE = WorkflowGenerator.DEFAULT_TGT_DB;
+  protected HelixDataAccessor _accessor;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _numPartitions = 1;
+    _numNodes = 1;
+    super.beforeClass();
+  }
+
+  @AfterClass()
+  public void afterClass() throws Exception {
+    super.afterClass();
+  }
+
+  @Test
+  public void testCurrentStateDropAfterReconnecting() throws Exception {
+    String jobQueueName = TestHelper.getTestMethodName();
+    JobConfig.Builder jobBuilder0 =
+        new JobConfig.Builder().setWorkflow(jobQueueName).setTargetResource(DATABASE)
+            .setTargetPartitionStates(Sets.newHashSet(MasterSlaveSMD.States.MASTER.name()))
+            .setCommand(MockTask.TASK_COMMAND)
+            .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "100000"));
+
+    JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName);
+    jobQueue.enqueueJob("JOB0", jobBuilder0);
+
+    _driver.start(jobQueue.build());
+
+    String namespacedJobName = TaskUtil.getNamespacedJobName(jobQueueName, "JOB0");
+
+    _driver.pollForJobState(jobQueueName, namespacedJobName, TaskState.IN_PROGRESS);
+
+    // Make sure task is in running state
+    Assert.assertTrue(TestHelper.verify(
+        () -> (TaskPartitionState.RUNNING
+            .equals(_driver.getJobContext(namespacedJobName).getPartitionState(0))),
+        TestHelper.WAIT_DURATION));
+
+    // Get the current states of Participant0
+    String instanceP0 = PARTICIPANT_PREFIX + "_" + (_startPort + 0);
+    ZkClient clientP0 = (ZkClient) _participants[0].getZkClient();
+    String sessionIdP0 = ZkTestHelper.getSessionId(clientP0);
+    String taskCurrentStatePathP0 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP0
+        + "/CURRENTSTATES/" + sessionIdP0 + "/" + namespacedJobName;
+    String dataBaseCurrentStatePathP0 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP0
+        + "/CURRENTSTATES/" + sessionIdP0 + "/" + DATABASE;
+
+    // Read the current states of Participant0 and make sure they been created
+    boolean isCurrentStateCreated = TestHelper.verify(() -> {
+      ZNRecord recordTask = _manager.getHelixDataAccessor().getBaseDataAccessor()
+          .get(taskCurrentStatePathP0, new Stat(), AccessOption.PERSISTENT);
+      ZNRecord recordDataBase = _manager.getHelixDataAccessor().getBaseDataAccessor()
+          .get(dataBaseCurrentStatePathP0, new Stat(), AccessOption.PERSISTENT);
+      return (recordTask != null && recordDataBase != null);
+    }, TestHelper.WAIT_DURATION);
+    Assert.assertTrue(isCurrentStateCreated);
+
+    // Stop the controller to make sure controller does not sent any message to participants inorder
+    // to drop the current states
+    _controller.syncStop();
+
+    // restart the participant0 and make sure task related current state has not been carried over
+    stopParticipant(0);
+    startParticipant(0);
+
+    clientP0 = (ZkClient) _participants[0].getZkClient();
+    String newSessionIdP0 = ZkTestHelper.getSessionId(clientP0);
+    String newTaskCurrentStatePathP0 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP0
+        + "/CURRENTSTATES/" + newSessionIdP0 + "/" + namespacedJobName;
+    String newDataBaseCurrentStatePathP0 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP0
+        + "/CURRENTSTATES/" + newSessionIdP0 + "/" + DATABASE;
+
+    boolean isCurrentStateExpected = TestHelper.verify(() -> {
+      ZNRecord taskRecord = _manager.getHelixDataAccessor().getBaseDataAccessor()
+          .get(newTaskCurrentStatePathP0, new Stat(), AccessOption.PERSISTENT);
+      ZNRecord dataBase = _manager.getHelixDataAccessor().getBaseDataAccessor()
+          .get(newDataBaseCurrentStatePathP0, new Stat(), AccessOption.PERSISTENT);
+      return (taskRecord == null && dataBase != null);
+    }, TestHelper.WAIT_DURATION);
+    Assert.assertTrue(isCurrentStateExpected);
+  }
+}