You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/11/11 17:26:54 UTC

airavata git commit: Use datamovement remote cluster in datamovement tasks

Repository: airavata
Updated Branches:
  refs/heads/develop dd32d065e -> 832b14680


Use datamovement remote cluster in datamovement tasks


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/832b1468
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/832b1468
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/832b1468

Branch: refs/heads/develop
Commit: 832b14680730bc64c346a739a6491ffc6f4c13b5
Parents: dd32d06
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Wed Nov 11 11:26:48 2015 -0500
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Wed Nov 11 11:26:48 2015 -0500

----------------------------------------------------------------------
 .../apache/airavata/gfac/core/GFacUtils.java    |  6 ++--
 .../gfac/core/context/ProcessContext.java       | 19 +++++++++----
 .../org/apache/airavata/gfac/impl/Factory.java  | 29 +++++++++++++++++---
 .../airavata/gfac/impl/GFacEngineImpl.java      |  3 +-
 .../airavata/gfac/impl/task/DataStageTask.java  |  4 +--
 .../impl/task/DefaultJobSubmissionTask.java     |  4 +--
 .../gfac/impl/task/EnvironmentSetupTask.java    |  2 +-
 .../gfac/impl/task/ForkJobSubmissionTask.java   |  2 +-
 .../gfac/impl/task/SCPDataStageTask.java        |  6 ++--
 .../gfac/impl/task/utils/StreamData.java        |  4 +--
 10 files changed, 54 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/832b1468/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
index c3de6a2..4d6595e 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
@@ -44,7 +44,6 @@ import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel
 import org.apache.airavata.model.status.*;
 import org.apache.airavata.model.task.JobSubmissionTaskModel;
 import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
-import org.apache.airavata.registry.core.experiment.catalog.model.ProcessOutput;
 import org.apache.airavata.registry.cpi.*;
 import org.apache.airavata.registry.cpi.utils.Constants;
 import org.apache.commons.io.FileUtils;
@@ -52,7 +51,6 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.thrift.TException;
 import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
@@ -518,10 +516,10 @@ public class GFacUtils {
         }
 
         jobDescriptor.setInputValues(inputValues);
-        jobDescriptor.setUserName(processContext.getRemoteCluster().getServerInfo().getUserName());
+        jobDescriptor.setUserName(processContext.getJobSubmissionRemoteCluster().getServerInfo().getUserName());
         jobDescriptor.setShellName("/bin/bash");
         jobDescriptor.setAllEnvExport(true);
