You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/06/30 20:24:59 UTC
git commit: Making AdvancedInputHandle recoverable
Repository: airavata
Updated Branches:
refs/heads/master 31c84b6fe -> 2fe5d9e3d
Making AdvancedInputHandle recoverable
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/2fe5d9e3
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/2fe5d9e3
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/2fe5d9e3
Branch: refs/heads/master
Commit: 2fe5d9e3d09bba6d8b3dd169399603eedb5d190c
Parents: 31c84b6
Author: lahiru <la...@apache.org>
Authored: Mon Jun 30 14:24:45 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Mon Jun 30 14:24:45 2014 -0400
----------------------------------------------------------------------
.../gfac/gsissh/handler/GSISSHInputHandler.java | 3 +-
.../ssh/handler/AdvancedSCPInputHandler.java | 112 ++++++++++++++-----
2 files changed, 82 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/2fe5d9e3/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java
index 2ed4889..6b84fb4 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java
@@ -113,11 +113,10 @@ public class GSISSHInputHandler extends AbstractRecoverableHandler {
for (String paramValueEach : split) {
if (index < oldIndex) {
log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!");
- ((URIParameterType) actualParameter.getType()).setValue(oldFiles.get(index));
+ newFiles.add(oldFiles.get(index));
data.append(oldFiles.get(index++)).append(",");
} else {
String stageInputFiles = stageInputFiles(jobExecutionContext, paramValueEach);
- GFacUtils.savePluginData(jobExecutionContext, new StringBuffer(String.valueOf(index++)), this.getClass().getName());
status.setTransferState(TransferState.UPLOAD);
detail.setTransferStatus(status);
detail.setTransferDescription("Input Data Staged: " + stageInputFiles);
http://git-wip-us.apache.org/repos/asf/airavata/blob/2fe5d9e3/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
index e102dd0..9fe2819 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
@@ -28,7 +28,9 @@ import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.context.MessageContext;
import org.apache.airavata.gfac.core.handler.AbstractHandler;
+import org.apache.airavata.gfac.core.handler.AbstractRecoverableHandler;
import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.gfac.core.utils.GFacUtils;
import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
import org.apache.airavata.gsi.ssh.api.Cluster;
@@ -52,13 +54,13 @@ import java.util.*;
* This handler will copy input data from gateway machine to airavata
* installed machine, later running handlers can copy the input files to computing resource
* <Handler class="AdvancedSCPOutputHandler">
- <property name="privateKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa"/>
- <property name="publicKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa.pub"/>
- <property name="userName" value="airavata"/>
- <property name="hostName" value="gw98.iu.xsede.org"/>
- <property name="inputPath" value="/home/airavata/outputData"/>
+ * <property name="privateKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa"/>
+ * <property name="publicKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa.pub"/>
+ * <property name="userName" value="airavata"/>
+ * <property name="hostName" value="gw98.iu.xsede.org"/>
+ * <property name="inputPath" value="/home/airavata/outputData"/>
*/
-public class AdvancedSCPInputHandler extends AbstractHandler {
+public class AdvancedSCPInputHandler extends AbstractRecoverableHandler {
private static final Logger log = LoggerFactory.getLogger(AdvancedSCPInputHandler.class);
private String password = null;
@@ -76,36 +78,56 @@ public class AdvancedSCPInputHandler extends AbstractHandler {
private String inputPath;
public void initProperties(Properties properties) throws GFacHandlerException {
- password = (String)properties.get("password");
- passPhrase = (String)properties.get("passPhrase");
- privateKeyPath = (String)properties.get("privateKeyPath");
- publicKeyPath = (String)properties.get("publicKeyPath");
- userName = (String)properties.get("userName");
- hostName = (String)properties.get("hostName");
- inputPath = (String)properties.get("inputPath");
+ password = (String) properties.get("password");
+ passPhrase = (String) properties.get("passPhrase");
+ privateKeyPath = (String) properties.get("privateKeyPath");
+ publicKeyPath = (String) properties.get("publicKeyPath");
+ userName = (String) properties.get("userName");
+ hostName = (String) properties.get("hostName");
+ inputPath = (String) properties.get("inputPath");
}
public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ super.invoke(jobExecutionContext);
+ int index = 0;
+ int oldIndex = 0;
+ List<String> oldFiles = new ArrayList<String>();
MessageContext inputNew = new MessageContext();
- try{
- if(jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null){
- try {
- GFACSSHUtils.addSecurityContext(jobExecutionContext);
- } catch (ApplicationSettingsException e) {
- log.error(e.getMessage());
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+ StringBuffer data = new StringBuffer("|");
+ try {
+ String pluginData = GFacUtils.getPluginData(jobExecutionContext, this.getClass().getName());
+ if (pluginData != null) {
+ try {
+ oldIndex = Integer.parseInt(pluginData.split("\\|")[0].trim());
+ oldFiles = Arrays.asList(pluginData.split("\\|")[1].split(","));
+ if (oldIndex == oldFiles.size()) {
+ log.info("Old data looks good !!!!");
+ } else {
+ oldIndex = 0;
+ oldFiles.clear();
+ }
+ } catch (NumberFormatException e) {
+ log.error("Previously stored data " + pluginData + " is wrong so we continue the operations");
+ }
+ }
+ if (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null) {
+ try {
+ GFACSSHUtils.addSecurityContext(jobExecutionContext);
+ } catch (ApplicationSettingsException e) {
+ log.error(e.getMessage());
+ throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+ }
}
- }
ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext()
- .getApplicationDeploymentDescription().getType();
+ .getApplicationDeploymentDescription().getType();
AuthenticationInfo authenticationInfo = null;
if (password != null) {
- authenticationInfo = new DefaultPasswordAuthenticationInfo(this.password);
- } else {
- authenticationInfo = new DefaultPublicKeyFileAuthentication(this.publicKeyPath, this.privateKeyPath,
- this.passPhrase);
- }
+ authenticationInfo = new DefaultPasswordAuthenticationInfo(this.password);
+ } else {
+ authenticationInfo = new DefaultPublicKeyFileAuthentication(this.publicKeyPath, this.privateKeyPath,
+ this.passPhrase);
+ }
// Server info
ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName);
Cluster pbsCluster = null;
@@ -113,7 +135,14 @@ public class AdvancedSCPInputHandler extends AbstractHandler {
// not really dealing with monitoring or job submission, so we pa
pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/"));
String parentPath = inputPath + File.separator + jobExecutionContext.getExperimentID() + File.separator + jobExecutionContext.getTaskData().getTaskID();
- (new File(parentPath)).mkdirs();
+ if (index < oldIndex) {
+ parentPath = oldFiles.get(index);
+ data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index
+ } else {
+ (new File(parentPath)).mkdirs();
+ StringBuffer temp = new StringBuffer(data.append(parentPath).append(",").toString());
+ GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
+ }
MessageContext input = jobExecutionContext.getInMessageContext();
Set<String> parameters = input.getParameters().keySet();
for (String paramName : parameters) {
@@ -121,13 +150,30 @@ public class AdvancedSCPInputHandler extends AbstractHandler {
String paramValue = MappingFactory.toString(actualParameter);
//TODO: Review this with type
if ("URI".equals(actualParameter.getType().getType().toString())) {
- ((URIParameterType) actualParameter.getType()).setValue(stageInputFiles(pbsCluster, paramValue, parentPath));
+ if (index < oldIndex) {
+ log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!");
+ ((URIParameterType) actualParameter.getType()).setValue(oldFiles.get(index));
+ data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index
+ } else {
+ String s = stageInputFiles(pbsCluster, paramValue, parentPath);
+ ((URIParameterType) actualParameter.getType()).setValue(s);
+ StringBuffer temp = new StringBuffer(data.append(s).append(",").toString());
+ GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
+ }
} else if ("URIArray".equals(actualParameter.getType().getType().toString())) {
List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue));
List<String> newFiles = new ArrayList<String>();
for (String paramValueEach : split) {
- String stageInputFiles = stageInputFiles(pbsCluster, paramValueEach, parentPath);
- newFiles.add(stageInputFiles);
+ if (index < oldIndex) {
+ log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!");
+ newFiles.add(oldFiles.get(index));
+ data.append(oldFiles.get(index++)).append(",");
+ } else {
+ String stageInputFiles = stageInputFiles(pbsCluster, paramValueEach, parentPath);
+ StringBuffer temp = new StringBuffer(data.append(stageInputFiles).append(",").toString());
+ GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
+ newFiles.add(stageInputFiles);
+ }
}
((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
}
@@ -140,6 +186,10 @@ public class AdvancedSCPInputHandler extends AbstractHandler {
jobExecutionContext.setInMessageContext(inputNew);
}
+ public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ this.invoke(jobExecutionContext);
+ }
+
private String stageInputFiles(Cluster cluster, String paramValue, String parentPath) throws GFacException {
try {
cluster.scpFrom(paramValue, parentPath);