You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by GitBox <gi...@apache.org> on 2020/07/29 07:57:21 UTC

[GitHub] [helix] narendly commented on a change in pull request #1169: [HELIX-1168] Dynamically Load Tasks in Task Framework

narendly commented on a change in pull request #1169:
URL: https://github.com/apache/helix/pull/1169#discussion_r461719235



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
##########
@@ -283,6 +294,63 @@ public void reset() {
     }
   }
 
+  /**
+   * Loads Task and TaskFactory classes for command input from
+   * a JAR file, and register the TaskFactory in _taskFactoryRegistry.
+   * @param command The command indicating what task to be loaded
+   */
+  private void loadNewTask(String command) {

Review comment:
       This method is still doing too many things, and some parts of the inner logic could be refactored out for clarity, conciseness, and testability. I recommend you take a look at some publicly available good programming practices to get a sense of what good, maintainable code looks like as well.
   

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
##########
@@ -43,6 +49,11 @@
   private ScheduledFuture timeout_task;
   private TaskRunner _taskRunner;
   private final ScheduledExecutorService _timeoutTaskExecutor;
+  public static final String TASK_JAR_FILE = "JAR_FILE";

Review comment:
       Nit: Rename to something like TASK_JAR_FILE_KEY?

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
##########
@@ -283,6 +294,63 @@ public void reset() {
     }
   }
 
