You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/06/26 18:47:58 UTC

git commit: Making GSISSH Input handler recoverable

Repository: airavata
Updated Branches:
  refs/heads/master 90c47ded4 -> 6da674fdd


Making GSISSH Input handler recoverable


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/6da674fd
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/6da674fd
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/6da674fd

Branch: refs/heads/master
Commit: 6da674fdd7e415e07df88c3de0f106eec597542f
Parents: 90c47de
Author: lahiru <la...@apache.org>
Authored: Thu Jun 26 12:47:44 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Thu Jun 26 12:47:44 2014 -0400

----------------------------------------------------------------------
 .../core/monitor/GfacInternalStatusUpdator.java |  4 +-
 .../gfac/gsissh/handler/GSISSHInputHandler.java | 61 ++++++++++++++++----
 .../server/OrchestratorServerHandler.java       | 32 +++++-----
 3 files changed, 72 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/6da674fd/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
index 2047f20..97bb49d 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
@@ -81,10 +81,10 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc
         }
         switch (statusChangeRequest.getState()) {
             case COMPLETED:
-                ZKUtil.deleteRecursive(zk,experimentPath);
+//                ZKUtil.deleteRecursive(zk,experimentPath);
                 break;
             case FAILED:
-                ZKUtil.deleteRecursive(zk,experimentPath);
+//                ZKUtil.deleteRecursive(zk,experimentPath);
                 break;
             default:
         }

http://git-wip-us.apache.org/repos/asf/airavata/blob/6da674fd/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java
index 440009b..2ed4889 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java
@@ -47,15 +47,39 @@ import java.io.File;
 import java.io.IOException;
 import java.util.*;
 
+/**
+ * Recoverability for this handler assumes the same input values will come in the second
+ * run, and assume nobody is changing registry during the original submission and re-submission
+ */
 public class GSISSHInputHandler extends AbstractRecoverableHandler {
     private static final Logger log = LoggerFactory.getLogger(GSISSHInputHandler.class);
 
 
     public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+        super.invoke(jobExecutionContext);
+        int index = 0;
+        int oldIndex = 0;
+        List<String> oldFiles = new ArrayList<String>();
         MessageContext inputNew = new MessageContext();
         DataTransferDetails detail = new DataTransferDetails();
         TransferStatus status = new TransferStatus();
+        StringBuffer data = new StringBuffer("|");
         try {
+            String pluginData = GFacUtils.getPluginData(jobExecutionContext, this.getClass().getName());
+            if (pluginData != null) {
+                try {
+                    oldIndex = Integer.parseInt(pluginData.split("\\|")[0].trim());
+                    oldFiles = Arrays.asList(pluginData.split("\\|")[1].split(","));
+                    if (oldIndex == oldFiles.size()) {
+                        log.info("Old data looks good !!!!");
+                    } else {
+                        oldIndex = 0;
+                        oldFiles.clear();
+                    }
+                } catch (NumberFormatException e) {
+                    log.error("Previously stored data " + pluginData +" is wrong so we continue the operations");
+                }
+            }
             if (jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT) == null) {
                 try {
                     GFACGSISSHUtils.addSecurityContext(jobExecutionContext);
@@ -65,8 +89,6 @@ public class GSISSHInputHandler extends AbstractRecoverableHandler {
                 }
             }
             log.info("Invoking SCPInputHandler");
-            super.invoke(jobExecutionContext);
-
 
             MessageContext input = jobExecutionContext.getInMessageContext();
             Set<String> parameters = input.getParameters().keySet();
@@ -75,17 +97,36 @@ public class GSISSHInputHandler extends AbstractRecoverableHandler {
                 String paramValue = MappingFactory.toString(actualParameter);
                 //TODO: Review this with type
                 if ("URI".equals(actualParameter.getType().getType().toString())) {
-                    ((URIParameterType) actualParameter.getType()).setValue(stageInputFiles(jobExecutionContext, paramValue));
+                    if (index < oldIndex) {
+                        log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!");
+                        ((URIParameterType) actualParameter.getType()).setValue(oldFiles.get(index));
+                        data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index
+                    } else {
+                        String s = stageInputFiles(jobExecutionContext, paramValue);
+                        ((URIParameterType) actualParameter.getType()).setValue(s);
+                        StringBuffer temp = new StringBuffer(data.append(s).append(",").toString());
+                        GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
+                    }
                 } else if ("URIArray".equals(actualParameter.getType().getType().toString())) {
                     List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue));
                     List<String> newFiles = new ArrayList<String>();
                     for (String paramValueEach : split) {
-                        String stageInputFiles = stageInputFiles(jobExecutionContext, paramValueEach);
-                        status.setTransferState(TransferState.UPLOAD);
-                        detail.setTransferStatus(status);
-                        detail.setTransferDescription("Input Data Staged: " + stageInputFiles);
-                        registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
-                        newFiles.add(stageInputFiles);
+                        if (index < oldIndex) {
+                            log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!");
+                            ((URIParameterType) actualParameter.getType()).setValue(oldFiles.get(index));
+                            data.append(oldFiles.get(index++)).append(",");
+                        } else {
+                            String stageInputFiles = stageInputFiles(jobExecutionContext, paramValueEach);
+                            GFacUtils.savePluginData(jobExecutionContext, new StringBuffer(String.valueOf(index++)), this.getClass().getName());
+                            status.setTransferState(TransferState.UPLOAD);
+                            detail.setTransferStatus(status);
+                            detail.setTransferDescription("Input Data Staged: " + stageInputFiles);
+                            registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+                            StringBuffer temp = new StringBuffer(data.append(stageInputFiles).append(",").toString());
+                            GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
+                            newFiles.add(stageInputFiles);
+                        }
+
                     }
                     ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
                 }
@@ -139,6 +180,6 @@ public class GSISSHInputHandler extends AbstractRecoverableHandler {
     }
 
     public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
-
+        this.invoke(jobExecutionContext);
     }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/6da674fd/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index a3ed956..abf6a2c 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -123,8 +123,6 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, Wat
     }
 
 
