You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/03/07 21:10:14 UTC
[airavata] 17/17: Refactoring
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch helix-integration
in repository https://gitbox.apache.org/repos/asf/airavata.git
commit 71b294ed957cdddf6a08bf32f690c8204fedf3af
Author: dimuthu <di...@gmail.com>
AuthorDate: Wed Mar 7 16:09:50 2018 -0500
Refactoring
---
.../airavata/agents/api/JobSubmissionOutput.java | 9 +
.../helix/impl/controller/HelixController.java | 54 +----
.../helix/impl/participant/GlobalParticipant.java | 68 +++---
.../airavata/helix/impl/task/AiravataTask.java | 69 +++---
.../airavata/helix/impl/task/TaskContext.java | 22 +-
.../airavata/helix/impl/task/env/EnvSetupTask.java | 12 -
.../helix/impl/task/staging/DataStagingTask.java | 6 +
.../impl/task/staging/InputDataStagingTask.java | 4 -
.../impl/task/staging/OutputDataStagingTask.java | 37 +--
.../task/submission/DefaultJobSubmissionTask.java | 260 +++++++++------------
.../task/submission/ForkJobSubmissionTask.java | 78 ++++---
.../impl/task/submission/JobSubmissionTask.java | 38 ++-
.../helix/impl/workflow/PostWorkflowManager.java | 29 +--
.../helix/impl/workflow/PreWorkflowManager.java | 4 +-
.../src/main/resources/airavata-server.properties | 79 +------
.../helix/core/participant/HelixParticipant.java | 17 +-
16 files changed, 314 insertions(+), 472 deletions(-)
diff --git a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/JobSubmissionOutput.java b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/JobSubmissionOutput.java
index 1858826..e1d0a80 100644
--- a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/JobSubmissionOutput.java
+++ b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/JobSubmissionOutput.java
@@ -8,6 +8,7 @@ public class JobSubmissionOutput {
private String jobId;
private boolean isJobSubmissionFailed;
private String failureReason;
+ private String description;
public int getExitCode() {
return exitCode;
@@ -71,4 +72,12 @@ public class JobSubmissionOutput {
this.failureReason = failureReason;
return this;
}
+
+ public String getDescription() {
+ return description;
+ }
+
+ public void setDescription(String description) {
+ this.description = description;
+ }
}
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/controller/HelixController.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/controller/HelixController.java
index 11d7129..f5e2137 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/controller/HelixController.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/controller/HelixController.java
@@ -1,12 +1,11 @@
package org.apache.airavata.helix.impl.controller;
-import org.apache.airavata.helix.core.util.PropertyResolver;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
import org.apache.helix.controller.HelixControllerMain;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import java.io.File;
-import java.io.IOException;
import java.util.concurrent.CountDownLatch;
/**
@@ -27,18 +26,11 @@ public class HelixController implements Runnable {
private CountDownLatch startLatch = new CountDownLatch(1);
private CountDownLatch stopLatch = new CountDownLatch(1);
- public HelixController(String propertyFile, boolean readPropertyFromFile) throws IOException {
-
- PropertyResolver propertyResolver = new PropertyResolver();
- if (readPropertyFromFile) {
- propertyResolver.loadFromFile(new File(propertyFile));
- } else {
- propertyResolver.loadInputStream(this.getClass().getClassLoader().getResourceAsStream(propertyFile));
- }
-
- this.clusterName = propertyResolver.get("helix.cluster.name");
- this.controllerName = propertyResolver.get("helix.controller.name");
- this.zkAddress = propertyResolver.get("zookeeper.connection.url");
+ @SuppressWarnings("WeakerAccess")
+ public HelixController() throws ApplicationSettingsException {
+ this.clusterName = ServerSettings.getSetting("helix.cluster.name");
+ this.controllerName = ServerSettings.getSetting("helix.controller.name");
+ this.zkAddress = ServerSettings.getZookeeperConnection();
}
public void run() {
@@ -64,12 +56,7 @@ public class HelixController implements Runnable {
logger.info("Controller: " + controllerName + ", has connected to cluster: " + clusterName);
Runtime.getRuntime().addShutdownHook(
- new Thread() {
- @Override
- public void run() {
- disconnect();
- }
- }
+ new Thread(this::disconnect)
);
} catch (InterruptedException ex) {
@@ -77,6 +64,7 @@ public class HelixController implements Runnable {
}
}
+ @SuppressWarnings({"WeakerAccess", "unused"})
public void stop() {
stopLatch.countDown();
}
@@ -92,29 +80,11 @@ public class HelixController implements Runnable {
try {
logger.info("Starting helix controller");
- String confDir = null;
- if (args != null) {
- for (String arg : args) {
- if (arg.startsWith("--confDir=")) {
- confDir = arg.substring("--confDir=".length());
- }
- }
- }
-
- String propertiesFile = "application.properties";
- boolean readPropertyFromFile = false;
-
- if (confDir != null && !confDir.isEmpty()) {
- propertiesFile = confDir.endsWith(File.separator)? confDir + propertiesFile : confDir + File.separator + propertiesFile;
- readPropertyFromFile = true;
- }
-
- logger.info("Using configuration file " + propertiesFile);
-
- HelixController helixController = new HelixController(propertiesFile, readPropertyFromFile);
+
+ HelixController helixController = new HelixController();
helixController.start();
- } catch (IOException e) {
+ } catch (Exception e) {
logger.error("Failed to start the helix controller", e);
}
}
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
index 7dd5c99..7c86f42 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
@@ -1,17 +1,14 @@
package org.apache.airavata.helix.impl.participant;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.helix.core.AbstractTask;
import org.apache.airavata.helix.core.participant.HelixParticipant;
import org.apache.airavata.helix.core.support.TaskHelperImpl;
import org.apache.airavata.helix.task.api.annotation.TaskDef;
-import org.apache.helix.task.Task;
-import org.apache.helix.task.TaskCallbackContext;
import org.apache.helix.task.TaskFactory;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import java.io.File;
-import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@@ -30,26 +27,24 @@ public class GlobalParticipant extends HelixParticipant {
};
public Map<String, TaskFactory> getTaskFactory() {
- Map<String, TaskFactory> taskRegistry = new HashMap<String, TaskFactory>();
+ Map<String, TaskFactory> taskRegistry = new HashMap<>();
for (String taskClass : taskClasses) {
- TaskFactory taskFac = new TaskFactory() {
- public Task createNewTask(TaskCallbackContext context) {
- try {
- return AbstractTask.class.cast(Class.forName(taskClass).newInstance())
- .setCallbackContext(context)
- .setTaskHelper(new TaskHelperImpl());
- } catch (InstantiationException | IllegalAccessException e) {
- e.printStackTrace();
- return null;
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- return null;
- }
+ TaskFactory taskFac = context -> {
+ try {
+ return AbstractTask.class.cast(Class.forName(taskClass).newInstance())
+ .setCallbackContext(context)
+ .setTaskHelper(new TaskHelperImpl());
+ } catch (InstantiationException | IllegalAccessException e) {
+ logger.error("Failed to initialize the task", e);
+ return null;
+ } catch (ClassNotFoundException e) {
+ logger.error("Task class can not be found in the class path", e);
+ return null;
}
};
- TaskDef taskDef = null;
+ TaskDef taskDef;
try {
taskDef = Class.forName(taskClass).getAnnotation(TaskDef.class);
taskRegistry.put(taskDef.name(), taskFac);
@@ -60,34 +55,23 @@ public class GlobalParticipant extends HelixParticipant {
return taskRegistry;
}
- public GlobalParticipant(String propertyFile, Class taskClass, String taskTypeName, boolean readPropertyFromFile) throws IOException {
- super(propertyFile, taskClass, taskTypeName, readPropertyFromFile);
+ @SuppressWarnings("WeakerAccess")
+ public GlobalParticipant(Class taskClass, String taskTypeName) throws ApplicationSettingsException {
+ super(taskClass, taskTypeName);
}
- public static void main(String args[]) throws IOException {
+ public static void main(String args[]) {
+ logger.info("Starting global participant");
- String confDir = null;
- if (args != null) {
- for (String arg : args) {
- if (arg.startsWith("--confDir=")) {
- confDir = arg.substring("--confDir=".length());
- }
- }
- }
-
- String propertiesFile = "application.properties";
- boolean readPropertyFromFile = false;
-
- if (confDir != null && !confDir.isEmpty()) {
- propertiesFile = confDir.endsWith(File.separator)? confDir + propertiesFile : confDir + File.separator + propertiesFile;
- readPropertyFromFile = true;
+ GlobalParticipant participant;
+ try {
+ participant = new GlobalParticipant(null, null);
+ Thread t = new Thread(participant);
+ t.start();
+ } catch (Exception e) {
+ logger.error("Failed to start global participant", e);
}
- logger.info("Using configuration file " + propertiesFile);
-
- GlobalParticipant participant = new GlobalParticipant(propertiesFile, null, null, readPropertyFromFile);
- Thread t = new Thread(participant);
- t.start();
}
}
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
index 289cfc5..4f6d6ec 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
@@ -53,6 +53,7 @@ public abstract class AiravataTask extends AbstractTask {
private OutPort nextTask;
protected TaskResult onSuccess(String message) {
+ publishTaskState(TaskState.COMPLETED);
String successMessage = "Task " + getTaskId() + " completed." + (message != null ? " Message : " + message : "");
logger.info(successMessage);
return nextTask.invoke(new TaskResult(TaskResult.Status.COMPLETED, message));
@@ -80,13 +81,15 @@ public abstract class AiravataTask extends AbstractTask {
getTaskContext().setProcessStatus(status);
ErrorModel errorModel = new ErrorModel();
- errorModel.setUserFriendlyMessage("GFac Worker throws an exception");
+ errorModel.setUserFriendlyMessage(reason);
errorModel.setActualErrorMessage(errors.toString());
errorModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+ publishTaskState(TaskState.FAILED);
saveAndPublishProcessStatus();
saveExperimentError(errorModel);
saveProcessError(errorModel);
+ saveTaskError(errorModel);
return new TaskResult(fatal ? TaskResult.Status.FATAL_FAILED : TaskResult.Status.FAILED, errorMessage);
}
@@ -97,6 +100,7 @@ public abstract class AiravataTask extends AbstractTask {
saveAndPublishProcessStatus();
}
+ @SuppressWarnings("WeakerAccess")
protected void saveAndPublishProcessStatus() {
try {
ProcessStatus status = taskContext.getProcessStatus();
@@ -117,6 +121,7 @@ public abstract class AiravataTask extends AbstractTask {
}
}
+ @SuppressWarnings("WeakerAccess")
protected void saveAndPublishTaskStatus() {
try {
TaskState state = getTaskContext().getTaskState();
@@ -140,6 +145,7 @@ public abstract class AiravataTask extends AbstractTask {
}
}
+ @SuppressWarnings("WeakerAccess")
protected void saveExperimentError(ErrorModel errorModel) {
try {
errorModel.setErrorId(AiravataUtils.getId("EXP_ERROR"));
@@ -150,6 +156,7 @@ public abstract class AiravataTask extends AbstractTask {
}
}
+ @SuppressWarnings("WeakerAccess")
protected void saveProcessError(ErrorModel errorModel) {
try {
errorModel.setErrorId(AiravataUtils.getId("PROCESS_ERROR"));
@@ -161,14 +168,15 @@ public abstract class AiravataTask extends AbstractTask {
}
}
- protected void saveTaskError(ErrorModel errorModel) throws Exception {
+ @SuppressWarnings("WeakerAccess")
+ protected void saveTaskError(ErrorModel errorModel) {
try {
errorModel.setErrorId(AiravataUtils.getId("TASK_ERROR"));
getExperimentCatalog().add(ExpCatChildDataType.TASK_ERROR, errorModel, getTaskId());
} catch (RegistryException e) {
String msg = "expId: " + getExperimentId() + " processId: " + getProcessId() + " taskId: " + getTaskId()
+ " : - Error while updating task errors";
- throw new Exception(msg, e);
+ logger.error(msg, e);
}
}
@@ -191,6 +199,7 @@ public abstract class AiravataTask extends AbstractTask {
MDC.put("process", getProcessId());
MDC.put("gateway", getGatewayId());
MDC.put("task", getTaskId());
+ publishTaskState(TaskState.EXECUTING);
return onRun(helper, getTaskContext());
} finally {
MDC.clear();
@@ -206,6 +215,7 @@ public abstract class AiravataTask extends AbstractTask {
MDC.put("process", getProcessId());
MDC.put("gateway", getGatewayId());
MDC.put("task", getTaskId());
+ publishTaskState(TaskState.CANCELED);
onCancel(getTaskContext());
} finally {
MDC.clear();
@@ -231,22 +241,21 @@ public abstract class AiravataTask extends AbstractTask {
this.computeResourceDescription = getAppCatalog().getComputeResource().getComputeResource(getProcessModel()
.getComputeResourceId());
- TaskContext.TaskContextBuilder taskContextBuilder = new TaskContext.TaskContextBuilder(getProcessId(), getGatewayId(), getTaskId());
- taskContextBuilder.setAppCatalog(getAppCatalog());
- taskContextBuilder.setExperimentCatalog(getExperimentCatalog());
- taskContextBuilder.setProcessModel(getProcessModel());
- taskContextBuilder.setStatusPublisher(getStatusPublisher());
-
- taskContextBuilder.setGatewayResourceProfile(appCatalog.getGatewayProfile().getGatewayProfile(gatewayId));
- taskContextBuilder.setGatewayComputeResourcePreference(
+ TaskContext.TaskContextBuilder taskContextBuilder = new TaskContext.TaskContextBuilder(getProcessId(), getGatewayId(), getTaskId())
+ .setAppCatalog(getAppCatalog())
+ .setExperimentCatalog(getExperimentCatalog())
+ .setProcessModel(getProcessModel())
+ .setStatusPublisher(getStatusPublisher())
+ .setGatewayResourceProfile(appCatalog.getGatewayProfile().getGatewayProfile(gatewayId))
+ .setGatewayComputeResourcePreference(
appCatalog.getGatewayProfile()
- .getComputeResourcePreference(gatewayId, processModel.getComputeResourceId()));
- taskContextBuilder.setGatewayStorageResourcePreference(
+ .getComputeResourcePreference(gatewayId, processModel.getComputeResourceId()))
+ .setGatewayStorageResourcePreference(
appCatalog.getGatewayProfile()
.getStoragePreference(gatewayId, processModel.getStorageResourceId()));
this.taskContext = taskContextBuilder.build();
- logger.info("Task " + taskName + " intitialized");
+ logger.info("Task " + taskName + " initialized");
} catch (Exception e) {
logger.error("Error occurred while initializing the task " + getTaskId() + " of experiment " + getExperimentId(), e);
throw new RuntimeException("Error occurred while initializing the task " + getTaskId() + " of experiment " + getExperimentId(), e);
@@ -259,19 +268,25 @@ public abstract class AiravataTask extends AbstractTask {
return appCatalog;
}
- protected void publishTaskState(TaskState ts) throws RegistryException {
-
- TaskStatus taskStatus = new TaskStatus();
- taskStatus.setState(ts);
- taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- experimentCatalog.add(ExpCatChildDataType.TASK_STATUS, taskStatus, getTaskId());
- TaskIdentifier identifier = new TaskIdentifier(getTaskId(),
- getProcessId(), getExperimentId(), getGatewayId());
- TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent(ts,
- identifier);
- MessageContext msgCtx = new MessageContext(taskStatusChangeEvent, MessageType.TASK, AiravataUtils.getId
- (MessageType.TASK.name()), getGatewayId());
- msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+ @SuppressWarnings("WeakerAccess")
+ protected void publishTaskState(TaskState ts) {
+
+ try {
+ TaskStatus taskStatus = new TaskStatus();
+ taskStatus.setState(ts);
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ experimentCatalog.add(ExpCatChildDataType.TASK_STATUS, taskStatus, getTaskId());
+ TaskIdentifier identifier = new TaskIdentifier(getTaskId(),
+ getProcessId(), getExperimentId(), getGatewayId());
+ TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent(ts,
+ identifier);
+ MessageContext msgCtx = new MessageContext(taskStatusChangeEvent, MessageType.TASK, AiravataUtils.getId
+ (MessageType.TASK.name()), getGatewayId());
+ msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+ statusPublisher.publish(msgCtx);
+ } catch (Exception e) {
+ logger.error("Failed to publish task status " + (ts != null ? ts.name(): "null") +" of task " + getTaskId());
+ }
}
protected ComputeResourceDescription getComputeResourceDescription() {
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
index 6be1d36..0e6a3cc 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
@@ -716,21 +716,11 @@ public class TaskContext {
}
}
}else {
- Collections.sort(jobSubmissionInterfaces, new Comparator<JobSubmissionInterface>() {
- @Override
- public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) {
- return jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder();
- }
- });
+ jobSubmissionInterfaces.sort(Comparator.comparingInt(JobSubmissionInterface::getPriorityOrder));
}
}
interfaces = orderedInterfaces.get(preferredJobSubmissionProtocol);
- Collections.sort(interfaces, new Comparator<JobSubmissionInterface>() {
- @Override
- public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) {
- return jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder();
- }
- });
+ interfaces.sort(Comparator.comparingInt(JobSubmissionInterface::getPriorityOrder));
} else {
throw new AppCatalogException("Compute resource should have at least one job submission interface defined...");
}
@@ -740,6 +730,7 @@ public class TaskContext {
}
}
+ @SuppressWarnings("WeakerAccess")
public TaskModel getCurrentTaskModel() {
return getTaskMap().get(taskId);
}
@@ -763,6 +754,7 @@ public class TaskContext {
private StoragePreference gatewayStorageResourcePreference;
private ProcessModel processModel;
+ @SuppressWarnings("WeakerAccess")
public TaskContextBuilder(String processId, String gatewayId, String taskId) throws Exception {
if (notValid(processId) || notValid(gatewayId) || notValid(taskId)) {
throwError("Process Id, Gateway Id and Task Id must be not null");
@@ -826,9 +818,9 @@ public class TaskContext {
if (notValid(experimentCatalog)) {
throwError("Invalid Experiment catalog");
}
- //if (notValid(statusPublisher)) {
- // throwError("Invalid Status Publisher");
- //}
+ if (notValid(statusPublisher)) {
+ throwError("Invalid Status Publisher");
+ }
TaskContext ctx = new TaskContext(processId, gatewayId, taskId);
ctx.setAppCatalog(appCatalog);
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/env/EnvSetupTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/env/EnvSetupTask.java
index 6eb1722..84adbcd 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/env/EnvSetupTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/env/EnvSetupTask.java
@@ -6,8 +6,6 @@ import org.apache.airavata.helix.impl.task.TaskContext;
import org.apache.airavata.helix.task.api.TaskHelper;
import org.apache.airavata.helix.task.api.annotation.TaskDef;
import org.apache.airavata.model.status.ProcessState;
-import org.apache.airavata.model.status.TaskState;
-import org.apache.airavata.registry.cpi.RegistryException;
import org.apache.helix.task.TaskResult;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -20,9 +18,7 @@ public class EnvSetupTask extends AiravataTask {
@Override
public TaskResult onRun(TaskHelper taskHelper, TaskContext taskContext) {
try {
-
saveAndPublishProcessStatus(ProcessState.CONFIGURING_WORKSPACE);
- publishTaskState(TaskState.EXECUTING);
AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(
getTaskContext().getGatewayId(),
getTaskContext().getComputeResourceId(),
@@ -32,17 +28,9 @@ public class EnvSetupTask extends AiravataTask {
logger.info("Creating directory " + getTaskContext().getWorkingDir() + " on compute resource " + getTaskContext().getComputeResourceId());
adaptor.createDirectory(getTaskContext().getWorkingDir());
- publishTaskState(TaskState.COMPLETED);
return onSuccess("Envi setup task successfully completed " + getTaskId());
} catch (Exception e) {
- try {
- publishTaskState(TaskState.FAILED);
- } catch (RegistryException e1) {
- logger.error("Task failed to publish task status", e1);
-
- // ignore silently
- }
return onFail("Failed to setup environment of task " + getTaskId(), true, e);
}
}
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java
index 76b4cb3..3220064 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java
@@ -15,8 +15,10 @@ import org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.IOException;
+@SuppressWarnings("WeakerAccess")
public abstract class DataStagingTask extends AiravataTask {
+ @SuppressWarnings("WeakerAccess")
protected DataStagingTaskModel getDataStagingTaskModel() throws TaskOnFailException {
try {
Object subTaskModel = getTaskContext().getSubTaskModel();
@@ -30,6 +32,7 @@ public abstract class DataStagingTask extends AiravataTask {
}
}
+ @SuppressWarnings("WeakerAccess")
protected StorageResourceDescription getStorageResource() throws TaskOnFailException {
try {
StorageResourceDescription storageResource = getTaskContext().getStorageResource();
@@ -42,6 +45,7 @@ public abstract class DataStagingTask extends AiravataTask {
}
}
+ @SuppressWarnings("WeakerAccess")
protected StorageResourceAdaptor getStorageAdaptor(AdaptorSupport adaptorSupport) throws TaskOnFailException {
try {
StorageResourceAdaptor storageResourceAdaptor = adaptorSupport.fetchStorageAdaptor(
@@ -61,6 +65,7 @@ public abstract class DataStagingTask extends AiravataTask {
}
}
+ @SuppressWarnings("WeakerAccess")
protected AgentAdaptor getComputeResourceAdaptor(AdaptorSupport adaptorSupport) throws TaskOnFailException {
try {
return adaptorSupport.fetchAdaptor(
@@ -75,6 +80,7 @@ public abstract class DataStagingTask extends AiravataTask {
}
}
+ @SuppressWarnings("WeakerAccess")
protected String getLocalDataPath(String fileName) throws TaskOnFailException {
String localDataPath = ServerSettings.getLocalDataLocation();
localDataPath = (localDataPath.endsWith(File.separator) ? localDataPath : localDataPath + File.separator);
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/InputDataStagingTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/InputDataStagingTask.java
index de2aeac..f8d98cf 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/InputDataStagingTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/InputDataStagingTask.java
@@ -7,7 +7,6 @@ import org.apache.airavata.helix.impl.task.TaskContext;
import org.apache.airavata.helix.impl.task.TaskOnFailException;
import org.apache.airavata.helix.task.api.TaskHelper;
import org.apache.airavata.helix.task.api.annotation.TaskDef;
-import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
import org.apache.airavata.model.application.io.InputDataObjectType;
import org.apache.airavata.model.status.ProcessState;
import org.apache.airavata.model.task.DataStagingTaskModel;
@@ -49,9 +48,6 @@ public class InputDataStagingTask extends DataStagingTask {
throw new TaskOnFailException(message, true, null);
}
- // Fetch and validate storage resource
- StorageResourceDescription storageResource = getStorageResource();
-
// Fetch and validate source and destination URLS
URI sourceURI;
URI destinationURI;
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
index 7d657cb..88698c0 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
@@ -20,7 +20,7 @@ import org.apache.log4j.Logger;
import java.io.File;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
@TaskDef(name = "Output Data Staging Task")
@@ -65,6 +65,7 @@ public class OutputDataStagingTask extends DataStagingTask {
sourceURI.getPath().length());
if (dataStagingTaskModel.getDestination().startsWith("dummy")) {
+
String inputPath = getTaskContext().getStorageFileSystemRootLocation();
inputPath = (inputPath.endsWith(File.separator) ? inputPath : inputPath + File.separator);
String experimentDataDir = getProcessModel().getExperimentDataDir();
@@ -110,7 +111,7 @@ public class OutputDataStagingTask extends DataStagingTask {
String sourceParentPath = (new File(sourceURI.getPath())).getParentFile().getPath();
logger.debug("Destination parent path " + destParentPath + ", source parent path " + sourceParentPath);
- List<String> fileNames = null;
+ List<String> fileNames;
try {
fileNames = adaptor.getFileNameFromExtension(sourceFileName, sourceParentPath);
@@ -133,11 +134,14 @@ public class OutputDataStagingTask extends DataStagingTask {
}
//Wildcard support is only enabled for output data staging
+ assert processOutput != null;
processOutput.setName(sourceFileName);
try {
- getTaskContext().getExperimentCatalog().add(ExpCatChildDataType.EXPERIMENT_OUTPUT, Arrays.asList(processOutput), getExperimentId());
- getTaskContext().getExperimentCatalog().add(ExpCatChildDataType.PROCESS_OUTPUT, Arrays.asList(processOutput), getProcessId());
+ getTaskContext().getExperimentCatalog().add(ExpCatChildDataType.EXPERIMENT_OUTPUT,
+ Collections.singletonList(processOutput), getExperimentId());
+ getTaskContext().getExperimentCatalog().add(ExpCatChildDataType.PROCESS_OUTPUT,
+ Collections.singletonList(processOutput), getProcessId());
} catch (RegistryException e) {
throw new TaskOnFailException("Failed to update experiment or process outputs for task " + getTaskId(), true, e);
}
@@ -145,11 +149,12 @@ public class OutputDataStagingTask extends DataStagingTask {
logger.info("Transferring file " + sourceFileName);
transferFile(sourceURI, destinationURI, sourceFileName, adaptor, storageResourceAdaptor);
}
+ return onSuccess("Output data staging task " + getTaskId() + " successfully completed");
} else {
// Downloading input file from the storage resource
transferFile(sourceURI, destinationURI, sourceFileName, adaptor, storageResourceAdaptor);
- return onSuccess("Input data staging task " + getTaskId() + " successfully completed");
+ return onSuccess("Output data staging task " + getTaskId() + " successfully completed");
}
} catch (TaskOnFailException e) {
@@ -164,8 +169,6 @@ public class OutputDataStagingTask extends DataStagingTask {
logger.error("Unknown error while executing output data staging task " + getTaskId(), e);
return onFail("Unknown error while executing output data staging task " + getTaskId(), false, e);
}
-
- return null;
}
private void transferFile(URI sourceURI, URI destinationURI, String fileName, AgentAdaptor adaptor,
@@ -192,26 +195,6 @@ public class OutputDataStagingTask extends DataStagingTask {
}
}
- public URI getDestinationURIFromDummy(String hostName, String inputPath, String fileName) throws URISyntaxException {
- String experimentDataDir = getProcessModel().getExperimentDataDir();
- String filePath;
- if(experimentDataDir != null && !experimentDataDir.isEmpty()) {
- if(!experimentDataDir.endsWith(File.separator)){
- experimentDataDir += File.separator;
- }
- if (experimentDataDir.startsWith(File.separator)) {
- filePath = experimentDataDir + fileName;
- } else {
- filePath = inputPath + experimentDataDir + fileName;
- }
- } else {
- filePath = inputPath + getProcessId() + File.separator + fileName;
- }
- //FIXME
- return new URI("file", getTaskContext().getStorageResourceLoginUserName(), hostName, 22, filePath, null, null);
-
- }
-
@Override
public void onCancel(TaskContext taskContext) {
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/DefaultJobSubmissionTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/DefaultJobSubmissionTask.java
index 82316f0..6d64273 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/DefaultJobSubmissionTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/DefaultJobSubmissionTask.java
@@ -19,7 +19,7 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
@TaskDef(name = "Default Job Submission")
@@ -27,7 +27,7 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask {
private static final Logger logger = LogManager.getLogger(DefaultJobSubmissionTask.class);
- public static final String DEFAULT_JOB_ID = "DEFAULT_JOB_ID";
+ private static final String DEFAULT_JOB_ID = "DEFAULT_JOB_ID";
@Override
public TaskResult onRun(TaskHelper taskHelper, TaskContext taskContext) {
@@ -45,171 +45,137 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask {
jobModel.setJobName(mapData.getJobName());
jobModel.setJobDescription("Sample description");
- if (mapData != null) {
- //jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
- AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(
- getTaskContext().getGatewayId(),
- getTaskContext().getComputeResourceId(),
- getTaskContext().getJobSubmissionProtocol().name(),
- getTaskContext().getComputeResourceCredentialToken(),
- getTaskContext().getComputeResourceLoginUserName());
-
- JobSubmissionOutput submissionOutput = submitBatchJob(adaptor, mapData, mapData.getWorkingDirectory());
-
- jobModel.setExitCode(submissionOutput.getExitCode());
- jobModel.setStdErr(submissionOutput.getStdErr());
- jobModel.setStdOut(submissionOutput.getStdOut());
-
- String jobId = submissionOutput.getJobId();
-
- if (submissionOutput.getExitCode() != 0 || submissionOutput.isJobSubmissionFailed()) {
- jobModel.setJobId(DEFAULT_JOB_ID);
- if (submissionOutput.isJobSubmissionFailed()) {
- List<JobStatus> statusList = new ArrayList<>();
- statusList.add(new JobStatus(JobState.FAILED));
- statusList.get(0).setReason(submissionOutput.getFailureReason());
- jobModel.setJobStatuses(statusList);
- saveJobModel(jobModel);
- logger.error("expId: " + getExperimentId() + ", processid: " + getProcessId()+ ", taskId: " +
- getTaskId() + " :- Job submission failed for job name " + jobModel.getJobName()
- + ". Exit code : " + submissionOutput.getExitCode() + ", Submission failed : "
- + submissionOutput.isJobSubmissionFailed());
+ AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(
+ getTaskContext().getGatewayId(),
+ getTaskContext().getComputeResourceId(),
+ getTaskContext().getJobSubmissionProtocol().name(),
+ getTaskContext().getComputeResourceCredentialToken(),
+ getTaskContext().getComputeResourceLoginUserName());
- ErrorModel errorModel = new ErrorModel();
- errorModel.setUserFriendlyMessage(submissionOutput.getFailureReason());
- errorModel.setActualErrorMessage(submissionOutput.getFailureReason());
- saveExperimentError(errorModel);
- saveProcessError(errorModel);
- saveTaskError(errorModel);
- //taskStatus.setState(TaskState.FAILED);
- //taskStatus.setReason("Job submission command didn't return a jobId");
- //taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- //taskContext.setTaskStatus(taskStatus);
- logger.error("Standard error message : " + submissionOutput.getStdErr());
- logger.error("Standard out message : " + submissionOutput.getStdOut());
- return onFail("Job submission command didn't return a jobId", false, null);
+ JobSubmissionOutput submissionOutput = submitBatchJob(adaptor, mapData, mapData.getWorkingDirectory());
+
+ jobModel.setJobDescription(submissionOutput.getDescription());
+ jobModel.setExitCode(submissionOutput.getExitCode());
+ jobModel.setStdErr(submissionOutput.getStdErr());
+ jobModel.setStdOut(submissionOutput.getStdOut());
+
+ String jobId = submissionOutput.getJobId();
+
+ if (submissionOutput.getExitCode() != 0 || submissionOutput.isJobSubmissionFailed()) {
+
+ jobModel.setJobId(DEFAULT_JOB_ID);
+ if (submissionOutput.isJobSubmissionFailed()) {
+ List<JobStatus> statusList = new ArrayList<>();
+ statusList.add(new JobStatus(JobState.FAILED));
+ statusList.get(0).setReason(submissionOutput.getFailureReason());
+ jobModel.setJobStatuses(statusList);
+ saveJobModel(jobModel);
+ logger.error("expId: " + getExperimentId() + ", processid: " + getProcessId()+ ", taskId: " +
+ getTaskId() + " :- Job submission failed for job name " + jobModel.getJobName()
+ + ". Exit code : " + submissionOutput.getExitCode() + ", Submission failed : "
+ + submissionOutput.isJobSubmissionFailed());
+
+ logger.error("Standard error message : " + submissionOutput.getStdErr());
+ logger.error("Standard out message : " + submissionOutput.getStdOut());
+ return onFail("Job submission command didn't return a jobId. Reason " + submissionOutput.getFailureReason(),
+ false, null);
+
+ } else {
+ String msg;
+ saveJobModel(jobModel);
+ ErrorModel errorModel = new ErrorModel();
+ if (submissionOutput.getExitCode() != Integer.MIN_VALUE) {
+ msg = "expId:" + getExperimentId() + ", processId:" + getProcessId() + ", taskId: " + getTaskId() +
+ " return non zero exit code:" + submissionOutput.getExitCode() + " for JobName:" + jobModel.getJobName() +
+ ", with failure reason : " + submissionOutput.getFailureReason()
+ + " Hence changing job state to Failed." ;
+ errorModel.setActualErrorMessage(submissionOutput.getFailureReason());
} else {
- String msg;
- saveJobModel(jobModel);
- ErrorModel errorModel = new ErrorModel();
- if (submissionOutput.getExitCode() != Integer.MIN_VALUE) {
- msg = "expId:" + getExperimentId() + ", processId:" + getProcessId() + ", taskId: " + getTaskId() +
- " return non zero exit code:" + submissionOutput.getExitCode() + " for JobName:" + jobModel.getJobName() +
- ", with failure reason : " + submissionOutput.getFailureReason()
- + " Hence changing job state to Failed." ;
- errorModel.setActualErrorMessage(submissionOutput.getFailureReason());
- } else {
- msg = "expId:" + getExperimentId() + ", processId:" + getProcessId() + ", taskId: " + getTaskId() +
- " doesn't return valid job submission exit code for JobName:" + jobModel.getJobName() +
- ", with failure reason : stdout ->" + submissionOutput.getStdOut() +
- " stderr -> " + submissionOutput.getStdErr() + " Hence changing job state to Failed." ;
- errorModel.setActualErrorMessage(msg);
- }
- logger.error(msg);
- errorModel.setUserFriendlyMessage(msg);
- saveExperimentError(errorModel);
- saveProcessError(errorModel);
- saveTaskError(errorModel);
- //taskStatus.setState(TaskState.FAILED);
- //taskStatus.setReason(msg);
- //taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- //taskContext.setTaskStatus(taskStatus);
- return onFail(msg, false, null);
+ msg = "expId:" + getExperimentId() + ", processId:" + getProcessId() + ", taskId: " + getTaskId() +
+ " doesn't return valid job submission exit code for JobName:" + jobModel.getJobName() +
+ ", with failure reason : stdout ->" + submissionOutput.getStdOut() +
+ " stderr -> " + submissionOutput.getStdErr() + " Hence changing job state to Failed." ;
+ errorModel.setActualErrorMessage(msg);
}
+ logger.error(msg);
+ return onFail(msg, false, null);
- //TODO save task status??
- } else if (jobId != null && !jobId.isEmpty()) {
- logger.info("Received job id " + jobId + " from compute resource");
- jobModel.setJobId(jobId);
- saveJobModel(jobModel);
+ }
+
+ } else if (jobId != null && !jobId.isEmpty()) {
+
+ logger.info("Received job id " + jobId + " from compute resource");
+ jobModel.setJobId(jobId);
+ saveJobModel(jobModel);
+
+ JobStatus jobStatus = new JobStatus();
+ jobStatus.setJobState(JobState.SUBMITTED);
+ jobStatus.setReason("Successfully Submitted to " + getComputeResourceDescription().getHostName());
+ jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ jobModel.setJobStatuses(Collections.singletonList(jobStatus));
+ saveAndPublishJobStatus(jobModel);
- JobStatus jobStatus = new JobStatus();
- jobStatus.setJobState(JobState.SUBMITTED);
- jobStatus.setReason("Successfully Submitted to " + getComputeResourceDescription().getHostName());
+ if (verifyJobSubmissionByJobId(adaptor, jobId)) {
+ jobStatus.setJobState(JobState.QUEUED);
+ jobStatus.setReason("Verification step succeeded");
jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- jobModel.setJobStatuses(Arrays.asList(jobStatus));
+ jobModel.setJobStatuses(Collections.singletonList(jobStatus));
saveAndPublishJobStatus(jobModel);
+ createMonitoringNode(jobId);
+ }
+
+ if (getComputeResourceDescription().isGatewayUsageReporting()){
+ String loadCommand = getComputeResourceDescription().getGatewayUsageModuleLoadCommand();
+ String usageExecutable = getComputeResourceDescription().getGatewayUsageExecutable();
+ ExperimentModel experiment = (ExperimentModel)getExperimentCatalog().get(ExperimentCatalogModelType.EXPERIMENT, getExperimentId());
+ String username = experiment.getUserName() + "@" + getTaskContext().getGatewayComputeResourcePreference().getUsageReportingGatewayId();
+ RawCommandInfo rawCommandInfo = new RawCommandInfo(loadCommand + " && " + usageExecutable + " -gateway_user " + username +
+ " -submit_time \"`date '+%F %T %:z'`\" -jobid " + jobId );
+ adaptor.executeCommand(rawCommandInfo.getRawCommand(), null);
+ }
+
+ return onSuccess("Submitted job to compute resource");
+
+ } else {
- if (verifyJobSubmissionByJobId(adaptor, jobId)) {
+ int verificationTryCount = 0;
+ while (verificationTryCount++ < 3) {
+ String verifyJobId = verifyJobSubmission(adaptor, jobModel.getJobName(), getTaskContext().getComputeResourceLoginUserName());
+ if (verifyJobId != null && !verifyJobId.isEmpty()) {
+ // JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED
+ jobId = verifyJobId;
+ jobModel.setJobId(jobId);
+ saveJobModel(jobModel);
+ JobStatus jobStatus = new JobStatus();
jobStatus.setJobState(JobState.QUEUED);
jobStatus.setReason("Verification step succeeded");
jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- jobModel.setJobStatuses(Arrays.asList(jobStatus));
+ jobModel.setJobStatuses(Collections.singletonList(jobStatus));
saveAndPublishJobStatus(jobModel);
- createMonitoringNode(jobId);
- }
-
- if (getComputeResourceDescription().isGatewayUsageReporting()){
- String loadCommand = getComputeResourceDescription().getGatewayUsageModuleLoadCommand();
- String usageExecutable = getComputeResourceDescription().getGatewayUsageExecutable();
- ExperimentModel experiment = (ExperimentModel)getExperimentCatalog().get(ExperimentCatalogModelType.EXPERIMENT, getExperimentId());
- String username = experiment.getUserName() + "@" + getTaskContext().getGatewayComputeResourcePreference().getUsageReportingGatewayId();
- RawCommandInfo rawCommandInfo = new RawCommandInfo(loadCommand + " && " + usageExecutable + " -gateway_user " + username +
- " -submit_time \"`date '+%F %T %:z'`\" -jobid " + jobId );
- adaptor.executeCommand(rawCommandInfo.getRawCommand(), null);
- }
- //taskStatus = new TaskStatus(TaskState.COMPLETED);
- //taskStatus.setReason("Submitted job to compute resource");
- //taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-
- return onSuccess("Submitted job to compute resource");
- } else {
- int verificationTryCount = 0;
- while (verificationTryCount++ < 3) {
- String verifyJobId = verifyJobSubmission(adaptor, jobModel.getJobName(), getTaskContext().getComputeResourceLoginUserName());
- if (verifyJobId != null && !verifyJobId.isEmpty()) {
- // JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED
- jobId = verifyJobId;
- jobModel.setJobId(jobId);
- saveJobModel(jobModel);
- JobStatus jobStatus = new JobStatus();
- jobStatus.setJobState(JobState.QUEUED);
- jobStatus.setReason("Verification step succeeded");
- jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- jobModel.setJobStatuses(Arrays.asList(jobStatus));
- saveAndPublishJobStatus(jobModel);
- //taskStatus.setState(TaskState.COMPLETED);
- //taskStatus.setReason("Submitted job to compute resource");
- //taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- break;
- }
- logger.info("Verify step return invalid jobId, retry verification step in " + (verificationTryCount * 10) + " secs");
- Thread.sleep(verificationTryCount * 10000);
+ logger.info("Job id " + verifyJobId + " verification succeeded");
+ break;
}
+ logger.info("Verify step return invalid jobId, retry verification step in " + (verificationTryCount * 10) + " secs");
+ Thread.sleep(verificationTryCount * 10000);
}
+ }
- if (jobId == null || jobId.isEmpty()) {
- jobModel.setJobId(DEFAULT_JOB_ID);
- saveJobModel(jobModel);
- String msg = "expId:" + getExperimentId() + " Couldn't find " +
- "remote jobId for JobName:" + jobModel.getJobName() + ", both submit and verify steps " +
- "doesn't return a valid JobId. " + "Hence changing experiment state to Failed";
- logger.error(msg);
- ErrorModel errorModel = new ErrorModel();
- errorModel.setUserFriendlyMessage(msg);
- errorModel.setActualErrorMessage(msg);
- saveExperimentError(errorModel);
- saveProcessError(errorModel);
- saveTaskError(errorModel);
- //taskStatus.setState(TaskState.FAILED);
- //taskStatus.setReason("Couldn't find job id in both submitted and verified steps");
- //taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- return onFail("Couldn't find job id in both submitted and verified steps", false, null);
- } else {
- //GFacUtils.saveJobModel(processContext, jobModel);
- }
-
- } else {
- return onFail("Job data is null", true, null);
- // taskStatus.setReason("JobFile is null");
- //taskStatus.setState(TaskState.FAILED);
+ if (jobId == null || jobId.isEmpty()) {
+ jobModel.setJobId(DEFAULT_JOB_ID);
+ saveJobModel(jobModel);
+ String msg = "expId:" + getExperimentId() + " Couldn't find " +
+ "remote jobId for JobName:" + jobModel.getJobName() + ", both submit and verify steps " +
+ "doesn't return a valid JobId. " + "Hence changing experiment state to Failed";
+ logger.error(msg);
+ return onFail("Couldn't find job id in both submitted and verified steps. " + msg, false, null);
+ } else {
+ return onSuccess("Submitted job to compute resource after retry");
}
+
} catch (Exception e) {
return onFail("Task failed due to unexpected issue", false, e);
}
- // TODO get rid of this
- return onFail("Task moved to an unknown state", false, null);
}
private boolean verifyJobSubmissionByJobId(AgentAdaptor agentAdaptor, String jobID) throws Exception {
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/ForkJobSubmissionTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/ForkJobSubmissionTask.java
index 06ce0ea..d9415ac 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/ForkJobSubmissionTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/ForkJobSubmissionTask.java
@@ -12,12 +12,17 @@ import org.apache.airavata.model.job.JobModel;
import org.apache.airavata.model.status.JobState;
import org.apache.airavata.model.status.JobStatus;
import org.apache.helix.task.TaskResult;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
-import java.util.Arrays;
+import java.util.Collections;
@TaskDef(name = "Fork Job Submission")
+@SuppressWarnings("unused")
public class ForkJobSubmissionTask extends JobSubmissionTask {
+ private static final Logger logger = LogManager.getLogger(ForkJobSubmissionTask.class);
+
@Override
public TaskResult onRun(TaskHelper taskHelper, TaskContext taskContext) {
@@ -31,45 +36,44 @@ public class ForkJobSubmissionTask extends JobSubmissionTask {
jobModel.setTaskId(getTaskId());
jobModel.setJobName(mapData.getJobName());
- if (mapData != null) {
- //jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
- AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(
- getTaskContext().getGatewayId(),
- getTaskContext().getComputeResourceId(),
- getTaskContext().getJobSubmissionProtocol().name(),
- getTaskContext().getComputeResourceCredentialToken(),
- getTaskContext().getComputeResourceLoginUserName());
-
- JobSubmissionOutput submissionOutput = submitBatchJob(adaptor, mapData, mapData.getWorkingDirectory());
-
- jobModel.setExitCode(submissionOutput.getExitCode());
- jobModel.setStdErr(submissionOutput.getStdErr());
- jobModel.setStdOut(submissionOutput.getStdOut());
-
- String jobId = submissionOutput.getJobId();
-
- if (jobId != null && !jobId.isEmpty()) {
- jobModel.setJobId(jobId);
- saveJobModel(jobModel);
- JobStatus jobStatus = new JobStatus();
- jobStatus.setJobState(JobState.SUBMITTED);
- jobStatus.setReason("Successfully Submitted to " + getComputeResourceDescription().getHostName());
- jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- jobModel.setJobStatuses(Arrays.asList(jobStatus));
- saveAndPublishJobStatus(jobModel);
-
- return null;
- } else {
- String msg = "expId:" + getExperimentId() + " Couldn't find remote jobId for JobName:" +
- jobModel.getJobName() + ", both submit and verify steps doesn't return a valid JobId. " +
- "Hence changing experiment state to Failed";
- }
-
+ AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(
+ getTaskContext().getGatewayId(),
+ getTaskContext().getComputeResourceId(),
+ getTaskContext().getJobSubmissionProtocol().name(),
+ getTaskContext().getComputeResourceCredentialToken(),
+ getTaskContext().getComputeResourceLoginUserName());
+
+ JobSubmissionOutput submissionOutput = submitBatchJob(adaptor, mapData, mapData.getWorkingDirectory());
+
+ jobModel.setJobDescription(submissionOutput.getDescription());
+ jobModel.setExitCode(submissionOutput.getExitCode());
+ jobModel.setStdErr(submissionOutput.getStdErr());
+ jobModel.setStdOut(submissionOutput.getStdOut());
+
+ String jobId = submissionOutput.getJobId();
+
+ if (jobId != null && !jobId.isEmpty()) {
+ jobModel.setJobId(jobId);
+ saveJobModel(jobModel);
+ JobStatus jobStatus = new JobStatus();
+ jobStatus.setJobState(JobState.SUBMITTED);
+ jobStatus.setReason("Successfully Submitted to " + getComputeResourceDescription().getHostName());
+ jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ jobModel.setJobStatuses(Collections.singletonList(jobStatus));
+ saveAndPublishJobStatus(jobModel);
+
+ return onSuccess("Job submitted successfully");
+ } else {
+ String msg = "expId:" + getExperimentId() + " Couldn't find remote jobId for JobName:" +
+ jobModel.getJobName() + ", both submit and verify steps doesn't return a valid JobId. " +
+ "Hence changing experiment state to Failed";
+
+ return onFail(msg, true, null);
}
- return null;
} catch (Exception e) {
- return null;
+ logger.error("Unknown error while submitting job", e);
+ return onFail("Unknown error while submitting job", true, e);
}
}
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
index 7bf5034..a204ee1 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
@@ -52,23 +52,32 @@ public abstract class JobSubmissionTask extends AiravataTask {
}
}
+ @SuppressWarnings("WeakerAccess")
public CuratorFramework getCuratorClient() {
return curatorClient;
}
// TODO perform exception handling
+ @SuppressWarnings("WeakerAccess")
protected void createMonitoringNode(String jobId) throws Exception {
logger.info("Creating zookeeper paths for job monitoring for job id : " + jobId + ", process : "
+ getProcessId() + ", gateway : " + getGatewayId());
- this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/lock", new byte[0]);
- this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/gateway", getGatewayId().getBytes());
- this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/process", getProcessId().getBytes());
- this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/task", getTaskId().getBytes());
- this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/experiment", getExperimentId().getBytes());
- this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/status", "pending".getBytes());
+ getCuratorClient().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
+ "/monitoring/" + jobId + "/lock", new byte[0]);
+ getCuratorClient().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
+ "/monitoring/" + jobId + "/gateway", getGatewayId().getBytes());
+ getCuratorClient().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
+ "/monitoring/" + jobId + "/process", getProcessId().getBytes());
+ getCuratorClient().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
+ "/monitoring/" + jobId + "/task", getTaskId().getBytes());
+ getCuratorClient().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
+ "/monitoring/" + jobId + "/experiment", getExperimentId().getBytes());
+ getCuratorClient().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
+ "/monitoring/" + jobId + "/status", "pending".getBytes());
}
//////////////////////
+ @SuppressWarnings("WeakerAccess")
protected JobSubmissionOutput submitBatchJob(AgentAdaptor agentAdaptor, GroovyMapData groovyMapData, String workingDirectory) throws Exception {
JobManagerConfiguration jobManagerConfiguration = JobFactory.getJobManagerConfiguration(JobFactory.getResourceJobManager(
getAppCatalog(), getTaskContext().getJobSubmissionProtocol(), getTaskContext().getPreferredJobSubmissionInterface()));
@@ -94,6 +103,7 @@ public abstract class JobSubmissionTask extends AiravataTask {
CommandOutput commandOutput = agentAdaptor.executeCommand(submitCommand.getRawCommand(), workingDirectory);
JobSubmissionOutput jsoutput = new JobSubmissionOutput();
+ jsoutput.setDescription(scriptAsString);
jsoutput.setJobId(jobManagerConfiguration.getParser().parseJobSubmission(commandOutput.getStdOut()));
if (jsoutput.getJobId() == null) {
@@ -114,12 +124,14 @@ public abstract class JobSubmissionTask extends AiravataTask {
return jsoutput;
}
+ @SuppressWarnings("WeakerAccess")
public File getLocalDataDir() {
String outputPath = ServerSettings.getLocalDataLocation();
outputPath = (outputPath.endsWith(File.separator) ? outputPath : outputPath + File.separator);
return new File(outputPath + getProcessId());
}
+ @SuppressWarnings("WeakerAccess")
public JobStatus getJobStatus(AgentAdaptor agentAdaptor, String jobID) throws Exception {
JobManagerConfiguration jobManagerConfiguration = JobFactory.getJobManagerConfiguration(JobFactory.getResourceJobManager(
getAppCatalog(), getTaskContext().getJobSubmissionProtocol(), getTaskContext().getPreferredJobSubmissionInterface()));
@@ -129,6 +141,7 @@ public abstract class JobSubmissionTask extends AiravataTask {
}
+ @SuppressWarnings("WeakerAccess")
public String getJobIdByJobName(AgentAdaptor agentAdaptor, String jobName, String userName) throws Exception {
JobManagerConfiguration jobManagerConfiguration = JobFactory.getJobManagerConfiguration(JobFactory.getResourceJobManager(
getAppCatalog(), getTaskContext().getJobSubmissionProtocol(), getTaskContext().getPreferredJobSubmissionInterface()));
@@ -138,16 +151,22 @@ public abstract class JobSubmissionTask extends AiravataTask {
return jobManagerConfiguration.getParser().parseJobId(jobName, commandOutput.getStdOut());
}
+ @SuppressWarnings("WeakerAccess")
public void saveJobModel(JobModel jobModel) throws RegistryException {
getExperimentCatalog().add(ExpCatChildDataType.JOB, jobModel, getProcessId());
}
+ @SuppressWarnings("WeakerAccess")
public void saveAndPublishJobStatus(JobModel jobModel) throws Exception {
try {
// first we save job jobModel to the registry for sa and then save the job status.
- JobStatus jobStatus = null;
- if(jobModel.getJobStatuses() != null)
+ JobStatus jobStatus;
+ if (jobModel.getJobStatuses() != null && jobModel.getJobStatuses().size() > 0) {
jobStatus = jobModel.getJobStatuses().get(0);
+ } else {
+ logger.error("Job statuses can not be empty");
+ return;
+ }
List<JobStatus> statuses = new ArrayList<>();
statuses.add(jobStatus);
@@ -173,7 +192,4 @@ public abstract class JobSubmissionTask extends AiravataTask {
throw new Exception("Error persisting job status " + e.getLocalizedMessage(), e);
}
}
-
- ///////////// required for groovy map
-
}
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index 225f81d..e2af339 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -41,15 +41,11 @@ import org.apache.log4j.Logger;
import org.apache.zookeeper.data.Stat;
import java.util.*;
-import java.util.stream.Collectors;
public class PostWorkflowManager {
private static final Logger logger = LogManager.getLogger(PostWorkflowManager.class);
- //private final String BOOTSTRAP_SERVERS = "localhost:9092";
- //private final String TOPIC = "parsed-data";
-
private CuratorFramework curatorClient = null;
private Publisher statusPublisher;
@@ -66,7 +62,7 @@ public class PostWorkflowManager {
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JobStatusResultDeserializer.class.getName());
// Create the consumer using props.
- final Consumer<String, JobStatusResult> consumer = new KafkaConsumer<String, JobStatusResult>(props);
+ final Consumer<String, JobStatusResult> consumer = new KafkaConsumer<>(props);
// Subscribe to the topic.
consumer.subscribe(Collections.singletonList(ServerSettings.getSetting("kafka.broker.topic")));
return consumer;
@@ -74,32 +70,27 @@ public class PostWorkflowManager {
private String getExperimentIdByJobId(String jobId) throws Exception {
byte[] processBytes = this.curatorClient.getData().forPath("/monitoring/" + jobId + "/experiment");
- String process = new String(processBytes);
- return process;
+ return new String(processBytes);
}
private String getTaskIdByJobId(String jobId) throws Exception {
byte[] processBytes = this.curatorClient.getData().forPath("/monitoring/" + jobId + "/task");
- String process = new String(processBytes);
- return process;
+ return new String(processBytes);
}
private String getProcessIdByJobId(String jobId) throws Exception {
byte[] processBytes = this.curatorClient.getData().forPath("/monitoring/" + jobId + "/process");
- String process = new String(processBytes);
- return process;
+ return new String(processBytes);
}
private String getGatewayByJobId(String jobId) throws Exception {
byte[] gatewayBytes = this.curatorClient.getData().forPath("/monitoring/" + jobId + "/gateway");
- String gateway = new String(gatewayBytes);
- return gateway;
+ return new String(gatewayBytes);
}
private String getStatusByJobId(String jobId) throws Exception {
byte[] statusBytes = this.curatorClient.getData().forPath("/monitoring/" + jobId + "/status");
- String status = new String(statusBytes);
- return status;
+ return new String(statusBytes);
}
private boolean hasMonitoringRegistered(String jobId) throws Exception {
@@ -128,7 +119,7 @@ public class PostWorkflowManager {
// TODO get cluster lock before that
if ("cancelled".equals(status)) {
-
+ // TODO to be implemented
} else {
saveAndPublishJobStatus(jobStatusResult.getJobId(), task, processId, experimentId, gateway, jobStatusResult.getState());
@@ -190,7 +181,7 @@ public class PostWorkflowManager {
ServerSettings.getZookeeperConnection());
workflowManager.launchWorkflow(processId + "-POST-" + UUID.randomUUID().toString(),
- allTasks.stream().map(t -> (AiravataTask) t).collect(Collectors.toList()), true, false);
+ new ArrayList<>(allTasks), true, false);
} else if (jobStatusResult.getState() == JobState.CANCELED) {
logger.info("Job " + jobStatusResult.getJobId() + " was externally cancelled");
@@ -224,7 +215,7 @@ public class PostWorkflowManager {
}
}
- public void saveAndPublishJobStatus(String jobId, String taskId, String processId, String experimentId, String gateway,
+ private void saveAndPublishJobStatus(String jobId, String taskId, String processId, String experimentId, String gateway,
JobState jobState) throws Exception {
try {
@@ -255,7 +246,7 @@ public class PostWorkflowManager {
}
}
- public Publisher getStatusPublisher() throws AiravataException {
+ private Publisher getStatusPublisher() throws AiravataException {
if (statusPublisher == null) {
synchronized (RabbitMQPublisher.class) {
if (statusPublisher == null) {
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
index 18a6627..383e0b0 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
@@ -3,6 +3,7 @@ package org.apache.airavata.helix.impl.workflow;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.helix.core.AbstractTask;
import org.apache.airavata.helix.core.OutPort;
import org.apache.airavata.helix.impl.task.AiravataTask;
import org.apache.airavata.helix.impl.task.env.EnvSetupTask;
@@ -36,6 +37,7 @@ public class PreWorkflowManager {
private final Subscriber subscriber;
+ @SuppressWarnings("WeakerAccess")
public PreWorkflowManager() throws AiravataException {
List<String> routingKeys = new ArrayList<>();
routingKeys.add(ServerSettings.getRabbitmqProcessExchangeName());
@@ -94,7 +96,7 @@ public class PreWorkflowManager {
ServerSettings.getSetting("post.workflow.manager.name"),
ServerSettings.getZookeeperConnection());
String workflowName = workflowManager.launchWorkflow(processId + "-PRE-" + UUID.randomUUID().toString(),
- allTasks.stream().map(t -> (AiravataTask) t).collect(Collectors.toList()), true, false);
+ new ArrayList<>(allTasks), true, false);
return workflowName;
}
diff --git a/modules/airavata-helix/helix-spectator/src/main/resources/airavata-server.properties b/modules/airavata-helix/helix-spectator/src/main/resources/airavata-server.properties
index 19b3b3d..e412896 100644
--- a/modules/airavata-helix/helix-spectator/src/main/resources/airavata-server.properties
+++ b/modules/airavata-helix/helix-spectator/src/main/resources/airavata-server.properties
@@ -45,18 +45,6 @@ jpa.cache.size=-1
#jpa.connection.properties=MaxActive=10,MaxIdle=5,MinIdle=2,MaxWait=60000,testWhileIdle=true,testOnBorrow=true
enable.sharing=true
-# Properties for default user mode
-default.registry.user=default-admin
-default.registry.password=123456
-default.registry.password.hash.method=SHA
-default.registry.gateway=default
-super.tenant.gatewayId=default
-
-# Properties for cluster status monitoring
-# cluster status monitoring job repeat time in seconds
-cluster.status.monitoring.enable=false
-cluster.status.monitoring.repeat.time=18000
-
###########################################################################
# Application Catalog DB Configuration
###########################################################################
@@ -84,19 +72,6 @@ replicacatalog.jdbc.password=eroma123456
replicacatalog.validationQuery=SELECT 1 from CONFIGURATION
###########################################################################
-# Workflow Catalog DB Configuration
-###########################################################################
-#for derby [AiravataJPARegistry]
-#workflowcatalog.jdbc.driver=org.apache.derby.jdbc.ClientDriver
-#workflowcatalog.jdbc.url=jdbc:derby://localhost:1527/workflow_catalog;create=true;user=airavata;password=airavata
-# MariaDB database configuration
-workflowcatalog.jdbc.driver=org.mariadb.jdbc.Driver
-workflowcatalog.jdbc.url=jdbc:mariadb://149.165.168.248:3306/replica_catalog
-workflowcatalog.jdbc.user=eroma
-workflowcatalog.jdbc.password=eroma123456
-workflowcatalog.validationQuery=SELECT 1 from CONFIGURATION
-
-###########################################################################
# Sharing Catalog DB Configuration
###########################################################################
#for derby [AiravataJPARegistry]
@@ -117,21 +92,6 @@ sharing.registry.server.host=192.168.99.102
sharing.registry.server.port=7878
###########################################################################
-# User Profile MongoDB Configuration
-###########################################################################
-userprofile.mongodb.host=localhost
-userprofile.mongodb.port=27017
-
-
-###########################################################################
-# Server module Configuration
-###########################################################################
-#credential store server should be started before API server
-#This is obsolete property with new script files.
-#servers=credentialstore,apiserver,orchestrator
-
-
-###########################################################################
# API Server Configurations
###########################################################################
apiserver=org.apache.airavata.api.server.AiravataAPIServer
@@ -141,21 +101,6 @@ apiserver.port=8930
apiserver.min.threads=50
###########################################################################
-# Orchestrator Server Configurations
-###########################################################################
-orchestrator=org.apache.airavata.orchestrator.server.OrchestratorServer
-orchestrator.server.name=orchestrator-node0
-orchestrator.server.host=192.168.99.102
-orchestrator.server.port=8940
-orchestrator.server.min.threads=50
-job.validators=org.apache.airavata.orchestrator.core.validator.impl.BatchQueueValidator,org.apache.airavata.orchestrator.core.validator.impl.ExperimentStatusValidator
-submitter.interval=10000
-threadpool.size=10
-start.submitter=true
-embedded.mode=true
-enable.validation=true
-
-###########################################################################
# Registry Server Configurations
###########################################################################
regserver=org.apache.airavata.registry.api.service.RegistryAPIServer
@@ -164,28 +109,6 @@ regserver.server.host=192.168.99.102
regserver.server.port=8970
regserver.server.min.threads=50
-###########################################################################
-# GFac Server Configurations
-###########################################################################
-gfac=org.apache.airavata.gfac.server.GfacServer
-gfac.server.name=gfac-node0
-gfac.server.host=10.0.2.15
-gfac.server.port=8950
-gfac.thread.pool.size=50
-host.scheduler=org.apache.airavata.gfac.impl.DefaultHostScheduler
-
-
-
-###########################################################################
-# Airavata Workflow Interpreter Configurations
-###########################################################################
-workflowserver=org.apache.airavata.api.server.WorkflowServer
-enactment.thread.pool.size=10
-
-#to define custom workflow parser user following property
-#workflow.parser=org.apache.airavata.workflow.core.parser.AiravataWorkflowBuilder
-
-
###########################################################################
# Job Scheduler can send informative email messages to you about the status of your job.
@@ -269,6 +192,8 @@ kafka.broker.consumer.group=MonitoringConsumer
helix.cluster.name=AiravataDemoCluster
pre.workflow.manager.name=prewm
post.workflow.manager.name=postwm
+helix.controller.name=helixcontroller
+helix.participant.name=helixparticipant
###########################################################################
# AMQP Notification Configuration
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java
index a7e5a64..029da29 100644
--- a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java
@@ -1,5 +1,7 @@
package org.apache.airavata.helix.core.participant;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.helix.core.support.TaskHelperImpl;
import org.apache.airavata.helix.core.AbstractTask;
import org.apache.airavata.helix.core.util.PropertyResolver;
@@ -44,20 +46,13 @@ public class HelixParticipant <T extends AbstractTask> implements Runnable {
private PropertyResolver propertyResolver;
private Class<T> taskClass;
- public HelixParticipant(String propertyFile, Class<T> taskClass, String taskTypeName, boolean readPropertyFromFile) throws IOException {
+ public HelixParticipant(Class<T> taskClass, String taskTypeName) throws ApplicationSettingsException {
logger.info("Initializing Participant Node");
- this.propertyResolver = new PropertyResolver();
- if (readPropertyFromFile) {
- propertyResolver.loadFromFile(new File(propertyFile));
- } else {
- propertyResolver.loadInputStream(this.getClass().getClassLoader().getResourceAsStream(propertyFile));
- }
-
- this.zkAddress = propertyResolver.get("zookeeper.connection.url");
- this.clusterName = propertyResolver.get("helix.cluster.name");
- this.participantName = propertyResolver.get("participant.name");
+ this.zkAddress = ServerSettings.getZookeeperConnection();
+ this.clusterName = ServerSettings.getSetting("helix.cluster.name");
+ this.participantName = ServerSettings.getSetting("helix.participant.name");
this.taskTypeName = taskTypeName;
this.taskClass = taskClass;
--
To stop receiving notification emails like this one, please contact
dimuthuupe@apache.org.