+  /**
+   * Loads Task and TaskFactory classes for command input from
+   * a JAR file, and register the TaskFactory in _taskFactoryRegistry.
+   * @param command The command indicating what task to be loaded
+   */
+  private void loadNewTask(String command) {
+    // Read ZNRecord containing task definition information.
+    ZNRecord taskConfig = _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .get("/" + _manager.getClusterName() + "/" + TASK_PATH + "_" + command,null, 0);
+    if(taskConfig == null) {
+      LOG.info("Failed to read ZNRecord for task " + command + " for instance " + _manager.getInstanceName()
+          + " in cluster " + _manager.getClusterName() + ".");
+      throw new IllegalStateException("No ZNRecord for task " + command);
+    }
+
+    // Open the JAR file containing Task(s) and TaskFactory classes.
+    File taskJar;
+    URL taskJarUrl;
+    try {
+      taskJar = new File(taskConfig.getSimpleField(TASK_JAR_FILE));
+      if(taskJar.exists() && !taskJar.isDirectory()) {
+        taskJarUrl = taskJar.toURI().toURL();
+      } else {
+        throw new IllegalStateException("No JAR for task " + command);
+      }
+    } catch (MalformedURLException e) {
+      LOG.info("Failed to find/open JAR for new task " + command + " for instance " + _manager.getInstanceName()
+          + " in cluster " + _manager.getClusterName() + ".");
+      throw new IllegalStateException("Malformed JAR URL for task " + command);
+    }
+
+    // Import Task(s) and TaskFactory classes.
+    URLClassLoader classLoader = URLClassLoader.newInstance(new URL[]{taskJarUrl});
+    for(String taskClass : taskConfig.getListField(TASK_CLASSES)) {
+      try {
+        classLoader.loadClass(taskClass);
+      } catch(ClassNotFoundException e) {
+        LOG.info("Failed to load class(es) for new task " + command + " for instance " + _manager.getInstanceName()
+            + " in cluster " + _manager.getClusterName() + ".");
+        throw new IllegalStateException("Null class(es) for task " + command);
+      }
+    }
+    Class cl;
+    TaskFactory tf;
+    try {
+      cl = classLoader.loadClass(taskConfig.getSimpleField(TASK_FACTORY));
+      tf = (TaskFactory)cl.newInstance();
+    } catch(ClassNotFoundException | InstantiationException | IllegalAccessException e) {

Review comment:
       This code doesn't seem like it's been formatted properly.

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
##########
@@ -283,6 +294,63 @@ public void reset() {
     }
   }
 
+  /**
+   * Loads Task and TaskFactory classes for command input from
+   * a JAR file, and register the TaskFactory in _taskFactoryRegistry.
+   * @param command The command indicating what task to be loaded
+   */
+  private void loadNewTask(String command) {
+    // Read ZNRecord containing task definition information.
+    ZNRecord taskConfig = _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .get("/" + _manager.getClusterName() + "/" + TASK_PATH + "_" + command,null, 0);
+    if(taskConfig == null) {
+      LOG.info("Failed to read ZNRecord for task " + command + " for instance " + _manager.getInstanceName()

Review comment:
       LOG level should be error instead of info because you're treating this as an exception?

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
##########
@@ -283,6 +294,63 @@ public void reset() {
     }
   }
 
+  /**
+   * Loads Task and TaskFactory classes for command input from
+   * a JAR file, and register the TaskFactory in _taskFactoryRegistry.
+   * @param command The command indicating what task to be loaded
+   */
+  private void loadNewTask(String command) {
+    // Read ZNRecord containing task definition information.
+    ZNRecord taskConfig = _manager.getHelixDataAccessor().getBaseDataAccessor()
+        .get("/" + _manager.getClusterName() + "/" + TASK_PATH + "_" + command,null, 0);
+    if(taskConfig == null) {
+      LOG.info("Failed to read ZNRecord for task " + command + " for instance " + _manager.getInstanceName()
+          + " in cluster " + _manager.getClusterName() + ".");
+      throw new IllegalStateException("No ZNRecord for task " + command);
+    }
+
+    // Open the JAR file containing Task(s) and TaskFactory classes.
+    File taskJar;

Review comment:
       Since loading the JAR from the local directory is not a permanent solution and rather not very useful, consider using Java Interface to make this pluggable. That's a better design principle and will allow you to extend your code more easily. The way it's hard-coded right now, it will be difficult to change the underlying implementation for the jar loader.

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
##########
@@ -43,6 +49,11 @@
   private ScheduledFuture timeout_task;
   private TaskRunner _taskRunner;
   private final ScheduledExecutorService _timeoutTaskExecutor;
+  public static final String TASK_JAR_FILE = "JAR_FILE";
+  public static final String TASK_VERSION = "VERSION";
+  public static final String TASK_CLASSES = "TASK_CLASSES";
+  public static final String TASK_FACTORY = "TASKFACTORY";
+  public static final String TASK_PATH = "TASK_DEFINITION";

Review comment:
       Overall, the code still feels like a "script" - to make this more amenable for use in production, let's work on making it maintainable and extendable :)

##########
File path: helix-core/src/test/java/org/apache/helix/task/TestDynamicTaskLoading.java
##########
@@ -0,0 +1,125 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.zookeeper.CreateMode;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestDynamicTaskLoading extends ZkTestBase {
+  private final String _clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
+  private static final String _instanceName = "localhost_12913";
+  private HelixManager _manager;
+  private MockParticipantManager _participant;
+  private ClusterControllerManager _controller;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    super.beforeClass();
+    _gSetupTool.addCluster(_clusterName, true);
+
+    _manager = HelixManagerFactory.getZKHelixManager(_clusterName, "Admin",
+        InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    _gSetupTool.addInstanceToCluster(_clusterName, _instanceName);
+    _participant = new MockParticipantManager(ZK_ADDR, _clusterName, _instanceName);
+    StateModelFactory<StateModel> stateModelFactory = new MasterSlaveStateModelFactory(_instanceName, 0);
+    StateMachineEngine stateMach = _participant.getStateMachineEngine();
+    stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+    _participant.getStateMachineEngine().registerStateModelFactory(TaskConstants.STATE_MODEL_NAME,
+        new TaskStateModelFactory(_participant, taskFactoryReg));
+    _participant.syncStart();
+
+    _controller = new ClusterControllerManager(ZK_ADDR, _clusterName,null);
+    _controller.syncStart();
+
+    // Add task definition information as a ZNRecord.
+    ZNRecord configZnRecord = new ZNRecord(_participant.getInstanceName());

Review comment:
       One suggestion here is to make the dynamic Task config a typed ZNRecord. Consider creating a record class that extends ZNRecord (and call it DynamicTaskConfig) that explicitly defines the fields you need to describe a dynamic task.
   
   HelixProperty is one such example.

##########
File path: helix-core/src/test/java/org/apache/helix/task/TestDynamicTaskLoading.java
##########
@@ -0,0 +1,125 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.zookeeper.CreateMode;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestDynamicTaskLoading extends ZkTestBase {
+  private final String _clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
+  private static final String _instanceName = "localhost_12913";
+  private HelixManager _manager;
+  private MockParticipantManager _participant;
+  private ClusterControllerManager _controller;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    super.beforeClass();
+    _gSetupTool.addCluster(_clusterName, true);
+
+    _manager = HelixManagerFactory.getZKHelixManager(_clusterName, "Admin",
+        InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    _gSetupTool.addInstanceToCluster(_clusterName, _instanceName);
+    _participant = new MockParticipantManager(ZK_ADDR, _clusterName, _instanceName);
+    StateModelFactory<StateModel> stateModelFactory = new MasterSlaveStateModelFactory(_instanceName, 0);
+    StateMachineEngine stateMach = _participant.getStateMachineEngine();
+    stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+    _participant.getStateMachineEngine().registerStateModelFactory(TaskConstants.STATE_MODEL_NAME,
+        new TaskStateModelFactory(_participant, taskFactoryReg));
+    _participant.syncStart();
+
+    _controller = new ClusterControllerManager(ZK_ADDR, _clusterName,null);
+    _controller.syncStart();
+
+    // Add task definition information as a ZNRecord.
+    ZNRecord configZnRecord = new ZNRecord(_participant.getInstanceName());

Review comment:
       Also to add, it might be a good idea to have a separate PR for this new type of ZNRecord.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org