You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/08/14 21:40:36 UTC

[helix] branch dynamically-loaded-task updated: Demo for Dynamically Loading Tasks in Task Framework (#1270)

This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch dynamically-loaded-task
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/dynamically-loaded-task by this push:
     new 83d54de  Demo for Dynamically Loading Tasks in Task Framework (#1270)
83d54de is described below

commit 83d54de871522e3240a67fc6c31a886252188461
Author: rabashizade <67...@users.noreply.github.com>
AuthorDate: Fri Aug 14 17:40:24 2020 -0400

    Demo for Dynamically Loading Tasks in Task Framework (#1270)
    
    Adds a demo script and the JAR file it uses for dynamically loading
    tasks.
---
 demo/dynamic-task-loading/README                   |  27 ++++
 demo/dynamic-task-loading/demo-task/pom.xml        |  86 ++++++++++++
 .../main/java/com/mycompany/demotask/DemoTask.java |  45 +++++++
 .../com/mycompany/demotask/DemoTaskFactory.java    |  39 ++++++
 .../helix/task/TestDemoDynamicTaskLoading.java     | 144 +++++++++++++++++++++
 helix-core/src/test/resources/demo-task-1.0.jar    | Bin 0 -> 3699 bytes
 6 files changed, 341 insertions(+)

diff --git a/demo/dynamic-task-loading/README b/demo/dynamic-task-loading/README
new file mode 100644
index 0000000..918a2e6
--- /dev/null
+++ b/demo/dynamic-task-loading/README
@@ -0,0 +1,27 @@
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+
+This directory contains the demo-task that is used in
+org.apache.helix.task.TestDemoDynamicTaskLoading. The
+resulting JAR file is copied to "src/test/resources".
+You can either change the path in the test script to point to
+the JAR file in this directory, or copy the JAR file to
+"src/test/resources" if you want to rebuild the project. After
+this step, simply run the test in
+org.apache.helix.task.TestDemoDynamicTaskLoading for the demo.
diff --git a/demo/dynamic-task-loading/demo-task/pom.xml b/demo/dynamic-task-loading/demo-task/pom.xml
new file mode 100644
index 0000000..02c4a40
--- /dev/null
+++ b/demo/dynamic-task-loading/demo-task/pom.xml
@@ -0,0 +1,86 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <groupId>com.mycompany.demotask</groupId>
+  <artifactId>demo-task</artifactId>
+  <version>1.0</version>
+
+  <name>demo-task</name>
+  <!-- FIXME change it to the project's website -->
+  <url>http://www.example.com</url>
+
+  <properties>
+    <osgi.import>
+      org.apache.helix.task*;version="[0.9,4)",
+      org.apache.helix*;version="[0.9,4)",
+      org.codehaus.jackson*;version="[1.8,2)",
+      *
+    </osgi.import>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-mapper-asl</artifactId>
+      <version>1.8.5</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.helix</groupId>
+      <artifactId>helix-core</artifactId>
+      <version>0.9.4</version>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
+      <plugins>
+        <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
+        <plugin>
+          <artifactId>maven-clean-plugin</artifactId>
+          <version>3.1.0</version>
+        </plugin>
+        <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
+        <plugin>
+          <artifactId>maven-resources-plugin</artifactId>
+          <version>3.0.2</version>
+        </plugin>
+        <plugin>
+          <artifactId>maven-compiler-plugin</artifactId>
+          <version>3.8.0</version>
+          <configuration>
+            <source>1.8</source>
+            <target>1.8</target>
+          </configuration>
+        </plugin>
+        <plugin>
+          <artifactId>maven-surefire-plugin</artifactId>
+          <version>2.22.1</version>
+        </plugin>
+        <plugin>
+          <artifactId>maven-jar-plugin</artifactId>
+          <version>3.0.2</version>
+        </plugin>
+        <plugin>
+          <artifactId>maven-install-plugin</artifactId>
+          <version>2.5.2</version>
+        </plugin>
+        <plugin>
+          <artifactId>maven-deploy-plugin</artifactId>
+          <version>2.8.2</version>
+        </plugin>
+        <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
+        <plugin>
+          <artifactId>maven-site-plugin</artifactId>
+          <version>3.7.1</version>
+        </plugin>
+        <plugin>
+          <artifactId>maven-project-info-reports-plugin</artifactId>
+          <version>3.0.0</version>
+        </plugin>
+      </plugins>
+    </pluginManagement>
+  </build>
+</project>
diff --git a/demo/dynamic-task-loading/demo-task/src/main/java/com/mycompany/demotask/DemoTask.java b/demo/dynamic-task-loading/demo-task/src/main/java/com/mycompany/demotask/DemoTask.java
new file mode 100644
index 0000000..08fc9c6
--- /dev/null
+++ b/demo/dynamic-task-loading/demo-task/src/main/java/com/mycompany/demotask/DemoTask.java
@@ -0,0 +1,45 @@
+package com.mycompany.demotask;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.task.*;
+
+
+public class DemoTask extends UserContentStore implements Task {
+  public DemoTask(TaskCallbackContext context) {
+
+  }
+
+  @Override
+  public TaskResult run() {
+    System.err.println("\n\n\n");
+    System.err.println("**********************");
+    System.err.println("* DemoTask executed! *");
+    System.err.println("**********************");
+    System.err.println("\n\n\n");
+    return new TaskResult(TaskResult.Status.COMPLETED,
+        "Successfully finished task!");
+  }
+
+  @Override
+  public void cancel() {
+
+  }
+}
diff --git a/demo/dynamic-task-loading/demo-task/src/main/java/com/mycompany/demotask/DemoTaskFactory.java b/demo/dynamic-task-loading/demo-task/src/main/java/com/mycompany/demotask/DemoTaskFactory.java
new file mode 100644
index 0000000..a68b604
--- /dev/null
+++ b/demo/dynamic-task-loading/demo-task/src/main/java/com/mycompany/demotask/DemoTaskFactory.java
@@ -0,0 +1,39 @@
+package com.mycompany.demotask;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskFactory;
+
+/**
+ * A factory for {@link Task} objects.
+ */
+public class DemoTaskFactory implements TaskFactory {
+  /**
+   * Returns a {@link Task} instance.
+   * @param context Contextual information for the task, including task and job configurations
+   * @return A {@link Task} instance.
+   */
+  @Override
+  public Task createNewTask(TaskCallbackContext context) {
+    return new DemoTask(context);
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestDemoDynamicTaskLoading.java b/helix-core/src/test/java/org/apache/helix/task/TestDemoDynamicTaskLoading.java
new file mode 100644
index 0000000..66100ad
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/task/TestDemoDynamicTaskLoading.java
@@ -0,0 +1,144 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Demo script for dynamically loading tasks.
+ */
+public class TestDemoDynamicTaskLoading extends ZkTestBase {
+  private static final String CLUSTER_NAME = CLUSTER_PREFIX + "_TestDynamicTaskLoading";
+  private static final String INSTANCE_NAME = "localhost_12913";
+  private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
+  private static final String TASK_COMMAND = "DemoTask";
+  private HelixManager _manager;
+  private MockParticipantManager _participant;
+  private ClusterControllerManager _controller;
+
+  @BeforeMethod
+  public void beforeMethod() throws Exception {
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+
+    _manager = HelixManagerFactory
+        .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    _gSetupTool.addInstanceToCluster(CLUSTER_NAME, INSTANCE_NAME);
+    _participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, INSTANCE_NAME);
+    StateModelFactory<StateModel> stateModelFactory =
+        new MasterSlaveStateModelFactory(INSTANCE_NAME);
+    StateMachineEngine stateMach = _participant.getStateMachineEngine();
+    stateMach.registerStateModelFactory(MASTER_SLAVE_STATE_MODEL, stateModelFactory);
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+    _participant.getStateMachineEngine().registerStateModelFactory(TaskConstants.STATE_MODEL_NAME,
+        new TaskStateModelFactory(_participant, taskFactoryReg));
+    _participant.syncStart();
+
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, null);
+    _controller.syncStart();
+  }
+
+  /**
+   * Submit a workflow consisting of a job with a DemoTask task.
+   * @param workflowName name of the workflow
+   * @param driver {@link TaskDriver} to submit workflowName to
+   */
+  private void submitWorkflow(String workflowName, TaskDriver driver) {
+    JobConfig.Builder job = new JobConfig.Builder();
+    Workflow.Builder workflow = new Workflow.Builder(workflowName);
+    job.setWorkflow(workflowName);
+    TaskConfig taskConfig =
+        new TaskConfig(TASK_COMMAND, new HashMap<String, String>(), null, null);
+    job.addTaskConfigMap(Collections.singletonMap(taskConfig.getId(), taskConfig));
+    job.setJobId(TaskUtil.getNamespacedJobName(workflowName, "JOB"));
+    workflow.addJob("JOB", job);
+    driver.start(workflow.build());
+  }
+
+  @Test
+  public void testDynamicTaskLoading() throws Exception {
+    String taskCommand = TASK_COMMAND;
+    String taskJarPath = "src/test/resources/demo-task-1.0.jar";
+    String taskVersion = "1.0.0";
+    String fullyQualifiedTaskClassName = "com.mycompany.demotask.DemoTask";
+    String fullyQualifiedTaskFactoryClassName = "com.mycompany.demotask.DemoTaskFactory";
+
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList();
+    taskClasses.add(fullyQualifiedTaskClassName);
+    DynamicTaskConfig taskConfig =
+        new DynamicTaskConfig(taskCommand, taskJarPath, taskVersion, taskClasses,
+            fullyQualifiedTaskFactoryClassName);
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/" + taskCommand;
+    _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Submit workflow
+    TaskDriver driver = new TaskDriver(_manager);
+    String workflowName = TestHelper.getTestMethodName();
+    submitWorkflow(workflowName, driver);
+
+    // Wait for the workflow to either complete or fail.
+    Thread.sleep(100000);
+    TaskState finalState =
+        driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
+    Assert.assertEquals(finalState, TaskState.COMPLETED);
+  }
+
+  @AfterMethod
+  public void afterMethod() {
+    _controller.syncStop();
+    _manager.disconnect();
+    _participant.syncStop();
+
+    // Shutdown the state model factories to close all threads.
+    StateMachineEngine stateMachine = _participant.getStateMachineEngine();
+    if (stateMachine != null) {
+      StateModelFactory stateModelFactory =
+          stateMachine.getStateModelFactory(TaskConstants.STATE_MODEL_NAME);
+      if (stateModelFactory != null && stateModelFactory instanceof TaskStateModelFactory) {
+        ((TaskStateModelFactory) stateModelFactory).shutdownNow();
+      }
+    }
+    deleteCluster(CLUSTER_NAME);
+  }
+}
diff --git a/helix-core/src/test/resources/demo-task-1.0.jar b/helix-core/src/test/resources/demo-task-1.0.jar
new file mode 100644
index 0000000..f81a632
Binary files /dev/null and b/helix-core/src/test/resources/demo-task-1.0.jar differ