You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ra...@apache.org on 2014/03/03 21:42:48 UTC
[1/4] git commit: Save JobDetails and use task details. AIRAVATA-1043,
AIRAVATA-1042
Repository: airavata
Updated Branches:
refs/heads/master 119f7541c -> 1dc04ce7c
Save JobDetails and use task details. AIRAVATA-1043, AIRAVATA-1042
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/8a615ec0
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/8a615ec0
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/8a615ec0
Branch: refs/heads/master
Commit: 8a615ec049d4c4f8421cf8ac20046e3a6744da9d
Parents: 22cd1a0
Author: raminder <ra...@apache.org>
Authored: Mon Mar 3 14:07:34 2014 -0500
Committer: raminder <ra...@apache.org>
Committed: Mon Mar 3 14:07:34 2014 -0500
----------------------------------------------------------------------
modules/gfac/gfac-core/pom.xml | 5 +
.../org/apache/airavata/gfac/cpi/GFacImpl.java | 10 +-
.../airavata/gfac/handler/AbstractHandler.java | 35 +++++
.../gfac/handler/GramDirectorySetupHandler.java | 30 +++--
.../gfac/handler/GridFTPInputHandler.java | 36 ++++-
.../gfac/handler/GridFTPOutputHandler.java | 43 +++++-
.../gfac/handler/SCPDirectorySetupHandler.java | 31 ++++-
.../airavata/gfac/handler/SCPInputHandler.java | 36 +++--
.../airavata/gfac/handler/SCPOutputHandler.java | 49 +++++--
.../gfac/provider/impl/AbstractProvider.java | 131 ++++++-------------
.../gfac/provider/impl/BESProvider.java | 50 ++++---
.../gfac/provider/impl/GSISSHProvider.java | 33 ++++-
.../gfac/provider/impl/GramProvider.java | 53 ++++----
.../gfac/provider/impl/HadoopProvider.java | 2 +-
.../gfac/provider/impl/LocalProvider.java | 59 +++++----
.../gfac/provider/impl/SSHProvider.java | 31 ++---
.../apache/airavata/gfac/utils/GFacUtils.java | 54 +++++---
.../gfac/utils/GramJobSubmissionListener.java | 8 +-
.../apache/airavata/gfac/ec2/EC2Provider.java | 37 +++---
19 files changed, 455 insertions(+), 278 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/8a615ec0/modules/gfac/gfac-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/pom.xml b/modules/gfac/gfac-core/pom.xml
index e75cf62..5e0965d 100644
--- a/modules/gfac/gfac-core/pom.xml
+++ b/modules/gfac/gfac-core/pom.xml
@@ -83,6 +83,11 @@
<artifactId>airavata-registry-cpi</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-jpa-registry</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- Workflow Tracking -->
<dependency>
<groupId>org.apache.airavata</groupId>
http://git-wip-us.apache.org/repos/asf/airavata/blob/8a615ec0/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
index a34c9b5..a60b200 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
@@ -144,8 +144,14 @@ public class GFacImpl implements GFac {
HostDescription hostDescription = hostScheduler.schedule(registeredHosts);
ServiceDescription serviceDescription = airavataRegistry2.getServiceDescriptor(serviceName);
-
- ApplicationDescription applicationDescription = airavataRegistry2.getApplicationDescriptors(serviceName, hostDescription.getType().getHostName());
+ String hostName;
+ if(taskData.getTaskScheduling().getResourceHostId() != null){
+ hostName = taskData.getTaskScheduling().getResourceHostId();
+ }else{
+ hostName = hostDescription.getType().getHostName();
+ }
+
+ ApplicationDescription applicationDescription = airavataRegistry2.getApplicationDescriptors(serviceName, hostName);
URL resource = GFacImpl.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
Properties configurationProperties = ServerSettings.getProperties();
GFacConfiguration gFacConfiguration = GFacConfiguration.create(new File(resource.getPath()), airavataAPI, configurationProperties);
http://git-wip-us.apache.org/repos/asf/airavata/blob/8a615ec0/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/AbstractHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/AbstractHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/AbstractHandler.java
new file mode 100644
index 0000000..e3db75a
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/AbstractHandler.java
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.handler;
+
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.Registry;
+
+public abstract class AbstractHandler implements GFacHandler {
+ protected Registry registry = null;
+
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException, GFacException {
+ registry = RegistryFactory.getDefaultRegistry();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8a615ec0/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/GramDirectorySetupHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/GramDirectorySetupHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/GramDirectorySetupHandler.java
index eaafa7f..1964ddd 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/GramDirectorySetupHandler.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/GramDirectorySetupHandler.java
@@ -26,12 +26,15 @@ import java.util.Map;
import org.apache.airavata.commons.gfac.type.ApplicationDescription;
import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.ToolsException;
import org.apache.airavata.gfac.context.JobExecutionContext;
import org.apache.airavata.gfac.context.security.GSISecurityContext;
import org.apache.airavata.gfac.external.GridFtp;
import org.apache.airavata.gfac.utils.GFacUtils;
import org.apache.airavata.gfac.utils.GramJobSubmissionListener;
+import org.apache.airavata.model.workspace.experiment.DataTransferDetails;
+import org.apache.airavata.model.workspace.experiment.TransferState;
+import org.apache.airavata.model.workspace.experiment.TransferStatus;
+import org.apache.airavata.registry.cpi.ChildDataType;
import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
import org.apache.airavata.schemas.gfac.GlobusHostType;
import org.apache.airavata.schemas.gfac.HostDescriptionType;
@@ -40,13 +43,12 @@ import org.ietf.jgss.GSSCredential;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class GramDirectorySetupHandler implements GFacHandler {
+public class GramDirectorySetupHandler extends AbstractHandler {
private static final Logger log = LoggerFactory.getLogger(GramJobSubmissionListener.class);
-
+
public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException,GFacException {
log.info("Invoking GramDirectorySetupHandler ...");
-
-
+ super.invoke(jobExecutionContext);
String[] gridFTPEndpointArray = null;
//TODO: why it is tightly coupled with gridftp
@@ -63,9 +65,7 @@ public class GramDirectorySetupHandler implements GFacHandler {
else if (hostType instanceof UnicoreHostType){
gridFTPEndpointArray = ((UnicoreHostType) hostType).getGridFTPEndPointArray();
}
- else {
- //TODO
- }
+
ApplicationDescription applicationDeploymentDescription = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription();
@@ -74,9 +74,6 @@ public class GramDirectorySetupHandler implements GFacHandler {
try {
-
-
-
GSSCredential gssCred = ((GSISecurityContext)jobExecutionContext.
getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getGssCredentials();
@@ -98,18 +95,23 @@ public class GramDirectorySetupHandler implements GFacHandler {
log.info("Working directory = " + workingDirURI);
log.info("Input directory = " + inputURI);
log.info("Output directory = " + outputURI);
-
ftp.makeDir(tmpdirURI, gssCred);
ftp.makeDir(workingDirURI, gssCred);
ftp.makeDir(inputURI, gssCred);
ftp.makeDir(outputURI, gssCred);
-
success = true;
+ DataTransferDetails detail = new DataTransferDetails();
+ TransferStatus status = new TransferStatus();
+ status.setTransferState(TransferState.DIRECTORY_SETUP);
+ detail.setTransferStatus(status);
+ detail.setTransferDescription("Working directory = " + workingDirURI);
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+
break;
} catch (URISyntaxException e) {
pe = new GFacHandlerException("URI is malformatted:" + e.getMessage(), e);
- } catch (ToolsException e) {
+ } catch (Exception e) {
pe = new GFacHandlerException(e.getMessage(), e);
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8a615ec0/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/GridFTPInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/GridFTPInputHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/GridFTPInputHandler.java
index b959ce6..29d5609 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/GridFTPInputHandler.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/GridFTPInputHandler.java
@@ -26,7 +26,11 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.airavata.common.utils.StringUtil;
import org.apache.airavata.commons.gfac.type.ActualParameter;
@@ -38,6 +42,10 @@ import org.apache.airavata.gfac.context.MessageContext;
import org.apache.airavata.gfac.context.security.GSISecurityContext;
import org.apache.airavata.gfac.external.GridFtp;
import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.model.workspace.experiment.DataTransferDetails;
+import org.apache.airavata.model.workspace.experiment.TransferState;
+import org.apache.airavata.model.workspace.experiment.TransferStatus;
+import org.apache.airavata.registry.cpi.ChildDataType;
import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
import org.apache.airavata.schemas.gfac.GlobusHostType;
import org.apache.airavata.schemas.gfac.HostDescriptionType;
@@ -48,11 +56,15 @@ import org.ietf.jgss.GSSCredential;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class GridFTPInputHandler implements GFacHandler {
+public class GridFTPInputHandler extends AbstractHandler {
private static final Logger log = LoggerFactory.getLogger(AppDescriptorCheckHandler.class);
-
- public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException,GFacException {
log.info("Invoking GridFTPInputHandler ...");
+ super.invoke(jobExecutionContext);
+ DataTransferDetails detail = new DataTransferDetails();
+ TransferStatus status = new TransferStatus();
+
MessageContext inputNew = new MessageContext();
try {
MessageContext input = jobExecutionContext.getInMessageContext();
@@ -67,13 +79,27 @@ public class GridFTPInputHandler implements GFacHandler {
List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue));
List<String> newFiles = new ArrayList<String>();
for (String paramValueEach : split) {
- newFiles.add(stageInputFiles(jobExecutionContext, paramValueEach));
+ String stageInputFiles = stageInputFiles(jobExecutionContext, paramValueEach);
+ detail.setTransferDescription("Input Data Staged: " + stageInputFiles);
+ status.setTransferState(TransferState.UPLOAD);
+ detail.setTransferStatus(status);
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+
+ newFiles.add(stageInputFiles);
}
((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
}
inputNew.getParameters().put(paramName, actualParameter);
+
}
} catch (Exception e) {
+ try {
+ status.setTransferState(TransferState.FAILED);
+ detail.setTransferStatus(status);
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+ } catch (Exception e1) {
+ throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
+ }
log.error(e.getMessage());
throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8a615ec0/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/GridFTPOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/GridFTPOutputHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/GridFTPOutputHandler.java
index 0f8eed5..dee76a3 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/GridFTPOutputHandler.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/GridFTPOutputHandler.java
@@ -48,7 +48,12 @@ import org.apache.airavata.gfac.external.GridFtp;
import org.apache.airavata.gfac.provider.GFacProviderException;
import org.apache.airavata.gfac.utils.GFacUtils;
import org.apache.airavata.gfac.utils.OutputUtils;
+import org.apache.airavata.model.workspace.experiment.DataTransferDetails;
import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.TransferState;
+import org.apache.airavata.model.workspace.experiment.TransferStatus;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.apache.airavata.registry.cpi.Registry;
import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
import org.apache.airavata.schemas.gfac.GlobusHostType;
import org.apache.airavata.schemas.gfac.HostDescriptionType;
@@ -61,12 +66,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class GridFTPOutputHandler implements GFacHandler {
+public class GridFTPOutputHandler extends AbstractHandler {
private static final Logger log = LoggerFactory.getLogger(GridFTPOutputHandler.class);
+ private Registry registry;
- public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- log.info("Invoking GridFTPOutputHandler ...");
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException,GFacException {
+ log.info("Invoking GridFTPOutputHandler ...");
+ super.invoke(jobExecutionContext);
+
ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
HostDescriptionType hostType = jobExecutionContext.getApplicationContext().getHostDescription().getType();
@@ -89,6 +97,9 @@ public class GridFTPOutputHandler implements GFacHandler {
GridFtp ftp = new GridFtp();
File localStdErrFile = null;
Map<String, ActualParameter> stringMap = new HashMap<String, ActualParameter>();
+ DataTransferDetails detail = new DataTransferDetails();
+ TransferStatus status = new TransferStatus();
+
try {
GSSCredential gssCred = ((GSISecurityContext)jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getGssCredentials();
String[] hostgridFTP = gridFTPEndpointArray;
@@ -100,9 +111,17 @@ public class GridFTPOutputHandler implements GFacHandler {
/*
* Read Stdout and Stderror
*/
- URI stdoutURI = GFacUtils.createGsiftpURI(endpoint, app.getStandardOutput());
+ URI stdoutURI = GFacUtils.createGsiftpURI(endpoint, app.getStandardOutput());
URI stderrURI = GFacUtils.createGsiftpURI(endpoint, app.getStandardError());
-
+ status.setTransferState(TransferState.COMPLETE);
+ detail.setTransferStatus(status);
+ detail.setTransferDescription("STDOUT:" + stdoutURI.toString());
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+ status.setTransferState(TransferState.COMPLETE);
+ detail.setTransferStatus(status);
+ detail.setTransferDescription("STDERR:" + stderrURI.toString());
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+
log.info("STDOUT:" + stdoutURI.toString());
log.info("STDERR:" + stderrURI.toString());
@@ -170,6 +189,11 @@ public class GridFTPOutputHandler implements GFacHandler {
// This is to handle exception during the output parsing.
stringMap = OutputUtils.fillOutputFromStdout(output, stdout, stderr);
}
+ status.setTransferState(TransferState.DOWNLOAD);
+ detail.setTransferStatus(status);
+ detail.setTransferDescription("Output: " + stringMap.get(paramName).toString());
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+
}
if (stringMap == null || stringMap.isEmpty()) {
throw new GFacHandlerException("Empty Output returned from the Application, Double check the application" +
@@ -192,7 +216,14 @@ public class GridFTPOutputHandler implements GFacHandler {
}
}
} catch (Exception e) {
- log.error(e.getMessage());
+ try {
+ status.setTransferState(TransferState.FAILED);
+ detail.setTransferStatus(status);
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+ } catch (Exception e1) {
+ throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
+ }
+ log.error(e.getMessage());
throw new GFacHandlerException(e.getMessage(), e, readLastLinesofStdOut(localStdErrFile.getPath(), 20));
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8a615ec0/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPDirectorySetupHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPDirectorySetupHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPDirectorySetupHandler.java
index d06221e..121d2b7 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPDirectorySetupHandler.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPDirectorySetupHandler.java
@@ -28,15 +28,20 @@ import org.apache.airavata.gfac.context.security.GSISecurityContext;
import org.apache.airavata.gfac.context.security.SSHSecurityContext;
import org.apache.airavata.gsi.ssh.api.Cluster;
import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.model.workspace.experiment.DataTransferDetails;
+import org.apache.airavata.model.workspace.experiment.TransferState;
+import org.apache.airavata.model.workspace.experiment.TransferStatus;
+import org.apache.airavata.registry.cpi.ChildDataType;
import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SCPDirectorySetupHandler implements GFacHandler{
+public class SCPDirectorySetupHandler extends AbstractHandler{
private static final Logger log = LoggerFactory.getLogger(SCPDirectorySetupHandler.class);
public void invoke(JobExecutionContext jobExecutionContext) throws GFacException {
log.info("Setup SSH job directorties");
+ super.invoke(jobExecutionContext);
makeDirectory(jobExecutionContext);
}
@@ -54,11 +59,31 @@ public class SCPDirectorySetupHandler implements GFacHandler{
}
ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
try {
- cluster.makeDirectory(app.getScratchWorkingDirectory());
+ String workingDirectory = app.getScratchWorkingDirectory();
+ cluster.makeDirectory(workingDirectory);
cluster.makeDirectory(app.getInputDataDirectory());
cluster.makeDirectory(app.getOutputDataDirectory());
+ DataTransferDetails detail = new DataTransferDetails();
+ TransferStatus status = new TransferStatus();
+ status.setTransferState(TransferState.DIRECTORY_SETUP);
+ detail.setTransferStatus(status);
+ detail.setTransferDescription("Working directory = " + workingDirectory);
+
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+
} catch (SSHApiException e) {
- throw new GFacHandlerException("Error executing the Handler: " + SCPDirectorySetupHandler.class,e); //To change body of catch statement use File | Settings | File Templates.
+ throw new GFacHandlerException("Error executing the Handler: " + SCPDirectorySetupHandler.class,e);
+ }catch (Exception e){
+ DataTransferDetails detail = new DataTransferDetails();
+ TransferStatus status = new TransferStatus();
+ status.setTransferState(TransferState.FAILED);
+ detail.setTransferStatus(status);
+ try {
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+ } catch (Exception e1) {
+ throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
+ }
+ throw new GFacHandlerException("Error executing the Handler: " + SCPDirectorySetupHandler.class,e);
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8a615ec0/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPInputHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPInputHandler.java
index 0712d51..9d7167d 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPInputHandler.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPInputHandler.java
@@ -22,9 +22,11 @@ package org.apache.airavata.gfac.handler;
import java.io.File;
import java.io.IOException;
-import java.util.*;
-
-import net.schmizz.sshj.xfer.scp.SCPFileTransfer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.airavata.common.utils.StringUtil;
import org.apache.airavata.commons.gfac.type.ActualParameter;
@@ -34,16 +36,19 @@ import org.apache.airavata.gfac.context.JobExecutionContext;
import org.apache.airavata.gfac.context.MessageContext;
import org.apache.airavata.gfac.context.security.GSISecurityContext;
import org.apache.airavata.gfac.context.security.SSHSecurityContext;
-import org.apache.airavata.gfac.provider.GFacProviderException;
import org.apache.airavata.gsi.ssh.api.Cluster;
import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.model.workspace.experiment.DataTransferDetails;
+import org.apache.airavata.model.workspace.experiment.TransferState;
+import org.apache.airavata.model.workspace.experiment.TransferStatus;
+import org.apache.airavata.registry.cpi.ChildDataType;
import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
import org.apache.airavata.schemas.gfac.URIArrayType;
import org.apache.airavata.schemas.gfac.URIParameterType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SCPInputHandler implements GFacHandler {
+public class SCPInputHandler extends AbstractHandler {
private static final Logger log = LoggerFactory.getLogger(SCPInputHandler.class);
@@ -51,8 +56,11 @@ public class SCPInputHandler implements GFacHandler {
public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException, GFacException {
log.info("Invoking SCPInputHandler");
-
-
+ super.invoke(jobExecutionContext);
+
+ DataTransferDetails detail = new DataTransferDetails();
+ TransferStatus status = new TransferStatus();
+
MessageContext inputNew = new MessageContext();
try {
MessageContext input = jobExecutionContext.getInMessageContext();
@@ -67,7 +75,12 @@ public class SCPInputHandler implements GFacHandler {
List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue));
List<String> newFiles = new ArrayList<String>();
for (String paramValueEach : split) {
- newFiles.add(stageInputFiles(jobExecutionContext, paramValueEach));
+ String stageInputFiles = stageInputFiles(jobExecutionContext, paramValueEach);
+ status.setTransferState(TransferState.UPLOAD);
+ detail.setTransferStatus(status);
+ detail.setTransferDescription("Input Data Staged: " + stageInputFiles);
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+ newFiles.add(stageInputFiles);
}
((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
}
@@ -75,6 +88,13 @@ public class SCPInputHandler implements GFacHandler {
}
} catch (Exception e) {
log.error(e.getMessage());
+ status.setTransferState(TransferState.FAILED);
+ detail.setTransferStatus(status);
+ try {
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+ } catch (Exception e1) {
+ throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
+ }
throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
}
jobExecutionContext.setInMessageContext(inputNew);
http://git-wip-us.apache.org/repos/asf/airavata/blob/8a615ec0/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java
index bc4633f..eca0daf 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java
@@ -22,40 +22,41 @@ package org.apache.airavata.gfac.handler;
import java.io.File;
import java.io.IOException;
-import java.net.URISyntaxException;
-import java.util.*;
+import java.util.HashMap;
+import java.util.Map;
import net.schmizz.sshj.connection.ConnectionException;
import net.schmizz.sshj.transport.TransportException;
-import org.apache.airavata.common.utils.StringUtil;
import org.apache.airavata.commons.gfac.type.ActualParameter;
-import org.apache.airavata.commons.gfac.type.MappingFactory;
import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.ToolsException;
import org.apache.airavata.gfac.context.JobExecutionContext;
-import org.apache.airavata.gfac.context.MessageContext;
import org.apache.airavata.gfac.context.security.GSISecurityContext;
import org.apache.airavata.gfac.context.security.SSHSecurityContext;
-import org.apache.airavata.gfac.external.GridFtp;
import org.apache.airavata.gfac.provider.GFacProviderException;
import org.apache.airavata.gfac.utils.GFacUtils;
import org.apache.airavata.gfac.utils.OutputUtils;
import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.model.workspace.experiment.DataTransferDetails;
+import org.apache.airavata.model.workspace.experiment.TransferState;
+import org.apache.airavata.model.workspace.experiment.TransferStatus;
+import org.apache.airavata.persistance.registry.jpa.model.DataTransferDetail;
+import org.apache.airavata.registry.cpi.ChildDataType;
import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
-import org.apache.airavata.schemas.gfac.URIArrayType;
-import org.apache.airavata.schemas.gfac.URIParameterType;
import org.apache.xmlbeans.XmlException;
-import org.ietf.jgss.GSSCredential;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SCPOutputHandler implements GFacHandler{
+public class SCPOutputHandler extends AbstractHandler{
private static final Logger log = LoggerFactory.getLogger(SCPOutputHandler.class);
- public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException, GFacException {
- ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext()
+ super.invoke(jobExecutionContext);
+ DataTransferDetails detail = new DataTransferDetails();
+ TransferStatus status = new TransferStatus();
+
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext()
.getApplicationDeploymentDescription().getType();
try {
Cluster cluster = null;
@@ -82,6 +83,16 @@ public class SCPOutputHandler implements GFacHandler{
String stdOutStr = GFacUtils.readFileToString(localStdOutFile.getAbsolutePath());
String stdErrStr = GFacUtils.readFileToString(localStdErrFile.getAbsolutePath());
+ status.setTransferState(TransferState.COMPLETE);
+ detail.setTransferStatus(status);
+ detail.setTransferDescription("STDOUT:" + stdOutStr);
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+
+ status.setTransferState(TransferState.COMPLETE);
+ detail.setTransferStatus(status);
+ detail.setTransferDescription("STDERR:" + stdErrStr);
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+
Map<String, ActualParameter> stringMap = new HashMap<String, ActualParameter>();
Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters();
@@ -91,6 +102,11 @@ public class SCPOutputHandler implements GFacHandler{
"Empty Output returned from the Application, Double check the application"
+ "and ApplicationDescriptor output Parameter Names");
}
+ status.setTransferState(TransferState.DOWNLOAD);
+ detail.setTransferStatus(status);
+ detail.setTransferDescription("Output: " + stringMap.get(output.keySet()).toString());
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+
} catch (XmlException e) {
throw new GFacHandlerException("Cannot read output:" + e.getMessage(), e);
} catch (ConnectionException e) {
@@ -100,6 +116,13 @@ public class SCPOutputHandler implements GFacHandler{
} catch (IOException e) {
throw new GFacHandlerException(e.getMessage(), e);
} catch (Exception e) {
+ try {
+ status.setTransferState(TransferState.FAILED);
+ detail.setTransferStatus(status);
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+ } catch (Exception e1) {
+ throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
+ }
throw new GFacHandlerException("Error in retrieving results", e);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8a615ec0/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java
index 6d1d397..e49c5dd 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java
@@ -1,89 +1,42 @@
-///*
-// *
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements. See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership. The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License. You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing,
-// * software distributed under the License is distributed on an
-// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// * KIND, either express or implied. See the License for the
-// * specific language governing permissions and limitations
-// * under the License.
-// *
-// */
-//
-//package org.apache.airavata.gfac.provider.impl;
-//
-//import java.util.Map;
-//
-//import org.apache.airavata.common.workflow.execution.context.WorkflowContextHeaderBuilder;
-//import org.apache.airavata.core.gfac.context.invocation.InvocationContext;
-//import org.apache.airavata.core.gfac.exception.*;
-//import org.apache.airavata.core.gfac.notification.GFacNotifier;
-//import org.apache.airavata.core.gfac.provider.Provider;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-//
-///**
-// * AbstractProvider wraps up steps of execution for Provider. <br/>
-// * The steps in execution are <br/>
-// * - makeDirectory <br/>
-// * - setupEnvironment <br/>
-// * - executeApplication <br/>
-// * - retrieveOutput <br/>
-// */
-//public abstract class AbstractProvider implements Provider {
-// protected final Logger log = LoggerFactory.getLogger(this.getClass() +
-// "." + WorkflowContextHeaderBuilder.getCurrentContextHeader().getWorkflowMonitoringContext().getExperimentId());
-// public void initialize(InvocationContext invocationContext) throws ProviderException {
-// /*
-// * Make a directory on the host
-// */
-// makeDirectory(invocationContext);
-// }
-//
-// public void dispose(InvocationContext invocationContext) throws GfacException {
-// }
-//
-// public Map<String, ?> execute(InvocationContext invocationContext) throws ProviderException {
-//
-// processInput(invocationContext);
-// /*
-// * Setup necessary environment
-// */
-// setupEnvironment(invocationContext);
-//
-// GFacNotifier notifier = invocationContext.getExecutionContext().getNotifier();
-//
-// notifier.startExecution(invocationContext);
-//
-// /*
-// * Execution application
-// */
-// executeApplication(invocationContext);
-//
-// notifier.finishExecution(invocationContext);
-//
-// /*
-// * Process output information
-// */
-// return processOutput(invocationContext);
-// }
-//
-// protected abstract void makeDirectory(InvocationContext invocationContext) throws ProviderException;
-//
-// protected abstract void setupEnvironment(InvocationContext invocationContext) throws ProviderException;
-//
-// protected abstract void executeApplication(InvocationContext invocationContext) throws ProviderException;
-//
-// protected abstract Map<String, ?> processOutput(InvocationContext invocationContext) throws ProviderException;
-//
-// protected abstract Map<String, ?> processInput(InvocationContext invocationContext) throws ProviderException;
-//}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.provider.impl;
+
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.provider.GFacProviderException;
+import org.apache.airavata.model.workspace.experiment.JobDetails;
+import org.apache.airavata.model.workspace.experiment.JobStatus;
+import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.Registry;
+
+public abstract class AbstractProvider{
+ protected Registry registry = null;
+ protected JobDetails details;
+ protected JobStatus status;
+
+ public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+ registry = RegistryFactory.getDefaultRegistry();
+ details = new JobDetails();
+ status = new JobStatus();
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8a615ec0/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/BESProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/BESProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/BESProvider.java
index 3339beb..03dbf42 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/BESProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/BESProvider.java
@@ -54,6 +54,7 @@ import org.apache.airavata.gfac.provider.utils.DataTransferrer;
import org.apache.airavata.gfac.provider.utils.JSDLGenerator;
import org.apache.airavata.gfac.provider.utils.StorageCreator;
import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.model.workspace.experiment.JobState;
import org.apache.airavata.registry.api.workflow.ApplicationJob;
import org.apache.airavata.registry.api.workflow.ApplicationJob.ApplicationJobStatus;
import org.apache.airavata.schemas.gfac.UnicoreHostType;
@@ -91,7 +92,7 @@ import eu.unicore.util.httpclient.DefaultClientConfiguration;
-public class BESProvider implements GFacProvider {
+public class BESProvider extends AbstractProvider implements GFacProvider{
protected final Logger log = LoggerFactory.getLogger(this.getClass());
private DefaultClientConfiguration secProperties;
@@ -103,6 +104,7 @@ public class BESProvider implements GFacProvider {
public void initialize(JobExecutionContext jobExecutionContext)
throws GFacProviderException, GFacException {
log.info("Initializing UNICORE Provider");
+ super.initialize(jobExecutionContext);
initSecurityProperties(jobExecutionContext);
log.debug("initialized security properties");
}
@@ -194,12 +196,15 @@ public class BESProvider implements GFacProvider {
ActivityStatusType activityStatus = null;
try {
activityStatus = getStatus(factory, activityEpr);
- ApplicationJobStatus applicationJobStatus = getApplicationJobStatus(activityStatus);
- String jobStatusMessage = "Status of job " + jobId + "is " + applicationJobStatus;
+ JobState jobStatus = getApplicationJobStatus(activityStatus);
+ String jobStatusMessage = "Status of job " + jobId + "is " + jobStatus;
jobExecutionContext.getNotifier().publish(new StatusChangeEvent(jobStatusMessage));
- GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobId, applicationJobStatus);
+ details.setJobID(jobId);
+ GFacUtils.updateJobStatus(details, jobStatus);
} catch (UnknownActivityIdentifierFault e) {
throw new GFacProviderException(e.getMessage(), e.getCause());
+ }catch (GFacException e) {
+ throw new GFacProviderException(e.getMessage(), e.getCause());
}
try {
@@ -230,10 +235,15 @@ public class BESProvider implements GFacProvider {
dt.downloadStdOuts();
} else if (activityStatus.getState() == ActivityStateEnumeration.CANCELLED) {
String experimentID = (String) jobExecutionContext.getProperty(Constants.PROP_TOPIC);
- ApplicationJobStatus applicationJobStatus = ApplicationJobStatus.CANCELED;
- String jobStatusMessage = "Status of job " + jobId + "is " + applicationJobStatus;
+ JobState jobStatus = JobState.CANCELED;
+ String jobStatusMessage = "Status of job " + jobId + "is " + jobStatus;
jobExecutionContext.getNotifier().publish(new StatusChangeEvent(jobStatusMessage));
- GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobId, applicationJobStatus);
+ details.setJobID(jobId);
+ try {
+ GFacUtils.saveJobStatus(details, jobStatus, jobExecutionContext.getTaskData().getTaskID());
+ } catch (GFacException e) {
+ throw new GFacProviderException(e.getLocalizedMessage(),e);
+ }
throw new GFacProviderException(experimentID + "Job Canceled");
}
@@ -263,9 +273,9 @@ public class BESProvider implements GFacProvider {
}
}
- private ApplicationJobStatus getApplicationJobStatus(ActivityStatusType activityStatus){
+ private JobState getApplicationJobStatus(ActivityStatusType activityStatus){
if (activityStatus == null) {
- return ApplicationJobStatus.UNKNOWN;
+ return JobState.UNKNOWN;
}
Enum state = activityStatus.getState();
String status = null;
@@ -279,34 +289,34 @@ public class BESProvider implements GFacProvider {
if (status != null) {
if (status.equalsIgnoreCase("Queued") || status.equalsIgnoreCase("Starting")
|| status.equalsIgnoreCase("Ready")) {
- return ApplicationJobStatus.PENDING;
+ return JobState.QUEUED;
} else if (status.equalsIgnoreCase("Staging-In")) {
- return ApplicationJobStatus.STAGING;
+ return JobState.SUBMITTED;
} else if (status.equalsIgnoreCase("Staging-Out") || status.equalsIgnoreCase("FINISHED")) {
- return ApplicationJobStatus.FINISHED;
+ return JobState.COMPLETE;
} else if (status.equalsIgnoreCase("Executing")) {
- return ApplicationJobStatus.ACTIVE;
+ return JobState.ACTIVE;
} else if (status.equalsIgnoreCase("FAILED")) {
- return ApplicationJobStatus.FAILED;
+ return JobState.FAILED;
} else if (status.equalsIgnoreCase("CANCELLED")) {
- return ApplicationJobStatus.CANCELED;
+ return JobState.CANCELED;
}
} else {
if (ActivityStateEnumeration.CANCELLED.equals(state)) {
- return ApplicationJobStatus.CANCELED;
+ return JobState.CANCELED;
} else if (ActivityStateEnumeration.FAILED.equals(state)) {
- return ApplicationJobStatus.FAILED;
+ return JobState.FAILED;
} else if (ActivityStateEnumeration.FINISHED.equals(state)) {
- return ApplicationJobStatus.FINISHED;
+ return JobState.COMPLETE;
} else if (ActivityStateEnumeration.RUNNING.equals(state)) {
- return ApplicationJobStatus.ACTIVE;
+ return JobState.ACTIVE;
}
}
} finally {
if (acursor != null)
acursor.dispose();
}
- return ApplicationJobStatus.UNKNOWN;
+ return JobState.UNKNOWN;
}
private void saveApplicationJob(JobExecutionContext jobExecutionContext, JobDefinitionType jobDefinition,
http://git-wip-us.apache.org/repos/asf/airavata/blob/8a615ec0/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java
index 1e80877..b7bba72 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java
@@ -34,18 +34,18 @@ import org.apache.airavata.gfac.context.JobExecutionContext;
import org.apache.airavata.gfac.context.MessageContext;
import org.apache.airavata.gfac.context.security.GSISecurityContext;
import org.apache.airavata.gfac.context.security.SSHSecurityContext;
-import org.apache.airavata.gfac.notification.events.JobIDEvent;
import org.apache.airavata.gfac.notification.events.StartExecutionEvent;
-import org.apache.airavata.gfac.notification.listeners.GSISSHJobSubmissionListener;
import org.apache.airavata.gfac.provider.GFacProvider;
import org.apache.airavata.gfac.provider.GFacProviderException;
+import org.apache.airavata.gfac.utils.GFacUtils;
import org.apache.airavata.gsi.ssh.api.Cluster;
import org.apache.airavata.gsi.ssh.api.SSHApiException;
import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
-import org.apache.airavata.gsi.ssh.impl.JobStatus;
import org.apache.airavata.gsi.ssh.impl.PBSCluster;
-import org.apache.airavata.gsi.ssh.listener.JobSubmissionListener;
+import org.apache.airavata.model.workspace.experiment.ComputationalResourceScheduling;
import org.apache.airavata.model.workspace.experiment.JobDetails;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
import org.apache.airavata.schemas.gfac.FileArrayType;
import org.apache.airavata.schemas.gfac.HostDescriptionType;
import org.apache.airavata.schemas.gfac.HpcApplicationDeploymentType;
@@ -54,7 +54,7 @@ import org.apache.airavata.schemas.gfac.URIArrayType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class GSISSHProvider implements GFacProvider {
+public class GSISSHProvider extends AbstractProvider implements GFacProvider{
private static final Logger log = LoggerFactory.getLogger(GSISSHProvider.class);
public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
@@ -62,7 +62,7 @@ public class GSISSHProvider implements GFacProvider {
}
public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
- //To change body of implemented methods use File | Settings | File Templates.
+ super.initialize(jobExecutionContext);
}
public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
@@ -107,6 +107,26 @@ public class GSISSHProvider implements GFacProvider {
jobDescriptor.setQueueName(app.getQueue().getQueueName());
}
jobDescriptor.setOwner(((PBSCluster) cluster).getServerInfo().getUserName());
+
+ TaskDetails taskData = jobExecutionContext.getTaskData();
+ if(taskData != null && taskData.isSetTaskScheduling()){
+ ComputationalResourceScheduling computionnalResource = taskData.getTaskScheduling();
+ if(computionnalResource.getNodeCount() > 0){
+ jobDescriptor.setNodes(computionnalResource.getNodeCount());
+ }
+ if(computionnalResource.getComputationalProjectAccount() != null){
+ jobDescriptor.setAcountString(computionnalResource.getComputationalProjectAccount());
+ }
+ if(computionnalResource.getQueueName() != null){
+ jobDescriptor.setQueueName(computionnalResource.getQueueName());
+ }
+ if(computionnalResource.getTotalCPUCount() > 0){
+ jobDescriptor.setProcessesPerNode(computionnalResource.getTotalCPUCount());
+ }
+ if(computionnalResource.getWallTimeLimit() > 0){
+ jobDescriptor.setMaxWallTime(String.valueOf(computionnalResource.getWallTimeLimit()));
+ }
+ }
List<String> inputValues = new ArrayList<String>();
MessageContext input = jobExecutionContext.getInMessageContext();
Map<String, Object> inputs = input.getParameters();
@@ -137,6 +157,7 @@ public class GSISSHProvider implements GFacProvider {
JobDetails jobDetails = new JobDetails();
jobDetails.setJobID(jobID);
jobExecutionContext.setJobDetails(jobDetails);
+ GFacUtils.saveJobStatus(jobDetails,JobState.QUEUED,jobExecutionContext.getTaskData().getTaskID());
} catch (SSHApiException e) {
String error = "Error submitting the job to host " + host.getHostAddress() + e.getMessage();
log.error(error);
http://git-wip-us.apache.org/repos/asf/airavata/blob/8a615ec0/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java
index 85241d5..bb0cf97 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java
@@ -24,7 +24,10 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
-import java.util.*;
+import java.util.Calendar;
+import java.util.Map;
+import java.util.MissingResourceException;
+import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.airavata.common.exception.ApplicationSettingsException;
@@ -40,8 +43,12 @@ import org.apache.airavata.gfac.provider.GFacProviderException;
import org.apache.airavata.gfac.utils.GFacUtils;
import org.apache.airavata.gfac.utils.GramJobSubmissionListener;
import org.apache.airavata.gfac.utils.GramProviderUtils;
+import org.apache.airavata.model.workspace.experiment.JobDetails;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.model.workspace.experiment.JobStatus;
import org.apache.airavata.registry.api.workflow.ApplicationJob;
import org.apache.airavata.registry.api.workflow.ApplicationJob.ApplicationJobStatus;
+import org.apache.airavata.registry.cpi.ChildDataType;
import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
import org.apache.airavata.schemas.gfac.GlobusHostType;
import org.globus.gram.GramException;
@@ -54,7 +61,7 @@ import org.ietf.jgss.GSSException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class GramProvider implements GFacProvider {
+public class GramProvider extends AbstractProvider implements GFacProvider{
private static final Logger log = LoggerFactory.getLogger(GramJobSubmissionListener.class);
private GramJob job;
@@ -109,9 +116,10 @@ public class GramProvider implements GFacProvider {
// This method prepare the environment before the application invocation.
- public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+ public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
try {
+ super.initialize(jobExecutionContext);
String strTwoPhase = ServerSettings.getSetting("TwoPhase");
if (strTwoPhase != null) {
twoPhase = Boolean.parseBoolean(strTwoPhase);
@@ -170,7 +178,7 @@ public class GramProvider implements GFacProvider {
JobExecutionContext jobExecutionContext,
GlobusHostType globusHostType) throws GFacException, GFacProviderException {
boolean applicationSaved=false;
- if (twoPhase) {
+ if (twoPhase) {
try {
/*
* The first boolean is to force communication through SSLv3
@@ -185,17 +193,19 @@ public class GramProvider implements GFacProvider {
renewCredentialsAttempt = false;
} catch (WaitingForCommitException e) {
-
- saveApplicationJob(jobExecutionContext, ApplicationJobStatus.UN_SUBMITTED);
+ String jobID = job.getIDAsString();
+
+ details.setJobID(jobID);
+ details.setJobDescription(job.getRSL());
+ jobExecutionContext.setJobDetails(details);
+ GFacUtils.saveJobStatus(details, JobState.UN_SUBMITTED, jobExecutionContext.getTaskData().getTaskID());
+
applicationSaved=true;
- String jobStatusMessage = "Un-submitted JobID= " + job.getIDAsString();
+ String jobStatusMessage = "Un-submitted JobID= " + jobID;
log.info(jobStatusMessage);
jobExecutionContext.getNotifier().publish(new JobIDEvent(jobStatusMessage));
- log.info("JobID = " + job.getIDAsString());
-
-
- log.info("Two phase commit: sending COMMIT_REQUEST signal; Job id - " + job.getIDAsString());
+ log.info("Two phase commit: sending COMMIT_REQUEST signal; Job id - " + jobID);
try {
job.signal(GramJob.SIGNAL_COMMIT_REQUEST);
@@ -222,7 +232,7 @@ public class GramProvider implements GFacProvider {
+ job.getIDAsString() + ". Credentials provided invalid", e1);
}
}
-
+ GFacUtils.updateJobStatus(details, JobState.SUBMITTED);
jobStatusMessage = "Submitted JobID= " + job.getIDAsString();
log.info(jobStatusMessage);
jobExecutionContext.getNotifier().publish(new JobIDEvent(jobStatusMessage));
@@ -262,15 +272,6 @@ public class GramProvider implements GFacProvider {
}
currentlyExecutingJobCache.put(job.getIDAsString(), job);
-
- /* these will be removed and used new status structure
- if (applicationSaved){
- GFacUtils.updateApplicationJobStatus(jobExecutionContext, job.getIDAsString(), ApplicationJobStatus.SUBMITTED);
- }else{
- saveApplicationJob(jobExecutionContext, ApplicationJobStatus.SUBMITTED);
- applicationSaved=true;
- }*/
-
/*
* Wait until job is done
*/
@@ -350,15 +351,7 @@ public class GramProvider implements GFacProvider {
}
- private void saveApplicationJob(JobExecutionContext jobExecutionContext, ApplicationJobStatus jobStatus) {
- ApplicationJob appJob = GFacUtils.createApplicationJob(jobExecutionContext);
- appJob.setJobId(job.getIDAsString());
- appJob.setJobData(job.getRSL());
- appJob.setSubmittedTime(Calendar.getInstance().getTime());
- appJob.setStatus(jobStatus);
- appJob.setStatusUpdateTime(appJob.getSubmittedTime());
- GFacUtils.recordApplicationJob(jobExecutionContext, appJob);
- }
+
public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8a615ec0/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/HadoopProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/HadoopProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/HadoopProvider.java
index 956b7b2..a5762a1 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/HadoopProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/HadoopProvider.java
@@ -55,7 +55,7 @@ import sun.reflect.generics.reflectiveObjects.NotImplementedException;
* Executes hadoop job using the cluster configuration provided by handlers in
* in-flow.
*/
-public class HadoopProvider implements GFacProvider{
+public class HadoopProvider extends AbstractProvider implements GFacProvider{
private static final Logger logger = LoggerFactory.getLogger(HadoopProvider.class);
private boolean isWhirrBasedDeployment = false;
http://git-wip-us.apache.org/repos/asf/airavata/blob/8a615ec0/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java
index 9d1f204..d9a8383 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java
@@ -41,6 +41,7 @@ import org.apache.airavata.gfac.utils.GFacUtils;
import org.apache.airavata.gfac.utils.InputStreamToFileWriter;
import org.apache.airavata.gfac.utils.InputUtils;
import org.apache.airavata.gfac.utils.OutputUtils;
+import org.apache.airavata.model.workspace.experiment.JobState;
import org.apache.airavata.registry.api.workflow.ApplicationJob;
import org.apache.airavata.registry.api.workflow.ApplicationJob.ApplicationJobStatus;
import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
@@ -48,9 +49,10 @@ import org.apache.airavata.schemas.gfac.NameValuePairType;
import org.apache.xmlbeans.XmlException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
-public class LocalProvider implements GFacProvider {
+public class LocalProvider extends AbstractProvider implements GFacProvider{
private static final Logger log = LoggerFactory.getLogger(LocalProvider.class);
private ProcessBuilder builder;
private List<String> cmdList;
@@ -97,7 +99,8 @@ public class LocalProvider implements GFacProvider {
cmdList = new ArrayList<String>();
}
- public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+ public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException {
+ super.initialize(jobExecutionContext);
ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().
getApplicationDeploymentDescription().getType();
@@ -125,9 +128,12 @@ public class LocalProvider implements GFacProvider {
getApplicationContext().getApplicationDeploymentDescription().getType();
try {
- // running cmd
+ jobId= jobExecutionContext.getTaskData().getTaskID();
+ details.setJobID(jobId);
+ GFacUtils.saveJobStatus(details, JobState.SETUP, jobExecutionContext.getTaskData().getTaskID());
+ // running cmd
Process process = builder.start();
- jobId= jobExecutionContext.getExperimentID();
+
//todo fix how to incoperate orchestrator with gfac
// if(jobExecutionContext.getGFacConfiguration().getAiravataAPI() != null){
// saveApplicationJob(jobExecutionContext);
@@ -168,36 +174,39 @@ public class LocalProvider implements GFacProvider {
.append(" on the localHost, working directory = ").append(app.getStaticWorkingDirectory())
.append(" tempDirectory = ").append(app.getScratchWorkingDirectory()).append(" With the status ")
.append(String.valueOf(returnValue));
-
+ details.setJobDescription(buf.toString());
+ GFacUtils.updateJobStatus(details, JobState.COMPLETE);
log.info(buf.toString());
} catch (IOException io) {
throw new GFacProviderException(io.getMessage(), io);
} catch (InterruptedException e) {
throw new GFacProviderException(e.getMessage(), e);
+ }catch (GFacException e) {
+ throw new GFacProviderException(e.getMessage(), e);
}
}
- private void saveApplicationJob(JobExecutionContext jobExecutionContext)
- throws GFacProviderException {
- ApplicationDeploymentDescriptionType app = jobExecutionContext.
- getApplicationContext().getApplicationDeploymentDescription().getType();
- ApplicationJob appJob = GFacUtils.createApplicationJob(jobExecutionContext);
- appJob.setJobId(jobId);
- LocalProviderJobData data = new LocalProviderJobData();
- data.setApplicationName(app.getExecutableLocation());
- data.setInputDir(app.getInputDataDirectory());
- data.setOutputDir(app.getOutputDataDirectory());
- data.setWorkingDir(builder.directory().toString());
- data.setInputParameters(ProviderUtils.getInputParameters(jobExecutionContext));
- ByteArrayOutputStream stream = new ByteArrayOutputStream();
- JAXB.marshal(data, stream);
- appJob.setJobData(stream.toString());
- appJob.setSubmittedTime(Calendar.getInstance().getTime());
- appJob.setStatus(ApplicationJobStatus.SUBMITTED);
- appJob.setStatusUpdateTime(appJob.getSubmittedTime());
- GFacUtils.recordApplicationJob(jobExecutionContext, appJob);
- }
+// private void saveApplicationJob(JobExecutionContext jobExecutionContext)
+// throws GFacProviderException {
+// ApplicationDeploymentDescriptionType app = jobExecutionContext.
+// getApplicationContext().getApplicationDeploymentDescription().getType();
+// ApplicationJob appJob = GFacUtils.createApplicationJob(jobExecutionContext);
+// appJob.setJobId(jobId);
+// LocalProviderJobData data = new LocalProviderJobData();
+// data.setApplicationName(app.getExecutableLocation());
+// data.setInputDir(app.getInputDataDirectory());
+// data.setOutputDir(app.getOutputDataDirectory());
+// data.setWorkingDir(builder.directory().toString());
+// data.setInputParameters(ProviderUtils.getInputParameters(jobExecutionContext));
+// ByteArrayOutputStream stream = new ByteArrayOutputStream();
+// JAXB.marshal(data, stream);
+// appJob.setJobData(stream.toString());
+// appJob.setSubmittedTime(Calendar.getInstance().getTime());
+// appJob.setStatus(ApplicationJobStatus.SUBMITTED);
+// appJob.setStatusUpdateTime(appJob.getSubmittedTime());
+// GFacUtils.recordApplicationJob(jobExecutionContext, appJob);
+// }
public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
http://git-wip-us.apache.org/repos/asf/airavata/blob/8a615ec0/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
index 48826a8..1b44c1b 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
@@ -49,39 +49,42 @@ import org.apache.airavata.gfac.context.security.SSHSecurityContext;
import org.apache.airavata.gfac.provider.GFacProvider;
import org.apache.airavata.gfac.provider.GFacProviderException;
import org.apache.airavata.gfac.utils.GFacUtils;
-import org.apache.airavata.registry.api.workflow.ApplicationJob;
-import org.apache.airavata.registry.api.workflow.ApplicationJob.ApplicationJobStatus;
+import org.apache.airavata.model.workspace.experiment.JobState;
import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
import org.apache.airavata.schemas.gfac.NameValuePairType;
import org.apache.airavata.schemas.gfac.SSHHostType;
import org.apache.airavata.schemas.gfac.URIArrayType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
/**
* Execute application using remote SSH
*/
-public class SSHProvider implements GFacProvider {
+public class SSHProvider extends AbstractProvider implements GFacProvider{
private static final Logger log = LoggerFactory.getLogger(SSHProvider.class);
private SSHSecurityContext securityContext;
private String jobID = null;
+ private String taskID = null;
// we keep gsisshprovider to support qsub submission incase of hpc scenario with ssh
private GSISSHProvider gsiSshProvider = null;
public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
- if (!((SSHHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType()).getHpcResource()) {
+ super.initialize(jobExecutionContext);
+ taskID = jobExecutionContext.getTaskData().getTaskID();
+ if (!((SSHHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType()).getHpcResource()) {
jobID = "SSH_" + jobExecutionContext.getApplicationContext().getHostDescription().getType().getHostAddress() + "_" + Calendar.getInstance().getTimeInMillis();
securityContext = (SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT);
ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
String remoteFile = app.getStaticWorkingDirectory() + File.separatorChar + Constants.EXECUTABLE_NAME;
- saveApplicationJob(jobExecutionContext, remoteFile);
+ details.setJobDescription(remoteFile);
+ GFacUtils.saveJobStatus(details, JobState.SETUP, taskID);
log.info(remoteFile);
try {
File runscript = createShellScript(jobExecutionContext);
SCPFileTransfer fileTransfer = securityContext.getSSHClient().newSCPFileTransfer();
- GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobID, ApplicationJobStatus.STAGING);
fileTransfer.upload(runscript.getAbsolutePath(), remoteFile);
} catch (IOException e) {
throw new GFacProviderException(e.getLocalizedMessage(), e);
@@ -91,15 +94,6 @@ public class SSHProvider implements GFacProvider {
}
}
- private void saveApplicationJob(JobExecutionContext jobExecutionContext, String executableName) {
- ApplicationJob job = GFacUtils.createApplicationJob(jobExecutionContext);
- job.setJobId(jobID);
- job.setStatus(ApplicationJobStatus.INITIALIZE);
- job.setSubmittedTime(Calendar.getInstance().getTime());
- job.setStatusUpdateTime(job.getSubmittedTime());
- job.setJobData(executableName);
- GFacUtils.recordApplicationJob(jobExecutionContext, job);
- }
public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException {
if (gsiSshProvider == null) {
@@ -111,9 +105,9 @@ public class SSHProvider implements GFacProvider {
* Execute
*/
String execuable = app.getStaticWorkingDirectory() + File.separatorChar + Constants.EXECUTABLE_NAME;
- GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobID, ApplicationJobStatus.SUBMITTED);
+ details.setJobDescription(execuable);
+ GFacUtils.updateJobStatus(details, JobState.SUBMITTED);
Command cmd = session.exec("/bin/chmod 755 " + execuable + "; " + execuable);
- GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobID, ApplicationJobStatus.RESULTS_RETRIEVE);
log.info("stdout=" + GFacUtils.readFromStream(session.getInputStream()));
cmd.join(Constants.COMMAND_EXECUTION_TIMEOUT, TimeUnit.SECONDS);
@@ -127,8 +121,7 @@ public class SSHProvider implements GFacProvider {
} else {
log.info("Process finished with return value of zero.");
}
-
- GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobID, ApplicationJobStatus.FINISHED);
+ GFacUtils.updateJobStatus(details, JobState.COMPLETE);
} catch (ConnectionException e) {
throw new GFacProviderException(e.getMessage(), e);
} catch (TransportException e) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/8a615ec0/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GFacUtils.java
index 04353b6..2457528 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GFacUtils.java
@@ -39,8 +39,14 @@ import org.apache.airavata.gfac.Constants;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.context.JobExecutionContext;
import org.apache.airavata.model.workspace.experiment.DataObjectType;
+import org.apache.airavata.model.workspace.experiment.JobDetails;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.model.workspace.experiment.JobStatus;
+import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
import org.apache.airavata.registry.api.workflow.ApplicationJob;
import org.apache.airavata.registry.api.workflow.ApplicationJob.ApplicationJobStatus;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.apache.airavata.registry.cpi.Registry;
import org.apache.airavata.schemas.gfac.*;
import org.apache.axiom.om.OMElement;
import org.globus.gram.GramJob;
@@ -695,31 +701,49 @@ public class GFacUtils {
log.error("Error in persisting application job data for application job " + job.getJobId() + "!!!", e);
}
}
-
- public static void updateApplicationJobStatus(JobExecutionContext context, String jobId, ApplicationJobStatus status) {
- updateApplicationJobStatus(context, jobId, status, Calendar.getInstance().getTime());
- }
-
- public static ApplicationJobStatus getApplicationJobStatus(int gramStatus) {
+ public static void saveJobStatus(JobDetails details, JobState state, String taskID) throws GFacException {
+ try {
+ Registry registry = RegistryFactory.getDefaultRegistry();
+ JobStatus status = new JobStatus();
+ status.setJobState(state);
+ details.setJobStatus(status);
+ registry.add(ChildDataType.JOB_DETAIL,details, taskID);
+ } catch (Exception e) {
+ throw new GFacException("Error persisting job status" + e.getLocalizedMessage(),e);
+ }
+ }
+ public static void updateJobStatus(JobDetails details, JobState state) throws GFacException {
+ try {
+ Registry registry = RegistryFactory.getDefaultRegistry();
+ JobStatus status = new JobStatus();
+ status.setJobState(state);
+ status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+ details.setJobStatus(status);
+ registry.update(org.apache.airavata.registry.cpi.DataType.JOB_DETAIL, details, details.getJobID());
+ } catch (Exception e) {
+ throw new GFacException("Error persisting job status" + e.getLocalizedMessage(),e);
+ }
+ }
+ public static JobState getApplicationJobStatus(int gramStatus) {
switch (gramStatus) {
case GramJob.STATUS_UNSUBMITTED:
- return ApplicationJobStatus.UN_SUBMITTED;
+ return JobState.HELD;
case GramJob.STATUS_ACTIVE:
- return ApplicationJobStatus.EXECUTING;
+ return JobState.ACTIVE;
case GramJob.STATUS_DONE:
- return ApplicationJobStatus.FINISHED;
+ return JobState.COMPLETE;
case GramJob.STATUS_FAILED:
- return ApplicationJobStatus.FAILED;
+ return JobState.FAILED;
case GramJob.STATUS_PENDING:
- return ApplicationJobStatus.PENDING;
+ return JobState.QUEUED;
case GramJob.STATUS_STAGE_IN:
- return ApplicationJobStatus.INITIALIZE;
+ return JobState.QUEUED;
case GramJob.STATUS_STAGE_OUT:
- return ApplicationJobStatus.FINALIZE;
+ return JobState.COMPLETE;
case GramJob.STATUS_SUSPENDED:
- return ApplicationJobStatus.SUSPENDED;
+ return JobState.SUSPENDED;
default:
- return ApplicationJobStatus.UNKNOWN;
+ return JobState.UNKNOWN;
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8a615ec0/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramJobSubmissionListener.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramJobSubmissionListener.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramJobSubmissionListener.java
index c682ded..9fc1989 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramJobSubmissionListener.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramJobSubmissionListener.java
@@ -20,6 +20,7 @@
*/
package org.apache.airavata.gfac.utils;
+import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.context.JobExecutionContext;
import org.apache.airavata.gfac.context.security.GSISecurityContext;
import org.apache.airavata.gfac.notification.events.StatusChangeEvent;
@@ -67,8 +68,11 @@ public class GramJobSubmissionListener implements GramJobListener{
}
private void setStatus(int status, int error) {
- GFacUtils.updateApplicationJobStatus(context,job.getIDAsString(),
- GFacUtils.getApplicationJobStatus(status));
+ try {
+ GFacUtils.saveJobStatus(context.getJobDetails(), GFacUtils.getApplicationJobStatus(status), context.getTaskData().getTaskID());
+ } catch (GFacException e) {
+ log.error("Error persisting status" + e.getLocalizedMessage(), e);
+ }
this.currentStatus = status;
this.error = error;
http://git-wip-us.apache.org/repos/asf/airavata/blob/8a615ec0/modules/gfac/gfac-ec2/src/main/java/org/apache/airavata/gfac/ec2/EC2Provider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ec2/src/main/java/org/apache/airavata/gfac/ec2/EC2Provider.java b/modules/gfac/gfac-ec2/src/main/java/org/apache/airavata/gfac/ec2/EC2Provider.java
index 4f606b1..30c8c89 100644
--- a/modules/gfac/gfac-ec2/src/main/java/org/apache/airavata/gfac/ec2/EC2Provider.java
+++ b/modules/gfac/gfac-ec2/src/main/java/org/apache/airavata/gfac/ec2/EC2Provider.java
@@ -48,8 +48,10 @@ import org.apache.airavata.gfac.ec2.util.AmazonEC2Util;
import org.apache.airavata.gfac.ec2.util.EC2ProviderUtil;
import org.apache.airavata.gfac.provider.GFacProvider;
import org.apache.airavata.gfac.provider.GFacProviderException;
+import org.apache.airavata.gfac.provider.impl.AbstractProvider;
import org.apache.airavata.gfac.provider.utils.ProviderUtils;
import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.model.workspace.experiment.JobState;
import org.apache.airavata.registry.api.workflow.ApplicationJob;
import org.apache.airavata.registry.api.workflow.ApplicationJob.ApplicationJobStatus;
import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
@@ -58,6 +60,7 @@ import org.apache.airavata.schemas.gfac.OutputParameterType;
import org.apache.airavata.schemas.gfac.StringParameterType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
import java.io.File;
@@ -67,7 +70,7 @@ import java.util.Calendar;
import java.util.List;
import java.util.Map;
-public class EC2Provider implements GFacProvider {
+public class EC2Provider extends AbstractProvider implements GFacProvider {
private static final Logger log = LoggerFactory.getLogger(EC2Provider.class);
@@ -82,6 +85,8 @@ public class EC2Provider implements GFacProvider {
private AmazonSecurityContext amazonSecurityContext;
private String jobId;
+
+ private String taskID;
public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException{
if (jobExecutionContext != null) {
@@ -105,8 +110,8 @@ public class EC2Provider implements GFacProvider {
log.debug("INS_TYPE:" + amazonSecurityContext.getInstanceType());
log.debug("USERNAME:" + amazonSecurityContext.getUserName());
}
- saveApplicationJob(jobExecutionContext);
// job
+ details.setJobID(jobId);
/* Validation */
if (amazonSecurityContext.getAccessKey() == null || amazonSecurityContext.getAccessKey().isEmpty())
throw new GFacProviderException("EC2 Access Key is empty");
@@ -123,22 +128,15 @@ public class EC2Provider implements GFacProvider {
AWSCredentials credential =
new BasicAWSCredentials(amazonSecurityContext.getAccessKey(), amazonSecurityContext.getSecretKey());
AmazonEC2Client ec2client = new AmazonEC2Client(credential);
- GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobId, ApplicationJobStatus.AUTHENTICATE);
+ taskID = jobExecutionContext.getTaskData().getTaskID();
+ GFacUtils.saveJobStatus(details, JobState.SETUP, taskID);
initEc2Environment(jobExecutionContext, ec2client);
checkConnection(instance, ec2client);
}
- private void saveApplicationJob(JobExecutionContext jobExecutionContext) {
- ApplicationJob job = GFacUtils.createApplicationJob(jobExecutionContext);
- job.setJobId(jobId);
- job.setStatus(ApplicationJobStatus.VALIDATE_INPUT);
- job.setSubmittedTime(Calendar.getInstance().getTime());
- job.setStatusUpdateTime(job.getSubmittedTime());
- GFacUtils.recordApplicationJob(jobExecutionContext, job);
- }
-
+
public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException {
- GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobId, ApplicationJobStatus.INITIALIZE);
+
String shellCmd = createShellCmd(jobExecutionContext);
AiravataAPI airavataAPI = jobExecutionContext.getGFacConfiguration().getAiravataAPI();
if (airavataAPI!=null){
@@ -173,7 +171,6 @@ public class EC2Provider implements GFacProvider {
return true;
}
});
- GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobId, ApplicationJobStatus.AUTHENTICATE);
// Initialize the authentication data.
PublicKeyAuthenticationClient publicKeyAuth = new PublicKeyAuthenticationClient();
publicKeyAuth.setUsername(amazonSecurityContext.getUserName());
@@ -185,6 +182,7 @@ public class EC2Provider implements GFacProvider {
// Authenticate
int result = sshClient.authenticate(publicKeyAuth);
if(result== AuthenticationProtocolState.FAILED) {
+ GFacUtils.saveJobStatus(details, JobState.FAILED, taskID);
throw new GFacProviderException("The authentication failed");
} else if(result==AuthenticationProtocolState.PARTIAL) {
throw new GFacProviderException("The authentication succeeded but another"
@@ -192,13 +190,13 @@ public class EC2Provider implements GFacProvider {
} else if(result==AuthenticationProtocolState.COMPLETE) {
log.info("ssh client authentication is complete...");
}
- GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobId, ApplicationJobStatus.SUBMITTED);
+ GFacUtils.saveJobStatus(details, JobState.SUBMITTED, taskID);
SessionChannelClient session = sshClient.openSessionChannel();
log.info("ssh session successfully opened...");
session.requestPseudoTerminal("vt100", 80, 25, 0, 0, "");
session.startShell();
-
- GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobId, ApplicationJobStatus.EXECUTING);
+ GFacUtils.saveJobStatus(details, JobState.ACTIVE, taskID);
+
session.getOutputStream().write(shellCmd.getBytes());
InputStream in = session.getInputStream();
@@ -215,8 +213,7 @@ public class EC2Provider implements GFacProvider {
break;
}
}
- GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobId, ApplicationJobStatus.RESULTS_RETRIEVE);
-
+
executionResult = executionResult.replace("\r","").replace("\n","");
log.info("Result of the job : " + executionResult);
@@ -229,7 +226,7 @@ public class EC2Provider implements GFacProvider {
((StringParameterType) outParam.getType()).setValue(executionResult);
jobExecutionContext.getOutMessageContext().addParameter(paramName, outParam);
}
- GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobId, ApplicationJobStatus.FINISHED);
+ GFacUtils.saveJobStatus(details, JobState.COMPLETE, taskID);
} catch (InvalidSshKeyException e) {
throw new GFacProviderException("Invalid SSH key", e);
} catch (IOException e) {
[2/4] git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/airavata
Posted by ra...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/airavata
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/5fb684b6
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/5fb684b6
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/5fb684b6
Branch: refs/heads/master
Commit: 5fb684b621202489eb62829569651bad41268853
Parents: 8a615ec 47afa39
Author: raminder <ra...@apache.org>
Authored: Mon Mar 3 14:07:48 2014 -0500
Committer: raminder <ra...@apache.org>
Committed: Mon Mar 3 14:07:48 2014 -0500
----------------------------------------------------------------------
.../monitor/impl/pull/qstat/QstatMonitor.java | 26 +++++++++++---------
.../impl/pull/qstat/ResourceConnection.java | 2 ++
.../apache/airavata/gfac/utils/GFacUtils.java | 7 +-----
.../client/OrchestratorClientFactoryTest.java | 3 ++-
.../registry/jpa/impl/ExperimentRegistry.java | 6 ++---
5 files changed, 23 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/5fb684b6/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GFacUtils.java
----------------------------------------------------------------------
[3/4] git commit: Fixed AIRAVATA-1043.
Posted by ra...@apache.org.
Fixed AIRAVATA-1043.
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/2736d75c
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/2736d75c
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/2736d75c
Branch: refs/heads/master
Commit: 2736d75c7cc04b1486190f86117cf1e545afc07b
Parents: 5fb684b
Author: raminder <ra...@apache.org>
Authored: Mon Mar 3 15:28:01 2014 -0500
Committer: raminder <ra...@apache.org>
Committed: Mon Mar 3 15:28:01 2014 -0500
----------------------------------------------------------------------
.../job/monitor/AiravataJobStatusUpdator.java | 19 +++++++++++++++++++
.../airavata/gfac/handler/SCPOutputHandler.java | 1 -
.../apache/airavata/gfac/utils/GFacUtils.java | 3 ++-
3 files changed, 21 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/2736d75c/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
index 0a0fde5..a6d6c31 100644
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
@@ -21,12 +21,16 @@
package org.apache.airavata.job.monitor;
import com.google.common.eventbus.Subscribe;
+
import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.airavata.model.workspace.experiment.JobDetails;
import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
import org.apache.airavata.registry.cpi.Registry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Calendar;
import java.util.concurrent.BlockingQueue;
public class AiravataJobStatusUpdator{
@@ -64,6 +68,11 @@ public class AiravataJobStatusUpdator{
the registry accordingly, for now we are just printing to standard Out
*/
JobState state = jobStatus.getState();
+ try {
+ updateJobStatus(jobStatus.getMonitorID().getJobID(),state);
+ } catch (Exception e) {
+ logger.error("Error persisting data" + e.getLocalizedMessage(),e);
+ }
switch (state) {
case COMPLETE:
logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is DONE");
@@ -97,4 +106,14 @@ public class AiravataJobStatusUpdator{
break;
}
}
+ public static void updateJobStatus(String jobID, JobState state) throws Exception {
+ Registry registry = RegistryFactory.getDefaultRegistry();
+ JobDetails details = new JobDetails();
+ org.apache.airavata.model.workspace.experiment.JobStatus status = new org.apache.airavata.model.workspace.experiment.JobStatus();
+ status.setJobState(state);
+ status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+ details.setJobStatus(status);
+ details.setJobID(jobID);
+ registry.update(org.apache.airavata.registry.cpi.DataType.JOB_DETAIL, details, jobID);
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/2736d75c/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java
index eca0daf..555a889 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java
@@ -104,7 +104,6 @@ public class SCPOutputHandler extends AbstractHandler{
}
status.setTransferState(TransferState.DOWNLOAD);
detail.setTransferStatus(status);
- detail.setTransferDescription("Output: " + stringMap.get(output.keySet()).toString());
registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
} catch (XmlException e) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/2736d75c/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GFacUtils.java
index e6b70e7..ad741b6 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GFacUtils.java
@@ -46,6 +46,7 @@ import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
import org.apache.airavata.registry.api.workflow.ApplicationJob;
import org.apache.airavata.registry.api.workflow.ApplicationJob.ApplicationJobStatus;
import org.apache.airavata.registry.cpi.ChildDataType;
+import org.apache.airavata.registry.cpi.CompositeIdentifier;
import org.apache.airavata.registry.cpi.Registry;
import org.apache.airavata.schemas.gfac.*;
import org.apache.axiom.om.OMElement;
@@ -707,7 +708,7 @@ public class GFacUtils {
JobStatus status = new JobStatus();
status.setJobState(state);
details.setJobStatus(status);
- registry.add(ChildDataType.JOB_DETAIL,details, taskID);
+ registry.add(ChildDataType.JOB_DETAIL,details, new CompositeIdentifier(taskID, details.getJobID()));
} catch (Exception e) {
throw new GFacException("Error persisting job status" + e.getLocalizedMessage(),e);
}
[4/4] git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/airavata
Posted by ra...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/airavata
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/1dc04ce7
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/1dc04ce7
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/1dc04ce7
Branch: refs/heads/master
Commit: 1dc04ce7c4a2ac591c5168a26815f74d6f788bec
Parents: 2736d75 119f754
Author: raminder <ra...@apache.org>
Authored: Mon Mar 3 15:29:33 2014 -0500
Committer: raminder <ra...@apache.org>
Committed: Mon Mar 3 15:29:33 2014 -0500
----------------------------------------------------------------------
modules/airavata-job-monitor/pom.xml | 10 ++--------
.../airavata/job/monitor/util/X509Helper.java | 17 ++++++++---------
2 files changed, 10 insertions(+), 17 deletions(-)
----------------------------------------------------------------------