-
-
     /**
      * * After creating the experiment Data user have the
      * * experimentID as the handler to the experiment, during the launchExperiment
@@ -239,7 +237,9 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, Wat
         return true;
     }
 
-    /** This method gracefully handler gfac node failures */
+    /**
+     * This method gracefully handler gfac node failures
+     */
     synchronized public void process(WatchedEvent watchedEvent) {
         synchronized (mutex) {
             try {
@@ -270,12 +270,18 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, Wat
                             final OrchestratorServerHandler handler = this;
                             (new Thread() {
                                 public void run() {
-                                    try {
-                                        (new OrchestratorRecoveryHandler(handler, event.getPath())).recover();     // run this task in a separate thread
-                                    } catch (Exception e) {
-                                        e.printStackTrace();
-                                        log.error("error recovering the jobs for gfac-node: " + event.getPath());
+                                    int retry = 0;
+                                    while (retry < 3) {
+                                        try {
+                                            (new OrchestratorRecoveryHandler(handler, event.getPath())).recover();
+                                            break;
+                                        } catch (Exception e) {
+                                            e.printStackTrace();
+                                            log.error("error recovering the jobs for gfac-node: " + event.getPath());
+                                            log.error("Retrying again to recover jobs and retry attempt: " + ++retry);
+                                        }
                                     }
+
                                 }
                             }).start();
                             break;
@@ -290,9 +296,9 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, Wat
     }
 
 
-	@Override
-	public boolean launchTask(String taskId) throws TException {
-		// TODO Auto-generated method stub
-		return false;
-	}
+    @Override
+    public boolean launchTask(String taskId) throws TException {
+        // TODO Auto-generated method stub
+        return false;
+    }
 }