-        jobDescriptor.setOwner(processContext.getRemoteCluster().getServerInfo().getUserName());
+        jobDescriptor.setOwner(processContext.getJobSubmissionRemoteCluster().getServerInfo().getUserName());
         // get walltime
         try {
             JobSubmissionTaskModel jobSubmissionTaskModel = ((JobSubmissionTaskModel) taskContext.getSubTaskModel());

http://git-wip-us.apache.org/repos/asf/airavata/blob/832b1468/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
index 1a3a236..f470461 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
@@ -69,7 +69,8 @@ public class ProcessContext {
 	private ComputeResourceDescription computeResourceDescription;
 	private ApplicationDeploymentDescription applicationDeploymentDescription;
 	private ApplicationInterfaceDescription applicationInterfaceDescription;
-	private RemoteCluster remoteCluster;
+	private RemoteCluster jobSubmissionRemoteCluster;
+	private RemoteCluster dataMovementRemoteCluster;
 	private Map<String, String> sshProperties;
 	private String stdoutLocation;
 	private String stderrLocation;
@@ -178,12 +179,20 @@ public class ProcessContext {
 		this.gatewayResourceProfile = gatewayResourceProfile;
 	}
 
-	public RemoteCluster getRemoteCluster() {
-		return remoteCluster;
+	public RemoteCluster getJobSubmissionRemoteCluster() {
+		return jobSubmissionRemoteCluster;
 	}
 
-	public void setRemoteCluster(RemoteCluster remoteCluster) {
-		this.remoteCluster = remoteCluster;
+	public void setJobSubmissionRemoteCluster(RemoteCluster jobSubmissoinRemoteCluster) {
+		this.jobSubmissionRemoteCluster = jobSubmissoinRemoteCluster;
+	}
+
+	public RemoteCluster getDataMovementRemoteCluster() {
+		return dataMovementRemoteCluster;
+	}
+
+	public void setDataMovementRemoteCluster(RemoteCluster dataMovementRemoteCluster) {
+		this.dataMovementRemoteCluster = dataMovementRemoteCluster;
 	}
 
 	public Map<String, String> getSshProperties() {

http://git-wip-us.apache.org/repos/asf/airavata/blob/832b1468/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
index ff4ca9d..527ee4e 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
@@ -201,13 +201,13 @@ public abstract class Factory {
 	 * @throws AppCatalogException
 	 * @throws AiravataException
 	 */
-	public static RemoteCluster getRemoteCluster(ProcessContext processContext) throws GFacException,
-			AppCatalogException, AiravataException {
+	public static RemoteCluster getJobSubmissionRemoteCluster(ProcessContext processContext)
+            throws GFacException, AppCatalogException, AiravataException {
 
         String computeResourceId = processContext.getComputeResourceId();
-        String key = processContext.getJobSubmissionProtocol().toString() + ":" + computeResourceId;
+        JobSubmissionProtocol jobSubmissionProtocol = processContext.getJobSubmissionProtocol();
+        String key = jobSubmissionProtocol.name() + ":" + computeResourceId;
 		RemoteCluster remoteCluster = remoteClusterMap.get(key);
-		JobSubmissionProtocol jobSubmissionProtocol = processContext.getJobSubmissionProtocol();
         if (remoteCluster == null) {
             JobManagerConfiguration jobManagerConfiguration = getJobManagerConfiguration(processContext.getResourceJobManager());
             if (jobSubmissionProtocol == JobSubmissionProtocol.LOCAL ||
@@ -223,6 +223,27 @@ public abstract class Factory {
 		return remoteCluster;
 	}
 
+    public static RemoteCluster getDataMovementRemoteCluster(ProcessContext processContext)
+            throws GFacException, AiravataException {
+
+        String computeResourceId = processContext.getComputeResourceId();
+        DataMovementProtocol dataMovementProtocol = processContext.getDataMovementProtocol();
+        String key = dataMovementProtocol.name() + ":" + computeResourceId;
+        RemoteCluster remoteCluster = remoteClusterMap.get(key);
+        if (remoteCluster == null) {
+            JobManagerConfiguration jobManagerConfiguration = getJobManagerConfiguration(processContext.getResourceJobManager());
+            if (dataMovementProtocol == DataMovementProtocol.LOCAL) {
+                remoteCluster = new LocalRemoteCluster(processContext.getServerInfo(), jobManagerConfiguration, null);
+            } else if (dataMovementProtocol == DataMovementProtocol.SCP) {
+                AuthenticationInfo authenticationInfo = getSSHKeyAuthentication();
+                remoteCluster = new HPCRemoteCluster(processContext.getServerInfo(), jobManagerConfiguration, authenticationInfo);
+            }
+
+            remoteClusterMap.put(key, remoteCluster);
+        }
+        return remoteCluster;
+    }
+
 	private static SSHKeyAuthentication getSSHKeyAuthentication() throws ApplicationSettingsException {
 		SSHKeyAuthentication sshKA = new SSHKeyAuthentication();
 		sshKA.setUserName(ServerSettings.getSetting("ssh.username"));

http://git-wip-us.apache.org/repos/asf/airavata/blob/832b1468/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index d386f66..752b37c 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -128,7 +128,8 @@ public class GFacEngineImpl implements GFacEngine {
             expCatalog.update(ExperimentCatalogModelType.PROCESS, processModel, processId);
             processModel.setProcessOutputs(applicationOutputs);
             processContext.setResourceJobManager(getResourceJobManager(processContext));
-            processContext.setRemoteCluster(Factory.getRemoteCluster(processContext));
+            processContext.setJobSubmissionRemoteCluster(Factory.getJobSubmissionRemoteCluster(processContext));
+            processContext.setDataMovementRemoteCluster(Factory.getDataMovementRemoteCluster(processContext));
 
             String inputPath = ServerSettings.getLocalDataLocation();
             if (inputPath != null) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/832b1468/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java
index ab9d562..ec3ffd7 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java
@@ -66,13 +66,13 @@ public class DataStageTask implements Task {
 					/**
 					 * copy local file to compute resource.
 					 */
-					taskContext.getParentProcessContext().getRemoteCluster().copyTo(sourceURI.getPath(), destinationURI
+					taskContext.getParentProcessContext().getDataMovementRemoteCluster().copyTo(sourceURI.getPath(), destinationURI
 							.getPath());
 				} else if (processState == ProcessState.OUTPUT_DATA_STAGING) {
 					/**
 					 * copy remote file from compute resource.
 					 */
-					taskContext.getParentProcessContext().getRemoteCluster().copyFrom(sourceURI.getPath(), destinationURI
+					taskContext.getParentProcessContext().getDataMovementRemoteCluster().copyFrom(sourceURI.getPath(), destinationURI
 							.getPath());
 				}
 				status.setReason("Successfully staged data");

http://git-wip-us.apache.org/repos/asf/airavata/blob/832b1468/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
index 020880d..9ccaa94 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
@@ -62,7 +62,7 @@ public class DefaultJobSubmissionTask implements JobSubmissionTask {
 		    ProcessContext processContext = taskContext.getParentProcessContext();
 		    JobModel jobModel = processContext.getJobModel();
 		    jobModel.setTaskId(taskContext.getTaskId());
-		    RemoteCluster remoteCluster = processContext.getRemoteCluster();
+		    RemoteCluster remoteCluster = processContext.getJobSubmissionRemoteCluster();
 		    JobDescriptor jobDescriptor = GFacUtils.createJobDescriptor(processContext,taskContext);
 		    jobModel.setJobName(jobDescriptor.getJobName());
 		    ResourceJobManager resourceJobManager = GFacUtils.getResourceJobManager(processContext);
@@ -258,7 +258,7 @@ public class DefaultJobSubmissionTask implements JobSubmissionTask {
 	@Override
 	public JobStatus cancel(TaskContext taskcontext) throws TaskException {
 		ProcessContext processContext = taskcontext.getParentProcessContext();
-		RemoteCluster remoteCluster = processContext.getRemoteCluster();
+		RemoteCluster remoteCluster = processContext.getJobSubmissionRemoteCluster();
 		JobModel jobModel = processContext.getJobModel();
 		int retryCount = 0;
 		if (jobModel != null) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/832b1468/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java
index fff130c..1256e48 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java
@@ -46,7 +46,7 @@ public class EnvironmentSetupTask implements Task {
 	public TaskStatus execute(TaskContext taskContext) {
 		TaskStatus status = new TaskStatus(TaskState.COMPLETED);
 		try {
-			RemoteCluster remoteCluster = taskContext.getParentProcessContext().getRemoteCluster();
+			RemoteCluster remoteCluster = taskContext.getParentProcessContext().getJobSubmissionRemoteCluster();
 			remoteCluster.makeDirectory(taskContext.getParentProcessContext().getWorkingDir());
 			status.setReason("Successfully created environment");
 		} catch (SSHApiException e) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/832b1468/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java
index ed75fef..59a36e1 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java
@@ -62,7 +62,7 @@ public class ForkJobSubmissionTask implements JobSubmissionTask {
             ProcessContext processContext = taskContext.getParentProcessContext();
             JobModel jobModel = processContext.getJobModel();
             jobModel.setTaskId(taskContext.getTaskId());
-            RemoteCluster remoteCluster = processContext.getRemoteCluster();
+            RemoteCluster remoteCluster = processContext.getJobSubmissionRemoteCluster();
             JobDescriptor jobDescriptor = GFacUtils.createJobDescriptor(processContext, taskContext);
             jobModel.setJobName(jobDescriptor.getJobName());
             ResourceJobManager resourceJobManager = GFacUtils.getResourceJobManager(processContext);

http://git-wip-us.apache.org/repos/asf/airavata/blob/832b1468/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
index 678ded1..7403c29 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
@@ -262,7 +262,7 @@ public class SCPDataStageTask implements Task {
         StringBuilder sb = new StringBuilder("rsync -cr ");
         sb.append(sourceURI.getPath()).append(" ").append(destinationURI.getPath());
         CommandInfo commandInfo = new RawCommandInfo(sb.toString());
-        taskContext.getParentProcessContext().getRemoteCluster().execute(commandInfo);
+        taskContext.getParentProcessContext().getDataMovementRemoteCluster().execute(commandInfo);
     }
 
     private void inputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI
@@ -270,7 +270,7 @@ public class SCPDataStageTask implements Task {
         /**
          * scp third party file transfer 'to' compute resource.
          */
-        taskContext.getParentProcessContext().getRemoteCluster().scpThirdParty(sourceURI.getPath(),
+        taskContext.getParentProcessContext().getDataMovementRemoteCluster().scpThirdParty(sourceURI.getPath(),
                 destinationURI.getPath(), sshSession, RemoteCluster.DIRECTION.TO);
     }
 
@@ -280,7 +280,7 @@ public class SCPDataStageTask implements Task {
         /**
          * scp third party file transfer 'from' comute resource.
          */
-        taskContext.getParentProcessContext().getRemoteCluster().scpThirdParty(sourceURI.getPath(),
+        taskContext.getParentProcessContext().getDataMovementRemoteCluster().scpThirdParty(sourceURI.getPath(),
                 destinationURI.getPath(), sshSession, RemoteCluster.DIRECTION.FROM);
         // update output locations
         GFacUtils.saveExperimentOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.getPath());

http://git-wip-us.apache.org/repos/asf/airavata/blob/832b1468/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
index fe5f8b7..54903ac 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
@@ -149,7 +149,7 @@ public class StreamData extends TimerTask  {
         StringBuilder sb = new StringBuilder("rsync -cr ");
         sb.append(sourceURI.getPath()).append(" ").append(destinationURI.getPath());
         CommandInfo commandInfo = new RawCommandInfo(sb.toString());
-        taskContext.getParentProcessContext().getRemoteCluster().execute(commandInfo);
+        taskContext.getParentProcessContext().getDataMovementRemoteCluster().execute(commandInfo);
     }
 
 
@@ -166,7 +166,7 @@ public class StreamData extends TimerTask  {
         /**
          * scp third party file transfer 'from' comute resource.
          */
-        taskContext.getParentProcessContext().getRemoteCluster().scpThirdParty(sourceURI.getPath(),
+        taskContext.getParentProcessContext().getDataMovementRemoteCluster().scpThirdParty(sourceURI.getPath(),
                 destinationURI.getPath(), sshSession, RemoteCluster.DIRECTION.FROM);
         // update output locations
         GFacUtils.saveExperimentOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.getPath());