You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by GitBox <gi...@apache.org> on 2020/07/24 01:52:55 UTC

[GitHub] [helix] rabashizade opened a new pull request #1169: [HELIX-1168] Dynamically Load Tasks in Task Framework

rabashizade opened a new pull request #1169:
URL: https://github.com/apache/helix/pull/1169


   ### Issues
   
   - [ x] My PR addresses the following Helix issues and references them in the PR description:
   
   Fixes #1168 
   
   ### Description
   
   - [x ] Here are some details about my PR, including screenshots of any UI changes:
   
   In this PR, loadNewTask() method is added to TaskStateModel to dynamically load tasks that are not already defined in task factory registry. This prevents the need to implement task-specific Participants and thus, enables numerous use cases with minimal extra coding effort beyond implementing the task logic itself.
   
   ### Tests
   
   - [ x] The following tests are written for this issue:
   
   TestDynamicTaskLoading
   
   - [ x] The following is the result of the "mvn test" command on the appropriate module:
   
   [INFO] Results:
   [INFO] 
   [ERROR] Failures: 
   [ERROR]   TestEnableCompression.testEnableCompressionResource:117 expected:<true> but was:<false>
   [ERROR]   TestP2PNoDuplicatedMessage.testP2PStateTransitionEnabled:175 Only 415 out of 704 percent transitions to Master were triggered by expected host! expected:<true> but was:<false>
   [ERROR]   TestWagedRebalance.testChangeIdealState:303->validate:646 expected:<true> but was:<false>
   [ERROR]   TestJobFailureDependence.testWorkflowFailureJobThreshold » ThreadTimeout Metho...
   [ERROR]   TestClusterStateVerifier.afterMethod:98 » IllegalState ZkClient already closed...
   [ERROR]   TestClusterVerifier.testResourceSubset:225 expected:<false> but was:<true>
   [INFO] 
   [ERROR] Tests run: 1156, Failures: 6, Errors: 0, Skipped: 0
   [INFO] 
   [INFO] ------------------------------------------------------------------------
   [INFO] Reactor Summary for Apache Helix 1.0.2-SNAPSHOT:
   [INFO] 
   [INFO] Apache Helix ....................................... SUCCESS [  0.563 s]
   [INFO] Apache Helix :: Metrics Common ..................... SUCCESS [  3.421 s]
   [INFO] Apache Helix :: Metadata Store Directory Common .... SUCCESS [  2.242 s]
   [INFO] Apache Helix :: ZooKeeper API ...................... SUCCESS [ 53.690 s]
   [INFO] Apache Helix :: Helix Common ....................... SUCCESS [  1.298 s]
   [INFO] Apache Helix :: Core ............................... FAILURE [  01:17 h]
   [INFO] Apache Helix :: Admin Webapp ....................... SKIPPED
   [INFO] Apache Helix :: Restful Interface .................. SKIPPED
   [INFO] Apache Helix :: Distributed Lock ................... SKIPPED
   [INFO] Apache Helix :: HelixAgent ......................... SKIPPED
   [INFO] Apache Helix :: Recipes ............................ SKIPPED
   [INFO] Apache Helix :: Recipes :: Rabbitmq Consumer Group . SKIPPED
   [INFO] Apache Helix :: Recipes :: Rsync Replicated File Store SKIPPED
   [INFO] Apache Helix :: Recipes :: distributed lock manager  SKIPPED
   [INFO] Apache Helix :: Recipes :: distributed task execution SKIPPED
   [INFO] Apache Helix :: Recipes :: service discovery ....... SKIPPED
   [INFO] ------------------------------------------------------------------------
   [INFO] BUILD FAILURE
   [INFO] ------------------------------------------------------------------------
   [INFO] Total time:  01:18 h
   [INFO] Finished at: 2020-07-23T17:56:53-07:00
   [INFO] ------------------------------------------------------------------------
   
   ### Commits
   
   - [ x] My commits all reference appropriate Apache Helix GitHub issues in their subject lines. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
     1. Subject is separated from body by a blank line
     1. Subject is limited to 50 characters (not including Jira issue reference)
     1. Subject does not end with a period
     1. Subject uses the imperative mood ("add", not "adding")
     1. Body wraps at 72 characters
     1. Body explains "what" and "why", not "how"
   
   ### Code Quality
   
   - [x ] My diff has been formatted using helix-style.xml 
   (helix-style-intellij.xml if IntelliJ IDE is used)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] rabashizade commented on a change in pull request #1169: Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
rabashizade commented on a change in pull request #1169:
URL: https://github.com/apache/helix/pull/1169#discussion_r470271299



