You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/06/30 20:24:59 UTC

git commit: Making AdvancedInputHandle recoverable

Repository: airavata
Updated Branches:
  refs/heads/master 31c84b6fe -> 2fe5d9e3d


Making AdvancedInputHandle recoverable


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/2fe5d9e3
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/2fe5d9e3
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/2fe5d9e3

Branch: refs/heads/master
Commit: 2fe5d9e3d09bba6d8b3dd169399603eedb5d190c
Parents: 31c84b6
Author: lahiru <la...@apache.org>
Authored: Mon Jun 30 14:24:45 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Mon Jun 30 14:24:45 2014 -0400

----------------------------------------------------------------------
 .../gfac/gsissh/handler/GSISSHInputHandler.java |   3 +-
 .../ssh/handler/AdvancedSCPInputHandler.java    | 112 ++++++++++++++-----
 2 files changed, 82 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/2fe5d9e3/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java
index 2ed4889..6b84fb4 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java
@@ -113,11 +113,10 @@ public class GSISSHInputHandler extends AbstractRecoverableHandler {
                     for (String paramValueEach : split) {
                         if (index < oldIndex) {
                             log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!");
-                            ((URIParameterType) actualParameter.getType()).setValue(oldFiles.get(index));
+                            newFiles.add(oldFiles.get(index));
                             data.append(oldFiles.get(index++)).append(",");
                         } else {
                             String stageInputFiles = stageInputFiles(jobExecutionContext, paramValueEach);
-                            GFacUtils.savePluginData(jobExecutionContext, new StringBuffer(String.valueOf(index++)), this.getClass().getName());
                             status.setTransferState(TransferState.UPLOAD);
                             detail.setTransferStatus(status);
                             detail.setTransferDescription("Input Data Staged: " + stageInputFiles);

http://git-wip-us.apache.org/repos/asf/airavata/blob/2fe5d9e3/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
index e102dd0..9fe2819 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
@@ -28,7 +28,9 @@ import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
 import org.apache.airavata.gfac.core.context.MessageContext;
 import org.apache.airavata.gfac.core.handler.AbstractHandler;
+import org.apache.airavata.gfac.core.handler.AbstractRecoverableHandler;
 import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.gfac.core.utils.GFacUtils;
 import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
 import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
 import org.apache.airavata.gsi.ssh.api.Cluster;
@@ -52,13 +54,13 @@ import java.util.*;
  * This handler will copy input data from gateway machine to airavata
  * installed machine, later running handlers can copy the input files to computing resource
  * <Handler class="AdvancedSCPOutputHandler">
-                            <property name="privateKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa"/>
-                            <property name="publicKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa.pub"/>
-                        <property name="userName" value="airavata"/>
-                        <property name="hostName" value="gw98.iu.xsede.org"/>
-                        <property name="inputPath" value="/home/airavata/outputData"/>
+ * <property name="privateKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa"/>
+ * <property name="publicKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa.pub"/>
+ * <property name="userName" value="airavata"/>
+ * <property name="hostName" value="gw98.iu.xsede.org"/>
+ * <property name="inputPath" value="/home/airavata/outputData"/>
  */
-public class AdvancedSCPInputHandler extends AbstractHandler {
+public class AdvancedSCPInputHandler extends AbstractRecoverableHandler {
     private static final Logger log = LoggerFactory.getLogger(AdvancedSCPInputHandler.class);
 
     private String password = null;
@@ -76,36 +78,56 @@ public class AdvancedSCPInputHandler extends AbstractHandler {
     private String inputPath;
 
     public void initProperties(Properties properties) throws GFacHandlerException {
-        password = (String)properties.get("password");
-        passPhrase = (String)properties.get("passPhrase");
-        privateKeyPath = (String)properties.get("privateKeyPath");
-        publicKeyPath = (String)properties.get("publicKeyPath");
-        userName = (String)properties.get("userName");
-        hostName = (String)properties.get("hostName");
-        inputPath = (String)properties.get("inputPath");
+        password = (String) properties.get("password");
+        passPhrase = (String) properties.get("passPhrase");
+        privateKeyPath = (String) properties.get("privateKeyPath");
+        publicKeyPath = (String) properties.get("publicKeyPath");
+        userName = (String) properties.get("userName");
+        hostName = (String) properties.get("hostName");
+        inputPath = (String) properties.get("inputPath");
     }
 
     public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+        super.invoke(jobExecutionContext);
+        int index = 0;
+        int oldIndex = 0;
+        List<String> oldFiles = new ArrayList<String>();
         MessageContext inputNew = new MessageContext();
-        try{
-            if(jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null){
-            try {
-                GFACSSHUtils.addSecurityContext(jobExecutionContext);
-            } catch (ApplicationSettingsException e) {
-                log.error(e.getMessage());
-                throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+        StringBuffer data = new StringBuffer("|");
+        try {
+            String pluginData = GFacUtils.getPluginData(jobExecutionContext, this.getClass().getName());
+            if (pluginData != null) {
+                try {
+                    oldIndex = Integer.parseInt(pluginData.split("\\|")[0].trim());
+                    oldFiles = Arrays.asList(pluginData.split("\\|")[1].split(","));
+                    if (oldIndex == oldFiles.size()) {
+                        log.info("Old data looks good !!!!");
+                    } else {
+                        oldIndex = 0;
+                        oldFiles.clear();
+                    }
+                } catch (NumberFormatException e) {
+                    log.error("Previously stored data " + pluginData + " is wrong so we continue the operations");
+                }
+            }
+            if (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null) {
+                try {
+                    GFACSSHUtils.addSecurityContext(jobExecutionContext);
+                } catch (ApplicationSettingsException e) {
+                    log.error(e.getMessage());
+                    throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+                }
             }
-        }
             ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext()
-                .getApplicationDeploymentDescription().getType();
+                    .getApplicationDeploymentDescription().getType();
 
             AuthenticationInfo authenticationInfo = null;
             if (password != null) {
-            authenticationInfo = new DefaultPasswordAuthenticationInfo(this.password);
-        } else {
-            authenticationInfo = new DefaultPublicKeyFileAuthentication(this.publicKeyPath, this.privateKeyPath,
-                    this.passPhrase);
-        }
+                authenticationInfo = new DefaultPasswordAuthenticationInfo(this.password);
+            } else {
+                authenticationInfo = new DefaultPublicKeyFileAuthentication(this.publicKeyPath, this.privateKeyPath,
+                        this.passPhrase);
+            }
             // Server info
             ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName);
             Cluster pbsCluster = null;
@@ -113,7 +135,14 @@ public class AdvancedSCPInputHandler extends AbstractHandler {
             // not really dealing with monitoring or job submission, so we pa
             pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/"));
             String parentPath = inputPath + File.separator + jobExecutionContext.getExperimentID() + File.separator + jobExecutionContext.getTaskData().getTaskID();
-            (new File(parentPath)).mkdirs();
+            if (index < oldIndex) {
+                parentPath = oldFiles.get(index);
+                data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index
+            } else {
+                (new File(parentPath)).mkdirs();
+                StringBuffer temp = new StringBuffer(data.append(parentPath).append(",").toString());
+                GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
+            }
             MessageContext input = jobExecutionContext.getInMessageContext();
             Set<String> parameters = input.getParameters().keySet();
             for (String paramName : parameters) {
@@ -121,13 +150,30 @@ public class AdvancedSCPInputHandler extends AbstractHandler {
                 String paramValue = MappingFactory.toString(actualParameter);
                 //TODO: Review this with type
                 if ("URI".equals(actualParameter.getType().getType().toString())) {
-                    ((URIParameterType) actualParameter.getType()).setValue(stageInputFiles(pbsCluster, paramValue, parentPath));
+                    if (index < oldIndex) {
+                        log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!");
+                        ((URIParameterType) actualParameter.getType()).setValue(oldFiles.get(index));
+                        data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index
+                    } else {
+                        String s = stageInputFiles(pbsCluster, paramValue, parentPath);
+                        ((URIParameterType) actualParameter.getType()).setValue(s);
+                        StringBuffer temp = new StringBuffer(data.append(s).append(",").toString());
+                        GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
+                    }
                 } else if ("URIArray".equals(actualParameter.getType().getType().toString())) {
                     List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue));
                     List<String> newFiles = new ArrayList<String>();
                     for (String paramValueEach : split) {
-                        String stageInputFiles = stageInputFiles(pbsCluster, paramValueEach, parentPath);
-                        newFiles.add(stageInputFiles);
+                        if (index < oldIndex) {
+                            log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!");
+                            newFiles.add(oldFiles.get(index));
+                            data.append(oldFiles.get(index++)).append(",");
+                        } else {
+                            String stageInputFiles = stageInputFiles(pbsCluster, paramValueEach, parentPath);
+                            StringBuffer temp = new StringBuffer(data.append(stageInputFiles).append(",").toString());
+                            GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
+                            newFiles.add(stageInputFiles);
+                        }
                     }
                     ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
                 }
@@ -140,6 +186,10 @@ public class AdvancedSCPInputHandler extends AbstractHandler {
         jobExecutionContext.setInMessageContext(inputNew);
     }
 
+    public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+        this.invoke(jobExecutionContext);
+    }
+
     private String stageInputFiles(Cluster cluster, String paramValue, String parentPath) throws GFacException {
         try {
             cluster.scpFrom(paramValue, parentPath);