You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/08/14 18:48:37 UTC

[helix] branch dynamically-loaded-task updated: Dynamically Load Tasks in Task Framework (#1169)

This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch dynamically-loaded-task
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/dynamically-loaded-task by this push:
     new e8b2938  Dynamically Load Tasks in Task Framework (#1169)
e8b2938 is described below

commit e8b29387146a6ce6951b331735bd4ef53ca8a531
Author: rabashizade <67...@users.noreply.github.com>
AuthorDate: Fri Aug 14 14:48:30 2020 -0400

    Dynamically Load Tasks in Task Framework (#1169)
    
    Adds loadNewTask() to TaskStateModel to support dynamically
    loading tasks from JAR files by reading the dynamic task configs
    from ZooKeeper at runtime.
    
    Tests are added to TestDynamicTaskLoading.
---
 .../java/org/apache/helix/task/TaskStateModel.java |  86 ++++++-
 .../apache/helix/task/TestDynamicTaskLoading.java  | 274 +++++++++++++++++++++
 helix-core/src/test/resources/Reindex.jar          | Bin 0 -> 6293 bytes
 3 files changed, 357 insertions(+), 3 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
index 705f118..43814ce 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
@@ -19,18 +19,23 @@ package org.apache.helix.task;
  * under the License.
  */
 
+import java.net.URL;
+import java.net.URLClassLoader;
 import java.util.Map;
 import java.util.TimerTask;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.helix.AccessOption;
 import org.apache.helix.HelixManager;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.model.Message;
 import org.apache.helix.participant.statemachine.StateModel;
 import org.apache.helix.participant.statemachine.StateModelInfo;
 import org.apache.helix.participant.statemachine.Transition;
+import org.apache.helix.task.api.JarLoader;
+import org.apache.helix.task.api.LocalJarLoader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -283,6 +288,78 @@ public class TaskStateModel extends StateModel {
     }
   }
 
+  /**
+   * Loads className using classLoader
+   * @param classLoader
+   * @param className
+   * @return Class className loaded by classLoader
+   */
+  private Class loadClass(URLClassLoader classLoader, String className) {
+    try {
+      return classLoader.loadClass(className);
+    } catch (ClassNotFoundException e) {
+      String errorMessage =
+          "Failed to load Task class " + className + " for new task in instance " + _manager
+              .getInstanceName() + " in cluster " + _manager.getClusterName() + ".";
+      LOG.error(errorMessage);
+      throw new IllegalStateException(errorMessage);
+    }
+  }
+
+  /**
+   * Loads Task and TaskFactory classes for command input from
+   * a JAR file, and registers the TaskFactory in _taskFactoryRegistry.
+   * @param command The command indicating what task to be loaded
+   */
+  private void loadNewTask(String command) {
+    // If the path for dynamic tasks doesn't exist, skip loading the task
+    if (!_manager.getHelixDataAccessor().getBaseDataAccessor()
+        .exists(TaskConstants.DYNAMICALLY_LOADED_TASK_PATH, 0)) {
+      String errorMessage = "Path for dynamic tasks doesn't exist!";
+      LOG.error(errorMessage);
+      throw new IllegalStateException(errorMessage);
+    }
+
+    // Read DynamicTaskConfig containing task definition information.
+    DynamicTaskConfig taskConfig = new DynamicTaskConfig(
+        _manager.getHelixDataAccessor().getBaseDataAccessor()
+            .get(TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/" + command, null,
+                ~AccessOption.THROW_EXCEPTION_IFNOTEXIST));
+    if (taskConfig.getTaskConfigZNRecord() == null) {
+      String errorMessage =
+          "Failed to read ZNRecord for task " + command + " for instance " + _manager
+              .getInstanceName() + " in cluster " + _manager.getClusterName() + ".";
+      LOG.error(errorMessage);
+      throw new IllegalArgumentException(errorMessage);
+    }
+
+    // Open the JAR file containing Task(s) and TaskFactory classes.
+    JarLoader jarLoader = new LocalJarLoader();
+    URL taskJarUrl = jarLoader.loadJar(taskConfig.getJarFilePath());
+
+    // Import Task(s) class(es).
+    URLClassLoader classLoader = URLClassLoader.newInstance(new URL[]{taskJarUrl});
+    for (String taskClass : taskConfig.getTaskClassesFqns()) {
+      loadClass(classLoader, taskClass);
+    }
+
+    TaskFactory taskFactory;
+    try {
+      // Import and instantiate TaskFactory class.
+      taskFactory =
+          (TaskFactory) loadClass(classLoader, taskConfig.getTaskFactoryFqn()).newInstance();
+    } catch (InstantiationException | IllegalAccessException e) {
+      String errorMessage =
+          "Failed to instantiate TaskFactory class for new task in instance " + _manager
+              .getInstanceName() + " in cluster " + _manager.getClusterName() + ".";
+      LOG.error(errorMessage);
+      throw new IllegalStateException(errorMessage);
+    }
+
+    // Register the TaskFactory.
+    _taskFactoryRegistry.put(command, taskFactory);
+  }
+
   private void startTask(Message msg, String taskPartition) {
     JobConfig cfg = TaskUtil.getJobConfig(_manager, msg.getResourceName());
     TaskConfig taskConfig = null;
@@ -313,9 +390,12 @@ public class TaskStateModel extends StateModel {
     callbackContext.setTaskConfig(taskConfig);
 
     // Create a task instance with this command
-    if (command == null || _taskFactoryRegistry == null
-        || !_taskFactoryRegistry.containsKey(command)) {
-      throw new IllegalStateException("No callback implemented(or not registered) for task " + command);
+    if (command == null || _taskFactoryRegistry == null) {
+      throw new IllegalStateException("Null command for task " + command);
+    }
+    // If the task isn't registered, load the appropriate Task and TaskFactory classes
+    if (!_taskFactoryRegistry.containsKey(command)) {
+      loadNewTask(command);
     }
     TaskFactory taskFactory = _taskFactoryRegistry.get(command);
     Task task = taskFactory.createNewTask(callbackContext);
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestDynamicTaskLoading.java b/helix-core/src/test/java/org/apache/helix/task/TestDynamicTaskLoading.java
new file mode 100644
index 0000000..cb184d2
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/task/TestDynamicTaskLoading.java
@@ -0,0 +1,274 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Unit tests for dynamically-loaded tasks. Different scenarios
+ * for success and failure are tested. Failure scenarios include
+ * non-existing JAR file, non-existing DynamicTaskConfig in
+ * ZooKeeper, non-existing Task class, and non-existing
+ * TaskFactory class.
+ */
+public class TestDynamicTaskLoading extends ZkTestBase {
+  private static final String CLUSTER_NAME = CLUSTER_PREFIX + "_TestDynamicTaskLoading";
+  private static final String INSTANCE_NAME = "localhost_12913";
+  private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
+  private HelixManager _manager;
+  private MockParticipantManager _participant;
+  private ClusterControllerManager _controller;
+
+  @BeforeMethod
+  public void beforeMethod() throws Exception {
+    // BeforeMethod is required (as opposed to BeforeClass)
+    // to make sure each test starts with a fresh state.
+    // This is important because we are testing dynamically
+    // loading classes, and the tests would break if the
+    // classes are already loaded.
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+
+    _manager = HelixManagerFactory
+        .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    _gSetupTool.addInstanceToCluster(CLUSTER_NAME, INSTANCE_NAME);
+    _participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, INSTANCE_NAME);
+    StateModelFactory<StateModel> stateModelFactory =
+        new MasterSlaveStateModelFactory(INSTANCE_NAME);
+    StateMachineEngine stateMach = _participant.getStateMachineEngine();
+    stateMach.registerStateModelFactory(MASTER_SLAVE_STATE_MODEL, stateModelFactory);
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+    _participant.getStateMachineEngine().registerStateModelFactory(TaskConstants.STATE_MODEL_NAME,
+        new TaskStateModelFactory(_participant, taskFactoryReg));
+    _participant.syncStart();
+
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, null);
+    _controller.syncStart();
+  }
+
+  private void removePathIfExists(String path) {
+    if (_manager.getHelixDataAccessor().getBaseDataAccessor().exists(path, 0)) {
+      _manager.getHelixDataAccessor().getBaseDataAccessor().remove(path, 0);
+    }
+  }
+
+  /**
+   * Submit a workflow consisting of a job with a MockTask task.
+   * @param workflowName name of the workflow
+   * @param driver {@link TaskDriver} to submit workflowName to
+   */
+  private void submitWorkflow(String workflowName, TaskDriver driver) {
+    JobConfig.Builder job = new JobConfig.Builder();
+    job.setJobCommandConfigMap(Collections.singletonMap(MockTask.JOB_DELAY, "100"));
+    Workflow.Builder workflow = new Workflow.Builder(workflowName);
+    job.setWorkflow(workflowName);
+    TaskConfig taskConfig =
+        new TaskConfig(MockTask.TASK_COMMAND, new HashMap<String, String>(), null, null);
+    job.addTaskConfigMap(Collections.singletonMap(taskConfig.getId(), taskConfig));
+    job.setJobId(TaskUtil.getNamespacedJobName(workflowName, "JOB"));
+    workflow.addJob("JOB", job);
+    driver.start(workflow.build());
+  }
+
+  @Test
+  public void testDynamicTaskLoading() throws Exception {
+    String taskCommand = "Reindex";
+    String taskJarPath = "src/test/resources/Reindex.jar";
+    String taskVersion = "1.0.0";
+    String fullyQualifiedTaskClassName = "com.mycompany.mocktask.MockTask";
+    String fullyQualifiedTaskFactoryClassName = "com.mycompany.mocktask.MockTaskFactory";
+
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList();
+    taskClasses.add(fullyQualifiedTaskClassName);
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig(taskCommand, taskJarPath, taskVersion, taskClasses,
+            fullyQualifiedTaskFactoryClassName);
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/" + taskCommand;
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    // Wait for the workflow to either complete or fail.
+    TaskState finalState =
+        driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+    Assert.assertEquals(finalState, TaskState.COMPLETED);
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingJar() throws Exception {
+    String taskCommand = "Reindex";
+    String taskJarPath = "src/test/resources/Random.jar";
+    String taskVersion = "1.0.0";
+    String fullyQualifiedTaskClassName = "com.mycompany.mocktask.MockTask";
+    String fullyQualifiedTaskFactoryClassName = "com.mycompany.mocktask.MockTaskFactory";
+
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList();
+    taskClasses.add(fullyQualifiedTaskClassName);
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig(taskCommand, taskJarPath, taskVersion, taskClasses,
+            fullyQualifiedTaskFactoryClassName);
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/" + taskCommand;
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    // Wait for the workflow to either complete or fail.
+    TaskState finalState =
+        driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+    Assert.assertEquals(finalState, TaskState.FAILED);
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingTaskConfig() throws Exception {
+    String taskCommand = "Reindex";
+
+    // Remove task config ZNRecord if it exists.
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/" + taskCommand;
+    removePathIfExists(path);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    // Wait for the workflow to either complete or fail.
+    TaskState finalState =
+        driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+    Assert.assertEquals(finalState, TaskState.FAILED);
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingTaskClass() throws Exception {
+    String taskCommand = "Reindex";
+    String taskJarPath = "src/test/resources/Random.jar";
+    String taskVersion = "1.0.0";
+    String fullyQualifiedTaskClassName = "com.mycompany.mocktask.RandomTask";
+    String fullyQualifiedTaskFactoryClassName = "com.mycompany.mocktask.MockTaskFactory";
+
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList();
+    taskClasses.add(fullyQualifiedTaskClassName);
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig(taskCommand, taskJarPath, taskVersion, taskClasses,
+            fullyQualifiedTaskFactoryClassName);
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/" + taskCommand;
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    // Wait for the workflow to either complete or fail.
+    TaskState finalState =
+        driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+    Assert.assertEquals(finalState, TaskState.FAILED);
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingTaskFactory() throws Exception {
+    String taskCommand = "Reindex";
+    String taskJarPath = "src/test/resources/Random.jar";
+    String taskVersion = "1.0.0";
+    String fullyQualifiedTaskClassName = "com.mycompany.mocktask.MockTask";
+    String fullyQualifiedTaskFactoryClassName = "com.mycompany.mocktask.RandomTaskFactory";
+
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList();
+    taskClasses.add(fullyQualifiedTaskClassName);
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig(taskCommand, taskJarPath, taskVersion, taskClasses,
+            fullyQualifiedTaskFactoryClassName);
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/" + taskCommand;
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    // Wait for the workflow to either complete or fail.
+    TaskState finalState =
+        driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+    Assert.assertEquals(finalState, TaskState.FAILED);
+  }
+
+  @AfterMethod
+  public void afterMethod() {
+    // AfterMethod is required (as opposed to AfterClass)
+    // to make sure each test erases all of its state.
+    // This is important because we are testing dynamically
+    // loading classes, and the tests would break if the
+    // classes are already loaded.
+    _controller.syncStop();
+    _manager.disconnect();
+    _participant.syncStop();
+
+    // Shutdown the state model factories to close all threads.
+    StateMachineEngine stateMachine = _participant.getStateMachineEngine();
+    if (stateMachine != null) {
+      StateModelFactory stateModelFactory =
+          stateMachine.getStateModelFactory(TaskConstants.STATE_MODEL_NAME);
+      if (stateModelFactory != null && stateModelFactory instanceof TaskStateModelFactory) {
+        ((TaskStateModelFactory) stateModelFactory).shutdownNow();
+      }
+    }
+    deleteCluster(CLUSTER_NAME);
+  }
+}
diff --git a/helix-core/src/test/resources/Reindex.jar b/helix-core/src/test/resources/Reindex.jar
new file mode 100644
index 0000000..0c417b3
Binary files /dev/null and b/helix-core/src/test/resources/Reindex.jar differ