##########
File path: helix-core/src/test/java/org/apache/helix/task/TestDynamicTaskLoading.java
##########
@@ -0,0 +1,251 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestDynamicTaskLoading extends ZkTestBase {
+  private final String _clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
+  private static final String _instanceName = "localhost_12913";
+  private HelixManager _manager;
+  private MockParticipantManager _participant;
+  private ClusterControllerManager _controller;
+
+  @BeforeMethod
+  public void beforeMethod() throws Exception {
+    _gSetupTool.addCluster(_clusterName, true);
+
+    _manager = HelixManagerFactory
+        .getZKHelixManager(_clusterName, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    _gSetupTool.addInstanceToCluster(_clusterName, _instanceName);
+    _participant = new MockParticipantManager(ZK_ADDR, _clusterName, _instanceName);
+    StateModelFactory<StateModel> stateModelFactory =
+        new MasterSlaveStateModelFactory(_instanceName, 0);
+    StateMachineEngine stateMach = _participant.getStateMachineEngine();
+    stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+    _participant.getStateMachineEngine().registerStateModelFactory(TaskConstants.STATE_MODEL_NAME,
+        new TaskStateModelFactory(_participant, taskFactoryReg));
+    _participant.syncStart();
+
+    _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, null);
+    _controller.syncStart();
+  }
+
+  private void removePathIfExists(String path) {
+    if (_manager.getHelixDataAccessor().getBaseDataAccessor().exists(path, 0)) {
+      _manager.getHelixDataAccessor().getBaseDataAccessor().remove(path, 0);
+    }
+  }
+
+  /**
+   * Submit a workflow consisting of a job with a MockTask task.
+   * @param workflowName name of the workflow
+   * @param driver {@link TaskDriver} to submit workflowName to
+   */
+  private void submitWorkflow(String workflowName, TaskDriver driver) {
+    JobConfig.Builder job = new JobConfig.Builder();
+    job.setJobCommandConfigMap(Collections.singletonMap(MockTask.JOB_DELAY, "100"));
+    Workflow.Builder workflow = new Workflow.Builder(workflowName);
+    job.setWorkflow(workflowName);
+    TaskConfig taskConfig =
+        new TaskConfig(MockTask.TASK_COMMAND, new HashMap<String, String>(), null, null);
+    job.addTaskConfigMap(Collections.singletonMap(taskConfig.getId(), taskConfig));
+    job.setJobId(TaskUtil.getNamespacedJobName(workflowName, "JOB"));
+    workflow.addJob("JOB", job);
+    driver.start(workflow.build());
+  }
+
+  @Test
+  public void testDynamicTaskLoading() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.MockTask");
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig("Reindex", "src/test/resources/Reindex.jar", "1.0.0", taskClasses,
+            "com.mycompany.mocktask.MockTaskFactory");
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.COMPLETED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingJar() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.MockTask");
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig("Reindex", "src/test/resources/Random.jar", "1.0.0", taskClasses,
+            "com.mycompany.mocktask.MockTaskFactory");
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.FAILED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingTaskConfig() throws Exception {
+    // Remove task config ZNRecord if it exists.
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.FAILED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingTaskClass() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();

Review comment:
       Thank you for your comment. I agree this might make sense (plus your other comment regarding path building). But I guess it's more appropriate to be addresses in another PR, as you mentioned.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #1169: Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #1169:
URL: https://github.com/apache/helix/pull/1169#discussion_r470080833



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
##########
@@ -283,6 +287,68 @@ public void reset() {
     }
   }
 
+  /**
+   * Loads className using classLoader
+   * @param classLoader
+   * @param className
+   * @return Class className loaded by classLoader
+   */
+  private Class loadClass(URLClassLoader classLoader, String className) {
+    try {
+      return classLoader.loadClass(className);
+    } catch (ClassNotFoundException e) {
+      LOG.error("Failed to load Task class " + className + " for new task in instance " + _manager
+          .getInstanceName() + " in cluster " + _manager.getClusterName() + ".");
+      throw new IllegalStateException("Class not found for task");
+    }
+  }
+
+  /**
+   * Loads Task and TaskFactory classes for command input from
+   * a JAR file, and registers the TaskFactory in _taskFactoryRegistry.
+   * @param command The command indicating what task to be loaded
+   */
+  private void loadNewTask(String command) {
+    // If the path for dynamic tasks doesn't exist, skip loading the task
+    if (!_manager.getHelixDataAccessor().getBaseDataAccessor()
+        .exists(TaskConstants.DYNAMICALLY_LOADED_TASK_PATH, 0)) {
+      return;
+    }
+
+    // Read DynamicTaskConfig containing task definition information.
+    DynamicTaskConfig taskConfig = new DynamicTaskConfig(
+        _manager.getHelixDataAccessor().getBaseDataAccessor()
+            .get(TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/" + command, null, 0));
+    if (taskConfig.getTaskConfigZNRecord() == null) {
+      LOG.error("Failed to read ZNRecord for task " + command + " for instance " + _manager
+          .getInstanceName() + " in cluster " + _manager.getClusterName() + ".");
+      throw new IllegalArgumentException("No ZNRecord for task " + command);
+    }
+
+    // Open the JAR file containing Task(s) and TaskFactory classes.
+    JarLoader jarLoader = new LocalJarLoader();
+    URL taskJarUrl = jarLoader.loadJar(taskConfig.getJarFilePath());
+
+    // Import Task(s) class(es).
+    URLClassLoader classLoader = URLClassLoader.newInstance(new URL[]{taskJarUrl});
+    for (String taskClass : taskConfig.getTaskClassesFqns()) {
+      loadClass(classLoader, taskClass);
+    }
+
+    try {
+      // Import and instantiate TaskFactory class.
+      TaskFactory taskFactory =
+          (TaskFactory) loadClass(classLoader, taskConfig.getTaskFactoryFqn()).newInstance();
+
+      // Register the TaskFactory.
+      _taskFactoryRegistry.put(command, taskFactory);

Review comment:
       nit: this piece of code could sit outside the try-catch?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly merged pull request #1169: Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
narendly merged pull request #1169:
URL: https://github.com/apache/helix/pull/1169


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #1169: Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #1169:
URL: https://github.com/apache/helix/pull/1169#discussion_r470086122



##########
File path: helix-core/src/test/java/org/apache/helix/task/TestDynamicTaskLoading.java
##########
@@ -0,0 +1,251 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestDynamicTaskLoading extends ZkTestBase {
+  private final String _clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
+  private static final String _instanceName = "localhost_12913";
+  private HelixManager _manager;
+  private MockParticipantManager _participant;
+  private ClusterControllerManager _controller;
+
+  @BeforeMethod
+  public void beforeMethod() throws Exception {
+    _gSetupTool.addCluster(_clusterName, true);
+
+    _manager = HelixManagerFactory
+        .getZKHelixManager(_clusterName, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    _gSetupTool.addInstanceToCluster(_clusterName, _instanceName);
+    _participant = new MockParticipantManager(ZK_ADDR, _clusterName, _instanceName);
+    StateModelFactory<StateModel> stateModelFactory =
+        new MasterSlaveStateModelFactory(_instanceName, 0);
+    StateMachineEngine stateMach = _participant.getStateMachineEngine();
+    stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+    _participant.getStateMachineEngine().registerStateModelFactory(TaskConstants.STATE_MODEL_NAME,
+        new TaskStateModelFactory(_participant, taskFactoryReg));
+    _participant.syncStart();
+
+    _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, null);
+    _controller.syncStart();
+  }
+
+  private void removePathIfExists(String path) {
+    if (_manager.getHelixDataAccessor().getBaseDataAccessor().exists(path, 0)) {
+      _manager.getHelixDataAccessor().getBaseDataAccessor().remove(path, 0);
+    }
+  }
+
+  /**
+   * Submit a workflow consisting of a job with a MockTask task.
+   * @param workflowName name of the workflow
+   * @param driver {@link TaskDriver} to submit workflowName to
+   */
+  private void submitWorkflow(String workflowName, TaskDriver driver) {
+    JobConfig.Builder job = new JobConfig.Builder();
+    job.setJobCommandConfigMap(Collections.singletonMap(MockTask.JOB_DELAY, "100"));
+    Workflow.Builder workflow = new Workflow.Builder(workflowName);
+    job.setWorkflow(workflowName);
+    TaskConfig taskConfig =
+        new TaskConfig(MockTask.TASK_COMMAND, new HashMap<String, String>(), null, null);
+    job.addTaskConfigMap(Collections.singletonMap(taskConfig.getId(), taskConfig));
+    job.setJobId(TaskUtil.getNamespacedJobName(workflowName, "JOB"));
+    workflow.addJob("JOB", job);
+    driver.start(workflow.build());
+  }
+
+  @Test
+  public void testDynamicTaskLoading() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.MockTask");
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig("Reindex", "src/test/resources/Reindex.jar", "1.0.0", taskClasses,
+            "com.mycompany.mocktask.MockTaskFactory");
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.COMPLETED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingJar() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.MockTask");
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig("Reindex", "src/test/resources/Random.jar", "1.0.0", taskClasses,
+            "com.mycompany.mocktask.MockTaskFactory");
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.FAILED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingTaskConfig() throws Exception {
+    // Remove task config ZNRecord if it exists.
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.FAILED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }

Review comment:
       Revise (see above)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] rabashizade commented on pull request #1169: Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
rabashizade commented on pull request #1169:
URL: https://github.com/apache/helix/pull/1169#issuecomment-674214950


   This PR is ready to be merged.
   
   Final commit message:
   **Dynamically Load Tasks in Task Framework**
   
   Adds loadNewTask() to TaskStateModel to support dynamically
   loading tasks from JAR files by reading the dynamic task configs
   from ZooKeeper at the runtime.
   
   Appropriate tests are added to TestDynamicTaskLoading.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #1169: Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #1169:
URL: https://github.com/apache/helix/pull/1169#discussion_r470083999



##########
File path: helix-core/src/test/java/org/apache/helix/task/TestDynamicTaskLoading.java
##########
@@ -0,0 +1,251 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestDynamicTaskLoading extends ZkTestBase {
+  private final String _clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
+  private static final String _instanceName = "localhost_12913";
+  private HelixManager _manager;
+  private MockParticipantManager _participant;
+  private ClusterControllerManager _controller;
+
+  @BeforeMethod
+  public void beforeMethod() throws Exception {
+    _gSetupTool.addCluster(_clusterName, true);
+
+    _manager = HelixManagerFactory
+        .getZKHelixManager(_clusterName, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    _gSetupTool.addInstanceToCluster(_clusterName, _instanceName);
+    _participant = new MockParticipantManager(ZK_ADDR, _clusterName, _instanceName);
+    StateModelFactory<StateModel> stateModelFactory =
+        new MasterSlaveStateModelFactory(_instanceName, 0);
+    StateMachineEngine stateMach = _participant.getStateMachineEngine();
+    stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+    _participant.getStateMachineEngine().registerStateModelFactory(TaskConstants.STATE_MODEL_NAME,
+        new TaskStateModelFactory(_participant, taskFactoryReg));
+    _participant.syncStart();
+
+    _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, null);
+    _controller.syncStart();
+  }
+
+  private void removePathIfExists(String path) {
+    if (_manager.getHelixDataAccessor().getBaseDataAccessor().exists(path, 0)) {
+      _manager.getHelixDataAccessor().getBaseDataAccessor().remove(path, 0);
+    }
+  }
+
+  /**
+   * Submit a workflow consisting of a job with a MockTask task.
+   * @param workflowName name of the workflow
+   * @param driver {@link TaskDriver} to submit workflowName to
+   */
+  private void submitWorkflow(String workflowName, TaskDriver driver) {
+    JobConfig.Builder job = new JobConfig.Builder();
+    job.setJobCommandConfigMap(Collections.singletonMap(MockTask.JOB_DELAY, "100"));
+    Workflow.Builder workflow = new Workflow.Builder(workflowName);
+    job.setWorkflow(workflowName);
+    TaskConfig taskConfig =
+        new TaskConfig(MockTask.TASK_COMMAND, new HashMap<String, String>(), null, null);
+    job.addTaskConfigMap(Collections.singletonMap(taskConfig.getId(), taskConfig));
+    job.setJobId(TaskUtil.getNamespacedJobName(workflowName, "JOB"));
+    workflow.addJob("JOB", job);
+    driver.start(workflow.build());
+  }
+
+  @Test
+  public void testDynamicTaskLoading() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.MockTask");
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig("Reindex", "src/test/resources/Reindex.jar", "1.0.0", taskClasses,
+            "com.mycompany.mocktask.MockTaskFactory");

Review comment:
       Pull out your strings and name them as a local variable - for example,
   
   ```
   String fullyQualifiedTaskClassName = "com.mycompany.mocktask.MockTask"; 
   String taskVersion = "1.0.0";
   ```
   ...
   
   for readability.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #1169: Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #1169:
URL: https://github.com/apache/helix/pull/1169#discussion_r470086565



##########
File path: helix-core/src/test/java/org/apache/helix/task/TestDynamicTaskLoading.java
##########
@@ -0,0 +1,251 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestDynamicTaskLoading extends ZkTestBase {
+  private final String _clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
+  private static final String _instanceName = "localhost_12913";
+  private HelixManager _manager;
+  private MockParticipantManager _participant;
+  private ClusterControllerManager _controller;
+
+  @BeforeMethod
+  public void beforeMethod() throws Exception {
+    _gSetupTool.addCluster(_clusterName, true);
+
+    _manager = HelixManagerFactory
+        .getZKHelixManager(_clusterName, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    _gSetupTool.addInstanceToCluster(_clusterName, _instanceName);
+    _participant = new MockParticipantManager(ZK_ADDR, _clusterName, _instanceName);
+    StateModelFactory<StateModel> stateModelFactory =
+        new MasterSlaveStateModelFactory(_instanceName, 0);
+    StateMachineEngine stateMach = _participant.getStateMachineEngine();
+    stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+    _participant.getStateMachineEngine().registerStateModelFactory(TaskConstants.STATE_MODEL_NAME,
+        new TaskStateModelFactory(_participant, taskFactoryReg));
+    _participant.syncStart();
+
+    _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, null);
+    _controller.syncStart();
+  }
+
+  private void removePathIfExists(String path) {
+    if (_manager.getHelixDataAccessor().getBaseDataAccessor().exists(path, 0)) {
+      _manager.getHelixDataAccessor().getBaseDataAccessor().remove(path, 0);
+    }
+  }
+
+  /**
+   * Submit a workflow consisting of a job with a MockTask task.
+   * @param workflowName name of the workflow
+   * @param driver {@link TaskDriver} to submit workflowName to
+   */
+  private void submitWorkflow(String workflowName, TaskDriver driver) {
+    JobConfig.Builder job = new JobConfig.Builder();
+    job.setJobCommandConfigMap(Collections.singletonMap(MockTask.JOB_DELAY, "100"));
+    Workflow.Builder workflow = new Workflow.Builder(workflowName);
+    job.setWorkflow(workflowName);
+    TaskConfig taskConfig =
+        new TaskConfig(MockTask.TASK_COMMAND, new HashMap<String, String>(), null, null);
+    job.addTaskConfigMap(Collections.singletonMap(taskConfig.getId(), taskConfig));
+    job.setJobId(TaskUtil.getNamespacedJobName(workflowName, "JOB"));
+    workflow.addJob("JOB", job);
+    driver.start(workflow.build());
+  }
+
+  @Test
+  public void testDynamicTaskLoading() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.MockTask");
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig("Reindex", "src/test/resources/Reindex.jar", "1.0.0", taskClasses,
+            "com.mycompany.mocktask.MockTaskFactory");
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.COMPLETED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingJar() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.MockTask");
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig("Reindex", "src/test/resources/Random.jar", "1.0.0", taskClasses,
+            "com.mycompany.mocktask.MockTaskFactory");
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.FAILED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingTaskConfig() throws Exception {
+    // Remove task config ZNRecord if it exists.
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.FAILED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingTaskClass() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.RandomTask");
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig("Reindex", "src/test/resources/Reindex.jar", "1.0.0", taskClasses,
+            "com.mycompany.mocktask.MockTaskFactory");
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.FAILED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingTaskFactory() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.MockTask");
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig("Reindex", "src/test/resources/Reindex.jar", "1.0.0", taskClasses,
+            "com.mycompany.mocktask.RandomTaskFactory");
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.FAILED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());

Review comment:
       Style, structure, AssertJUnit.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #1169: Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #1169:
URL: https://github.com/apache/helix/pull/1169#discussion_r470086871



##########
File path: helix-core/src/test/java/org/apache/helix/task/TestDynamicTaskLoading.java
##########
@@ -0,0 +1,251 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestDynamicTaskLoading extends ZkTestBase {
+  private final String _clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
+  private static final String _instanceName = "localhost_12913";
+  private HelixManager _manager;
+  private MockParticipantManager _participant;
+  private ClusterControllerManager _controller;
+
+  @BeforeMethod
+  public void beforeMethod() throws Exception {
+    _gSetupTool.addCluster(_clusterName, true);
+
+    _manager = HelixManagerFactory
+        .getZKHelixManager(_clusterName, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    _gSetupTool.addInstanceToCluster(_clusterName, _instanceName);
+    _participant = new MockParticipantManager(ZK_ADDR, _clusterName, _instanceName);
+    StateModelFactory<StateModel> stateModelFactory =
+        new MasterSlaveStateModelFactory(_instanceName, 0);
+    StateMachineEngine stateMach = _participant.getStateMachineEngine();
+    stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+    _participant.getStateMachineEngine().registerStateModelFactory(TaskConstants.STATE_MODEL_NAME,
+        new TaskStateModelFactory(_participant, taskFactoryReg));
+    _participant.syncStart();
+
+    _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, null);
+    _controller.syncStart();
+  }
+
+  private void removePathIfExists(String path) {
+    if (_manager.getHelixDataAccessor().getBaseDataAccessor().exists(path, 0)) {
+      _manager.getHelixDataAccessor().getBaseDataAccessor().remove(path, 0);
+    }
+  }
+
+  /**
+   * Submit a workflow consisting of a job with a MockTask task.
+   * @param workflowName name of the workflow
+   * @param driver {@link TaskDriver} to submit workflowName to
+   */
+  private void submitWorkflow(String workflowName, TaskDriver driver) {
+    JobConfig.Builder job = new JobConfig.Builder();
+    job.setJobCommandConfigMap(Collections.singletonMap(MockTask.JOB_DELAY, "100"));
+    Workflow.Builder workflow = new Workflow.Builder(workflowName);
+    job.setWorkflow(workflowName);
+    TaskConfig taskConfig =
+        new TaskConfig(MockTask.TASK_COMMAND, new HashMap<String, String>(), null, null);
+    job.addTaskConfigMap(Collections.singletonMap(taskConfig.getId(), taskConfig));
+    job.setJobId(TaskUtil.getNamespacedJobName(workflowName, "JOB"));
+    workflow.addJob("JOB", job);
+    driver.start(workflow.build());
+  }
+
+  @Test
+  public void testDynamicTaskLoading() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.MockTask");
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig("Reindex", "src/test/resources/Reindex.jar", "1.0.0", taskClasses,
+            "com.mycompany.mocktask.MockTaskFactory");
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.COMPLETED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingJar() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.MockTask");
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig("Reindex", "src/test/resources/Random.jar", "1.0.0", taskClasses,
+            "com.mycompany.mocktask.MockTaskFactory");
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.FAILED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingTaskConfig() throws Exception {
+    // Remove task config ZNRecord if it exists.
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.FAILED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingTaskClass() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.RandomTask");
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig("Reindex", "src/test/resources/Reindex.jar", "1.0.0", taskClasses,
+            "com.mycompany.mocktask.MockTaskFactory");
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.FAILED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingTaskFactory() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.MockTask");
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig("Reindex", "src/test/resources/Reindex.jar", "1.0.0", taskClasses,
+            "com.mycompany.mocktask.RandomTaskFactory");
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.FAILED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }
+  }
+
+  @AfterMethod
+  public void afterMethod() {

Review comment:
       Is it possible to use AfterClass instead to avoid redundant work of starting and shutting down threads?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #1169: Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #1169:
URL: https://github.com/apache/helix/pull/1169#discussion_r470081725



##########
File path: helix-core/src/test/java/org/apache/helix/task/TestDynamicTaskLoading.java
##########
@@ -0,0 +1,251 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestDynamicTaskLoading extends ZkTestBase {

Review comment:
       Please add a JavaDoc - a short paragraph about why and what kind of testing will be performed in this test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #1169: Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #1169:
URL: https://github.com/apache/helix/pull/1169#discussion_r470084612



##########
File path: helix-core/src/test/java/org/apache/helix/task/TestDynamicTaskLoading.java
##########
@@ -0,0 +1,251 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestDynamicTaskLoading extends ZkTestBase {
+  private final String _clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
+  private static final String _instanceName = "localhost_12913";
+  private HelixManager _manager;
+  private MockParticipantManager _participant;
+  private ClusterControllerManager _controller;
+
+  @BeforeMethod
+  public void beforeMethod() throws Exception {
+    _gSetupTool.addCluster(_clusterName, true);
+
+    _manager = HelixManagerFactory
+        .getZKHelixManager(_clusterName, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    _gSetupTool.addInstanceToCluster(_clusterName, _instanceName);
+    _participant = new MockParticipantManager(ZK_ADDR, _clusterName, _instanceName);
+    StateModelFactory<StateModel> stateModelFactory =
+        new MasterSlaveStateModelFactory(_instanceName, 0);
+    StateMachineEngine stateMach = _participant.getStateMachineEngine();
+    stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+    _participant.getStateMachineEngine().registerStateModelFactory(TaskConstants.STATE_MODEL_NAME,
+        new TaskStateModelFactory(_participant, taskFactoryReg));
+    _participant.syncStart();
+
+    _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, null);
+    _controller.syncStart();
+  }
+
+  private void removePathIfExists(String path) {
+    if (_manager.getHelixDataAccessor().getBaseDataAccessor().exists(path, 0)) {
+      _manager.getHelixDataAccessor().getBaseDataAccessor().remove(path, 0);
+    }
+  }
+
+  /**
+   * Submit a workflow consisting of a job with a MockTask task.
+   * @param workflowName name of the workflow
+   * @param driver {@link TaskDriver} to submit workflowName to
+   */
+  private void submitWorkflow(String workflowName, TaskDriver driver) {
+    JobConfig.Builder job = new JobConfig.Builder();
+    job.setJobCommandConfigMap(Collections.singletonMap(MockTask.JOB_DELAY, "100"));
+    Workflow.Builder workflow = new Workflow.Builder(workflowName);
+    job.setWorkflow(workflowName);
+    TaskConfig taskConfig =
+        new TaskConfig(MockTask.TASK_COMMAND, new HashMap<String, String>(), null, null);
+    job.addTaskConfigMap(Collections.singletonMap(taskConfig.getId(), taskConfig));
+    job.setJobId(TaskUtil.getNamespacedJobName(workflowName, "JOB"));
+    workflow.addJob("JOB", job);
+    driver.start(workflow.build());
+  }
+
+  @Test
+  public void testDynamicTaskLoading() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.MockTask");
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig("Reindex", "src/test/resources/Reindex.jar", "1.0.0", taskClasses,
+            "com.mycompany.mocktask.MockTaskFactory");
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.COMPLETED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());

Review comment:
       `Assert.fail`? Also it's throwing an exception anyway, so why don't we just have it throw an exception instead of doing `Assert.fail`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #1169: Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #1169:
URL: https://github.com/apache/helix/pull/1169#discussion_r470085477



##########
File path: helix-core/src/test/java/org/apache/helix/task/TestDynamicTaskLoading.java
##########
@@ -0,0 +1,251 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestDynamicTaskLoading extends ZkTestBase {
+  private final String _clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
+  private static final String _instanceName = "localhost_12913";
+  private HelixManager _manager;
+  private MockParticipantManager _participant;
+  private ClusterControllerManager _controller;
+
+  @BeforeMethod
+  public void beforeMethod() throws Exception {
+    _gSetupTool.addCluster(_clusterName, true);
+
+    _manager = HelixManagerFactory
+        .getZKHelixManager(_clusterName, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    _gSetupTool.addInstanceToCluster(_clusterName, _instanceName);
+    _participant = new MockParticipantManager(ZK_ADDR, _clusterName, _instanceName);
+    StateModelFactory<StateModel> stateModelFactory =
+        new MasterSlaveStateModelFactory(_instanceName, 0);
+    StateMachineEngine stateMach = _participant.getStateMachineEngine();
+    stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+    _participant.getStateMachineEngine().registerStateModelFactory(TaskConstants.STATE_MODEL_NAME,
+        new TaskStateModelFactory(_participant, taskFactoryReg));
+    _participant.syncStart();
+
+    _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, null);
+    _controller.syncStart();
+  }
+
+  private void removePathIfExists(String path) {
+    if (_manager.getHelixDataAccessor().getBaseDataAccessor().exists(path, 0)) {
+      _manager.getHelixDataAccessor().getBaseDataAccessor().remove(path, 0);
+    }
+  }
+
+  /**
+   * Submit a workflow consisting of a job with a MockTask task.
+   * @param workflowName name of the workflow
+   * @param driver {@link TaskDriver} to submit workflowName to
+   */
+  private void submitWorkflow(String workflowName, TaskDriver driver) {
+    JobConfig.Builder job = new JobConfig.Builder();
+    job.setJobCommandConfigMap(Collections.singletonMap(MockTask.JOB_DELAY, "100"));
+    Workflow.Builder workflow = new Workflow.Builder(workflowName);
+    job.setWorkflow(workflowName);
+    TaskConfig taskConfig =
+        new TaskConfig(MockTask.TASK_COMMAND, new HashMap<String, String>(), null, null);
+    job.addTaskConfigMap(Collections.singletonMap(taskConfig.getId(), taskConfig));
+    job.setJobId(TaskUtil.getNamespacedJobName(workflowName, "JOB"));
+    workflow.addJob("JOB", job);
+    driver.start(workflow.build());
+  }
+
+  @Test
+  public void testDynamicTaskLoading() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.MockTask");
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig("Reindex", "src/test/resources/Reindex.jar", "1.0.0", taskClasses,
+            "com.mycompany.mocktask.MockTaskFactory");
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.COMPLETED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingJar() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();

Review comment:
       <`String`> is not necessary in `new ArrayList<String>();` Are you sure you're coding in Java 8? Please fix up your environment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #1169: Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #1169:
URL: https://github.com/apache/helix/pull/1169#discussion_r470086201



##########
File path: helix-core/src/test/java/org/apache/helix/task/TestDynamicTaskLoading.java
##########
@@ -0,0 +1,251 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestDynamicTaskLoading extends ZkTestBase {
+  private final String _clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
+  private static final String _instanceName = "localhost_12913";
+  private HelixManager _manager;
+  private MockParticipantManager _participant;
+  private ClusterControllerManager _controller;
+
+  @BeforeMethod
+  public void beforeMethod() throws Exception {
+    _gSetupTool.addCluster(_clusterName, true);
+
+    _manager = HelixManagerFactory
+        .getZKHelixManager(_clusterName, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    _gSetupTool.addInstanceToCluster(_clusterName, _instanceName);
+    _participant = new MockParticipantManager(ZK_ADDR, _clusterName, _instanceName);
+    StateModelFactory<StateModel> stateModelFactory =
+        new MasterSlaveStateModelFactory(_instanceName, 0);
+    StateMachineEngine stateMach = _participant.getStateMachineEngine();
+    stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+    _participant.getStateMachineEngine().registerStateModelFactory(TaskConstants.STATE_MODEL_NAME,
+        new TaskStateModelFactory(_participant, taskFactoryReg));
+    _participant.syncStart();
+
+    _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, null);
+    _controller.syncStart();
+  }
+
+  private void removePathIfExists(String path) {
+    if (_manager.getHelixDataAccessor().getBaseDataAccessor().exists(path, 0)) {
+      _manager.getHelixDataAccessor().getBaseDataAccessor().remove(path, 0);
+    }
+  }
+
+  /**
+   * Submit a workflow consisting of a job with a MockTask task.
+   * @param workflowName name of the workflow
+   * @param driver {@link TaskDriver} to submit workflowName to
+   */
+  private void submitWorkflow(String workflowName, TaskDriver driver) {
+    JobConfig.Builder job = new JobConfig.Builder();
+    job.setJobCommandConfigMap(Collections.singletonMap(MockTask.JOB_DELAY, "100"));
+    Workflow.Builder workflow = new Workflow.Builder(workflowName);
+    job.setWorkflow(workflowName);
+    TaskConfig taskConfig =
+        new TaskConfig(MockTask.TASK_COMMAND, new HashMap<String, String>(), null, null);
+    job.addTaskConfigMap(Collections.singletonMap(taskConfig.getId(), taskConfig));
+    job.setJobId(TaskUtil.getNamespacedJobName(workflowName, "JOB"));
+    workflow.addJob("JOB", job);
+    driver.start(workflow.build());
+  }
+
+  @Test
+  public void testDynamicTaskLoading() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.MockTask");
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig("Reindex", "src/test/resources/Reindex.jar", "1.0.0", taskClasses,
+            "com.mycompany.mocktask.MockTaskFactory");
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.COMPLETED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingJar() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.MockTask");
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig("Reindex", "src/test/resources/Random.jar", "1.0.0", taskClasses,
+            "com.mycompany.mocktask.MockTaskFactory");
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.FAILED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingTaskConfig() throws Exception {
+    // Remove task config ZNRecord if it exists.
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.FAILED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingTaskClass() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();

Review comment:
       Style?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #1169: Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #1169:
URL: https://github.com/apache/helix/pull/1169#discussion_r470084481



##########
File path: helix-core/src/test/java/org/apache/helix/task/TestDynamicTaskLoading.java
##########
@@ -0,0 +1,251 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestDynamicTaskLoading extends ZkTestBase {
+  private final String _clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
+  private static final String _instanceName = "localhost_12913";
+  private HelixManager _manager;
+  private MockParticipantManager _participant;
+  private ClusterControllerManager _controller;
+
+  @BeforeMethod
+  public void beforeMethod() throws Exception {
+    _gSetupTool.addCluster(_clusterName, true);
+
+    _manager = HelixManagerFactory
+        .getZKHelixManager(_clusterName, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    _gSetupTool.addInstanceToCluster(_clusterName, _instanceName);
+    _participant = new MockParticipantManager(ZK_ADDR, _clusterName, _instanceName);
+    StateModelFactory<StateModel> stateModelFactory =
+        new MasterSlaveStateModelFactory(_instanceName, 0);
+    StateMachineEngine stateMach = _participant.getStateMachineEngine();
+    stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+    _participant.getStateMachineEngine().registerStateModelFactory(TaskConstants.STATE_MODEL_NAME,
+        new TaskStateModelFactory(_participant, taskFactoryReg));
+    _participant.syncStart();
+
+    _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, null);
+    _controller.syncStart();
+  }
+
+  private void removePathIfExists(String path) {
+    if (_manager.getHelixDataAccessor().getBaseDataAccessor().exists(path, 0)) {
+      _manager.getHelixDataAccessor().getBaseDataAccessor().remove(path, 0);
+    }
+  }
+
+  /**
+   * Submit a workflow consisting of a job with a MockTask task.
+   * @param workflowName name of the workflow
+   * @param driver {@link TaskDriver} to submit workflowName to
+   */
+  private void submitWorkflow(String workflowName, TaskDriver driver) {
+    JobConfig.Builder job = new JobConfig.Builder();
+    job.setJobCommandConfigMap(Collections.singletonMap(MockTask.JOB_DELAY, "100"));
+    Workflow.Builder workflow = new Workflow.Builder(workflowName);
+    job.setWorkflow(workflowName);
+    TaskConfig taskConfig =
+        new TaskConfig(MockTask.TASK_COMMAND, new HashMap<String, String>(), null, null);
+    job.addTaskConfigMap(Collections.singletonMap(taskConfig.getId(), taskConfig));
+    job.setJobId(TaskUtil.getNamespacedJobName(workflowName, "JOB"));
+    workflow.addJob("JOB", job);
+    driver.start(workflow.build());
+  }
+
+  @Test
+  public void testDynamicTaskLoading() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.MockTask");
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig("Reindex", "src/test/resources/Reindex.jar", "1.0.0", taskClasses,
+            "com.mycompany.mocktask.MockTaskFactory");
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.COMPLETED);

Review comment:
       Use `Assert.assertEquals` instead of AssertJUnit? Please remove all usages & import of AssertJUnit.

##########
File path: helix-core/src/test/java/org/apache/helix/task/TestDynamicTaskLoading.java
##########
@@ -0,0 +1,251 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestDynamicTaskLoading extends ZkTestBase {
+  private final String _clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
+  private static final String _instanceName = "localhost_12913";
+  private HelixManager _manager;
+  private MockParticipantManager _participant;
+  private ClusterControllerManager _controller;
+
+  @BeforeMethod
+  public void beforeMethod() throws Exception {
+    _gSetupTool.addCluster(_clusterName, true);
+
+    _manager = HelixManagerFactory
+        .getZKHelixManager(_clusterName, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    _gSetupTool.addInstanceToCluster(_clusterName, _instanceName);
+    _participant = new MockParticipantManager(ZK_ADDR, _clusterName, _instanceName);
+    StateModelFactory<StateModel> stateModelFactory =
+        new MasterSlaveStateModelFactory(_instanceName, 0);
+    StateMachineEngine stateMach = _participant.getStateMachineEngine();
+    stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+    _participant.getStateMachineEngine().registerStateModelFactory(TaskConstants.STATE_MODEL_NAME,
+        new TaskStateModelFactory(_participant, taskFactoryReg));
+    _participant.syncStart();
+
+    _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, null);
+    _controller.syncStart();
+  }
+
+  private void removePathIfExists(String path) {
+    if (_manager.getHelixDataAccessor().getBaseDataAccessor().exists(path, 0)) {
+      _manager.getHelixDataAccessor().getBaseDataAccessor().remove(path, 0);
+    }
+  }
+
+  /**
+   * Submit a workflow consisting of a job with a MockTask task.
+   * @param workflowName name of the workflow
+   * @param driver {@link TaskDriver} to submit workflowName to
+   */
+  private void submitWorkflow(String workflowName, TaskDriver driver) {
+    JobConfig.Builder job = new JobConfig.Builder();
+    job.setJobCommandConfigMap(Collections.singletonMap(MockTask.JOB_DELAY, "100"));
+    Workflow.Builder workflow = new Workflow.Builder(workflowName);
+    job.setWorkflow(workflowName);
+    TaskConfig taskConfig =
+        new TaskConfig(MockTask.TASK_COMMAND, new HashMap<String, String>(), null, null);
+    job.addTaskConfigMap(Collections.singletonMap(taskConfig.getId(), taskConfig));
+    job.setJobId(TaskUtil.getNamespacedJobName(workflowName, "JOB"));
+    workflow.addJob("JOB", job);
+    driver.start(workflow.build());
+  }
+
+  @Test
+  public void testDynamicTaskLoading() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.MockTask");
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig("Reindex", "src/test/resources/Reindex.jar", "1.0.0", taskClasses,
+            "com.mycompany.mocktask.MockTaskFactory");
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.COMPLETED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());

Review comment:
       `Assert.fail`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #1169: [HELIX-1168] Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #1169:
URL: https://github.com/apache/helix/pull/1169#discussion_r461719235



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
##########
@@ -283,6 +294,63 @@ public void reset() {
     }
   }
 
+  /**
+   * Loads Task and TaskFactory classes for command input from
+   * a JAR file, and register the TaskFactory in _taskFactoryRegistry.
+   * @param command The command indicating what task to be loaded
+   */
+  private void loadNewTask(String command) {

Review comment:
       This method is still doing too many things, and some parts of the inner logic could be refactored out for clarity, conciseness, and testability. I recommend you take a look at some publicly available good programming practices to get a sense of what good, maintainable code looks like as well.
   

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
##########
@@ -43,6 +49,11 @@
   private ScheduledFuture timeout_task;
   private TaskRunner _taskRunner;
   private final ScheduledExecutorService _timeoutTaskExecutor;
+  public static final String TASK_JAR_FILE = "JAR_FILE";

Review comment:
       Nit: Rename to something like TASK_JAR_FILE_KEY?

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
##########
@@ -283,6 +294,63 @@ public void reset() {
     }
   }
 
+  /**
+   * Loads Task and TaskFactory classes for command input from
+   * a JAR file, and register the TaskFactory in _taskFactoryRegistry.
+   * @param command The command indicating what task to be loaded
+   */
+  private void loadNewTask(String command) {
+    // Read ZNRecord containing task definition information.
+    ZNRecord taskConfig = _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .get("/" + _manager.getClusterName() + "/" + TASK_PATH + "_" + command,null, 0);
+    if(taskConfig == null) {
+      LOG.info("Failed to read ZNRecord for task " + command + " for instance " + _manager.getInstanceName()
+          + " in cluster " + _manager.getClusterName() + ".");
+      throw new IllegalStateException("No ZNRecord for task " + command);
+    }
+
+    // Open the JAR file containing Task(s) and TaskFactory classes.
+    File taskJar;
+    URL taskJarUrl;
+    try {
+      taskJar = new File(taskConfig.getSimpleField(TASK_JAR_FILE));
+      if(taskJar.exists() && !taskJar.isDirectory()) {
+        taskJarUrl = taskJar.toURI().toURL();
+      } else {
+        throw new IllegalStateException("No JAR for task " + command);
+      }
+    } catch (MalformedURLException e) {
+      LOG.info("Failed to find/open JAR for new task " + command + " for instance " + _manager.getInstanceName()
+          + " in cluster " + _manager.getClusterName() + ".");
+      throw new IllegalStateException("Malformed JAR URL for task " + command);
+    }
+
+    // Import Task(s) and TaskFactory classes.
+    URLClassLoader classLoader = URLClassLoader.newInstance(new URL[]{taskJarUrl});
+    for(String taskClass : taskConfig.getListField(TASK_CLASSES)) {
+      try {
+        classLoader.loadClass(taskClass);
+      } catch(ClassNotFoundException e) {
+        LOG.info("Failed to load class(es) for new task " + command + " for instance " + _manager.getInstanceName()
+            + " in cluster " + _manager.getClusterName() + ".");
+        throw new IllegalStateException("Null class(es) for task " + command);
+      }
+    }
+    Class cl;
+    TaskFactory tf;
+    try {
+      cl = classLoader.loadClass(taskConfig.getSimpleField(TASK_FACTORY));
+      tf = (TaskFactory)cl.newInstance();
+    } catch(ClassNotFoundException | InstantiationException | IllegalAccessException e) {

Review comment:
       This code doesn't seem like it's been formatted properly.

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
##########
@@ -283,6 +294,63 @@ public void reset() {
     }
   }
 
+  /**
+   * Loads Task and TaskFactory classes for command input from
+   * a JAR file, and register the TaskFactory in _taskFactoryRegistry.
+   * @param command The command indicating what task to be loaded
+   */
+  private void loadNewTask(String command) {
+    // Read ZNRecord containing task definition information.
+    ZNRecord taskConfig = _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .get("/" + _manager.getClusterName() + "/" + TASK_PATH + "_" + command,null, 0);
+    if(taskConfig == null) {
+      LOG.info("Failed to read ZNRecord for task " + command + " for instance " + _manager.getInstanceName()

Review comment:
       LOG level should be error instead of info because you're treating this as an exception?

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
##########
@@ -283,6 +294,63 @@ public void reset() {
     }
   }
 
+  /**
+   * Loads Task and TaskFactory classes for command input from
+   * a JAR file, and register the TaskFactory in _taskFactoryRegistry.
+   * @param command The command indicating what task to be loaded
+   */
+  private void loadNewTask(String command) {
+    // Read ZNRecord containing task definition information.
+    ZNRecord taskConfig = _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .get("/" + _manager.getClusterName() + "/" + TASK_PATH + "_" + command,null, 0);
+    if(taskConfig == null) {
+      LOG.info("Failed to read ZNRecord for task " + command + " for instance " + _manager.getInstanceName()
+          + " in cluster " + _manager.getClusterName() + ".");
+      throw new IllegalStateException("No ZNRecord for task " + command);
+    }
+
+    // Open the JAR file containing Task(s) and TaskFactory classes.
+    File taskJar;

Review comment:
       Since loading the JAR from the local directory is not a permanent solution and rather not very useful, consider using Java Interface to make this pluggable. That's a better design principle and will allow you to extend your code more easily. The way it's hard-coded right now, it will be difficult to change the underlying implementation for the jar loader.

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
##########
@@ -43,6 +49,11 @@
   private ScheduledFuture timeout_task;
   private TaskRunner _taskRunner;
   private final ScheduledExecutorService _timeoutTaskExecutor;
+  public static final String TASK_JAR_FILE = "JAR_FILE";
+  public static final String TASK_VERSION = "VERSION";
+  public static final String TASK_CLASSES = "TASK_CLASSES";
+  public static final String TASK_FACTORY = "TASKFACTORY";
+  public static final String TASK_PATH = "TASK_DEFINITION";

Review comment:
       Overall, the code still feels like a "script" - to make this more amenable for use in production, let's work on making it maintainable and extendable :)

##########
File path: helix-core/src/test/java/org/apache/helix/task/TestDynamicTaskLoading.java
##########
@@ -0,0 +1,125 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.zookeeper.CreateMode;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestDynamicTaskLoading extends ZkTestBase {
+  private final String _clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
+  private static final String _instanceName = "localhost_12913";
+  private HelixManager _manager;
+  private MockParticipantManager _participant;
+  private ClusterControllerManager _controller;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    super.beforeClass();
+    _gSetupTool.addCluster(_clusterName, true);
+
+    _manager = HelixManagerFactory.getZKHelixManager(_clusterName, "Admin",
+        InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    _gSetupTool.addInstanceToCluster(_clusterName, _instanceName);
+    _participant = new MockParticipantManager(ZK_ADDR, _clusterName, _instanceName);
+    StateModelFactory<StateModel> stateModelFactory = new MasterSlaveStateModelFactory(_instanceName, 0);
+    StateMachineEngine stateMach = _participant.getStateMachineEngine();
+    stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+    _participant.getStateMachineEngine().registerStateModelFactory(TaskConstants.STATE_MODEL_NAME,
+        new TaskStateModelFactory(_participant, taskFactoryReg));
+    _participant.syncStart();
+
+    _controller = new ClusterControllerManager(ZK_ADDR, _clusterName,null);
+    _controller.syncStart();
+
+    // Add task definition information as a ZNRecord.
+    ZNRecord configZnRecord = new ZNRecord(_participant.getInstanceName());

Review comment:
       One suggestion here is to make the dynamic Task config a typed ZNRecord. Consider creating a record class that extends ZNRecord (and call it DynamicTaskConfig) that explicitly defines the fields you need to describe a dynamic task.
   
   HelixProperty is one such example.

##########
File path: helix-core/src/test/java/org/apache/helix/task/TestDynamicTaskLoading.java
##########
@@ -0,0 +1,125 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.zookeeper.CreateMode;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestDynamicTaskLoading extends ZkTestBase {
+  private final String _clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
+  private static final String _instanceName = "localhost_12913";
+  private HelixManager _manager;
+  private MockParticipantManager _participant;
+  private ClusterControllerManager _controller;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    super.beforeClass();
+    _gSetupTool.addCluster(_clusterName, true);
+
+    _manager = HelixManagerFactory.getZKHelixManager(_clusterName, "Admin",
+        InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    _gSetupTool.addInstanceToCluster(_clusterName, _instanceName);
+    _participant = new MockParticipantManager(ZK_ADDR, _clusterName, _instanceName);
+    StateModelFactory<StateModel> stateModelFactory = new MasterSlaveStateModelFactory(_instanceName, 0);
+    StateMachineEngine stateMach = _participant.getStateMachineEngine();
+    stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+    _participant.getStateMachineEngine().registerStateModelFactory(TaskConstants.STATE_MODEL_NAME,
+        new TaskStateModelFactory(_participant, taskFactoryReg));
+    _participant.syncStart();
+
+    _controller = new ClusterControllerManager(ZK_ADDR, _clusterName,null);
+    _controller.syncStart();
+
+    // Add task definition information as a ZNRecord.
+    ZNRecord configZnRecord = new ZNRecord(_participant.getInstanceName());

Review comment:
       Also to add, it might be a good idea to have a separate PR for this new type of ZNRecord.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] mgao0 commented on a change in pull request #1169: [HELIX-1168] Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
mgao0 commented on a change in pull request #1169:
URL: https://github.com/apache/helix/pull/1169#discussion_r460228586



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
##########
@@ -283,6 +289,36 @@ public void reset() {
     }
   }
 
+  /**
+   * Loads Task and TaskFactory classes for command input
+   */
+  private boolean loadNewTask(String command) {
+    try {
+      // Read ZNRecord containing task definition information.
+      ZNRecord taskConfig = _manager.getHelixDataAccessor().getBaseDataAccessor()

Review comment:
       If this path doesn't exist, does it throw exception here or return null? If exception, do we need to handle the exception? If null, we need to do a check in the next line of code to prevent NPE.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #1169: Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #1169:
URL: https://github.com/apache/helix/pull/1169#discussion_r470082309



##########
File path: helix-core/src/test/java/org/apache/helix/task/TestDynamicTaskLoading.java
##########
@@ -0,0 +1,251 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestDynamicTaskLoading extends ZkTestBase {
+  private final String _clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
+  private static final String _instanceName = "localhost_12913";

Review comment:
       Please follow the code style guidelines (google how to define Java constants).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #1169: Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #1169:
URL: https://github.com/apache/helix/pull/1169#discussion_r470080413



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
##########
@@ -283,6 +287,68 @@ public void reset() {
     }
   }
 
+  /**
+   * Loads className using classLoader
+   * @param classLoader
+   * @param className
+   * @return Class className loaded by classLoader
+   */
+  private Class loadClass(URLClassLoader classLoader, String className) {
+    try {
+      return classLoader.loadClass(className);
+    } catch (ClassNotFoundException e) {
+      LOG.error("Failed to load Task class " + className + " for new task in instance " + _manager
+          .getInstanceName() + " in cluster " + _manager.getClusterName() + ".");
+      throw new IllegalStateException("Class not found for task");
+    }
+  }
+
+  /**
+   * Loads Task and TaskFactory classes for command input from
+   * a JAR file, and registers the TaskFactory in _taskFactoryRegistry.
+   * @param command The command indicating what task to be loaded
+   */
+  private void loadNewTask(String command) {
+    // If the path for dynamic tasks doesn't exist, skip loading the task
+    if (!_manager.getHelixDataAccessor().getBaseDataAccessor()
+        .exists(TaskConstants.DYNAMICALLY_LOADED_TASK_PATH, 0)) {
+      return;
+    }
+
+    // Read DynamicTaskConfig containing task definition information.
+    DynamicTaskConfig taskConfig = new DynamicTaskConfig(
+        _manager.getHelixDataAccessor().getBaseDataAccessor()
+            .get(TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/" + command, null, 0));
+    if (taskConfig.getTaskConfigZNRecord() == null) {
+      LOG.error("Failed to read ZNRecord for task " + command + " for instance " + _manager
+          .getInstanceName() + " in cluster " + _manager.getClusterName() + ".");
+      throw new IllegalArgumentException("No ZNRecord for task " + command);

Review comment:
       Make the error message the same for the log and the exception?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] mgao0 commented on pull request #1169: [HELIX-1168] Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
mgao0 commented on pull request #1169:
URL: https://github.com/apache/helix/pull/1169#issuecomment-663679182


   Great job Ramin! Thank you for your hard work. 
   I think your change shouldn't affect our tests, but 6 tests failed is a bit concerning. @kaisun2000 @jiajunwang here to see if it's expected.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #1169: Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #1169:
URL: https://github.com/apache/helix/pull/1169#discussion_r470079769



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
##########
@@ -283,6 +287,68 @@ public void reset() {
     }
   }
 
+  /**
+   * Loads className using classLoader
+   * @param classLoader
+   * @param className
+   * @return Class className loaded by classLoader
+   */
+  private Class loadClass(URLClassLoader classLoader, String className) {
+    try {
+      return classLoader.loadClass(className);
+    } catch (ClassNotFoundException e) {
+      LOG.error("Failed to load Task class " + className + " for new task in instance " + _manager
+          .getInstanceName() + " in cluster " + _manager.getClusterName() + ".");
+      throw new IllegalStateException("Class not found for task");
+    }
+  }
+
+  /**
+   * Loads Task and TaskFactory classes for command input from
+   * a JAR file, and registers the TaskFactory in _taskFactoryRegistry.
+   * @param command The command indicating what task to be loaded
+   */
+  private void loadNewTask(String command) {
+    // If the path for dynamic tasks doesn't exist, skip loading the task
+    if (!_manager.getHelixDataAccessor().getBaseDataAccessor()
+        .exists(TaskConstants.DYNAMICALLY_LOADED_TASK_PATH, 0)) {
+      return;

Review comment:
       Should you either log here or throw an exception? If you just return, you would be silently failing, which is bad.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #1169: Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #1169:
URL: https://github.com/apache/helix/pull/1169#discussion_r470295137



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
##########
@@ -283,6 +288,75 @@ public void reset() {
     }
   }
 
+  /**
+   * Loads className using classLoader
+   * @param classLoader
+   * @param className
+   * @return Class className loaded by classLoader
+   */
+  private Class loadClass(URLClassLoader classLoader, String className) {
+    try {
+      return classLoader.loadClass(className);
+    } catch (ClassNotFoundException e) {
+      LOG.error("Failed to load Task class " + className + " for new task in instance " + _manager
+          .getInstanceName() + " in cluster " + _manager.getClusterName() + ".");
+      throw new IllegalStateException("Class not found for task");
+    }
+  }
+
+  /**
+   * Loads Task and TaskFactory classes for command input from
+   * a JAR file, and registers the TaskFactory in _taskFactoryRegistry.
+   * @param command The command indicating what task to be loaded
+   */
+  private void loadNewTask(String command) {
+    // If the path for dynamic tasks doesn't exist, skip loading the task
+    if (!_manager.getHelixDataAccessor().getBaseDataAccessor()
+        .exists(TaskConstants.DYNAMICALLY_LOADED_TASK_PATH, 0)) {
+      LOG.error("Path for dynamic tasks doesn't exist!");
+      throw new IllegalStateException("Path for dynamic tasks doesn't exist!");

Review comment:
       Duplicate strings. Please factor it out to a local String variable throughout the test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] manick02 commented on a change in pull request #1169: Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
manick02 commented on a change in pull request #1169:
URL: https://github.com/apache/helix/pull/1169#discussion_r470129394



##########
File path: helix-core/src/test/java/org/apache/helix/task/TestDynamicTaskLoading.java
##########
@@ -0,0 +1,251 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestDynamicTaskLoading extends ZkTestBase {
+  private final String _clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
+  private static final String _instanceName = "localhost_12913";
+  private HelixManager _manager;
+  private MockParticipantManager _participant;
+  private ClusterControllerManager _controller;
+
+  @BeforeMethod
+  public void beforeMethod() throws Exception {
+    _gSetupTool.addCluster(_clusterName, true);
+
+    _manager = HelixManagerFactory
+        .getZKHelixManager(_clusterName, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    _gSetupTool.addInstanceToCluster(_clusterName, _instanceName);
+    _participant = new MockParticipantManager(ZK_ADDR, _clusterName, _instanceName);
+    StateModelFactory<StateModel> stateModelFactory =
+        new MasterSlaveStateModelFactory(_instanceName, 0);
+    StateMachineEngine stateMach = _participant.getStateMachineEngine();
+    stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+    _participant.getStateMachineEngine().registerStateModelFactory(TaskConstants.STATE_MODEL_NAME,
+        new TaskStateModelFactory(_participant, taskFactoryReg));
+    _participant.syncStart();
+
+    _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, null);
+    _controller.syncStart();
+  }
+
+  private void removePathIfExists(String path) {
+    if (_manager.getHelixDataAccessor().getBaseDataAccessor().exists(path, 0)) {
+      _manager.getHelixDataAccessor().getBaseDataAccessor().remove(path, 0);
+    }
+  }
+
+  /**
+   * Submit a workflow consisting of a job with a MockTask task.
+   * @param workflowName name of the workflow
+   * @param driver {@link TaskDriver} to submit workflowName to
+   */
+  private void submitWorkflow(String workflowName, TaskDriver driver) {
+    JobConfig.Builder job = new JobConfig.Builder();
+    job.setJobCommandConfigMap(Collections.singletonMap(MockTask.JOB_DELAY, "100"));
+    Workflow.Builder workflow = new Workflow.Builder(workflowName);
+    job.setWorkflow(workflowName);
+    TaskConfig taskConfig =
+        new TaskConfig(MockTask.TASK_COMMAND, new HashMap<String, String>(), null, null);
+    job.addTaskConfigMap(Collections.singletonMap(taskConfig.getId(), taskConfig));
+    job.setJobId(TaskUtil.getNamespacedJobName(workflowName, "JOB"));
+    workflow.addJob("JOB", job);
+    driver.start(workflow.build());
+  }
+
+  @Test
+  public void testDynamicTaskLoading() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.MockTask");
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig("Reindex", "src/test/resources/Reindex.jar", "1.0.0", taskClasses,
+            "com.mycompany.mocktask.MockTaskFactory");
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.COMPLETED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingJar() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.MockTask");
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig("Reindex", "src/test/resources/Random.jar", "1.0.0", taskClasses,
+            "com.mycompany.mocktask.MockTaskFactory");
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.FAILED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingTaskConfig() throws Exception {
+    // Remove task config ZNRecord if it exists.
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.FAILED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingTaskClass() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();

Review comment:
       May not be related to this PR, I see a lot of Path building, like this"TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex"  happening though out, it provides an opportunity to reduce chances of error and on the opportunity to standardize path building by defining a ZNPathBuilder utility ( This comment may be irrelevant - Sorry, am just new to the helix and trying to understand)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] rabashizade commented on a change in pull request #1169: [HELIX-1168] Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
rabashizade commented on a change in pull request #1169:
URL: https://github.com/apache/helix/pull/1169#discussion_r461210203



##########
File path: helix-core/src/test/java/org/apache/helix/task/TestDynamicTaskLoading.java
##########
@@ -0,0 +1,123 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.zookeeper.CreateMode;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestDynamicTaskLoading extends ZkTestBase {
+  HelixManager _manager;
+  MockParticipantManager _participant;
+  ClusterControllerManager _controller;
+  String _clusterName;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    super.beforeClass();
+    _clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
+    _gSetupTool.addCluster(_clusterName, true);
+
+    _manager = HelixManagerFactory.getZKHelixManager(_clusterName, "Admin",
+        InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    String instanceName = "localhost_12913";
+    _gSetupTool.addInstanceToCluster(_clusterName, instanceName);
+    _participant = new MockParticipantManager(ZK_ADDR, _clusterName, instanceName);
+    StateModelFactory<StateModel> stateModelFactory = new MasterSlaveStateModelFactory(instanceName, 0);
+    StateMachineEngine stateMach = _participant.getStateMachineEngine();
+    stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+    _participant.getStateMachineEngine().registerStateModelFactory(TaskConstants.STATE_MODEL_NAME,
+        new TaskStateModelFactory(_participant, taskFactoryReg));
+    _participant.syncStart();
+
+    _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, null);
+    _controller.syncStart();
+
+    // Add task definition information as a ZNRecord.
+    ZNRecord configZnRecord = new ZNRecord(_participant.getInstanceName());
+    configZnRecord.setSimpleField("JAR_FILE", "src/test/resources/Reindex.jar");
+    configZnRecord.setSimpleField("VERSION", "1.0.0");
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.MockTask");
+    configZnRecord.setListField("TASK_CLASSES", taskClasses);
+    configZnRecord.setSimpleField("TASKFACTORY", "com.mycompany.mocktask.MockTaskFactory");
+    String path = String.format("/%s/%s", _clusterName, "TASK_DEFINITION");
+    _participant.getZkClient().create(path, configZnRecord, CreateMode.PERSISTENT);
+  }
+
+  @Test
+  public void testDynamicTaskLoading() throws Exception {
+    String workflowName = TestHelper.getTestMethodName();
+    TaskDriver driver = new TaskDriver(_manager);
+    JobConfig.Builder job = new JobConfig.Builder();
+    job.setJobCommandConfigMap(Collections.singletonMap(MockTask.JOB_DELAY, "100"));
+    Workflow.Builder workflow = new Workflow.Builder(workflowName);
+    job.setWorkflow(workflowName);
+    TaskConfig taskConfig =
+        new TaskConfig(MockTask.TASK_COMMAND, new HashMap<String, String>(), null, null);
+    job.addTaskConfigMap(Collections.singletonMap(taskConfig.getId(), taskConfig));
+    job.setJobId(TaskUtil.getNamespacedJobName(workflowName, "JOB"));
+    workflow.addJob("JOB", job);
+    driver.start(workflow.build());
+
+    Thread.sleep(2000);

Review comment:
       Thank you, Ali. Great point, will fix it in the next commit.

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
##########
@@ -283,6 +292,38 @@ public void reset() {
     }
   }
 
+  /**
+   * Loads Task and TaskFactory classes for command input
+   */
+  private boolean loadNewTask(String command) {
+    try {
+      // Read ZNRecord containing task definition information.
+      ZNRecord taskConfig = _manager.getHelixDataAccessor().getBaseDataAccessor()
+          .get("/" + _manager.getClusterName() + "/TASK_DEFINITION",
+              null, AccessOption.THROW_EXCEPTION_IFNOTEXIST);

Review comment:
       Thank you for the comments, Hunter. I'll use them all in the next commit.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] rabashizade commented on a change in pull request #1169: Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
rabashizade commented on a change in pull request #1169:
URL: https://github.com/apache/helix/pull/1169#discussion_r463885621



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
##########
@@ -283,6 +297,63 @@ public void reset() {
     }
   }
 
+  /**
+   * Loads className using classLoader
+   * @param classLoader
+   * @param className
+   * @return Class className loaded by classLoader
+   */
+  private Class loadClass(URLClassLoader classLoader, String className) {
+    try {
+      return classLoader.loadClass(className);
+    } catch (ClassNotFoundException e) {
+      LOG.error("Failed to load Task class " + className + " for new task in instance " + _manager
+          .getInstanceName() + " in cluster " + _manager.getClusterName() + ".");
+      throw new IllegalStateException("Null TaskFactory for task");
+    }
+  }
+
+  /**
+   * Loads Task and TaskFactory classes for command input from
+   * a JAR file, and registers the TaskFactory in _taskFactoryRegistry.
+   * @param command The command indicating what task to be loaded
+   */
+  private void loadNewTask(String command) {
+    // Read ZNRecord containing task definition information.
+    ZNRecord taskConfig = _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .get(TASK_PATH + "/" + command, null, 0);

Review comment:
       Thank you for your comments, Junkai. So do you think something like this at the beginning of the method would do?
   
   ```suggestion
       // If the path for dynamic tasks doesn't exist, skip loading the task
       if (!_manager.getHelixDataAccessor().getBaseDataAccessor().exists(TaskConstants.TASK_PATH, 0)) {
         return;
       }
       ZNRecord taskConfig = _manager.getHelixDataAccessor().getBaseDataAccessor()
           .get(TASK_PATH + "/" + command, null, 0);
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] manick02 commented on a change in pull request #1169: Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
manick02 commented on a change in pull request #1169:
URL: https://github.com/apache/helix/pull/1169#discussion_r470118308



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
##########
@@ -283,6 +287,68 @@ public void reset() {
     }
   }
 
+  /**
+   * Loads className using classLoader
+   * @param classLoader
+   * @param className
+   * @return Class className loaded by classLoader
+   */
+  private Class loadClass(URLClassLoader classLoader, String className) {
+    try {
+      return classLoader.loadClass(className);
+    } catch (ClassNotFoundException e) {
+      LOG.error("Failed to load Task class " + className + " for new task in instance " + _manager
+          .getInstanceName() + " in cluster " + _manager.getClusterName() + ".");
+      throw new IllegalStateException("Class not found for task");
+    }
+  }
+
+  /**
+   * Loads Task and TaskFactory classes for command input from
+   * a JAR file, and registers the TaskFactory in _taskFactoryRegistry.
+   * @param command The command indicating what task to be loaded
+   */
+  private void loadNewTask(String command) {
+    // If the path for dynamic tasks doesn't exist, skip loading the task
+    if (!_manager.getHelixDataAccessor().getBaseDataAccessor()
+        .exists(TaskConstants.DYNAMICALLY_LOADED_TASK_PATH, 0)) {
+      return;
+    }
+
+    // Read DynamicTaskConfig containing task definition information.
+    DynamicTaskConfig taskConfig = new DynamicTaskConfig(
+        _manager.getHelixDataAccessor().getBaseDataAccessor()
+            .get(TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/" + command, null, 0));

Review comment:
       Does it make sense to define constant for Path separator "/"?  Not sure if it is defined anywhere else in helix




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #1169: Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #1169:
URL: https://github.com/apache/helix/pull/1169#discussion_r470086306



##########
File path: helix-core/src/test/java/org/apache/helix/task/TestDynamicTaskLoading.java
##########
@@ -0,0 +1,251 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestDynamicTaskLoading extends ZkTestBase {
+  private final String _clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
+  private static final String _instanceName = "localhost_12913";
+  private HelixManager _manager;
+  private MockParticipantManager _participant;
+  private ClusterControllerManager _controller;
+
+  @BeforeMethod
+  public void beforeMethod() throws Exception {
+    _gSetupTool.addCluster(_clusterName, true);
+
+    _manager = HelixManagerFactory
+        .getZKHelixManager(_clusterName, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    _gSetupTool.addInstanceToCluster(_clusterName, _instanceName);
+    _participant = new MockParticipantManager(ZK_ADDR, _clusterName, _instanceName);
+    StateModelFactory<StateModel> stateModelFactory =
+        new MasterSlaveStateModelFactory(_instanceName, 0);
+    StateMachineEngine stateMach = _participant.getStateMachineEngine();
+    stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+    _participant.getStateMachineEngine().registerStateModelFactory(TaskConstants.STATE_MODEL_NAME,
+        new TaskStateModelFactory(_participant, taskFactoryReg));
+    _participant.syncStart();
+
+    _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, null);
+    _controller.syncStart();
+  }
+
+  private void removePathIfExists(String path) {
+    if (_manager.getHelixDataAccessor().getBaseDataAccessor().exists(path, 0)) {
+      _manager.getHelixDataAccessor().getBaseDataAccessor().remove(path, 0);
+    }
+  }
+
+  /**
+   * Submit a workflow consisting of a job with a MockTask task.
+   * @param workflowName name of the workflow
+   * @param driver {@link TaskDriver} to submit workflowName to
+   */
+  private void submitWorkflow(String workflowName, TaskDriver driver) {
+    JobConfig.Builder job = new JobConfig.Builder();
+    job.setJobCommandConfigMap(Collections.singletonMap(MockTask.JOB_DELAY, "100"));
+    Workflow.Builder workflow = new Workflow.Builder(workflowName);
+    job.setWorkflow(workflowName);
+    TaskConfig taskConfig =
+        new TaskConfig(MockTask.TASK_COMMAND, new HashMap<String, String>(), null, null);
+    job.addTaskConfigMap(Collections.singletonMap(taskConfig.getId(), taskConfig));
+    job.setJobId(TaskUtil.getNamespacedJobName(workflowName, "JOB"));
+    workflow.addJob("JOB", job);
+    driver.start(workflow.build());
+  }
+
+  @Test
+  public void testDynamicTaskLoading() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.MockTask");
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig("Reindex", "src/test/resources/Reindex.jar", "1.0.0", taskClasses,
+            "com.mycompany.mocktask.MockTaskFactory");
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.COMPLETED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingJar() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.MockTask");
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig("Reindex", "src/test/resources/Random.jar", "1.0.0", taskClasses,
+            "com.mycompany.mocktask.MockTaskFactory");
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.FAILED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingTaskConfig() throws Exception {
+    // Remove task config ZNRecord if it exists.
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.FAILED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingTaskClass() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.RandomTask");
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig("Reindex", "src/test/resources/Reindex.jar", "1.0.0", taskClasses,
+            "com.mycompany.mocktask.MockTaskFactory");
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";

Review comment:
       Pull out strings.

##########
File path: helix-core/src/test/java/org/apache/helix/task/TestDynamicTaskLoading.java
##########
@@ -0,0 +1,251 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestDynamicTaskLoading extends ZkTestBase {
+  private final String _clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
+  private static final String _instanceName = "localhost_12913";
+  private HelixManager _manager;
+  private MockParticipantManager _participant;
+  private ClusterControllerManager _controller;
+
+  @BeforeMethod
+  public void beforeMethod() throws Exception {
+    _gSetupTool.addCluster(_clusterName, true);
+
+    _manager = HelixManagerFactory
+        .getZKHelixManager(_clusterName, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    _gSetupTool.addInstanceToCluster(_clusterName, _instanceName);
+    _participant = new MockParticipantManager(ZK_ADDR, _clusterName, _instanceName);
+    StateModelFactory<StateModel> stateModelFactory =
+        new MasterSlaveStateModelFactory(_instanceName, 0);
+    StateMachineEngine stateMach = _participant.getStateMachineEngine();
+    stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+    _participant.getStateMachineEngine().registerStateModelFactory(TaskConstants.STATE_MODEL_NAME,
+        new TaskStateModelFactory(_participant, taskFactoryReg));
+    _participant.syncStart();
+
+    _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, null);
+    _controller.syncStart();
+  }
+
+  private void removePathIfExists(String path) {
+    if (_manager.getHelixDataAccessor().getBaseDataAccessor().exists(path, 0)) {
+      _manager.getHelixDataAccessor().getBaseDataAccessor().remove(path, 0);
+    }
+  }
+
+  /**
+   * Submit a workflow consisting of a job with a MockTask task.
+   * @param workflowName name of the workflow
+   * @param driver {@link TaskDriver} to submit workflowName to
+   */
+  private void submitWorkflow(String workflowName, TaskDriver driver) {
+    JobConfig.Builder job = new JobConfig.Builder();
+    job.setJobCommandConfigMap(Collections.singletonMap(MockTask.JOB_DELAY, "100"));
+    Workflow.Builder workflow = new Workflow.Builder(workflowName);
+    job.setWorkflow(workflowName);
+    TaskConfig taskConfig =
+        new TaskConfig(MockTask.TASK_COMMAND, new HashMap<String, String>(), null, null);
+    job.addTaskConfigMap(Collections.singletonMap(taskConfig.getId(), taskConfig));
+    job.setJobId(TaskUtil.getNamespacedJobName(workflowName, "JOB"));
+    workflow.addJob("JOB", job);
+    driver.start(workflow.build());
+  }
+
+  @Test
+  public void testDynamicTaskLoading() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.MockTask");
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig("Reindex", "src/test/resources/Reindex.jar", "1.0.0", taskClasses,
+            "com.mycompany.mocktask.MockTaskFactory");
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.COMPLETED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingJar() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.MockTask");
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig("Reindex", "src/test/resources/Random.jar", "1.0.0", taskClasses,
+            "com.mycompany.mocktask.MockTaskFactory");
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.FAILED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingTaskConfig() throws Exception {
+    // Remove task config ZNRecord if it exists.
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.FAILED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingTaskClass() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.RandomTask");
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig("Reindex", "src/test/resources/Reindex.jar", "1.0.0", taskClasses,
+            "com.mycompany.mocktask.MockTaskFactory");
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.FAILED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());

Review comment:
       revise structure




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #1169: Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #1169:
URL: https://github.com/apache/helix/pull/1169#discussion_r470085695



##########
File path: helix-core/src/test/java/org/apache/helix/task/TestDynamicTaskLoading.java
##########
@@ -0,0 +1,251 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestDynamicTaskLoading extends ZkTestBase {
+  private final String _clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
+  private static final String _instanceName = "localhost_12913";
+  private HelixManager _manager;
+  private MockParticipantManager _participant;
+  private ClusterControllerManager _controller;
+
+  @BeforeMethod
+  public void beforeMethod() throws Exception {
+    _gSetupTool.addCluster(_clusterName, true);
+
+    _manager = HelixManagerFactory
+        .getZKHelixManager(_clusterName, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    _gSetupTool.addInstanceToCluster(_clusterName, _instanceName);
+    _participant = new MockParticipantManager(ZK_ADDR, _clusterName, _instanceName);
+    StateModelFactory<StateModel> stateModelFactory =
+        new MasterSlaveStateModelFactory(_instanceName, 0);
+    StateMachineEngine stateMach = _participant.getStateMachineEngine();
+    stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+    _participant.getStateMachineEngine().registerStateModelFactory(TaskConstants.STATE_MODEL_NAME,
+        new TaskStateModelFactory(_participant, taskFactoryReg));
+    _participant.syncStart();
+
+    _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, null);
+    _controller.syncStart();
+  }
+
+  private void removePathIfExists(String path) {
+    if (_manager.getHelixDataAccessor().getBaseDataAccessor().exists(path, 0)) {
+      _manager.getHelixDataAccessor().getBaseDataAccessor().remove(path, 0);
+    }
+  }
+
+  /**
+   * Submit a workflow consisting of a job with a MockTask task.
+   * @param workflowName name of the workflow
+   * @param driver {@link TaskDriver} to submit workflowName to
+   */
+  private void submitWorkflow(String workflowName, TaskDriver driver) {
+    JobConfig.Builder job = new JobConfig.Builder();
+    job.setJobCommandConfigMap(Collections.singletonMap(MockTask.JOB_DELAY, "100"));
+    Workflow.Builder workflow = new Workflow.Builder(workflowName);
+    job.setWorkflow(workflowName);
+    TaskConfig taskConfig =
+        new TaskConfig(MockTask.TASK_COMMAND, new HashMap<String, String>(), null, null);
+    job.addTaskConfigMap(Collections.singletonMap(taskConfig.getId(), taskConfig));
+    job.setJobId(TaskUtil.getNamespacedJobName(workflowName, "JOB"));
+    workflow.addJob("JOB", job);
+    driver.start(workflow.build());
+  }
+
+  @Test
+  public void testDynamicTaskLoading() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.MockTask");
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig("Reindex", "src/test/resources/Reindex.jar", "1.0.0", taskClasses,
+            "com.mycompany.mocktask.MockTaskFactory");
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.COMPLETED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingJar() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.MockTask");

Review comment:
       Pull out your strings into local variables with more descriptive names.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #1169: Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #1169:
URL: https://github.com/apache/helix/pull/1169#discussion_r470294913



##########
File path: helix-core/src/test/java/org/apache/helix/task/TestDynamicTaskLoading.java
##########
@@ -0,0 +1,251 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestDynamicTaskLoading extends ZkTestBase {
+  private final String _clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
+  private static final String _instanceName = "localhost_12913";
+  private HelixManager _manager;
+  private MockParticipantManager _participant;
+  private ClusterControllerManager _controller;
+
+  @BeforeMethod
+  public void beforeMethod() throws Exception {
+    _gSetupTool.addCluster(_clusterName, true);
+
+    _manager = HelixManagerFactory
+        .getZKHelixManager(_clusterName, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    _gSetupTool.addInstanceToCluster(_clusterName, _instanceName);
+    _participant = new MockParticipantManager(ZK_ADDR, _clusterName, _instanceName);
+    StateModelFactory<StateModel> stateModelFactory =
+        new MasterSlaveStateModelFactory(_instanceName, 0);
+    StateMachineEngine stateMach = _participant.getStateMachineEngine();
+    stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+    _participant.getStateMachineEngine().registerStateModelFactory(TaskConstants.STATE_MODEL_NAME,
+        new TaskStateModelFactory(_participant, taskFactoryReg));
+    _participant.syncStart();
+
+    _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, null);
+    _controller.syncStart();
+  }
+
+  private void removePathIfExists(String path) {
+    if (_manager.getHelixDataAccessor().getBaseDataAccessor().exists(path, 0)) {
+      _manager.getHelixDataAccessor().getBaseDataAccessor().remove(path, 0);
+    }
+  }
+
+  /**
+   * Submit a workflow consisting of a job with a MockTask task.
+   * @param workflowName name of the workflow
+   * @param driver {@link TaskDriver} to submit workflowName to
+   */
+  private void submitWorkflow(String workflowName, TaskDriver driver) {
+    JobConfig.Builder job = new JobConfig.Builder();
+    job.setJobCommandConfigMap(Collections.singletonMap(MockTask.JOB_DELAY, "100"));
+    Workflow.Builder workflow = new Workflow.Builder(workflowName);
+    job.setWorkflow(workflowName);
+    TaskConfig taskConfig =
+        new TaskConfig(MockTask.TASK_COMMAND, new HashMap<String, String>(), null, null);
+    job.addTaskConfigMap(Collections.singletonMap(taskConfig.getId(), taskConfig));
+    job.setJobId(TaskUtil.getNamespacedJobName(workflowName, "JOB"));
+    workflow.addJob("JOB", job);
+    driver.start(workflow.build());
+  }
+
+  @Test
+  public void testDynamicTaskLoading() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.MockTask");
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig("Reindex", "src/test/resources/Reindex.jar", "1.0.0", taskClasses,
+            "com.mycompany.mocktask.MockTaskFactory");
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.COMPLETED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingJar() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.MockTask");
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig("Reindex", "src/test/resources/Random.jar", "1.0.0", taskClasses,
+            "com.mycompany.mocktask.MockTaskFactory");
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.FAILED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingTaskConfig() throws Exception {
+    // Remove task config ZNRecord if it exists.
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.FAILED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingTaskClass() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.RandomTask");
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig("Reindex", "src/test/resources/Reindex.jar", "1.0.0", taskClasses,
+            "com.mycompany.mocktask.MockTaskFactory");
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.FAILED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingTaskFactory() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.MockTask");
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig("Reindex", "src/test/resources/Reindex.jar", "1.0.0", taskClasses,
+            "com.mycompany.mocktask.RandomTaskFactory");
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.FAILED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }
+  }
+
+  @AfterMethod
+  public void afterMethod() {

Review comment:
       That's fair. Could you make sure you document the reason in your before method/after method ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] rabashizade commented on a change in pull request #1169: Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
rabashizade commented on a change in pull request #1169:
URL: https://github.com/apache/helix/pull/1169#discussion_r470312649



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
##########
@@ -283,6 +288,75 @@ public void reset() {
     }
   }
 
+  /**
+   * Loads className using classLoader
+   * @param classLoader
+   * @param className
+   * @return Class className loaded by classLoader
+   */
+  private Class loadClass(URLClassLoader classLoader, String className) {
+    try {
+      return classLoader.loadClass(className);
+    } catch (ClassNotFoundException e) {
+      LOG.error("Failed to load Task class " + className + " for new task in instance " + _manager
+          .getInstanceName() + " in cluster " + _manager.getClusterName() + ".");
+      throw new IllegalStateException("Class not found for task");

Review comment:
       Sorry, missed this one in the previous fix.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #1169: Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #1169:
URL: https://github.com/apache/helix/pull/1169#discussion_r470082841



##########
File path: helix-core/src/test/java/org/apache/helix/task/TestDynamicTaskLoading.java
##########
@@ -0,0 +1,251 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestDynamicTaskLoading extends ZkTestBase {
+  private final String _clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
+  private static final String _instanceName = "localhost_12913";
+  private HelixManager _manager;
+  private MockParticipantManager _participant;
+  private ClusterControllerManager _controller;
+
+  @BeforeMethod
+  public void beforeMethod() throws Exception {
+    _gSetupTool.addCluster(_clusterName, true);
+
+    _manager = HelixManagerFactory
+        .getZKHelixManager(_clusterName, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    _gSetupTool.addInstanceToCluster(_clusterName, _instanceName);
+    _participant = new MockParticipantManager(ZK_ADDR, _clusterName, _instanceName);
+    StateModelFactory<StateModel> stateModelFactory =
+        new MasterSlaveStateModelFactory(_instanceName, 0);

Review comment:
       Is 0 a magic number?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] alirezazamani commented on a change in pull request #1169: [HELIX-1168] Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
alirezazamani commented on a change in pull request #1169:
URL: https://github.com/apache/helix/pull/1169#discussion_r461167311



##########
File path: helix-core/src/test/java/org/apache/helix/task/TestDynamicTaskLoading.java
##########
@@ -0,0 +1,123 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.zookeeper.CreateMode;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestDynamicTaskLoading extends ZkTestBase {
+  HelixManager _manager;
+  MockParticipantManager _participant;
+  ClusterControllerManager _controller;
+  String _clusterName;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    super.beforeClass();
+    _clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
+    _gSetupTool.addCluster(_clusterName, true);
+
+    _manager = HelixManagerFactory.getZKHelixManager(_clusterName, "Admin",
+        InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    String instanceName = "localhost_12913";
+    _gSetupTool.addInstanceToCluster(_clusterName, instanceName);
+    _participant = new MockParticipantManager(ZK_ADDR, _clusterName, instanceName);
+    StateModelFactory<StateModel> stateModelFactory = new MasterSlaveStateModelFactory(instanceName, 0);
+    StateMachineEngine stateMach = _participant.getStateMachineEngine();
+    stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+    _participant.getStateMachineEngine().registerStateModelFactory(TaskConstants.STATE_MODEL_NAME,
+        new TaskStateModelFactory(_participant, taskFactoryReg));
+    _participant.syncStart();
+
+    _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, null);
+    _controller.syncStart();
+
+    // Add task definition information as a ZNRecord.
+    ZNRecord configZnRecord = new ZNRecord(_participant.getInstanceName());
+    configZnRecord.setSimpleField("JAR_FILE", "src/test/resources/Reindex.jar");
+    configZnRecord.setSimpleField("VERSION", "1.0.0");
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.MockTask");
+    configZnRecord.setListField("TASK_CLASSES", taskClasses);
+    configZnRecord.setSimpleField("TASKFACTORY", "com.mycompany.mocktask.MockTaskFactory");
+    String path = String.format("/%s/%s", _clusterName, "TASK_DEFINITION");
+    _participant.getZkClient().create(path, configZnRecord, CreateMode.PERSISTENT);
+  }
+
+  @Test
+  public void testDynamicTaskLoading() throws Exception {
+    String workflowName = TestHelper.getTestMethodName();
+    TaskDriver driver = new TaskDriver(_manager);
+    JobConfig.Builder job = new JobConfig.Builder();
+    job.setJobCommandConfigMap(Collections.singletonMap(MockTask.JOB_DELAY, "100"));
+    Workflow.Builder workflow = new Workflow.Builder(workflowName);
+    job.setWorkflow(workflowName);
+    TaskConfig taskConfig =
+        new TaskConfig(MockTask.TASK_COMMAND, new HashMap<String, String>(), null, null);
+    job.addTaskConfigMap(Collections.singletonMap(taskConfig.getId(), taskConfig));
+    job.setJobId(TaskUtil.getNamespacedJobName(workflowName, "JOB"));
+    workflow.addJob("JOB", job);
+    driver.start(workflow.build());
+
+    Thread.sleep(2000);

Review comment:
       Thanks for the PR. Good job ;-)
   Is there any way to make sure the functionality of this PR? Do we need to have some sort of assert/statement to make sure it works?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #1169: Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #1169:
URL: https://github.com/apache/helix/pull/1169#discussion_r470080120



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
##########
@@ -283,6 +287,68 @@ public void reset() {
     }
   }
 
+  /**
+   * Loads className using classLoader
+   * @param classLoader
+   * @param className
+   * @return Class className loaded by classLoader
+   */
+  private Class loadClass(URLClassLoader classLoader, String className) {
+    try {
+      return classLoader.loadClass(className);
+    } catch (ClassNotFoundException e) {
+      LOG.error("Failed to load Task class " + className + " for new task in instance " + _manager
+          .getInstanceName() + " in cluster " + _manager.getClusterName() + ".");
+      throw new IllegalStateException("Class not found for task");
+    }
+  }
+
+  /**
+   * Loads Task and TaskFactory classes for command input from
+   * a JAR file, and registers the TaskFactory in _taskFactoryRegistry.
+   * @param command The command indicating what task to be loaded
+   */
+  private void loadNewTask(String command) {
+    // If the path for dynamic tasks doesn't exist, skip loading the task
+    if (!_manager.getHelixDataAccessor().getBaseDataAccessor()
+        .exists(TaskConstants.DYNAMICALLY_LOADED_TASK_PATH, 0)) {
+      return;
+    }
+
+    // Read DynamicTaskConfig containing task definition information.
+    DynamicTaskConfig taskConfig = new DynamicTaskConfig(
+        _manager.getHelixDataAccessor().getBaseDataAccessor()
+            .get(TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/" + command, null, 0));

Review comment:
       Instead of 0 (a magic number), please use a static constant that's already defined in the library.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] rabashizade commented on a change in pull request #1169: Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
rabashizade commented on a change in pull request #1169:
URL: https://github.com/apache/helix/pull/1169#discussion_r470269287



##########
File path: helix-core/src/test/java/org/apache/helix/task/TestDynamicTaskLoading.java
##########
@@ -0,0 +1,251 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestDynamicTaskLoading extends ZkTestBase {
+  private final String _clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
+  private static final String _instanceName = "localhost_12913";
+  private HelixManager _manager;
+  private MockParticipantManager _participant;
+  private ClusterControllerManager _controller;
+
+  @BeforeMethod
+  public void beforeMethod() throws Exception {
+    _gSetupTool.addCluster(_clusterName, true);
+
+    _manager = HelixManagerFactory
+        .getZKHelixManager(_clusterName, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    _gSetupTool.addInstanceToCluster(_clusterName, _instanceName);
+    _participant = new MockParticipantManager(ZK_ADDR, _clusterName, _instanceName);
+    StateModelFactory<StateModel> stateModelFactory =
+        new MasterSlaveStateModelFactory(_instanceName, 0);
+    StateMachineEngine stateMach = _participant.getStateMachineEngine();
+    stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+    _participant.getStateMachineEngine().registerStateModelFactory(TaskConstants.STATE_MODEL_NAME,
+        new TaskStateModelFactory(_participant, taskFactoryReg));
+    _participant.syncStart();
+
+    _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, null);
+    _controller.syncStart();
+  }
+
+  private void removePathIfExists(String path) {
+    if (_manager.getHelixDataAccessor().getBaseDataAccessor().exists(path, 0)) {
+      _manager.getHelixDataAccessor().getBaseDataAccessor().remove(path, 0);
+    }
+  }
+
+  /**
+   * Submit a workflow consisting of a job with a MockTask task.
+   * @param workflowName name of the workflow
+   * @param driver {@link TaskDriver} to submit workflowName to
+   */
+  private void submitWorkflow(String workflowName, TaskDriver driver) {
+    JobConfig.Builder job = new JobConfig.Builder();
+    job.setJobCommandConfigMap(Collections.singletonMap(MockTask.JOB_DELAY, "100"));
+    Workflow.Builder workflow = new Workflow.Builder(workflowName);
+    job.setWorkflow(workflowName);
+    TaskConfig taskConfig =
+        new TaskConfig(MockTask.TASK_COMMAND, new HashMap<String, String>(), null, null);
+    job.addTaskConfigMap(Collections.singletonMap(taskConfig.getId(), taskConfig));
+    job.setJobId(TaskUtil.getNamespacedJobName(workflowName, "JOB"));
+    workflow.addJob("JOB", job);
+    driver.start(workflow.build());
+  }
+
+  @Test
+  public void testDynamicTaskLoading() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.MockTask");
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig("Reindex", "src/test/resources/Reindex.jar", "1.0.0", taskClasses,
+            "com.mycompany.mocktask.MockTaskFactory");
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.COMPLETED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingJar() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.MockTask");
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig("Reindex", "src/test/resources/Random.jar", "1.0.0", taskClasses,
+            "com.mycompany.mocktask.MockTaskFactory");
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.FAILED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingTaskConfig() throws Exception {
+    // Remove task config ZNRecord if it exists.
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.FAILED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingTaskClass() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.RandomTask");
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig("Reindex", "src/test/resources/Reindex.jar", "1.0.0", taskClasses,
+            "com.mycompany.mocktask.MockTaskFactory");
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.FAILED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingTaskFactory() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.MockTask");
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig("Reindex", "src/test/resources/Reindex.jar", "1.0.0", taskClasses,
+            "com.mycompany.mocktask.RandomTaskFactory");
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.FAILED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }
+  }
+
+  @AfterMethod
+  public void afterMethod() {

Review comment:
       Regarding this and BeforeMethod vs. BeforeClass, I wanted to make sure no dynamically loaded class remains loaded between the tests. I tried with BeforeClass and AfterClass, and all tests would result in TaskState.COMPLETED whereas only one should complete and the others should fail.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #1169: Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #1169:
URL: https://github.com/apache/helix/pull/1169#discussion_r470085945



##########
File path: helix-core/src/test/java/org/apache/helix/task/TestDynamicTaskLoading.java
##########
@@ -0,0 +1,251 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestDynamicTaskLoading extends ZkTestBase {
+  private final String _clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
+  private static final String _instanceName = "localhost_12913";
+  private HelixManager _manager;
+  private MockParticipantManager _participant;
+  private ClusterControllerManager _controller;
+
+  @BeforeMethod
+  public void beforeMethod() throws Exception {
+    _gSetupTool.addCluster(_clusterName, true);
+
+    _manager = HelixManagerFactory
+        .getZKHelixManager(_clusterName, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    _gSetupTool.addInstanceToCluster(_clusterName, _instanceName);
+    _participant = new MockParticipantManager(ZK_ADDR, _clusterName, _instanceName);
+    StateModelFactory<StateModel> stateModelFactory =
+        new MasterSlaveStateModelFactory(_instanceName, 0);
+    StateMachineEngine stateMach = _participant.getStateMachineEngine();
+    stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+    _participant.getStateMachineEngine().registerStateModelFactory(TaskConstants.STATE_MODEL_NAME,
+        new TaskStateModelFactory(_participant, taskFactoryReg));
+    _participant.syncStart();
+
+    _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, null);
+    _controller.syncStart();
+  }
+
+  private void removePathIfExists(String path) {
+    if (_manager.getHelixDataAccessor().getBaseDataAccessor().exists(path, 0)) {
+      _manager.getHelixDataAccessor().getBaseDataAccessor().remove(path, 0);
+    }
+  }
+
+  /**
+   * Submit a workflow consisting of a job with a MockTask task.
+   * @param workflowName name of the workflow
+   * @param driver {@link TaskDriver} to submit workflowName to
+   */
+  private void submitWorkflow(String workflowName, TaskDriver driver) {
+    JobConfig.Builder job = new JobConfig.Builder();
+    job.setJobCommandConfigMap(Collections.singletonMap(MockTask.JOB_DELAY, "100"));
+    Workflow.Builder workflow = new Workflow.Builder(workflowName);
+    job.setWorkflow(workflowName);
+    TaskConfig taskConfig =
+        new TaskConfig(MockTask.TASK_COMMAND, new HashMap<String, String>(), null, null);
+    job.addTaskConfigMap(Collections.singletonMap(taskConfig.getId(), taskConfig));
+    job.setJobId(TaskUtil.getNamespacedJobName(workflowName, "JOB"));
+    workflow.addJob("JOB", job);
+    driver.start(workflow.build());
+  }
+
+  @Test
+  public void testDynamicTaskLoading() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.MockTask");
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig("Reindex", "src/test/resources/Reindex.jar", "1.0.0", taskClasses,
+            "com.mycompany.mocktask.MockTaskFactory");
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.COMPLETED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDynamicTaskLoadingNonexistingJar() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.MockTask");
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig("Reindex", "src/test/resources/Random.jar", "1.0.0", taskClasses,
+            "com.mycompany.mocktask.MockTaskFactory");
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/Reindex";
+    removePathIfExists(path);
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    try {
+      // Wait for the workflow to either complete or fail.
+      TaskState finalState =
+          driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+      AssertJUnit.assertEquals(finalState, TaskState.FAILED);
+    } catch (HelixException e) {
+      AssertJUnit.fail(e.getMessage());

Review comment:
       Revise the structure as commented above, and replace AssertJUnit with Assert?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #1169: Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #1169:
URL: https://github.com/apache/helix/pull/1169#discussion_r470083017



##########
File path: helix-core/src/test/java/org/apache/helix/task/TestDynamicTaskLoading.java
##########
@@ -0,0 +1,251 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestDynamicTaskLoading extends ZkTestBase {
+  private final String _clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
+  private static final String _instanceName = "localhost_12913";
+  private HelixManager _manager;
+  private MockParticipantManager _participant;
+  private ClusterControllerManager _controller;
+
+  @BeforeMethod
+  public void beforeMethod() throws Exception {
+    _gSetupTool.addCluster(_clusterName, true);
+
+    _manager = HelixManagerFactory
+        .getZKHelixManager(_clusterName, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    _gSetupTool.addInstanceToCluster(_clusterName, _instanceName);
+    _participant = new MockParticipantManager(ZK_ADDR, _clusterName, _instanceName);
+    StateModelFactory<StateModel> stateModelFactory =
+        new MasterSlaveStateModelFactory(_instanceName, 0);
+    StateMachineEngine stateMach = _participant.getStateMachineEngine();
+    stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);

Review comment:
       Don't we already have a static constant defined for "MasterSlave"?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #1169: Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #1169:
URL: https://github.com/apache/helix/pull/1169#discussion_r470083999



##########
File path: helix-core/src/test/java/org/apache/helix/task/TestDynamicTaskLoading.java
##########
@@ -0,0 +1,251 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestDynamicTaskLoading extends ZkTestBase {
+  private final String _clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
+  private static final String _instanceName = "localhost_12913";
+  private HelixManager _manager;
+  private MockParticipantManager _participant;
+  private ClusterControllerManager _controller;
+
+  @BeforeMethod
+  public void beforeMethod() throws Exception {
+    _gSetupTool.addCluster(_clusterName, true);
+
+    _manager = HelixManagerFactory
+        .getZKHelixManager(_clusterName, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    _gSetupTool.addInstanceToCluster(_clusterName, _instanceName);
+    _participant = new MockParticipantManager(ZK_ADDR, _clusterName, _instanceName);
+    StateModelFactory<StateModel> stateModelFactory =
+        new MasterSlaveStateModelFactory(_instanceName, 0);
+    StateMachineEngine stateMach = _participant.getStateMachineEngine();
+    stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+    _participant.getStateMachineEngine().registerStateModelFactory(TaskConstants.STATE_MODEL_NAME,
+        new TaskStateModelFactory(_participant, taskFactoryReg));
+    _participant.syncStart();
+
+    _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, null);
+    _controller.syncStart();
+  }
+
+  private void removePathIfExists(String path) {
+    if (_manager.getHelixDataAccessor().getBaseDataAccessor().exists(path, 0)) {
+      _manager.getHelixDataAccessor().getBaseDataAccessor().remove(path, 0);
+    }
+  }
+
+  /**
+   * Submit a workflow consisting of a job with a MockTask task.
+   * @param workflowName name of the workflow
+   * @param driver {@link TaskDriver} to submit workflowName to
+   */
+  private void submitWorkflow(String workflowName, TaskDriver driver) {
+    JobConfig.Builder job = new JobConfig.Builder();
+    job.setJobCommandConfigMap(Collections.singletonMap(MockTask.JOB_DELAY, "100"));
+    Workflow.Builder workflow = new Workflow.Builder(workflowName);
+    job.setWorkflow(workflowName);
+    TaskConfig taskConfig =
+        new TaskConfig(MockTask.TASK_COMMAND, new HashMap<String, String>(), null, null);
+    job.addTaskConfigMap(Collections.singletonMap(taskConfig.getId(), taskConfig));
+    job.setJobId(TaskUtil.getNamespacedJobName(workflowName, "JOB"));
+    workflow.addJob("JOB", job);
+    driver.start(workflow.build());
+  }
+
+  @Test
+  public void testDynamicTaskLoading() throws Exception {
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList<String>();
+    taskClasses.add("com.mycompany.mocktask.MockTask");
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig("Reindex", "src/test/resources/Reindex.jar", "1.0.0", taskClasses,
+            "com.mycompany.mocktask.MockTaskFactory");

Review comment:
       Pull out your strings and name them as a local variable - for example,
   
   String fullyQualifiedTaskClassName = "com.mycompany.mocktask.MockTask"; 
   ...
   
   for readability.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #1169: Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #1169:
URL: https://github.com/apache/helix/pull/1169#discussion_r470081129



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
##########
@@ -283,6 +287,68 @@ public void reset() {
     }
   }
 
+  /**
+   * Loads className using classLoader
+   * @param classLoader
+   * @param className
+   * @return Class className loaded by classLoader
+   */
+  private Class loadClass(URLClassLoader classLoader, String className) {
+    try {
+      return classLoader.loadClass(className);
+    } catch (ClassNotFoundException e) {
+      LOG.error("Failed to load Task class " + className + " for new task in instance " + _manager
+          .getInstanceName() + " in cluster " + _manager.getClusterName() + ".");
+      throw new IllegalStateException("Class not found for task");
+    }
+  }
+
+  /**
+   * Loads Task and TaskFactory classes for command input from
+   * a JAR file, and registers the TaskFactory in _taskFactoryRegistry.
+   * @param command The command indicating what task to be loaded
+   */
+  private void loadNewTask(String command) {
+    // If the path for dynamic tasks doesn't exist, skip loading the task
+    if (!_manager.getHelixDataAccessor().getBaseDataAccessor()
+        .exists(TaskConstants.DYNAMICALLY_LOADED_TASK_PATH, 0)) {
+      return;
+    }
+
+    // Read DynamicTaskConfig containing task definition information.
+    DynamicTaskConfig taskConfig = new DynamicTaskConfig(
+        _manager.getHelixDataAccessor().getBaseDataAccessor()
+            .get(TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/" + command, null, 0));
+    if (taskConfig.getTaskConfigZNRecord() == null) {
+      LOG.error("Failed to read ZNRecord for task " + command + " for instance " + _manager
+          .getInstanceName() + " in cluster " + _manager.getClusterName() + ".");
+      throw new IllegalArgumentException("No ZNRecord for task " + command);
+    }
+
+    // Open the JAR file containing Task(s) and TaskFactory classes.
+    JarLoader jarLoader = new LocalJarLoader();
+    URL taskJarUrl = jarLoader.loadJar(taskConfig.getJarFilePath());
+
+    // Import Task(s) class(es).
+    URLClassLoader classLoader = URLClassLoader.newInstance(new URL[]{taskJarUrl});
+    for (String taskClass : taskConfig.getTaskClassesFqns()) {
+      loadClass(classLoader, taskClass);
+    }
+
+    try {
+      // Import and instantiate TaskFactory class.
+      TaskFactory taskFactory =
+          (TaskFactory) loadClass(classLoader, taskConfig.getTaskFactoryFqn()).newInstance();
+
+      // Register the TaskFactory.
+      _taskFactoryRegistry.put(command, taskFactory);
+    } catch (InstantiationException | IllegalAccessException e) {
+      LOG.error("Failed to instantiate TaskFactory class for new task in instance " + _manager
+          .getInstanceName() + " in cluster " + _manager.getClusterName() + ".");
+      throw new IllegalStateException("Failed to instantiate TaskFactory for task");

Review comment:
       Make the error message uniform for logging and the exception for consistency?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] rabashizade commented on a change in pull request #1169: [HELIX-1168] Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
rabashizade commented on a change in pull request #1169:
URL: https://github.com/apache/helix/pull/1169#discussion_r460284544



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
##########
@@ -283,6 +289,36 @@ public void reset() {
     }
   }
 
+  /**
+   * Loads Task and TaskFactory classes for command input
+   */
+  private boolean loadNewTask(String command) {
+    try {
+      // Read ZNRecord containing task definition information.
+      ZNRecord taskConfig = _manager.getHelixDataAccessor().getBaseDataAccessor()

Review comment:
       Thank you, Molly. Good catch! I think I should add the appropriate option to throw exception and then handle it.
   ```suggestion
         ZNRecord taskConfig = _manager.getHelixDataAccessor().getBaseDataAccessor()
             .get("/" + _manager.getClusterName() + "/TASK_DEFINITION",
                 null, AccessOption.THROW_EXCEPTION_IFNOTEXIST);
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #1169: Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #1169:
URL: https://github.com/apache/helix/pull/1169#discussion_r470295243



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
##########
@@ -283,6 +288,75 @@ public void reset() {
     }
   }
 
+  /**
+   * Loads className using classLoader
+   * @param classLoader
+   * @param className
+   * @return Class className loaded by classLoader
+   */
+  private Class loadClass(URLClassLoader classLoader, String className) {
+    try {
+      return classLoader.loadClass(className);
+    } catch (ClassNotFoundException e) {
+      LOG.error("Failed to load Task class " + className + " for new task in instance " + _manager
+          .getInstanceName() + " in cluster " + _manager.getClusterName() + ".");
+      throw new IllegalStateException("Class not found for task");

Review comment:
       Let's make sure error messages are consistent.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #1169: Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #1169:
URL: https://github.com/apache/helix/pull/1169#discussion_r470082010



##########
File path: helix-core/src/test/java/org/apache/helix/task/TestDynamicTaskLoading.java
##########
@@ -0,0 +1,251 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestDynamicTaskLoading extends ZkTestBase {
+  private final String _clusterName = CLUSTER_PREFIX + "_" + getShortClassName();

Review comment:
       For constants, use private static final, and the variable name would be all in caps.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] dasahcc commented on a change in pull request #1169: Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
dasahcc commented on a change in pull request #1169:
URL: https://github.com/apache/helix/pull/1169#discussion_r463818279



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
##########
@@ -283,6 +297,63 @@ public void reset() {
     }
   }
 
+  /**
+   * Loads className using classLoader
+   * @param classLoader
+   * @param className
+   * @return Class className loaded by classLoader
+   */
+  private Class loadClass(URLClassLoader classLoader, String className) {
+    try {
+      return classLoader.loadClass(className);
+    } catch (ClassNotFoundException e) {
+      LOG.error("Failed to load Task class " + className + " for new task in instance " + _manager
+          .getInstanceName() + " in cluster " + _manager.getClusterName() + ".");
+      throw new IllegalStateException("Null TaskFactory for task");
+    }
+  }
+
+  /**
+   * Loads Task and TaskFactory classes for command input from
+   * a JAR file, and registers the TaskFactory in _taskFactoryRegistry.
+   * @param command The command indicating what task to be loaded
+   */
+  private void loadNewTask(String command) {
+    // Read ZNRecord containing task definition information.
+    ZNRecord taskConfig = _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .get(TASK_PATH + "/" + command, null, 0);

Review comment:
       I would suggestion to make this feature optional. If there is no folder created for "TASK_DEFINITON", let's skip the steps. Otherwise, it could bring potential risks.

##########
File path: helix-core/src/main/java/org/apache/helix/task/JarLoader.java
##########
@@ -0,0 +1,34 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.net.URL;
+
+/**
+ * The interface that is to be implemented by a specific JAR loader.
+ */
+public interface JarLoader {

Review comment:
       Let's start with this PR. You can create an API folder under task. Move the interface to API module.

##########
File path: helix-core/src/main/java/org/apache/helix/task/LocalJarLoader.java
##########
@@ -0,0 +1,48 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LocalJarLoader implements JarLoader {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalJarLoader.class);
+
+  @Override
+  public URL openJar(String jar) {
+    File taskJar;
+    try {
+      taskJar = new File(jar);
+      if (taskJar.exists() && !taskJar.isDirectory()) {
+        return taskJar.toURI().toURL();
+      } else {

Review comment:
       else clause is not necessary. You can directly put 
   
   LOG.error("Failed to find JAR " + jar + " for new task.");
           throw new IllegalStateException("No JAR for task");
   
   Because if the return is not happening, it will reaches the exception part.

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
##########
@@ -43,6 +46,11 @@
   private ScheduledFuture timeout_task;
   private TaskRunner _taskRunner;
   private final ScheduledExecutorService _timeoutTaskExecutor;
+  public static final String TASK_JAR_FILE_KEY = "JAR_FILE";
+  public static final String TASK_VERSION_KEY = "VERSION";
+  public static final String TASK_CLASSES_KEY = "TASK_CLASSES";
+  public static final String TASK_FACTORY_KEY = "TASKFACTORY";
+  public static final String TASK_PATH = "/TASK_DEFINITION";

Review comment:
       Let's move these two TaskConstants class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #1169: Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #1169:
URL: https://github.com/apache/helix/pull/1169#discussion_r470082631



##########
File path: helix-core/src/test/java/org/apache/helix/task/TestDynamicTaskLoading.java
##########
@@ -0,0 +1,251 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestDynamicTaskLoading extends ZkTestBase {
+  private final String _clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
+  private static final String _instanceName = "localhost_12913";
+  private HelixManager _manager;
+  private MockParticipantManager _participant;
+  private ClusterControllerManager _controller;
+
+  @BeforeMethod
+  public void beforeMethod() throws Exception {

Review comment:
       Do you really want a `beforeMethod` or a `beforeClass`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on pull request #1169: Dynamically Load Tasks in Task Framework

Posted by GitBox <gi...@apache.org>.
narendly commented on pull request #1169:
URL: https://github.com/apache/helix/pull/1169#issuecomment-673091202


   @rabashizade Please update your description with the up-to-date results? Your change seems to be making a lot of tests fail.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org