You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/04/24 05:21:54 UTC
[2/2] git commit: fixing most of the error during job execution
fixing most of the error during job execution
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/b6c7c41e
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/b6c7c41e
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/b6c7c41e
Branch: refs/heads/master
Commit: b6c7c41e3ea54b625c695b54d525fa5d11c5f2be
Parents: 34a8147
Author: lahiru <la...@apache.org>
Authored: Wed Apr 23 23:19:30 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Wed Apr 23 23:19:30 2014 -0400
----------------------------------------------------------------------
.../server/src/main/resources/gfac-config.xml | 12 +-
.../server/src/main/assembly/bin-assembly.xml | 2 -
.../org/apache/airavata/gfac/cpi/GFacImpl.java | 1 +
.../handler/GSISSHDirectorySetupHandler.java | 98 +++++++++
.../gfac/handler/GSISSHInputHandler.java | 137 ++++++++++++
.../gfac/handler/GSISSHOutputHandler.java | 216 ++++++++++++++++++
.../apache/airavata/gfac/monitor/MonitorID.java | 2 +
.../monitor/core/AiravataAbstractMonitor.java | 2 +-
.../monitor/impl/pull/qstat/QstatMonitor.java | 4 +-
.../monitor/state/JobStatusChangeRequest.java | 6 +
.../airavata/gfac/utils/GFACGSISSHUtils.java | 98 +++++++++
.../src/test/resources/gfac-config.xml | 12 +-
.../gfac/handler/SCPDirectorySetupHandler.java | 110 ----------
.../airavata/gfac/handler/SCPInputHandler.java | 147 -------------
.../airavata/gfac/handler/SCPOutputHandler.java | 218 -------------------
.../gfac/handler/SSHDirectorySetupHandler.java | 102 +++++++++
.../airavata/gfac/handler/SSHInputHandler.java | 146 +++++++++++++
.../airavata/gfac/handler/SSHOutputHandler.java | 218 +++++++++++++++++++
.../gfac-ssh/src/test/resources/gfac-config.xml | 6 +-
.../cpi/impl/SimpleOrchestratorImpl.java | 5 +-
20 files changed, 1046 insertions(+), 496 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/b6c7c41e/modules/configuration/server/src/main/resources/gfac-config.xml
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/gfac-config.xml b/modules/configuration/server/src/main/resources/gfac-config.xml
index 0607e49..1d30eac 100644
--- a/modules/configuration/server/src/main/resources/gfac-config.xml
+++ b/modules/configuration/server/src/main/resources/gfac-config.xml
@@ -64,16 +64,16 @@
<Provider class="org.apache.airavata.gfac.provider.impl.SSHProvider" host="org.apache.airavata.schemas.gfac.impl.SSHHostTypeImpl">
<InHandlers>
- <Handler class="org.apache.airavata.gfac.handler.SCPDirectorySetupHandler"/>
- <Handler class="org.apache.airavata.gfac.handler.SCPInputHandler"/>
+ <Handler class="org.apache.airavata.gfac.handler.SSHDirectorySetupHandler"/>
+ <Handler class="org.apache.airavata.gfac.handler.SSHInputHandler"/>
</InHandlers>
<OutHandlers>
- <Handler class="org.apache.airavata.gfac.handler.SCPOutputHandler"/>
+ <Handler class="org.apache.airavata.gfac.handler.SSHOutputHandler"/>
</OutHandlers>
</Provider>
<Provider class="org.apache.airavata.gfac.provider.impl.GSISSHProvider" host="org.apache.airavata.schemas.gfac.impl.GsisshHostTypeImpl">
<InHandlers>
- <Handler class="org.apache.airavata.gfac.handler.SCPDirectorySetupHandler"/>
+ <Handler class="org.apache.airavata.gfac.handler.GSISSHDirectorySetupHandler"/>
<!--Handler class="org.apache.airavata.gfac.handler.AdvancedSCPInputHandler">
<property name="privateKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa"/>
<property name="publicKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa.pub"/>
@@ -83,10 +83,10 @@
<property name="passPhrase" value="/home/airavata/outputData"/>
<property name="password" value="/home/airavata/outputData"/> <either we have to set password or keys, password has higher preference>
</Handler-->
- <Handler class="org.apache.airavata.gfac.handler.SCPInputHandler"/>
+ <Handler class="org.apache.airavata.gfac.handler.GSISSHInputHandler"/>
</InHandlers>
<OutHandlers>
- <Handler class="org.apache.airavata.gfac.handler.SCPOutputHandler"/>
+ <Handler class="org.apache.airavata.gfac.handler.SOutputHandler"/>
<!--Handler class="org.apache.airavata.gfac.handler.AdvancedSCPOutputHandler">
<property name="privateKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa"/>
<property name="publicKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa.pub"/>
http://git-wip-us.apache.org/repos/asf/airavata/blob/b6c7c41e/modules/distribution/server/src/main/assembly/bin-assembly.xml
----------------------------------------------------------------------
diff --git a/modules/distribution/server/src/main/assembly/bin-assembly.xml b/modules/distribution/server/src/main/assembly/bin-assembly.xml
index 4a76cd2..982e7d6 100644
--- a/modules/distribution/server/src/main/assembly/bin-assembly.xml
+++ b/modules/distribution/server/src/main/assembly/bin-assembly.xml
@@ -205,8 +205,6 @@
<include>org.apache.airavata:airavata-workflow-tracking:jar</include>
<include>org.apache.airavata:gsissh:jar</include>
<include>org.apache.airavata:airavata-model-utils:jar</include>
- <include>org.apache.airavata:job-monitor-tool:jar</include>
- <include>org.apache.airavata:gfac-monitor:jar</include>
<include>org.apache.airavata:airavata-api-server:jar</include>
<include>org.apache.airavata:airavata-api-stubs:jar</include>
<include>org.apache.openjpa:openjpa-all:jar</include>
http://git-wip-us.apache.org/repos/asf/airavata/blob/b6c7c41e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
index 6312292..ffd8af8 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
@@ -182,6 +182,7 @@ public class GFacImpl implements GFac, AbstractActivityListener {
GFacConfiguration gFacConfiguration = GFacConfiguration.create(new File(resource.getPath()), airavataAPI, configurationProperties);
jobExecutionContext = new JobExecutionContext(gFacConfiguration, serviceName);
+ jobExecutionContext.setRegistry(registry);
jobExecutionContext.setTaskData(taskData);
ApplicationContext applicationContext = new ApplicationContext();
http://git-wip-us.apache.org/repos/asf/airavata/blob/b6c7c41e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/GSISSHDirectorySetupHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/GSISSHDirectorySetupHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/GSISSHDirectorySetupHandler.java
new file mode 100644
index 0000000..06dbfc4
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/GSISSHDirectorySetupHandler.java
@@ -0,0 +1,98 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.handler;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.security.GSISecurityContext;
+import org.apache.airavata.gfac.utils.GFACGSISSHUtils;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.model.workspace.experiment.*;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class GSISSHDirectorySetupHandler extends AbstractHandler{
+ private static final Logger log = LoggerFactory.getLogger(GSISSHDirectorySetupHandler.class);
+
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacException {
+ if(jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT) == null){
+ try {
+ GFACGSISSHUtils.addSecurityContext(jobExecutionContext);
+ } catch (ApplicationSettingsException e) {
+ log.error(e.getMessage());
+ throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+ }
+ }
+ log.info("Setup SSH job directorties");
+ super.invoke(jobExecutionContext);
+ makeDirectory(jobExecutionContext);
+
+ }
+ private void makeDirectory(JobExecutionContext jobExecutionContext) throws GFacException {
+ Cluster cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getPbsCluster();
+ if (cluster == null) {
+ throw new GFacHandlerException("Security context is not set properly");
+ } else {
+ log.info("Successfully retrieved the Security Context");
+ }
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
+ try {
+ String workingDirectory = app.getScratchWorkingDirectory();
+ cluster.makeDirectory(workingDirectory);
+ cluster.makeDirectory(app.getScratchWorkingDirectory());
+ cluster.makeDirectory(app.getInputDataDirectory());
+ cluster.makeDirectory(app.getOutputDataDirectory());
+ DataTransferDetails detail = new DataTransferDetails();
+ TransferStatus status = new TransferStatus();
+ status.setTransferState(TransferState.DIRECTORY_SETUP);
+ detail.setTransferStatus(status);
+ detail.setTransferDescription("Working directory = " + workingDirectory);
+
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+
+ } catch (SSHApiException e) {
+ throw new GFacHandlerException("Error executing the Handler: " + GSISSHDirectorySetupHandler.class, e);
+ } catch (Exception e) {
+ DataTransferDetails detail = new DataTransferDetails();
+ TransferStatus status = new TransferStatus();
+ status.setTransferState(TransferState.FAILED);
+ detail.setTransferStatus(status);
+ try {
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+ GFacUtils.saveErrorDetails(e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE, jobExecutionContext.getTaskData().getTaskID());
+ } catch (Exception e1) {
+ throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
+ }
+ throw new GFacHandlerException("Error executing the Handler: " + GSISSHDirectorySetupHandler.class, e);
+ }
+ }
+
+ public void initProperties(Map<String, String> properties) throws GFacHandlerException, GFacException {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b6c7c41e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/GSISSHInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/GSISSHInputHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/GSISSHInputHandler.java
new file mode 100644
index 0000000..fa8d248
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/GSISSHInputHandler.java
@@ -0,0 +1,137 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.handler;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.StringUtil;
+import org.apache.airavata.commons.gfac.type.ActualParameter;
+import org.apache.airavata.commons.gfac.type.MappingFactory;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.MessageContext;
+import org.apache.airavata.gfac.context.security.GSISecurityContext;
+import org.apache.airavata.gfac.utils.GFACGSISSHUtils;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.model.workspace.experiment.*;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.apache.airavata.schemas.gfac.URIArrayType;
+import org.apache.airavata.schemas.gfac.URIParameterType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+
+public class GSISSHInputHandler extends AbstractHandler {
+
+ private static final Logger log = LoggerFactory.getLogger(GSISSHInputHandler.class);
+
+
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException, GFacException {
+ if(jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT) == null){
+ try {
+ GFACGSISSHUtils.addSecurityContext(jobExecutionContext);
+ } catch (ApplicationSettingsException e) {
+ log.error(e.getMessage());
+ throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+ }
+ }
+ log.info("Invoking SCPInputHandler");
+ super.invoke(jobExecutionContext);
+
+ DataTransferDetails detail = new DataTransferDetails();
+ TransferStatus status = new TransferStatus();
+
+ MessageContext inputNew = new MessageContext();
+ try {
+ MessageContext input = jobExecutionContext.getInMessageContext();
+ Set<String> parameters = input.getParameters().keySet();
+ for (String paramName : parameters) {
+ ActualParameter actualParameter = (ActualParameter) input.getParameters().get(paramName);
+ String paramValue = MappingFactory.toString(actualParameter);
+ //TODO: Review this with type
+ if ("URI".equals(actualParameter.getType().getType().toString())) {
+ ((URIParameterType) actualParameter.getType()).setValue(stageInputFiles(jobExecutionContext, paramValue));
+ } else if ("URIArray".equals(actualParameter.getType().getType().toString())) {
+ List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue));
+ List<String> newFiles = new ArrayList<String>();
+ for (String paramValueEach : split) {
+ String stageInputFiles = stageInputFiles(jobExecutionContext, paramValueEach);
+ status.setTransferState(TransferState.UPLOAD);
+ detail.setTransferStatus(status);
+ detail.setTransferDescription("Input Data Staged: " + stageInputFiles);
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+ newFiles.add(stageInputFiles);
+ }
+ ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
+ }
+ inputNew.getParameters().put(paramName, actualParameter);
+ }
+ } catch (Exception e) {
+ log.error(e.getMessage());
+ status.setTransferState(TransferState.FAILED);
+ detail.setTransferStatus(status);
+ try {
+ GFacUtils.saveErrorDetails(e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE, jobExecutionContext.getTaskData().getTaskID());
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+ } catch (Exception e1) {
+ throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
+ }
+ throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
+ }
+ jobExecutionContext.setInMessageContext(inputNew);
+ }
+
+ private static String stageInputFiles(JobExecutionContext jobExecutionContext, String paramValue) throws IOException, GFacException {
+ Cluster cluster = null;
+ if (jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT) != null) {
+ cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getPbsCluster();
+ } else {
+ cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getPbsCluster();
+ }
+ if (cluster == null) {
+ throw new GFacException("Security context is not set properly");
+ } else {
+ log.info("Successfully retrieved the Security Context");
+ }
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
+ int i = paramValue.lastIndexOf(File.separator);
+ String substring = paramValue.substring(i + 1);
+ try {
+ String targetFile = app.getInputDataDirectory() + File.separator + substring;
+ if(paramValue.startsWith("file")){
+ paramValue = paramValue.substring(paramValue.indexOf(":") + 1, paramValue.length());
+ }
+ cluster.scpTo(targetFile, paramValue);
+ return targetFile;
+ } catch (SSHApiException e) {
+ throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
+ }
+ }
+
+ public void initProperties(Map<String, String> properties) throws GFacHandlerException, GFacException {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b6c7c41e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/GSISSHOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/GSISSHOutputHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/GSISSHOutputHandler.java
new file mode 100644
index 0000000..08ed886
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/GSISSHOutputHandler.java
@@ -0,0 +1,216 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.handler;
+
+import net.schmizz.sshj.connection.ConnectionException;
+import net.schmizz.sshj.transport.TransportException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.commons.gfac.type.ActualParameter;
+import org.apache.airavata.commons.gfac.type.ApplicationDescription;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.security.GSISecurityContext;
+import org.apache.airavata.gfac.provider.GFacProviderException;
+import org.apache.airavata.gfac.utils.GFACGSISSHUtils;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.gfac.utils.OutputUtils;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
+import org.apache.airavata.model.workspace.experiment.*;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.apache.airavata.registry.cpi.DataType;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.apache.airavata.schemas.gfac.GsisshHostType;
+import org.apache.airavata.schemas.gfac.URIParameterType;
+import org.apache.xmlbeans.XmlException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class GSISSHOutputHandler extends AbstractHandler{
+ private static final Logger log = LoggerFactory.getLogger(GSISSHOutputHandler.class);
+
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException, GFacException {
+ if(jobExecutionContext.getApplicationContext().getHostDescription().getType() instanceof GsisshHostType) { // this is because we don't have the right jobexecution context
+ // so attempting to get it from the registry
+ if (Constants.PUSH.equals(((GsisshHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType()).getMonitorMode())) {
+ log.warn("During the out handler chain jobExecution context came null, so trying to handler");
+ ApplicationDescription applicationDeploymentDescription = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription();
+ TaskDetails taskData = null;
+ try {
+ taskData = (TaskDetails) jobExecutionContext.getRegistry().get(DataType.TASK_DETAIL, jobExecutionContext.getTaskData().getTaskID());
+ } catch (RegistryException e) {
+ log.error("Error retrieving job details from Registry");
+ throw new GFacHandlerException("Error retrieving job details from Registry", e);
+ }
+ JobDetails jobDetails = taskData.getJobDetailsList().get(0);
+ String jobDescription = jobDetails.getJobDescription();
+ if (jobDescription != null) {
+ JobDescriptor jobDescriptor = null;
+ try {
+ jobDescriptor = JobDescriptor.fromXML(jobDescription);
+ } catch (XmlException e1) {
+ e1.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ applicationDeploymentDescription.getType().setScratchWorkingDirectory(
+ jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getWorkingDirectory());
+ applicationDeploymentDescription.getType().setInputDataDirectory(jobDescriptor.getInputDirectory());
+ applicationDeploymentDescription.getType().setOutputDataDirectory(jobDescriptor.getOutputDirectory());
+ applicationDeploymentDescription.getType().setStandardError(jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getStandardErrorFile());
+ applicationDeploymentDescription.getType().setStandardOutput(jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getStandardOutFile());
+ }
+ }
+ }
+
+ if (jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT) == null) {
+ try {
+ GFACGSISSHUtils.addSecurityContext(jobExecutionContext);
+ } catch (ApplicationSettingsException e) {
+ log.error(e.getMessage());
+ throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+ }
+ }
+ super.invoke(jobExecutionContext);
+ DataTransferDetails detail = new DataTransferDetails();
+ TransferStatus status = new TransferStatus();
+
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext()
+ .getApplicationDeploymentDescription().getType();
+ try {
+ Cluster cluster = null;
+ if (jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT) != null) {
+ cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getPbsCluster();
+ } else {
+ cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getPbsCluster();
+ }
+ if (cluster == null) {
+ throw new GFacProviderException("Security context is not set properly");
+ } else {
+ log.info("Successfully retrieved the Security Context");
+ }
+
+ // Get the Stdouts and StdErrs
+ String timeStampedServiceName = GFacUtils.createUniqueNameForService(jobExecutionContext.getServiceName());
+
+ TaskDetails taskData = jobExecutionContext.getTaskData();
+ String outputDataDir = null;
+ File localStdOutFile;
+ File localStdErrFile;
+
+ if (taskData.getAdvancedOutputDataHandling() != null) {
+ outputDataDir = taskData.getAdvancedOutputDataHandling().getOutputDataDir();
+ }
+ if (outputDataDir == null) {
+ outputDataDir = File.separator + "tmp";
+ }
+ outputDataDir = outputDataDir + File.separator + jobExecutionContext.getExperimentID() + "-" + jobExecutionContext.getTaskData().getTaskID();
+ (new File(outputDataDir)).mkdirs();
+
+
+ localStdOutFile = new File(outputDataDir + File.separator + timeStampedServiceName + "stdout");
+ localStdErrFile = new File(outputDataDir + File.separator + timeStampedServiceName + "stderr");
+// cluster.makeDirectory(outputDataDir);
+ cluster.scpFrom(app.getStandardOutput(), localStdOutFile.getAbsolutePath());
+ Thread.sleep(1000);
+ cluster.scpFrom(app.getStandardError(), localStdErrFile.getAbsolutePath());
+ Thread.sleep(1000);
+
+ String stdOutStr = GFacUtils.readFileToString(localStdOutFile.getAbsolutePath());
+ String stdErrStr = GFacUtils.readFileToString(localStdErrFile.getAbsolutePath());
+ status.setTransferState(TransferState.COMPLETE);
+ detail.setTransferStatus(status);
+ detail.setTransferDescription("STDOUT:" + stdOutStr);
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+
+ status.setTransferState(TransferState.COMPLETE);
+ detail.setTransferStatus(status);
+ detail.setTransferDescription("STDERR:" + stdErrStr);
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+
+
+ Map<String, ActualParameter> stringMap = new HashMap<String, ActualParameter>();
+ Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters();
+ Set<String> keys = output.keySet();
+ for (String paramName : keys) {
+ ActualParameter actualParameter = (ActualParameter) output.get(paramName);
+ if ("URI".equals(actualParameter.getType().getType().toString())) {
+
+ List<String> outputList = cluster.listDirectory(app.getOutputDataDirectory());
+ if (outputList.size() == 0 || outputList.get(0).isEmpty()) {
+ stringMap = OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr);
+ } else {
+ String valueList = outputList.get(0);
+ cluster.scpFrom(app.getOutputDataDirectory() + File.separator + valueList, outputDataDir);
+ jobExecutionContext.addOutputFile(outputDataDir + File.separator + valueList);
+ ((URIParameterType) actualParameter.getType()).setValue(valueList);
+ stringMap = new HashMap<String, ActualParameter>();
+ stringMap.put(paramName, actualParameter);
+ }
+ } else {
+ stringMap = OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr);
+ }
+ }
+ if (stringMap == null || stringMap.isEmpty()) {
+ throw new GFacHandlerException(
+ "Empty Output returned from the Application, Double check the application"
+ + "and ApplicationDescriptor output Parameter Names");
+ }
+ status.setTransferState(TransferState.DOWNLOAD);
+ detail.setTransferStatus(status);
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+
+ app.setStandardError(localStdErrFile.getAbsolutePath());
+ app.setStandardOutput(localStdOutFile.getAbsolutePath());
+ app.setOutputDataDirectory(outputDataDir);
+ } catch (XmlException e) {
+ throw new GFacHandlerException("Cannot read output:" + e.getMessage(), e);
+ } catch (ConnectionException e) {
+ throw new GFacHandlerException(e.getMessage(), e);
+ } catch (TransportException e) {
+ throw new GFacHandlerException(e.getMessage(), e);
+ } catch (IOException e) {
+ throw new GFacHandlerException(e.getMessage(), e);
+ } catch (Exception e) {
+ try {
+ status.setTransferState(TransferState.FAILED);
+ detail.setTransferStatus(status);
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+ GFacUtils.saveErrorDetails(e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE, jobExecutionContext.getTaskData().getTaskID());
+ } catch (Exception e1) {
+ throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
+ }
+ throw new GFacHandlerException("Error in retrieving results", e);
+ }
+
+ }
+
+ public void initProperties(Map<String, String> properties) throws GFacHandlerException, GFacException {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b6c7c41e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorID.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorID.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorID.java
index a01dcba..a481564 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorID.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorID.java
@@ -72,6 +72,7 @@ public class MonitorID {
this.jobID = jobID;
this.taskID = taskID;
this.experimentID = experimentID;
+ this.workflowNodeID = workflowNodeID;
}
public MonitorID(HostDescription host, String jobID,String taskID, String workflowNodeID, String experimentID, String userName,AuthenticationInfo authenticationInfo) {
@@ -85,6 +86,7 @@ public class MonitorID {
this.userName = ((MyProxyAuthenticationInfo)this.authenticationInfo).getUserName();
}
}
+ this.workflowNodeID = workflowNodeID;
this.jobID = jobID;
this.taskID = taskID;
this.experimentID = experimentID;
http://git-wip-us.apache.org/repos/asf/airavata/blob/b6c7c41e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
index 1f85921..da6baf8 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
@@ -26,7 +26,7 @@ import org.slf4j.LoggerFactory;
/**
* This is the abstract Monitor which needs to be used by
- * any Monitoring implementation which expect to consume
+ * any Monitoring implementation which expect nto consume
* to store the status to registry. Because they have to
* use the MonitorPublisher to publish the monitoring statuses
* to the Event Bus. All the Monitor statuses publish to the eventbus
http://git-wip-us.apache.org/repos/asf/airavata/blob/b6c7c41e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/QstatMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/QstatMonitor.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/QstatMonitor.java
index 4e2e18e..020d9ae 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/QstatMonitor.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/QstatMonitor.java
@@ -23,6 +23,7 @@ package org.apache.airavata.gfac.monitor.impl.pull.qstat;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.commons.gfac.type.HostDescription;
import org.apache.airavata.gfac.monitor.HostMonitorData;
+import org.apache.airavata.gfac.monitor.JobIdentity;
import org.apache.airavata.gfac.monitor.MonitorID;
import org.apache.airavata.gfac.monitor.UserMonitorData;
import org.apache.airavata.gfac.monitor.core.PullMonitor;
@@ -133,8 +134,7 @@ public class QstatMonitor extends PullMonitor {
for (MonitorID iMonitorID : monitorID) {
currentMonitorID = iMonitorID;
iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID()));
- jobStatus.setMonitorID(iMonitorID);
- jobStatus.setState(iMonitorID.getStatus());
+ jobStatus = new JobStatusChangeRequest(iMonitorID);
// we have this JobStatus class to handle amqp monitoring
publisher.publish(jobStatus);
http://git-wip-us.apache.org/repos/asf/airavata/blob/b6c7c41e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/JobStatusChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/JobStatusChangeRequest.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/JobStatusChangeRequest.java
index 5ea9eb5..da52656 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/JobStatusChangeRequest.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/JobStatusChangeRequest.java
@@ -41,6 +41,12 @@ public class JobStatusChangeRequest extends AbstractStateChangeRequest {
public JobStatusChangeRequest() {
}
+ public JobStatusChangeRequest(MonitorID monitorID) {
+ setIdentity(new JobIdentity(monitorID.getExperimentID(),monitorID.getWorkflowNodeID(),
+ monitorID.getTaskID(),monitorID.getJobID()));
+ setMonitorID(monitorID);
+ this.state = monitorID.getStatus();
+ }
public JobStatusChangeRequest(MonitorID monitorID, JobIdentity jobId, JobState state) {
setIdentity(jobId);
setMonitorID(monitorID);
http://git-wip-us.apache.org/repos/asf/airavata/blob/b6c7c41e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GFACGSISSHUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GFACGSISSHUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GFACGSISSHUtils.java
new file mode 100644
index 0000000..81cf82f
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GFACGSISSHUtils.java
@@ -0,0 +1,98 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.utils;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.RequestData;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.security.GSISecurityContext;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.gsi.ssh.api.ServerInfo;
+import org.apache.airavata.gsi.ssh.api.authentication.GSIAuthenticationInfo;
+import org.apache.airavata.gsi.ssh.api.job.JobManagerConfiguration;
+import org.apache.airavata.gsi.ssh.impl.PBSCluster;
+import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
+import org.apache.airavata.gsi.ssh.util.CommonUtils;
+import org.apache.airavata.schemas.gfac.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class GFACGSISSHUtils {
+ private final static Logger logger = LoggerFactory.getLogger(GFACGSISSHUtils.class);
+
+ public static final String PBS_JOB_MANAGER = "pbs";
+ public static final String SLURM_JOB_MANAGER = "slurm";
+ public static final String SUN_GRID_ENGINE_JOB_MANAGER = "sge";
+
+ public static void addSecurityContext(JobExecutionContext jobExecutionContext) throws GFacException, ApplicationSettingsException {
+ RequestData requestData = new RequestData("default");
+ GSISecurityContext context = null;
+ try {
+ //todo fix this
+ context = new GSISecurityContext(null, requestData);
+ } catch (Exception e) {
+ throw new GFacException("An error occurred while creating GSI security context", e);
+ }
+ HostDescription registeredHost = jobExecutionContext.getApplicationContext().getHostDescription();
+ if (registeredHost.getType() instanceof GlobusHostType || registeredHost.getType() instanceof UnicoreHostType
+ || registeredHost.getType() instanceof SSHHostType) {
+ logger.error("This is a wrong method to invoke to non ssh host types,please check your gfac-config.xml");
+ } else if (registeredHost.getType() instanceof GsisshHostType) {
+ GSIAuthenticationInfo authenticationInfo
+ = new MyProxyAuthenticationInfo(requestData.getMyProxyUserName(), requestData.getMyProxyPassword(), requestData.getMyProxyServerUrl(),
+ requestData.getMyProxyPort(), requestData.getMyProxyLifeTime(), System.getProperty(Constants.TRUSTED_CERTIFICATE_SYSTEM_PROPERTY));
+ GsisshHostType gsisshHostType = (GsisshHostType) registeredHost.getType();
+ ServerInfo serverInfo = new ServerInfo(requestData.getMyProxyUserName(), registeredHost.getType().getHostAddress(),
+ gsisshHostType.getPort());
+
+ Cluster pbsCluster = null;
+ try {
+ JobManagerConfiguration jConfig = null;
+ String installedParentPath = ((HpcApplicationDeploymentType)
+ jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType()).getInstalledParentPath();
+ String jobManager = ((GsisshHostType) registeredHost.getType()).getJobManager();
+ if (jobManager == null) {
+ logger.error("No Job Manager is configured, so we are picking pbs as the default job manager");
+ jConfig = CommonUtils.getPBSJobManager(installedParentPath);
+ } else {
+ if (PBS_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
+ jConfig = CommonUtils.getPBSJobManager(installedParentPath);
+ } else if (SLURM_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
+ jConfig = CommonUtils.getSLURMJobManager(installedParentPath);
+ } else if (SUN_GRID_ENGINE_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
+ jConfig = CommonUtils.getSGEJobManager(installedParentPath);
+ }
+ }
+ pbsCluster = new PBSCluster(serverInfo, authenticationInfo, jConfig);
+ } catch (SSHApiException e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+
+ context.setPbsCluster(pbsCluster);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b6c7c41e/modules/gfac/gfac-core/src/test/resources/gfac-config.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/test/resources/gfac-config.xml b/modules/gfac/gfac-core/src/test/resources/gfac-config.xml
index 61dca4f..7c7e704 100644
--- a/modules/gfac/gfac-core/src/test/resources/gfac-config.xml
+++ b/modules/gfac/gfac-core/src/test/resources/gfac-config.xml
@@ -71,20 +71,20 @@
<Provider class="org.apache.airavata.gfac.provider.impl.SSHProvider" host="org.apache.airavata.schemas.gfac.impl.SSHHostTypeImpl">
<InHandlers>
- <Handler class="org.apache.airavata.gfac.handler.SCPDirectorySetupHandler"/>
- <Handler class="org.apache.airavata.gfac.handler.SCPInputHandler"/>
+ <Handler class="org.apache.airavata.gfac.handler.SSHDirectorySetupHandler"/>
+ <Handler class="org.apache.airavata.gfac.handler.GSISSHInputHandler"/>
</InHandlers>
<OutHandlers>
- <Handler class="org.apache.airavata.gfac.handler.SCPOutputHandler"/>
+ <Handler class="org.apache.airavata.gfac.handler.GSISSHOutputHandler"/>
</OutHandlers>
</Provider>
<Provider class="org.apache.airavata.gfac.provider.impl.GSISSHProvider" host="org.apache.airavata.schemas.gfac.impl.GsisshHostTypeImpl">
<InHandlers>
- <Handler class="org.apache.airavata.gfac.handler.SCPDirectorySetupHandler"/>
- <Handler class="org.apache.airavata.gfac.handler.SCPInputHandler"/>
+ <Handler class="org.apache.airavata.gfac.handler.GSISSHDirectorySetupHandler"/>
+ <Handler class="org.apache.airavata.gfac.handler.GSISSHInputHandler"/>
</InHandlers>
<OutHandlers>
- <Handler class="org.apache.airavata.gfac.handler.SCPOutputHandler"/>
+ <Handler class="org.apache.airavata.gfac.handler.GSISSHOutputHandler"/>
</OutHandlers>
</Provider>
</GFac>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/b6c7c41e/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SCPDirectorySetupHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SCPDirectorySetupHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SCPDirectorySetupHandler.java
deleted file mode 100644
index 51a2054..0000000
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SCPDirectorySetupHandler.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.handler;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.context.JobExecutionContext;
-import org.apache.airavata.gfac.context.security.GSISecurityContext;
-import org.apache.airavata.gfac.context.security.SSHSecurityContext;
-import org.apache.airavata.gfac.util.GFACSSHUtils;
-import org.apache.airavata.gfac.utils.GFacUtils;
-import org.apache.airavata.gsi.ssh.api.Cluster;
-import org.apache.airavata.gsi.ssh.api.SSHApiException;
-import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
-import org.apache.airavata.model.workspace.experiment.DataTransferDetails;
-import org.apache.airavata.model.workspace.experiment.ErrorCategory;
-import org.apache.airavata.model.workspace.experiment.TransferState;
-import org.apache.airavata.model.workspace.experiment.TransferStatus;
-import org.apache.airavata.registry.cpi.ChildDataType;
-import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SCPDirectorySetupHandler extends AbstractHandler{
- private static final Logger log = LoggerFactory.getLogger(SCPDirectorySetupHandler.class);
-
- public void invoke(JobExecutionContext jobExecutionContext) throws GFacException {
- if(jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null){
- try {
- GFACSSHUtils.addSecurityContext(jobExecutionContext);
- } catch (ApplicationSettingsException e) {
- log.error(e.getMessage());
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
- }
- }
- log.info("Setup SSH job directorties");
- super.invoke(jobExecutionContext);
- makeDirectory(jobExecutionContext);
-
- }
- private void makeDirectory(JobExecutionContext jobExecutionContext) throws GFacException {
- Cluster cluster = null;
- if (jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT) != null) {
- cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getPbsCluster();
- } else {
- cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster();
- }
- if (cluster == null) {
- throw new GFacHandlerException("Security context is not set properly");
- } else {
- log.info("Successfully retrieved the Security Context");
- }
- ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
- try {
- String workingDirectory = app.getScratchWorkingDirectory();
- cluster.makeDirectory(workingDirectory);
- cluster.makeDirectory(app.getScratchWorkingDirectory());
- cluster.makeDirectory(app.getInputDataDirectory());
- cluster.makeDirectory(app.getOutputDataDirectory());
- DataTransferDetails detail = new DataTransferDetails();
- TransferStatus status = new TransferStatus();
- status.setTransferState(TransferState.DIRECTORY_SETUP);
- detail.setTransferStatus(status);
- detail.setTransferDescription("Working directory = " + workingDirectory);
-
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
-
- } catch (SSHApiException e) {
- throw new GFacHandlerException("Error executing the Handler: " + SCPDirectorySetupHandler.class,e);
- }catch (Exception e){
- DataTransferDetails detail = new DataTransferDetails();
- TransferStatus status = new TransferStatus();
- status.setTransferState(TransferState.FAILED);
- detail.setTransferStatus(status);
- try {
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
- GFacUtils.saveErrorDetails(e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE, jobExecutionContext.getTaskData().getTaskID());
- } catch (Exception e1) {
- throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
- }
- throw new GFacHandlerException("Error executing the Handler: " + SCPDirectorySetupHandler.class,e);
- }
- }
-
- public void initProperties(Map<String, String> properties) throws GFacHandlerException, GFacException {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b6c7c41e/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SCPInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SCPInputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SCPInputHandler.java
deleted file mode 100644
index bf22ad6..0000000
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SCPInputHandler.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.handler;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.StringUtil;
-import org.apache.airavata.commons.gfac.type.ActualParameter;
-import org.apache.airavata.commons.gfac.type.MappingFactory;
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.context.JobExecutionContext;
-import org.apache.airavata.gfac.context.MessageContext;
-import org.apache.airavata.gfac.context.security.GSISecurityContext;
-import org.apache.airavata.gfac.context.security.SSHSecurityContext;
-import org.apache.airavata.gfac.util.GFACSSHUtils;
-import org.apache.airavata.gfac.utils.GFacUtils;
-import org.apache.airavata.gsi.ssh.api.Cluster;
-import org.apache.airavata.gsi.ssh.api.SSHApiException;
-import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
-import org.apache.airavata.model.workspace.experiment.DataTransferDetails;
-import org.apache.airavata.model.workspace.experiment.ErrorCategory;
-import org.apache.airavata.model.workspace.experiment.ErrorDetails;
-import org.apache.airavata.model.workspace.experiment.TransferState;
-import org.apache.airavata.model.workspace.experiment.TransferStatus;
-import org.apache.airavata.registry.cpi.ChildDataType;
-import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
-import org.apache.airavata.schemas.gfac.URIArrayType;
-import org.apache.airavata.schemas.gfac.URIParameterType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SCPInputHandler extends AbstractHandler {
-
- private static final Logger log = LoggerFactory.getLogger(SCPInputHandler.class);
-
-
- public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException, GFacException {
- if(jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null){
- try {
- GFACSSHUtils.addSecurityContext(jobExecutionContext);
- } catch (ApplicationSettingsException e) {
- log.error(e.getMessage());
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
- }
- }
- log.info("Invoking SCPInputHandler");
- super.invoke(jobExecutionContext);
-
- DataTransferDetails detail = new DataTransferDetails();
- TransferStatus status = new TransferStatus();
-
- MessageContext inputNew = new MessageContext();
- try {
- MessageContext input = jobExecutionContext.getInMessageContext();
- Set<String> parameters = input.getParameters().keySet();
- for (String paramName : parameters) {
- ActualParameter actualParameter = (ActualParameter) input.getParameters().get(paramName);
- String paramValue = MappingFactory.toString(actualParameter);
- //TODO: Review this with type
- if ("URI".equals(actualParameter.getType().getType().toString())) {
- ((URIParameterType) actualParameter.getType()).setValue(stageInputFiles(jobExecutionContext, paramValue));
- } else if ("URIArray".equals(actualParameter.getType().getType().toString())) {
- List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue));
- List<String> newFiles = new ArrayList<String>();
- for (String paramValueEach : split) {
- String stageInputFiles = stageInputFiles(jobExecutionContext, paramValueEach);
- status.setTransferState(TransferState.UPLOAD);
- detail.setTransferStatus(status);
- detail.setTransferDescription("Input Data Staged: " + stageInputFiles);
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
- newFiles.add(stageInputFiles);
- }
- ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
- }
- inputNew.getParameters().put(paramName, actualParameter);
- }
- } catch (Exception e) {
- log.error(e.getMessage());
- status.setTransferState(TransferState.FAILED);
- detail.setTransferStatus(status);
- try {
- GFacUtils.saveErrorDetails(e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE, jobExecutionContext.getTaskData().getTaskID());
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
- } catch (Exception e1) {
- throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
- }
- throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
- }
- jobExecutionContext.setInMessageContext(inputNew);
- }
-
- private static String stageInputFiles(JobExecutionContext jobExecutionContext, String paramValue) throws IOException, GFacException {
- Cluster cluster = null;
- if (jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT) != null) {
- cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getPbsCluster();
- } else {
- cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster();
- }
- if (cluster == null) {
- throw new GFacException("Security context is not set properly");
- } else {
- log.info("Successfully retrieved the Security Context");
- }
- ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
- int i = paramValue.lastIndexOf(File.separator);
- String substring = paramValue.substring(i + 1);
- try {
- String targetFile = app.getInputDataDirectory() + File.separator + substring;
- if(paramValue.startsWith("file")){
- paramValue = paramValue.substring(paramValue.indexOf(":") + 1, paramValue.length());
- }
- cluster.scpTo(targetFile, paramValue);
- return targetFile;
- } catch (SSHApiException e) {
- throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
- }
- }
-
- public void initProperties(Map<String, String> properties) throws GFacHandlerException, GFacException {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b6c7c41e/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java
deleted file mode 100644
index 789d188..0000000
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.handler;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import net.schmizz.sshj.connection.ConnectionException;
-import net.schmizz.sshj.transport.TransportException;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.commons.gfac.type.ActualParameter;
-import org.apache.airavata.commons.gfac.type.ApplicationDescription;
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.Scheduler;
-import org.apache.airavata.gfac.context.JobExecutionContext;
-import org.apache.airavata.gfac.context.security.GSISecurityContext;
-import org.apache.airavata.gfac.context.security.SSHSecurityContext;
-import org.apache.airavata.gfac.provider.GFacProviderException;
-import org.apache.airavata.gfac.util.GFACSSHUtils;
-import org.apache.airavata.gfac.utils.GFacUtils;
-import org.apache.airavata.gfac.utils.OutputUtils;
-import org.apache.airavata.gsi.ssh.api.Cluster;
-import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
-import org.apache.airavata.gsi.ssh.util.SSHUtils;
-import org.apache.airavata.model.workspace.experiment.*;
-import org.apache.airavata.persistance.registry.jpa.model.DataTransferDetail;
-import org.apache.airavata.registry.cpi.ChildDataType;
-import org.apache.airavata.registry.cpi.DataType;
-import org.apache.airavata.registry.cpi.RegistryException;
-import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
-import org.apache.airavata.schemas.gfac.URIParameterType;
-import org.apache.xmlbeans.XmlException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SCPOutputHandler extends AbstractHandler{
- private static final Logger log = LoggerFactory.getLogger(SCPOutputHandler.class);
-
- public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException, GFacException {
- if(jobExecutionContext.getTaskData().getJobDetailsListSize() == 0) { // this is because we don't have the right jobexecution context
- // so attempting to get it from the registry
- log.warn("During the out handler chain jobExecution context came null, so trying to handler");
- ApplicationDescription applicationDeploymentDescription = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription();
- TaskDetails taskData = null;
- try {
- taskData = (TaskDetails) registry.get(DataType.TASK_DETAIL, jobExecutionContext.getTaskData().getTaskID());
- } catch (RegistryException e) {
- log.error("Error retrieving job details from Registry");
- throw new GFacHandlerException("Error retrieving job details from Registry", e);
- }
- JobDetails jobDetails = taskData.getJobDetailsList().get(0);
- String jobDescription = jobDetails.getJobDescription();
- if (jobDescription != null) {
- JobDescriptor jobDescriptor = null;
- try {
- jobDescriptor = JobDescriptor.fromXML(jobDescription);
- } catch (XmlException e1) {
- e1.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- applicationDeploymentDescription.getType().setScratchWorkingDirectory(
- jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getWorkingDirectory());
- applicationDeploymentDescription.getType().setInputDataDirectory(jobDescriptor.getInputDirectory());
- applicationDeploymentDescription.getType().setOutputDataDirectory(jobDescriptor.getOutputDirectory());
- applicationDeploymentDescription.getType().setStandardError(jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getStandardErrorFile());
- applicationDeploymentDescription.getType().setStandardOutput(jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getStandardOutFile());
- }
- }
-
- if (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null) {
- try {
- GFACSSHUtils.addSecurityContext(jobExecutionContext);
- } catch (ApplicationSettingsException e) {
- log.error(e.getMessage());
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
- }
- }
- super.invoke(jobExecutionContext);
- DataTransferDetails detail = new DataTransferDetails();
- TransferStatus status = new TransferStatus();
-
- ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext()
- .getApplicationDeploymentDescription().getType();
- try {
- Cluster cluster = null;
- if (jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT) != null) {
- cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getPbsCluster();
- } else {
- cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster();
- }
- if (cluster == null) {
- throw new GFacProviderException("Security context is not set properly");
- } else {
- log.info("Successfully retrieved the Security Context");
- }
-
- // Get the Stdouts and StdErrs
- String timeStampedServiceName = GFacUtils.createUniqueNameForService(jobExecutionContext.getServiceName());
-
- TaskDetails taskData = jobExecutionContext.getTaskData();
- String outputDataDir = null;
- File localStdOutFile;
- File localStdErrFile;
-
- if (taskData.getAdvancedOutputDataHandling() != null) {
- outputDataDir = taskData.getAdvancedOutputDataHandling().getOutputDataDir();
- }
- if (outputDataDir == null) {
- outputDataDir = File.separator + "tmp";
- }
- outputDataDir = outputDataDir + File.separator + jobExecutionContext.getExperimentID() + "-" + jobExecutionContext.getTaskData().getTaskID();
- (new File(outputDataDir)).mkdirs();
-
-
- localStdOutFile = new File(outputDataDir + File.separator + timeStampedServiceName + "stdout");
- localStdErrFile = new File(outputDataDir + File.separator + timeStampedServiceName + "stderr");
-// cluster.makeDirectory(outputDataDir);
- cluster.scpFrom(app.getStandardOutput(), localStdOutFile.getAbsolutePath());
- Thread.sleep(1000);
- cluster.scpFrom(app.getStandardError(), localStdErrFile.getAbsolutePath());
- Thread.sleep(1000);
-
- String stdOutStr = GFacUtils.readFileToString(localStdOutFile.getAbsolutePath());
- String stdErrStr = GFacUtils.readFileToString(localStdErrFile.getAbsolutePath());
- status.setTransferState(TransferState.COMPLETE);
- detail.setTransferStatus(status);
- detail.setTransferDescription("STDOUT:" + stdOutStr);
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
-
- status.setTransferState(TransferState.COMPLETE);
- detail.setTransferStatus(status);
- detail.setTransferDescription("STDERR:" + stdErrStr);
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
-
-
- Map<String, ActualParameter> stringMap = new HashMap<String, ActualParameter>();
- Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters();
- Set<String> keys = output.keySet();
- for (String paramName : keys) {
- ActualParameter actualParameter = (ActualParameter) output.get(paramName);
- if ("URI".equals(actualParameter.getType().getType().toString())) {
-
- List<String> outputList = cluster.listDirectory(app.getOutputDataDirectory());
- if (outputList.size() == 0 || outputList.get(0).isEmpty()) {
- stringMap = OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr);
- } else {
- String valueList = outputList.get(0);
- cluster.scpFrom(app.getOutputDataDirectory() + File.separator + valueList, outputDataDir);
- jobExecutionContext.addOutputFile(outputDataDir + File.separator + valueList);
- ((URIParameterType) actualParameter.getType()).setValue(valueList);
- stringMap = new HashMap<String, ActualParameter>();
- stringMap.put(paramName, actualParameter);
- }
- } else {
- stringMap = OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr);
- }
- }
- if (stringMap == null || stringMap.isEmpty()) {
- throw new GFacHandlerException(
- "Empty Output returned from the Application, Double check the application"
- + "and ApplicationDescriptor output Parameter Names");
- }
- status.setTransferState(TransferState.DOWNLOAD);
- detail.setTransferStatus(status);
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
-
- app.setStandardError(localStdErrFile.getAbsolutePath());
- app.setStandardOutput(localStdOutFile.getAbsolutePath());
- app.setOutputDataDirectory(outputDataDir);
- } catch (XmlException e) {
- throw new GFacHandlerException("Cannot read output:" + e.getMessage(), e);
- } catch (ConnectionException e) {
- throw new GFacHandlerException(e.getMessage(), e);
- } catch (TransportException e) {
- throw new GFacHandlerException(e.getMessage(), e);
- } catch (IOException e) {
- throw new GFacHandlerException(e.getMessage(), e);
- } catch (Exception e) {
- try {
- status.setTransferState(TransferState.FAILED);
- detail.setTransferStatus(status);
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
- GFacUtils.saveErrorDetails(e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE, jobExecutionContext.getTaskData().getTaskID());
- } catch (Exception e1) {
- throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
- }
- throw new GFacHandlerException("Error in retrieving results", e);
- }
-
- }
-
- public void initProperties(Map<String, String> properties) throws GFacHandlerException, GFacException {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b6c7c41e/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHDirectorySetupHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHDirectorySetupHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHDirectorySetupHandler.java
new file mode 100644
index 0000000..70358f9
--- /dev/null
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHDirectorySetupHandler.java
@@ -0,0 +1,102 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.handler;
+
+import java.util.Map;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.security.SSHSecurityContext;
+import org.apache.airavata.gfac.util.GFACSSHUtils;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+import org.apache.airavata.model.workspace.experiment.DataTransferDetails;
+import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.model.workspace.experiment.TransferState;
+import org.apache.airavata.model.workspace.experiment.TransferStatus;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SSHDirectorySetupHandler extends AbstractHandler{
+ private static final Logger log = LoggerFactory.getLogger(SSHDirectorySetupHandler.class);
+
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacException {
+ if(jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null){
+ try {
+ GFACSSHUtils.addSecurityContext(jobExecutionContext);
+ } catch (ApplicationSettingsException e) {
+ log.error(e.getMessage());
+ throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+ }
+ }
+ log.info("Setup SSH job directorties");
+ super.invoke(jobExecutionContext);
+ makeDirectory(jobExecutionContext);
+
+ }
+ private void makeDirectory(JobExecutionContext jobExecutionContext) throws GFacException {
+ Cluster cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster();
+ if (cluster == null) {
+ throw new GFacHandlerException("Security context is not set properly");
+ } else {
+ log.info("Successfully retrieved the Security Context");
+ }
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
+ try {
+ String workingDirectory = app.getScratchWorkingDirectory();
+ cluster.makeDirectory(workingDirectory);
+ cluster.makeDirectory(app.getScratchWorkingDirectory());
+ cluster.makeDirectory(app.getInputDataDirectory());
+ cluster.makeDirectory(app.getOutputDataDirectory());
+ DataTransferDetails detail = new DataTransferDetails();
+ TransferStatus status = new TransferStatus();
+ status.setTransferState(TransferState.DIRECTORY_SETUP);
+ detail.setTransferStatus(status);
+ detail.setTransferDescription("Working directory = " + workingDirectory);
+
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+
+ } catch (SSHApiException e) {
+ throw new GFacHandlerException("Error executing the Handler: " + SSHDirectorySetupHandler.class, e);
+ } catch (Exception e) {
+ DataTransferDetails detail = new DataTransferDetails();
+ TransferStatus status = new TransferStatus();
+ status.setTransferState(TransferState.FAILED);
+ detail.setTransferStatus(status);
+ try {
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+ GFacUtils.saveErrorDetails(e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE, jobExecutionContext.getTaskData().getTaskID());
+ } catch (Exception e1) {
+ throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
+ }
+ throw new GFacHandlerException("Error executing the Handler: " + SSHDirectorySetupHandler.class, e);
+ }
+ }
+
+ public void initProperties(Map<String, String> properties) throws GFacHandlerException, GFacException {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b6c7c41e/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHInputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHInputHandler.java
new file mode 100644
index 0000000..861f732
--- /dev/null
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHInputHandler.java
@@ -0,0 +1,146 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.handler;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.StringUtil;
+import org.apache.airavata.commons.gfac.type.ActualParameter;
+import org.apache.airavata.commons.gfac.type.MappingFactory;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.MessageContext;
+import org.apache.airavata.gfac.context.security.GSISecurityContext;
+import org.apache.airavata.gfac.context.security.SSHSecurityContext;
+import org.apache.airavata.gfac.util.GFACSSHUtils;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+import org.apache.airavata.model.workspace.experiment.DataTransferDetails;
+import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.model.workspace.experiment.TransferState;
+import org.apache.airavata.model.workspace.experiment.TransferStatus;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.apache.airavata.schemas.gfac.URIArrayType;
+import org.apache.airavata.schemas.gfac.URIParameterType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SSHInputHandler extends AbstractHandler {
+
+ private static final Logger log = LoggerFactory.getLogger(SSHInputHandler.class);
+
+
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException, GFacException {
+ if(jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null){
+ try {
+ GFACSSHUtils.addSecurityContext(jobExecutionContext);
+ } catch (ApplicationSettingsException e) {
+ log.error(e.getMessage());
+ throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+ }
+ }
+ log.info("Invoking SCPInputHandler");
+ super.invoke(jobExecutionContext);
+
+ DataTransferDetails detail = new DataTransferDetails();
+ TransferStatus status = new TransferStatus();
+
+ MessageContext inputNew = new MessageContext();
+ try {
+ MessageContext input = jobExecutionContext.getInMessageContext();
+ Set<String> parameters = input.getParameters().keySet();
+ for (String paramName : parameters) {
+ ActualParameter actualParameter = (ActualParameter) input.getParameters().get(paramName);
+ String paramValue = MappingFactory.toString(actualParameter);
+ //TODO: Review this with type
+ if ("URI".equals(actualParameter.getType().getType().toString())) {
+ ((URIParameterType) actualParameter.getType()).setValue(stageInputFiles(jobExecutionContext, paramValue));
+ } else if ("URIArray".equals(actualParameter.getType().getType().toString())) {
+ List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue));
+ List<String> newFiles = new ArrayList<String>();
+ for (String paramValueEach : split) {
+ String stageInputFiles = stageInputFiles(jobExecutionContext, paramValueEach);
+ status.setTransferState(TransferState.UPLOAD);
+ detail.setTransferStatus(status);
+ detail.setTransferDescription("Input Data Staged: " + stageInputFiles);
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+ newFiles.add(stageInputFiles);
+ }
+ ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
+ }
+ inputNew.getParameters().put(paramName, actualParameter);
+ }
+ } catch (Exception e) {
+ log.error(e.getMessage());
+ status.setTransferState(TransferState.FAILED);
+ detail.setTransferStatus(status);
+ try {
+ GFacUtils.saveErrorDetails(e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE, jobExecutionContext.getTaskData().getTaskID());
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+ } catch (Exception e1) {
+ throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
+ }
+ throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
+ }
+ jobExecutionContext.setInMessageContext(inputNew);
+ }
+
+ private static String stageInputFiles(JobExecutionContext jobExecutionContext, String paramValue) throws IOException, GFacException {
+ Cluster cluster = null;
+ if (jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT) != null) {
+ cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getPbsCluster();
+ } else {
+ cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster();
+ }
+ if (cluster == null) {
+ throw new GFacException("Security context is not set properly");
+ } else {
+ log.info("Successfully retrieved the Security Context");
+ }
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
+ int i = paramValue.lastIndexOf(File.separator);
+ String substring = paramValue.substring(i + 1);
+ try {
+ String targetFile = app.getInputDataDirectory() + File.separator + substring;
+ if(paramValue.startsWith("file")){
+ paramValue = paramValue.substring(paramValue.indexOf(":") + 1, paramValue.length());
+ }
+ cluster.scpTo(targetFile, paramValue);
+ return targetFile;
+ } catch (SSHApiException e) {
+ throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
+ }
+ }
+
+ public void initProperties(Map<String, String> properties) throws GFacHandlerException, GFacException {
+
+ }
+}