You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/11/11 17:26:54 UTC
airavata git commit: Use datamovement remote cluster in datamovement
tasks
Repository: airavata
Updated Branches:
refs/heads/develop dd32d065e -> 832b14680
Use datamovement remote cluster in datamovement tasks
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/832b1468
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/832b1468
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/832b1468
Branch: refs/heads/develop
Commit: 832b14680730bc64c346a739a6491ffc6f4c13b5
Parents: dd32d06
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Wed Nov 11 11:26:48 2015 -0500
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Wed Nov 11 11:26:48 2015 -0500
----------------------------------------------------------------------
.../apache/airavata/gfac/core/GFacUtils.java | 6 ++--
.../gfac/core/context/ProcessContext.java | 19 +++++++++----
.../org/apache/airavata/gfac/impl/Factory.java | 29 +++++++++++++++++---
.../airavata/gfac/impl/GFacEngineImpl.java | 3 +-
.../airavata/gfac/impl/task/DataStageTask.java | 4 +--
.../impl/task/DefaultJobSubmissionTask.java | 4 +--
.../gfac/impl/task/EnvironmentSetupTask.java | 2 +-
.../gfac/impl/task/ForkJobSubmissionTask.java | 2 +-
.../gfac/impl/task/SCPDataStageTask.java | 6 ++--
.../gfac/impl/task/utils/StreamData.java | 4 +--
10 files changed, 54 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/832b1468/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
index c3de6a2..4d6595e 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
@@ -44,7 +44,6 @@ import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel
import org.apache.airavata.model.status.*;
import org.apache.airavata.model.task.JobSubmissionTaskModel;
import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
-import org.apache.airavata.registry.core.experiment.catalog.model.ProcessOutput;
import org.apache.airavata.registry.cpi.*;
import org.apache.airavata.registry.cpi.utils.Constants;
import org.apache.commons.io.FileUtils;
@@ -52,7 +51,6 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths;
import org.apache.thrift.TException;
import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
@@ -518,10 +516,10 @@ public class GFacUtils {
}
jobDescriptor.setInputValues(inputValues);
- jobDescriptor.setUserName(processContext.getRemoteCluster().getServerInfo().getUserName());
+ jobDescriptor.setUserName(processContext.getJobSubmissionRemoteCluster().getServerInfo().getUserName());
jobDescriptor.setShellName("/bin/bash");
jobDescriptor.setAllEnvExport(true);
- jobDescriptor.setOwner(processContext.getRemoteCluster().getServerInfo().getUserName());
+ jobDescriptor.setOwner(processContext.getJobSubmissionRemoteCluster().getServerInfo().getUserName());
// get walltime
try {
JobSubmissionTaskModel jobSubmissionTaskModel = ((JobSubmissionTaskModel) taskContext.getSubTaskModel());
http://git-wip-us.apache.org/repos/asf/airavata/blob/832b1468/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
index 1a3a236..f470461 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
@@ -69,7 +69,8 @@ public class ProcessContext {
private ComputeResourceDescription computeResourceDescription;
private ApplicationDeploymentDescription applicationDeploymentDescription;
private ApplicationInterfaceDescription applicationInterfaceDescription;
- private RemoteCluster remoteCluster;
+ private RemoteCluster jobSubmissionRemoteCluster;
+ private RemoteCluster dataMovementRemoteCluster;
private Map<String, String> sshProperties;
private String stdoutLocation;
private String stderrLocation;
@@ -178,12 +179,20 @@ public class ProcessContext {
this.gatewayResourceProfile = gatewayResourceProfile;
}
- public RemoteCluster getRemoteCluster() {
- return remoteCluster;
+ public RemoteCluster getJobSubmissionRemoteCluster() {
+ return jobSubmissionRemoteCluster;
}
- public void setRemoteCluster(RemoteCluster remoteCluster) {
- this.remoteCluster = remoteCluster;
+ public void setJobSubmissionRemoteCluster(RemoteCluster jobSubmissoinRemoteCluster) {
+ this.jobSubmissionRemoteCluster = jobSubmissoinRemoteCluster;
+ }
+
+ public RemoteCluster getDataMovementRemoteCluster() {
+ return dataMovementRemoteCluster;
+ }
+
+ public void setDataMovementRemoteCluster(RemoteCluster dataMovementRemoteCluster) {
+ this.dataMovementRemoteCluster = dataMovementRemoteCluster;
}
public Map<String, String> getSshProperties() {
http://git-wip-us.apache.org/repos/asf/airavata/blob/832b1468/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
index ff4ca9d..527ee4e 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
@@ -201,13 +201,13 @@ public abstract class Factory {
* @throws AppCatalogException
* @throws AiravataException
*/
- public static RemoteCluster getRemoteCluster(ProcessContext processContext) throws GFacException,
- AppCatalogException, AiravataException {
+ public static RemoteCluster getJobSubmissionRemoteCluster(ProcessContext processContext)
+ throws GFacException, AppCatalogException, AiravataException {
String computeResourceId = processContext.getComputeResourceId();
- String key = processContext.getJobSubmissionProtocol().toString() + ":" + computeResourceId;
+ JobSubmissionProtocol jobSubmissionProtocol = processContext.getJobSubmissionProtocol();
+ String key = jobSubmissionProtocol.name() + ":" + computeResourceId;
RemoteCluster remoteCluster = remoteClusterMap.get(key);
- JobSubmissionProtocol jobSubmissionProtocol = processContext.getJobSubmissionProtocol();
if (remoteCluster == null) {
JobManagerConfiguration jobManagerConfiguration = getJobManagerConfiguration(processContext.getResourceJobManager());
if (jobSubmissionProtocol == JobSubmissionProtocol.LOCAL ||
@@ -223,6 +223,27 @@ public abstract class Factory {
return remoteCluster;
}
+ public static RemoteCluster getDataMovementRemoteCluster(ProcessContext processContext)
+ throws GFacException, AiravataException {
+
+ String computeResourceId = processContext.getComputeResourceId();
+ DataMovementProtocol dataMovementProtocol = processContext.getDataMovementProtocol();
+ String key = dataMovementProtocol.name() + ":" + computeResourceId;
+ RemoteCluster remoteCluster = remoteClusterMap.get(key);
+ if (remoteCluster == null) {
+ JobManagerConfiguration jobManagerConfiguration = getJobManagerConfiguration(processContext.getResourceJobManager());
+ if (dataMovementProtocol == DataMovementProtocol.LOCAL) {
+ remoteCluster = new LocalRemoteCluster(processContext.getServerInfo(), jobManagerConfiguration, null);
+ } else if (dataMovementProtocol == DataMovementProtocol.SCP) {
+ AuthenticationInfo authenticationInfo = getSSHKeyAuthentication();
+ remoteCluster = new HPCRemoteCluster(processContext.getServerInfo(), jobManagerConfiguration, authenticationInfo);
+ }
+
+ remoteClusterMap.put(key, remoteCluster);
+ }
+ return remoteCluster;
+ }
+
private static SSHKeyAuthentication getSSHKeyAuthentication() throws ApplicationSettingsException {
SSHKeyAuthentication sshKA = new SSHKeyAuthentication();
sshKA.setUserName(ServerSettings.getSetting("ssh.username"));
http://git-wip-us.apache.org/repos/asf/airavata/blob/832b1468/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index d386f66..752b37c 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -128,7 +128,8 @@ public class GFacEngineImpl implements GFacEngine {
expCatalog.update(ExperimentCatalogModelType.PROCESS, processModel, processId);
processModel.setProcessOutputs(applicationOutputs);
processContext.setResourceJobManager(getResourceJobManager(processContext));
- processContext.setRemoteCluster(Factory.getRemoteCluster(processContext));
+ processContext.setJobSubmissionRemoteCluster(Factory.getJobSubmissionRemoteCluster(processContext));
+ processContext.setDataMovementRemoteCluster(Factory.getDataMovementRemoteCluster(processContext));
String inputPath = ServerSettings.getLocalDataLocation();
if (inputPath != null) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/832b1468/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java
index ab9d562..ec3ffd7 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java
@@ -66,13 +66,13 @@ public class DataStageTask implements Task {
/**
* copy local file to compute resource.
*/
- taskContext.getParentProcessContext().getRemoteCluster().copyTo(sourceURI.getPath(), destinationURI
+ taskContext.getParentProcessContext().getDataMovementRemoteCluster().copyTo(sourceURI.getPath(), destinationURI
.getPath());
} else if (processState == ProcessState.OUTPUT_DATA_STAGING) {
/**
* copy remote file from compute resource.
*/
- taskContext.getParentProcessContext().getRemoteCluster().copyFrom(sourceURI.getPath(), destinationURI
+ taskContext.getParentProcessContext().getDataMovementRemoteCluster().copyFrom(sourceURI.getPath(), destinationURI
.getPath());
}
status.setReason("Successfully staged data");
http://git-wip-us.apache.org/repos/asf/airavata/blob/832b1468/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
index 020880d..9ccaa94 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
@@ -62,7 +62,7 @@ public class DefaultJobSubmissionTask implements JobSubmissionTask {
ProcessContext processContext = taskContext.getParentProcessContext();
JobModel jobModel = processContext.getJobModel();
jobModel.setTaskId(taskContext.getTaskId());
- RemoteCluster remoteCluster = processContext.getRemoteCluster();
+ RemoteCluster remoteCluster = processContext.getJobSubmissionRemoteCluster();
JobDescriptor jobDescriptor = GFacUtils.createJobDescriptor(processContext,taskContext);
jobModel.setJobName(jobDescriptor.getJobName());
ResourceJobManager resourceJobManager = GFacUtils.getResourceJobManager(processContext);
@@ -258,7 +258,7 @@ public class DefaultJobSubmissionTask implements JobSubmissionTask {
@Override
public JobStatus cancel(TaskContext taskcontext) throws TaskException {
ProcessContext processContext = taskcontext.getParentProcessContext();
- RemoteCluster remoteCluster = processContext.getRemoteCluster();
+ RemoteCluster remoteCluster = processContext.getJobSubmissionRemoteCluster();
JobModel jobModel = processContext.getJobModel();
int retryCount = 0;
if (jobModel != null) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/832b1468/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java
index fff130c..1256e48 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java
@@ -46,7 +46,7 @@ public class EnvironmentSetupTask implements Task {
public TaskStatus execute(TaskContext taskContext) {
TaskStatus status = new TaskStatus(TaskState.COMPLETED);
try {
- RemoteCluster remoteCluster = taskContext.getParentProcessContext().getRemoteCluster();
+ RemoteCluster remoteCluster = taskContext.getParentProcessContext().getJobSubmissionRemoteCluster();
remoteCluster.makeDirectory(taskContext.getParentProcessContext().getWorkingDir());
status.setReason("Successfully created environment");
} catch (SSHApiException e) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/832b1468/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java
index ed75fef..59a36e1 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java
@@ -62,7 +62,7 @@ public class ForkJobSubmissionTask implements JobSubmissionTask {
ProcessContext processContext = taskContext.getParentProcessContext();
JobModel jobModel = processContext.getJobModel();
jobModel.setTaskId(taskContext.getTaskId());
- RemoteCluster remoteCluster = processContext.getRemoteCluster();
+ RemoteCluster remoteCluster = processContext.getJobSubmissionRemoteCluster();
JobDescriptor jobDescriptor = GFacUtils.createJobDescriptor(processContext, taskContext);
jobModel.setJobName(jobDescriptor.getJobName());
ResourceJobManager resourceJobManager = GFacUtils.getResourceJobManager(processContext);
http://git-wip-us.apache.org/repos/asf/airavata/blob/832b1468/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
index 678ded1..7403c29 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
@@ -262,7 +262,7 @@ public class SCPDataStageTask implements Task {
StringBuilder sb = new StringBuilder("rsync -cr ");
sb.append(sourceURI.getPath()).append(" ").append(destinationURI.getPath());
CommandInfo commandInfo = new RawCommandInfo(sb.toString());
- taskContext.getParentProcessContext().getRemoteCluster().execute(commandInfo);
+ taskContext.getParentProcessContext().getDataMovementRemoteCluster().execute(commandInfo);
}
private void inputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI
@@ -270,7 +270,7 @@ public class SCPDataStageTask implements Task {
/**
* scp third party file transfer 'to' compute resource.
*/
- taskContext.getParentProcessContext().getRemoteCluster().scpThirdParty(sourceURI.getPath(),
+ taskContext.getParentProcessContext().getDataMovementRemoteCluster().scpThirdParty(sourceURI.getPath(),
destinationURI.getPath(), sshSession, RemoteCluster.DIRECTION.TO);
}
@@ -280,7 +280,7 @@ public class SCPDataStageTask implements Task {
/**
* scp third party file transfer 'from' comute resource.
*/
- taskContext.getParentProcessContext().getRemoteCluster().scpThirdParty(sourceURI.getPath(),
+ taskContext.getParentProcessContext().getDataMovementRemoteCluster().scpThirdParty(sourceURI.getPath(),
destinationURI.getPath(), sshSession, RemoteCluster.DIRECTION.FROM);
// update output locations
GFacUtils.saveExperimentOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.getPath());
http://git-wip-us.apache.org/repos/asf/airavata/blob/832b1468/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
index fe5f8b7..54903ac 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
@@ -149,7 +149,7 @@ public class StreamData extends TimerTask {
StringBuilder sb = new StringBuilder("rsync -cr ");
sb.append(sourceURI.getPath()).append(" ").append(destinationURI.getPath());
CommandInfo commandInfo = new RawCommandInfo(sb.toString());
- taskContext.getParentProcessContext().getRemoteCluster().execute(commandInfo);
+ taskContext.getParentProcessContext().getDataMovementRemoteCluster().execute(commandInfo);
}
@@ -166,7 +166,7 @@ public class StreamData extends TimerTask {
/**
* scp third party file transfer 'from' comute resource.
*/
- taskContext.getParentProcessContext().getRemoteCluster().scpThirdParty(sourceURI.getPath(),
+ taskContext.getParentProcessContext().getDataMovementRemoteCluster().scpThirdParty(sourceURI.getPath(),
destinationURI.getPath(), sshSession, RemoteCluster.DIRECTION.FROM);
// update output locations
GFacUtils.saveExperimentOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.getPath());