You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/06/26 18:47:58 UTC
git commit: Making GSISSH Input handler recoverable
Repository: airavata
Updated Branches:
refs/heads/master 90c47ded4 -> 6da674fdd
Making GSISSH Input handler recoverable
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/6da674fd
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/6da674fd
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/6da674fd
Branch: refs/heads/master
Commit: 6da674fdd7e415e07df88c3de0f106eec597542f
Parents: 90c47de
Author: lahiru <la...@apache.org>
Authored: Thu Jun 26 12:47:44 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Thu Jun 26 12:47:44 2014 -0400
----------------------------------------------------------------------
.../core/monitor/GfacInternalStatusUpdator.java | 4 +-
.../gfac/gsissh/handler/GSISSHInputHandler.java | 61 ++++++++++++++++----
.../server/OrchestratorServerHandler.java | 32 +++++-----
3 files changed, 72 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/6da674fd/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
index 2047f20..97bb49d 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
@@ -81,10 +81,10 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc
}
switch (statusChangeRequest.getState()) {
case COMPLETED:
- ZKUtil.deleteRecursive(zk,experimentPath);
+// ZKUtil.deleteRecursive(zk,experimentPath);
break;
case FAILED:
- ZKUtil.deleteRecursive(zk,experimentPath);
+// ZKUtil.deleteRecursive(zk,experimentPath);
break;
default:
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6da674fd/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java
index 440009b..2ed4889 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java
@@ -47,15 +47,39 @@ import java.io.File;
import java.io.IOException;
import java.util.*;
+/**
+ * Recoverability for this handler assumes the same input values will come in the second
+ * run, and assume nobody is changing registry during the original submission and re-submission
+ */
public class GSISSHInputHandler extends AbstractRecoverableHandler {
private static final Logger log = LoggerFactory.getLogger(GSISSHInputHandler.class);
public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ super.invoke(jobExecutionContext);
+ int index = 0;
+ int oldIndex = 0;
+ List<String> oldFiles = new ArrayList<String>();
MessageContext inputNew = new MessageContext();
DataTransferDetails detail = new DataTransferDetails();
TransferStatus status = new TransferStatus();
+ StringBuffer data = new StringBuffer("|");
try {
+ String pluginData = GFacUtils.getPluginData(jobExecutionContext, this.getClass().getName());
+ if (pluginData != null) {
+ try {
+ oldIndex = Integer.parseInt(pluginData.split("\\|")[0].trim());
+ oldFiles = Arrays.asList(pluginData.split("\\|")[1].split(","));
+ if (oldIndex == oldFiles.size()) {
+ log.info("Old data looks good !!!!");
+ } else {
+ oldIndex = 0;
+ oldFiles.clear();
+ }
+ } catch (NumberFormatException e) {
+ log.error("Previously stored data " + pluginData +" is wrong so we continue the operations");
+ }
+ }
if (jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT) == null) {
try {
GFACGSISSHUtils.addSecurityContext(jobExecutionContext);
@@ -65,8 +89,6 @@ public class GSISSHInputHandler extends AbstractRecoverableHandler {
}
}
log.info("Invoking SCPInputHandler");
- super.invoke(jobExecutionContext);
-
MessageContext input = jobExecutionContext.getInMessageContext();
Set<String> parameters = input.getParameters().keySet();
@@ -75,17 +97,36 @@ public class GSISSHInputHandler extends AbstractRecoverableHandler {
String paramValue = MappingFactory.toString(actualParameter);
//TODO: Review this with type
if ("URI".equals(actualParameter.getType().getType().toString())) {
- ((URIParameterType) actualParameter.getType()).setValue(stageInputFiles(jobExecutionContext, paramValue));
+ if (index < oldIndex) {
+ log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!");
+ ((URIParameterType) actualParameter.getType()).setValue(oldFiles.get(index));
+ data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index
+ } else {
+ String s = stageInputFiles(jobExecutionContext, paramValue);
+ ((URIParameterType) actualParameter.getType()).setValue(s);
+ StringBuffer temp = new StringBuffer(data.append(s).append(",").toString());
+ GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
+ }
} else if ("URIArray".equals(actualParameter.getType().getType().toString())) {
List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue));
List<String> newFiles = new ArrayList<String>();
for (String paramValueEach : split) {
- String stageInputFiles = stageInputFiles(jobExecutionContext, paramValueEach);
- status.setTransferState(TransferState.UPLOAD);
- detail.setTransferStatus(status);
- detail.setTransferDescription("Input Data Staged: " + stageInputFiles);
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
- newFiles.add(stageInputFiles);
+ if (index < oldIndex) {
+ log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!");
+ ((URIParameterType) actualParameter.getType()).setValue(oldFiles.get(index));
+ data.append(oldFiles.get(index++)).append(",");
+ } else {
+ String stageInputFiles = stageInputFiles(jobExecutionContext, paramValueEach);
+ GFacUtils.savePluginData(jobExecutionContext, new StringBuffer(String.valueOf(index++)), this.getClass().getName());
+ status.setTransferState(TransferState.UPLOAD);
+ detail.setTransferStatus(status);
+ detail.setTransferDescription("Input Data Staged: " + stageInputFiles);
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+ StringBuffer temp = new StringBuffer(data.append(stageInputFiles).append(",").toString());
+ GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
+ newFiles.add(stageInputFiles);
+ }
+
}
((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
}
@@ -139,6 +180,6 @@ public class GSISSHInputHandler extends AbstractRecoverableHandler {
}
public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
-
+ this.invoke(jobExecutionContext);
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6da674fd/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index a3ed956..abf6a2c 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -123,8 +123,6 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, Wat
}
-
-
/**
* * After creating the experiment Data user have the
* * experimentID as the handler to the experiment, during the launchExperiment
@@ -239,7 +237,9 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, Wat
return true;
}
- /** This method gracefully handler gfac node failures */
+ /**
+ * This method gracefully handler gfac node failures
+ */
synchronized public void process(WatchedEvent watchedEvent) {
synchronized (mutex) {
try {
@@ -270,12 +270,18 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, Wat
final OrchestratorServerHandler handler = this;
(new Thread() {
public void run() {
- try {
- (new OrchestratorRecoveryHandler(handler, event.getPath())).recover(); // run this task in a separate thread
- } catch (Exception e) {
- e.printStackTrace();
- log.error("error recovering the jobs for gfac-node: " + event.getPath());
+ int retry = 0;
+ while (retry < 3) {
+ try {
+ (new OrchestratorRecoveryHandler(handler, event.getPath())).recover();
+ break;
+ } catch (Exception e) {
+ e.printStackTrace();
+ log.error("error recovering the jobs for gfac-node: " + event.getPath());
+ log.error("Retrying again to recover jobs and retry attempt: " + ++retry);
+ }
}
+
}
}).start();
break;
@@ -290,9 +296,9 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, Wat
}
- @Override
- public boolean launchTask(String taskId) throws TException {
- // TODO Auto-generated method stub
- return false;
- }
+ @Override
+ public boolean launchTask(String taskId) throws TException {
+ // TODO Auto-generated method stub
+ return false;
+ }
}