You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/05/05 16:44:26 UTC
[2/8] changing package names of gfac implementations
http://git-wip-us.apache.org/repos/asf/airavata/blob/9bb8c2be/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPInputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPInputHandler.java
deleted file mode 100644
index 13c325d..0000000
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPInputHandler.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.handler;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.StringUtil;
-import org.apache.airavata.commons.gfac.type.ActualParameter;
-import org.apache.airavata.commons.gfac.type.MappingFactory;
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.context.JobExecutionContext;
-import org.apache.airavata.gfac.context.MessageContext;
-import org.apache.airavata.gfac.context.security.SSHSecurityContext;
-import org.apache.airavata.gfac.util.GFACSSHUtils;
-import org.apache.airavata.gsi.ssh.api.Cluster;
-import org.apache.airavata.gsi.ssh.api.SSHApiException;
-import org.apache.airavata.gsi.ssh.api.ServerInfo;
-import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
-import org.apache.airavata.gsi.ssh.impl.PBSCluster;
-import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPasswordAuthenticationInfo;
-import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPublicKeyFileAuthentication;
-import org.apache.airavata.gsi.ssh.util.CommonUtils;
-import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
-import org.apache.airavata.schemas.gfac.URIArrayType;
-import org.apache.airavata.schemas.gfac.URIParameterType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.*;
-
-/**
- * This handler will copy input data from gateway machine to airavata
- * installed machine, later running handlers can copy the input files to computing resource
- * <Handler class="org.apache.airavata.gfac.handler.AdvancedSCPOutputHandler">
- <property name="privateKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa"/>
- <property name="publicKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa.pub"/>
- <property name="userName" value="airavata"/>
- <property name="hostName" value="gw98.iu.xsede.org"/>
- <property name="inputPath" value="/home/airavata/outputData"/>
- */
-public class AdvancedSCPInputHandler extends AbstractHandler{
- private static final Logger log = LoggerFactory.getLogger(AdvancedSCPInputHandler.class);
-
- private String password = null;
-
- private String publicKeyPath;
-
- private String passPhrase;
-
- private String privateKeyPath;
-
- private String userName;
-
- private String hostName;
-
- private String inputPath;
-
- public void initProperties(Map<String, String> properties) throws GFacHandlerException {
- password = properties.get("password");
- passPhrase = properties.get("passPhrase");
- privateKeyPath = properties.get("privateKeyPath");
- publicKeyPath = properties.get("publicKeyPath");
- userName = properties.get("userName");
- hostName = properties.get("hostName");
- inputPath = properties.get("inputPath");
- }
-
- public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- MessageContext inputNew = new MessageContext();
- try{
- if(jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null){
- try {
- GFACSSHUtils.addSecurityContext(jobExecutionContext);
- } catch (ApplicationSettingsException e) {
- log.error(e.getMessage());
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
- }
- }
- ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext()
- .getApplicationDeploymentDescription().getType();
-
- AuthenticationInfo authenticationInfo = null;
- if (password != null) {
- authenticationInfo = new DefaultPasswordAuthenticationInfo(this.password);
- } else {
- authenticationInfo = new DefaultPublicKeyFileAuthentication(this.publicKeyPath, this.privateKeyPath,
- this.passPhrase);
- }
- // Server info
- ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName);
- Cluster pbsCluster = null;
- // here doesn't matter what the job manager is because we are only doing some file handling
- // not really dealing with monitoring or job submission, so we pa
- pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/"));
- String parentPath = inputPath + File.separator + jobExecutionContext.getExperimentID() + File.separator + jobExecutionContext.getTaskData().getTaskID();
- (new File(parentPath)).mkdirs();
- MessageContext input = jobExecutionContext.getInMessageContext();
- Set<String> parameters = input.getParameters().keySet();
- for (String paramName : parameters) {
- ActualParameter actualParameter = (ActualParameter) input.getParameters().get(paramName);
- String paramValue = MappingFactory.toString(actualParameter);
- //TODO: Review this with type
- if ("URI".equals(actualParameter.getType().getType().toString())) {
- ((URIParameterType) actualParameter.getType()).setValue(stageInputFiles(pbsCluster, paramValue, parentPath));
- } else if ("URIArray".equals(actualParameter.getType().getType().toString())) {
- List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue));
- List<String> newFiles = new ArrayList<String>();
- for (String paramValueEach : split) {
- String stageInputFiles = stageInputFiles(pbsCluster, paramValueEach, parentPath);
- newFiles.add(stageInputFiles);
- }
- ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
- }
- inputNew.getParameters().put(paramName, actualParameter);
- }
- } catch (Exception e) {
- log.error(e.getMessage());
- throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
- }
- jobExecutionContext.setInMessageContext(inputNew);
- }
-
- private String stageInputFiles(Cluster cluster, String paramValue, String parentPath) throws GFacException {
- try {
- cluster.scpFrom(paramValue, parentPath);
- return "file://" + parentPath + File.separator + (new File(paramValue)).getName();
- } catch (SSHApiException e) {
- log.error("Error tranfering remote file to local file, remote path: " + paramValue);
- throw new GFacException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9bb8c2be/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPOutputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPOutputHandler.java
deleted file mode 100644
index dba3e69..0000000
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPOutputHandler.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.handler;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.context.JobExecutionContext;
-import org.apache.airavata.gfac.context.security.SSHSecurityContext;
-import org.apache.airavata.gfac.util.GFACSSHUtils;
-import org.apache.airavata.gsi.ssh.api.Cluster;
-import org.apache.airavata.gsi.ssh.api.SSHApiException;
-import org.apache.airavata.gsi.ssh.api.ServerInfo;
-import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
-import org.apache.airavata.gsi.ssh.impl.PBSCluster;
-import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPasswordAuthenticationInfo;
-import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPublicKeyFileAuthentication;
-import org.apache.airavata.gsi.ssh.util.CommonUtils;
-import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.Map;
-
-/**
- * This handler will copy outputs from airavata installed local directory
- * to a remote location, prior to this handler SCPOutputHandler should be invoked
- * Should add following configuration to gfac-config.xml and configure the keys properly
- * <Handler class="org.apache.airavata.gfac.handler.AdvancedSCPOutputHandler">
- <property name="privateKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa"/>
- <property name="publicKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa.pub"/>
- <property name="userName" value="airavata"/>
- <property name="hostName" value="gw98.iu.xsede.org"/>
- <property name="outputPath" value="/home/airavata/outputData"/>
- <property name="passPhrase" value="/home/airavata/outputData"/>
- <property name="password" value="/home/airavata/outputData"/>
-
- */
-public class AdvancedSCPOutputHandler extends AbstractHandler {
- private static final Logger log = LoggerFactory.getLogger(AdvancedSCPOutputHandler.class);
-
- private String password = null;
-
- private String publicKeyPath;
-
- private String passPhrase;
-
- private String privateKeyPath;
-
- private String userName;
-
- private String hostName;
-
- private String outputPath;
-
- public void initProperties(Map<String, String> properties) throws GFacHandlerException {
- password = properties.get("password");
- passPhrase = properties.get("passPhrase");
- privateKeyPath = properties.get("privateKeyPath");
- publicKeyPath = properties.get("publicKeyPath");
- userName = properties.get("userName");
- hostName = properties.get("hostName");
- outputPath = properties.get("outputPath");
- }
-
- @Override
- public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- try {
- if (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null) {
- try {
- GFACSSHUtils.addSecurityContext(jobExecutionContext);
- } catch (ApplicationSettingsException e) {
- log.error(e.getMessage());
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
- }
- }
- ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext()
- .getApplicationDeploymentDescription().getType();
- String standardError = app.getStandardError();
- String standardOutput = app.getStandardOutput();
- String outputDataDirectory = app.getOutputDataDirectory();
-
- AuthenticationInfo authenticationInfo = null;
- if (password != null) {
- authenticationInfo = new DefaultPasswordAuthenticationInfo(this.password);
- } else {
- authenticationInfo = new DefaultPublicKeyFileAuthentication(this.publicKeyPath, this.privateKeyPath,
- this.passPhrase);
- }
- // Server info
- ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName);
-
- Cluster pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/"));
- outputPath = outputPath + File.separator + jobExecutionContext.getExperimentID() + "-" + jobExecutionContext.getTaskData().getTaskID()
- + File.separator;
- pbsCluster.makeDirectory(outputPath);
- pbsCluster.scpTo(outputPath, standardError);
- pbsCluster.scpTo(outputPath, standardOutput);
- for (String files : jobExecutionContext.getOutputFiles()) {
- pbsCluster.scpTo(outputPath, files);
- }
- } catch (SSHApiException e) {
- log.error("Error transfering files to remote host : " + hostName + " with the user: " + userName);
- log.error(e.getMessage());
- throw new GFacHandlerException(e);
- } catch (GFacException e) {
- throw new GFacHandlerException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9bb8c2be/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHDirectorySetupHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHDirectorySetupHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHDirectorySetupHandler.java
deleted file mode 100644
index 1f99a98..0000000
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHDirectorySetupHandler.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.handler;
-
-import java.util.Map;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.context.JobExecutionContext;
-import org.apache.airavata.gfac.context.security.SSHSecurityContext;
-import org.apache.airavata.gfac.util.GFACSSHUtils;
-import org.apache.airavata.gfac.utils.GFacUtils;
-import org.apache.airavata.gsi.ssh.api.Cluster;
-import org.apache.airavata.gsi.ssh.api.SSHApiException;
-import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
-import org.apache.airavata.model.workspace.experiment.DataTransferDetails;
-import org.apache.airavata.model.workspace.experiment.ErrorCategory;
-import org.apache.airavata.model.workspace.experiment.TransferState;
-import org.apache.airavata.model.workspace.experiment.TransferStatus;
-import org.apache.airavata.registry.cpi.ChildDataType;
-import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SSHDirectorySetupHandler extends AbstractHandler{
- private static final Logger log = LoggerFactory.getLogger(SSHDirectorySetupHandler.class);
-
- public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- try {
- if (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null) {
- GFACSSHUtils.addSecurityContext(jobExecutionContext);
- }
- } catch (ApplicationSettingsException e) {
- log.error(e.getMessage());
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
- } catch (GFacException e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
-
- log.info("Setup SSH job directorties");
- super.invoke(jobExecutionContext);
- makeDirectory(jobExecutionContext);
-
- }
- private void makeDirectory(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- try{
- Cluster cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster();
- if (cluster == null) {
- throw new GFacHandlerException("Security context is not set properly");
- } else {
- log.info("Successfully retrieved the Security Context");
- }
- ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
- String workingDirectory = app.getScratchWorkingDirectory();
- cluster.makeDirectory(workingDirectory);
- cluster.makeDirectory(app.getScratchWorkingDirectory());
- cluster.makeDirectory(app.getInputDataDirectory());
- cluster.makeDirectory(app.getOutputDataDirectory());
- DataTransferDetails detail = new DataTransferDetails();
- TransferStatus status = new TransferStatus();
- status.setTransferState(TransferState.DIRECTORY_SETUP);
- detail.setTransferStatus(status);
- detail.setTransferDescription("Working directory = " + workingDirectory);
-
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
-
- } catch (SSHApiException e) {
- throw new GFacHandlerException("Error executing the Handler: " + SSHDirectorySetupHandler.class, e);
- } catch (Exception e) {
- DataTransferDetails detail = new DataTransferDetails();
- TransferStatus status = new TransferStatus();
- status.setTransferState(TransferState.FAILED);
- detail.setTransferStatus(status);
- try {
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
- GFacUtils.saveErrorDetails(jobExecutionContext, e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE);
- } catch (Exception e1) {
- throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
- }
- throw new GFacHandlerException("Error executing the Handler: " + SSHDirectorySetupHandler.class, e);
- }
- }
-
- public void initProperties(Map<String, String> properties) throws GFacHandlerException {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9bb8c2be/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHInputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHInputHandler.java
deleted file mode 100644
index 5cc0ffb..0000000
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHInputHandler.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.handler;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.StringUtil;
-import org.apache.airavata.commons.gfac.type.ActualParameter;
-import org.apache.airavata.commons.gfac.type.MappingFactory;
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.context.JobExecutionContext;
-import org.apache.airavata.gfac.context.MessageContext;
-import org.apache.airavata.gfac.context.security.SSHSecurityContext;
-import org.apache.airavata.gfac.util.GFACSSHUtils;
-import org.apache.airavata.gfac.utils.GFacUtils;
-import org.apache.airavata.gsi.ssh.api.Cluster;
-import org.apache.airavata.gsi.ssh.api.SSHApiException;
-import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
-import org.apache.airavata.model.workspace.experiment.DataTransferDetails;
-import org.apache.airavata.model.workspace.experiment.ErrorCategory;
-import org.apache.airavata.model.workspace.experiment.TransferState;
-import org.apache.airavata.model.workspace.experiment.TransferStatus;
-import org.apache.airavata.registry.cpi.ChildDataType;
-import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
-import org.apache.airavata.schemas.gfac.URIArrayType;
-import org.apache.airavata.schemas.gfac.URIParameterType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SSHInputHandler extends AbstractHandler {
-
- private static final Logger log = LoggerFactory.getLogger(SSHInputHandler.class);
-
-
- public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- DataTransferDetails detail = new DataTransferDetails();
- TransferStatus status = new TransferStatus();
- MessageContext inputNew = new MessageContext();
- try {
-
- if (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null) {
- try {
- GFACSSHUtils.addSecurityContext(jobExecutionContext);
- } catch (ApplicationSettingsException e) {
- log.error(e.getMessage());
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
- }
- }
- log.info("Invoking SCPInputHandler");
- super.invoke(jobExecutionContext);
-
-
- MessageContext input = jobExecutionContext.getInMessageContext();
- Set<String> parameters = input.getParameters().keySet();
- for (String paramName : parameters) {
- ActualParameter actualParameter = (ActualParameter) input.getParameters().get(paramName);
- String paramValue = MappingFactory.toString(actualParameter);
- //TODO: Review this with type
- if ("URI".equals(actualParameter.getType().getType().toString())) {
- ((URIParameterType) actualParameter.getType()).setValue(stageInputFiles(jobExecutionContext, paramValue));
- } else if ("URIArray".equals(actualParameter.getType().getType().toString())) {
- List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue));
- List<String> newFiles = new ArrayList<String>();
- for (String paramValueEach : split) {
- String stageInputFiles = stageInputFiles(jobExecutionContext, paramValueEach);
- status.setTransferState(TransferState.UPLOAD);
- detail.setTransferStatus(status);
- detail.setTransferDescription("Input Data Staged: " + stageInputFiles);
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
- newFiles.add(stageInputFiles);
- }
- ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
- }
- inputNew.getParameters().put(paramName, actualParameter);
- }
- } catch (Exception e) {
- log.error(e.getMessage());
- status.setTransferState(TransferState.FAILED);
- detail.setTransferStatus(status);
- try {
- GFacUtils.saveErrorDetails(jobExecutionContext, e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE);
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
- } catch (Exception e1) {
- throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
- }
- throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
- }
- jobExecutionContext.setInMessageContext(inputNew);
- }
-
- private static String stageInputFiles(JobExecutionContext jobExecutionContext, String paramValue) throws IOException, GFacException {
- Cluster cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster();
- if (cluster == null) {
- throw new GFacException("Security context is not set properly");
- } else {
- log.info("Successfully retrieved the Security Context");
- }
- ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
- int i = paramValue.lastIndexOf(File.separator);
- String substring = paramValue.substring(i + 1);
- try {
- String targetFile = app.getInputDataDirectory() + File.separator + substring;
- if(paramValue.startsWith("file")){
- paramValue = paramValue.substring(paramValue.indexOf(":") + 1, paramValue.length());
- }
- cluster.scpTo(targetFile, paramValue);
- return targetFile;
- } catch (SSHApiException e) {
- throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
- }
- }
-
- public void initProperties(Map<String, String> properties) throws GFacHandlerException {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9bb8c2be/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHOutputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHOutputHandler.java
deleted file mode 100644
index 9e1cfb2..0000000
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHOutputHandler.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.handler;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import net.schmizz.sshj.connection.ConnectionException;
-import net.schmizz.sshj.transport.TransportException;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.Constants;
-import org.apache.airavata.commons.gfac.type.ActualParameter;
-import org.apache.airavata.commons.gfac.type.ApplicationDescription;
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.context.JobExecutionContext;
-import org.apache.airavata.gfac.context.security.SSHSecurityContext;
-import org.apache.airavata.gfac.provider.GFacProviderException;
-import org.apache.airavata.gfac.util.GFACSSHUtils;
-import org.apache.airavata.gfac.utils.GFacUtils;
-import org.apache.airavata.gfac.utils.OutputUtils;
-import org.apache.airavata.gsi.ssh.api.Cluster;
-import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
-import org.apache.airavata.model.workspace.experiment.*;
-import org.apache.airavata.registry.cpi.ChildDataType;
-import org.apache.airavata.registry.cpi.DataType;
-import org.apache.airavata.registry.cpi.RegistryException;
-import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
-import org.apache.airavata.schemas.gfac.GsisshHostType;
-import org.apache.airavata.schemas.gfac.URIParameterType;
-import org.apache.xmlbeans.XmlException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SSHOutputHandler extends AbstractHandler{
- private static final Logger log = LoggerFactory.getLogger(SSHOutputHandler.class);
-
- public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- if (jobExecutionContext.getApplicationContext().getHostDescription().getType() instanceof GsisshHostType) { // this is because we don't have the right jobexecution context
- // so attempting to get it from the registry
- if (Constants.PUSH.equals(((GsisshHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType()).getMonitorMode())) { // this is because we don't have the right jobexecution context
- // so attempting to get it from the registry
- log.warn("During the out handler chain jobExecution context came null, so trying to handler");
- ApplicationDescription applicationDeploymentDescription = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription();
- TaskDetails taskData = null;
- try {
- taskData = (TaskDetails) registry.get(DataType.TASK_DETAIL, jobExecutionContext.getTaskData().getTaskID());
- } catch (RegistryException e) {
- log.error("Error retrieving job details from Registry");
- throw new GFacHandlerException("Error retrieving job details from Registry", e);
- }
- JobDetails jobDetails = taskData.getJobDetailsList().get(0);
- String jobDescription = jobDetails.getJobDescription();
- if (jobDescription != null) {
- JobDescriptor jobDescriptor = null;
- try {
- jobDescriptor = JobDescriptor.fromXML(jobDescription);
- } catch (XmlException e1) {
- e1.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- applicationDeploymentDescription.getType().setScratchWorkingDirectory(
- jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getWorkingDirectory());
- applicationDeploymentDescription.getType().setInputDataDirectory(jobDescriptor.getInputDirectory());
- applicationDeploymentDescription.getType().setOutputDataDirectory(jobDescriptor.getOutputDirectory());
- applicationDeploymentDescription.getType().setStandardError(jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getStandardErrorFile());
- applicationDeploymentDescription.getType().setStandardOutput(jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getStandardOutFile());
- }
- }
- }
-
- try {
- if (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null) {
-
- GFACSSHUtils.addSecurityContext(jobExecutionContext);
- }
- } catch (ApplicationSettingsException e) {
- log.error(e.getMessage());
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
- } catch (GFacException e1) {
- log.error(e1.getMessage());
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e1, e1.getLocalizedMessage());
- }
-
- super.invoke(jobExecutionContext);
- DataTransferDetails detail = new DataTransferDetails();
- TransferStatus status = new TransferStatus();
-
- ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext()
- .getApplicationDeploymentDescription().getType();
- try {
- Cluster cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster();
- if (cluster == null) {
- throw new GFacProviderException("Security context is not set properly");
- } else {
- log.info("Successfully retrieved the Security Context");
- }
-
- // Get the Stdouts and StdErrs
- String timeStampedServiceName = GFacUtils.createUniqueNameForService(jobExecutionContext.getServiceName());
-
- TaskDetails taskData = jobExecutionContext.getTaskData();
- String outputDataDir = null;
- File localStdOutFile;
- File localStdErrFile;
-
- if (taskData.getAdvancedOutputDataHandling() != null) {
- outputDataDir = taskData.getAdvancedOutputDataHandling().getOutputDataDir();
- }
- if (outputDataDir == null) {
- outputDataDir = File.separator + "tmp";
- }
- outputDataDir = outputDataDir + File.separator + jobExecutionContext.getExperimentID() + "-" + jobExecutionContext.getTaskData().getTaskID();
- (new File(outputDataDir)).mkdirs();
-
-
- localStdOutFile = new File(outputDataDir + File.separator + timeStampedServiceName + "stdout");
- localStdErrFile = new File(outputDataDir + File.separator + timeStampedServiceName + "stderr");
-// cluster.makeDirectory(outputDataDir);
- cluster.scpFrom(app.getStandardOutput(), localStdOutFile.getAbsolutePath());
- Thread.sleep(1000);
- cluster.scpFrom(app.getStandardError(), localStdErrFile.getAbsolutePath());
- Thread.sleep(1000);
-
- String stdOutStr = GFacUtils.readFileToString(localStdOutFile.getAbsolutePath());
- String stdErrStr = GFacUtils.readFileToString(localStdErrFile.getAbsolutePath());
- status.setTransferState(TransferState.COMPLETE);
- detail.setTransferStatus(status);
- detail.setTransferDescription("STDOUT:" + stdOutStr);
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
-
- status.setTransferState(TransferState.COMPLETE);
- detail.setTransferStatus(status);
- detail.setTransferDescription("STDERR:" + stdErrStr);
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
-
-
- Map<String, ActualParameter> stringMap = new HashMap<String, ActualParameter>();
- Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters();
- Set<String> keys = output.keySet();
- for (String paramName : keys) {
- ActualParameter actualParameter = (ActualParameter) output.get(paramName);
- if ("URI".equals(actualParameter.getType().getType().toString())) {
-
- List<String> outputList = cluster.listDirectory(app.getOutputDataDirectory());
- if (outputList.size() == 0 || outputList.get(0).isEmpty()) {
- stringMap = OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr);
- } else {
- String valueList = outputList.get(0);
- cluster.scpFrom(app.getOutputDataDirectory() + File.separator + valueList, outputDataDir);
- jobExecutionContext.addOutputFile(outputDataDir + File.separator + valueList);
- ((URIParameterType) actualParameter.getType()).setValue(valueList);
- stringMap = new HashMap<String, ActualParameter>();
- stringMap.put(paramName, actualParameter);
- }
- } else {
- stringMap = OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr);
- }
- }
- if (stringMap == null || stringMap.isEmpty()) {
- throw new GFacHandlerException(
- "Empty Output returned from the Application, Double check the application"
- + "and ApplicationDescriptor output Parameter Names");
- }
- status.setTransferState(TransferState.DOWNLOAD);
- detail.setTransferStatus(status);
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
-
- app.setStandardError(localStdErrFile.getAbsolutePath());
- app.setStandardOutput(localStdOutFile.getAbsolutePath());
- app.setOutputDataDirectory(outputDataDir);
- } catch (XmlException e) {
- throw new GFacHandlerException("Cannot read output:" + e.getMessage(), e);
- } catch (ConnectionException e) {
- throw new GFacHandlerException(e.getMessage(), e);
- } catch (TransportException e) {
- throw new GFacHandlerException(e.getMessage(), e);
- } catch (IOException e) {
- throw new GFacHandlerException(e.getMessage(), e);
- } catch (Exception e) {
- try {
- status.setTransferState(TransferState.FAILED);
- detail.setTransferStatus(status);
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
- GFacUtils.saveErrorDetails(jobExecutionContext, e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE);
- } catch (Exception e1) {
- throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
- }
- throw new GFacHandlerException("Error in retrieving results", e);
- }
-
- }
-
- public void initProperties(Map<String, String> properties) throws GFacHandlerException {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9bb8c2be/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
deleted file mode 100644
index e972f0d..0000000
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
+++ /dev/null
@@ -1,310 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.gfac.provider.impl;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Calendar;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.Set;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.commons.gfac.type.ActualParameter;
-import org.apache.airavata.commons.gfac.type.MappingFactory;
-import org.apache.airavata.gfac.Constants;
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.context.JobExecutionContext;
-import org.apache.airavata.gfac.context.MessageContext;
-import org.apache.airavata.gfac.context.security.SSHSecurityContext;
-import org.apache.airavata.gfac.handler.GFacHandlerException;
-import org.apache.airavata.gfac.notification.events.StartExecutionEvent;
-import org.apache.airavata.gfac.provider.AbstractProvider;
-import org.apache.airavata.gfac.provider.GFacProviderException;
-import org.apache.airavata.gfac.util.GFACSSHUtils;
-import org.apache.airavata.gfac.utils.GFacUtils;
-import org.apache.airavata.gsi.ssh.api.Cluster;
-import org.apache.airavata.gsi.ssh.api.CommandExecutor;
-import org.apache.airavata.gsi.ssh.api.SSHApiException;
-import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
-import org.apache.airavata.gsi.ssh.impl.RawCommandInfo;
-import org.apache.airavata.gsi.ssh.impl.StandardOutReader;
-import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
-import org.apache.airavata.model.workspace.experiment.ErrorCategory;
-import org.apache.airavata.model.workspace.experiment.JobDetails;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.schemas.gfac.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
-
-/**
- * Execute application using remote SSH
- */
-public class SSHProvider extends AbstractProvider {
- private static final Logger log = LoggerFactory.getLogger(SSHProvider.class);
- private Cluster cluster;
- private String jobID = null;
- private String taskID = null;
- // we keep gsisshprovider to support qsub submission incase of hpc scenario with ssh
- private boolean hpcType = false;
-
- public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
- super.initialize(jobExecutionContext);
- if(jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null){
- try {
- GFACSSHUtils.addSecurityContext(jobExecutionContext);
- } catch (ApplicationSettingsException e) {
- log.error(e.getMessage());
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
- }
- }
- taskID = jobExecutionContext.getTaskData().getTaskID();
- if (!((SSHHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType()).getHpcResource()) {
- jobID = "SSH_" + jobExecutionContext.getApplicationContext().getHostDescription().getType().getHostAddress() + "_" + Calendar.getInstance().getTimeInMillis();
- cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster();
-
- ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
- String remoteFile = app.getStaticWorkingDirectory() + File.separatorChar + Constants.EXECUTABLE_NAME;
- details.setJobID(taskID);
- details.setJobDescription(remoteFile);
- jobExecutionContext.setJobDetails(details);
- JobDescriptor jobDescriptor = GFACSSHUtils.createJobDescriptor(jobExecutionContext, app, null);
- details.setJobDescription(jobDescriptor.toXML());
-
- GFacUtils.saveJobStatus(jobExecutionContext, details, JobState.SETUP);
- log.info(remoteFile);
- try {
- File runscript = createShellScript(jobExecutionContext);
- cluster.scpTo(remoteFile, runscript.getAbsolutePath());
- } catch (Exception e) {
- throw new GFacProviderException(e.getLocalizedMessage(), e);
- }
- }else{
- hpcType = true;
- }
- }
-
-
- public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException {
- if (!hpcType) {
- ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
- try {
- /*
- * Execute
- */
- String execuable = app.getStaticWorkingDirectory() + File.separatorChar + Constants.EXECUTABLE_NAME;
- details.setJobDescription(execuable);
-
-// GFacUtils.updateJobStatus(details, JobState.SUBMITTED);
- RawCommandInfo rawCommandInfo = new RawCommandInfo("/bin/chmod 755 " + execuable + "; " + execuable);
-
- StandardOutReader jobIDReaderCommandOutput = new StandardOutReader();
-
- CommandExecutor.executeCommand(rawCommandInfo, cluster.getSession(), jobIDReaderCommandOutput);
- String stdOutputString = getOutputifAvailable(jobIDReaderCommandOutput, "Error submitting job to resource");
-
- log.info("stdout=" + stdOutputString);
-
-// GFacUtils.updateJobStatus(details, JobState.COMPLETE);
- } catch (Exception e) {
- throw new GFacProviderException(e.getMessage(), e);
- } finally {
- if (cluster != null) {
- try {
- cluster.disconnect();
- } catch (SSHApiException e) {
- throw new GFacProviderException(e.getMessage(), e);
- }
- }
- }
- } else {
- try {
- jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
- HostDescriptionType host = jobExecutionContext.getApplicationContext().
- getHostDescription().getType();
- HpcApplicationDeploymentType app = (HpcApplicationDeploymentType) jobExecutionContext.getApplicationContext().
- getApplicationDeploymentDescription().getType();
- JobDetails jobDetails = new JobDetails();
- String taskID = jobExecutionContext.getTaskData().getTaskID();
- try {
- Cluster cluster = null;
- if (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) != null) {
- cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster();
- }
- if (cluster == null) {
- throw new GFacProviderException("Security context is not set properly");
- } else {
- log.info("Successfully retrieved the Security Context");
- }
- // This installed path is a mandetory field, because this could change based on the computing resource
- JobDescriptor jobDescriptor = GFACSSHUtils.createJobDescriptor(jobExecutionContext, app, cluster);
-
- log.info(jobDescriptor.toXML());
-
- jobDetails.setJobDescription(jobDescriptor.toXML());
-
- String jobID = cluster.submitBatchJob(jobDescriptor);
- jobExecutionContext.setJobDetails(jobDetails);
- if (jobID == null) {
- jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
- } else {
- jobDetails.setJobID(jobID);
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED);
- }
-
- } catch (SSHApiException e) {
- String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
- log.error(error);
- jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
- GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- throw new GFacProviderException(error, e);
- } catch (Exception e) {
- String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
- log.error(error);
- jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
- GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- throw new GFacProviderException(error, e);
- }
- } catch (GFacException e) {
- throw new GFacProviderException(e.getMessage(), e);
- }
- }
- }
-
- public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
-
- }
-
-
- public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException {
- throw new NotImplementedException();
- }
-
-
- private File createShellScript(JobExecutionContext context) throws IOException {
- ApplicationDeploymentDescriptionType app = context.getApplicationContext()
- .getApplicationDeploymentDescription().getType();
- String uniqueDir = app.getApplicationName().getStringValue() + System.currentTimeMillis()
- + new Random().nextLong();
-
- File shellScript = File.createTempFile(uniqueDir, "sh");
- OutputStream out = new FileOutputStream(shellScript);
-
- out.write("#!/bin/bash\n".getBytes());
- out.write(("cd " + app.getStaticWorkingDirectory() + "\n").getBytes());
- out.write(("export " + Constants.INPUT_DATA_DIR_VAR_NAME + "=" + app.getInputDataDirectory() + "\n").getBytes());
- out.write(("export " + Constants.OUTPUT_DATA_DIR_VAR_NAME + "=" + app.getOutputDataDirectory() + "\n")
- .getBytes());
- // get the env of the host and the application
- NameValuePairType[] env = app.getApplicationEnvironmentArray();
-
- Map<String, String> nv = new HashMap<String, String>();
- if (env != null) {
- for (int i = 0; i < env.length; i++) {
- String key = env[i].getName();
- String value = env[i].getValue();
- nv.put(key, value);
- }
- }
- for (Entry<String, String> entry : nv.entrySet()) {
- log.debug("Env[" + entry.getKey() + "] = " + entry.getValue());
- out.write(("export " + entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
-
- }
-
- // prepare the command
- final String SPACE = " ";
- StringBuffer cmd = new StringBuffer();
- cmd.append(app.getExecutableLocation());
- cmd.append(SPACE);
-
- MessageContext input = context.getInMessageContext();
- ;
- Map<String, Object> inputs = input.getParameters();
- Set<String> keys = inputs.keySet();
- for (String paramName : keys) {
- ActualParameter actualParameter = (ActualParameter) inputs.get(paramName);
- if ("URIArray".equals(actualParameter.getType().getType().toString())) {
- String[] values = ((URIArrayType) actualParameter.getType()).getValueArray();
- for (String value : values) {
- cmd.append(value);
- cmd.append(SPACE);
- }
- } else {
- String paramValue = MappingFactory.toString(actualParameter);
- cmd.append(paramValue);
- cmd.append(SPACE);
- }
- }
- // We redirect the error and stdout to remote files, they will be read
- // in later
- cmd.append(SPACE);
- cmd.append("1>");
- cmd.append(SPACE);
- cmd.append(app.getStandardOutput());
- cmd.append(SPACE);
- cmd.append("2>");
- cmd.append(SPACE);
- cmd.append(app.getStandardError());
-
- String cmdStr = cmd.toString();
- log.info("Command = " + cmdStr);
- out.write((cmdStr + "\n").getBytes());
- String message = "\"execuationSuceeded\"";
- out.write(("echo " + message + "\n").getBytes());
- out.close();
-
- return shellScript;
- }
-
- public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
-
- }
- /**
- * This method will read standard output and if there's any it will be parsed
- * @param jobIDReaderCommandOutput
- * @param errorMsg
- * @return
- * @throws SSHApiException
- */
- private String getOutputifAvailable(StandardOutReader jobIDReaderCommandOutput, String errorMsg) throws SSHApiException {
- String stdOutputString = jobIDReaderCommandOutput.getStdOutputString();
- String stdErrorString = jobIDReaderCommandOutput.getStdErrorString();
-
- if(stdOutputString == null || stdOutputString.isEmpty() || (stdErrorString != null && !stdErrorString.isEmpty())){
- log.error("Standard Error output : " + stdErrorString);
- throw new SSHApiException(errorMsg + stdErrorString);
- }
- return stdOutputString;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9bb8c2be/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
new file mode 100644
index 0000000..cba476e
--- /dev/null
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
@@ -0,0 +1,152 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.ssh.handler;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.StringUtil;
+import org.apache.airavata.commons.gfac.type.ActualParameter;
+import org.apache.airavata.commons.gfac.type.MappingFactory;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.MessageContext;
+import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
+import org.apache.airavata.gfac.handler.AbstractHandler;
+import org.apache.airavata.gfac.handler.GFacHandlerException;
+import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.gsi.ssh.api.ServerInfo;
+import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
+import org.apache.airavata.gsi.ssh.impl.PBSCluster;
+import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPasswordAuthenticationInfo;
+import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPublicKeyFileAuthentication;
+import org.apache.airavata.gsi.ssh.util.CommonUtils;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.apache.airavata.schemas.gfac.URIArrayType;
+import org.apache.airavata.schemas.gfac.URIParameterType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.*;
+
+/**
+ * This handler will copy input data from gateway machine to airavata
+ * installed machine, later running handlers can copy the input files to computing resource
+ * <Handler class="AdvancedSCPOutputHandler">
+ <property name="privateKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa"/>
+ <property name="publicKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa.pub"/>
+ <property name="userName" value="airavata"/>
+ <property name="hostName" value="gw98.iu.xsede.org"/>
+ <property name="inputPath" value="/home/airavata/outputData"/>
+ */
+public class AdvancedSCPInputHandler extends AbstractHandler {
+ private static final Logger log = LoggerFactory.getLogger(AdvancedSCPInputHandler.class);
+
+ private String password = null;
+
+ private String publicKeyPath;
+
+ private String passPhrase;
+
+ private String privateKeyPath;
+
+ private String userName;
+
+ private String hostName;
+
+ private String inputPath;
+
+ public void initProperties(Map<String, String> properties) throws GFacHandlerException {
+ password = properties.get("password");
+ passPhrase = properties.get("passPhrase");
+ privateKeyPath = properties.get("privateKeyPath");
+ publicKeyPath = properties.get("publicKeyPath");
+ userName = properties.get("userName");
+ hostName = properties.get("hostName");
+ inputPath = properties.get("inputPath");
+ }
+
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ MessageContext inputNew = new MessageContext();
+ try{
+ if(jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null){
+ try {
+ GFACSSHUtils.addSecurityContext(jobExecutionContext);
+ } catch (ApplicationSettingsException e) {
+ log.error(e.getMessage());
+ throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+ }
+ }
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext()
+ .getApplicationDeploymentDescription().getType();
+
+ AuthenticationInfo authenticationInfo = null;
+ if (password != null) {
+ authenticationInfo = new DefaultPasswordAuthenticationInfo(this.password);
+ } else {
+ authenticationInfo = new DefaultPublicKeyFileAuthentication(this.publicKeyPath, this.privateKeyPath,
+ this.passPhrase);
+ }
+ // Server info
+ ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName);
+ Cluster pbsCluster = null;
+ // here doesn't matter what the job manager is because we are only doing some file handling
+ // not really dealing with monitoring or job submission, so we pa
+ pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/"));
+ String parentPath = inputPath + File.separator + jobExecutionContext.getExperimentID() + File.separator + jobExecutionContext.getTaskData().getTaskID();
+ (new File(parentPath)).mkdirs();
+ MessageContext input = jobExecutionContext.getInMessageContext();
+ Set<String> parameters = input.getParameters().keySet();
+ for (String paramName : parameters) {
+ ActualParameter actualParameter = (ActualParameter) input.getParameters().get(paramName);
+ String paramValue = MappingFactory.toString(actualParameter);
+ //TODO: Review this with type
+ if ("URI".equals(actualParameter.getType().getType().toString())) {
+ ((URIParameterType) actualParameter.getType()).setValue(stageInputFiles(pbsCluster, paramValue, parentPath));
+ } else if ("URIArray".equals(actualParameter.getType().getType().toString())) {
+ List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue));
+ List<String> newFiles = new ArrayList<String>();
+ for (String paramValueEach : split) {
+ String stageInputFiles = stageInputFiles(pbsCluster, paramValueEach, parentPath);
+ newFiles.add(stageInputFiles);
+ }
+ ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
+ }
+ inputNew.getParameters().put(paramName, actualParameter);
+ }
+ } catch (Exception e) {
+ log.error(e.getMessage());
+ throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
+ }
+ jobExecutionContext.setInMessageContext(inputNew);
+ }
+
+ private String stageInputFiles(Cluster cluster, String paramValue, String parentPath) throws GFacException {
+ try {
+ cluster.scpFrom(paramValue, parentPath);
+ return "file://" + parentPath + File.separator + (new File(paramValue)).getName();
+ } catch (SSHApiException e) {
+ log.error("Error tranfering remote file to local file, remote path: " + paramValue);
+ throw new GFacException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9bb8c2be/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
new file mode 100644
index 0000000..79ad49a
--- /dev/null
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
@@ -0,0 +1,130 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.ssh.handler;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
+import org.apache.airavata.gfac.handler.AbstractHandler;
+import org.apache.airavata.gfac.handler.GFacHandlerException;
+import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.gsi.ssh.api.ServerInfo;
+import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
+import org.apache.airavata.gsi.ssh.impl.PBSCluster;
+import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPasswordAuthenticationInfo;
+import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPublicKeyFileAuthentication;
+import org.apache.airavata.gsi.ssh.util.CommonUtils;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Map;
+
+/**
+ * This handler will copy outputs from airavata installed local directory
+ * to a remote location, prior to this handler SCPOutputHandler should be invoked
+ * Should add following configuration to gfac-config.xml and configure the keys properly
+ * <Handler class="AdvancedSCPOutputHandler">
+ <property name="privateKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa"/>
+ <property name="publicKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa.pub"/>
+ <property name="userName" value="airavata"/>
+ <property name="hostName" value="gw98.iu.xsede.org"/>
+ <property name="outputPath" value="/home/airavata/outputData"/>
+ <property name="passPhrase" value="/home/airavata/outputData"/>
+ <property name="password" value="/home/airavata/outputData"/>
+
+ */
+public class AdvancedSCPOutputHandler extends AbstractHandler {
+ private static final Logger log = LoggerFactory.getLogger(AdvancedSCPOutputHandler.class);
+
+ private String password = null;
+
+ private String publicKeyPath;
+
+ private String passPhrase;
+
+ private String privateKeyPath;
+
+ private String userName;
+
+ private String hostName;
+
+ private String outputPath;
+
+ public void initProperties(Map<String, String> properties) throws GFacHandlerException {
+ password = properties.get("password");
+ passPhrase = properties.get("passPhrase");
+ privateKeyPath = properties.get("privateKeyPath");
+ publicKeyPath = properties.get("publicKeyPath");
+ userName = properties.get("userName");
+ hostName = properties.get("hostName");
+ outputPath = properties.get("outputPath");
+ }
+
+ @Override
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ try {
+ if (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null) {
+ try {
+ GFACSSHUtils.addSecurityContext(jobExecutionContext);
+ } catch (ApplicationSettingsException e) {
+ log.error(e.getMessage());
+ throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+ }
+ }
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext()
+ .getApplicationDeploymentDescription().getType();
+ String standardError = app.getStandardError();
+ String standardOutput = app.getStandardOutput();
+ String outputDataDirectory = app.getOutputDataDirectory();
+
+ AuthenticationInfo authenticationInfo = null;
+ if (password != null) {
+ authenticationInfo = new DefaultPasswordAuthenticationInfo(this.password);
+ } else {
+ authenticationInfo = new DefaultPublicKeyFileAuthentication(this.publicKeyPath, this.privateKeyPath,
+ this.passPhrase);
+ }
+ // Server info
+ ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName);
+
+ Cluster pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/"));
+ outputPath = outputPath + File.separator + jobExecutionContext.getExperimentID() + "-" + jobExecutionContext.getTaskData().getTaskID()
+ + File.separator;
+ pbsCluster.makeDirectory(outputPath);
+ pbsCluster.scpTo(outputPath, standardError);
+ pbsCluster.scpTo(outputPath, standardOutput);
+ for (String files : jobExecutionContext.getOutputFiles()) {
+ pbsCluster.scpTo(outputPath, files);
+ }
+ } catch (SSHApiException e) {
+ log.error("Error transfering files to remote host : " + hostName + " with the user: " + userName);
+ log.error(e.getMessage());
+ throw new GFacHandlerException(e);
+ } catch (GFacException e) {
+ throw new GFacHandlerException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9bb8c2be/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHDirectorySetupHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHDirectorySetupHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHDirectorySetupHandler.java
new file mode 100644
index 0000000..0040de0
--- /dev/null
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHDirectorySetupHandler.java
@@ -0,0 +1,107 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.ssh.handler;
+
+import java.util.Map;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
+import org.apache.airavata.gfac.handler.AbstractHandler;
+import org.apache.airavata.gfac.handler.GFacHandlerException;
+import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+import org.apache.airavata.model.workspace.experiment.DataTransferDetails;
+import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.model.workspace.experiment.TransferState;
+import org.apache.airavata.model.workspace.experiment.TransferStatus;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SSHDirectorySetupHandler extends AbstractHandler {
+ private static final Logger log = LoggerFactory.getLogger(SSHDirectorySetupHandler.class);
+
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ try {
+ if (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null) {
+ GFACSSHUtils.addSecurityContext(jobExecutionContext);
+ }
+ } catch (ApplicationSettingsException e) {
+ log.error(e.getMessage());
+ throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+ } catch (GFacException e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+
+ log.info("Setup SSH job directorties");
+ super.invoke(jobExecutionContext);
+ makeDirectory(jobExecutionContext);
+
+ }
+ private void makeDirectory(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ try{
+ Cluster cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster();
+ if (cluster == null) {
+ throw new GFacHandlerException("Security context is not set properly");
+ } else {
+ log.info("Successfully retrieved the Security Context");
+ }
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
+ String workingDirectory = app.getScratchWorkingDirectory();
+ cluster.makeDirectory(workingDirectory);
+ cluster.makeDirectory(app.getScratchWorkingDirectory());
+ cluster.makeDirectory(app.getInputDataDirectory());
+ cluster.makeDirectory(app.getOutputDataDirectory());
+ DataTransferDetails detail = new DataTransferDetails();
+ TransferStatus status = new TransferStatus();
+ status.setTransferState(TransferState.DIRECTORY_SETUP);
+ detail.setTransferStatus(status);
+ detail.setTransferDescription("Working directory = " + workingDirectory);
+
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+
+ } catch (SSHApiException e) {
+ throw new GFacHandlerException("Error executing the Handler: " + SSHDirectorySetupHandler.class, e);
+ } catch (Exception e) {
+ DataTransferDetails detail = new DataTransferDetails();
+ TransferStatus status = new TransferStatus();
+ status.setTransferState(TransferState.FAILED);
+ detail.setTransferStatus(status);
+ try {
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+ GFacUtils.saveErrorDetails(jobExecutionContext, e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE);
+ } catch (Exception e1) {
+ throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
+ }
+ throw new GFacHandlerException("Error executing the Handler: " + SSHDirectorySetupHandler.class, e);
+ }
+ }
+
+ public void initProperties(Map<String, String> properties) throws GFacHandlerException {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9bb8c2be/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java
new file mode 100644
index 0000000..3d445e0
--- /dev/null
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java
@@ -0,0 +1,143 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.ssh.handler;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.StringUtil;
+import org.apache.airavata.commons.gfac.type.ActualParameter;
+import org.apache.airavata.commons.gfac.type.MappingFactory;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.MessageContext;
+import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
+import org.apache.airavata.gfac.handler.AbstractHandler;
+import org.apache.airavata.gfac.handler.GFacHandlerException;
+import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+import org.apache.airavata.model.workspace.experiment.DataTransferDetails;
+import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.model.workspace.experiment.TransferState;
+import org.apache.airavata.model.workspace.experiment.TransferStatus;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.apache.airavata.schemas.gfac.URIArrayType;
+import org.apache.airavata.schemas.gfac.URIParameterType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SSHInputHandler extends AbstractHandler {
+
+ private static final Logger log = LoggerFactory.getLogger(SSHInputHandler.class);
+
+
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ DataTransferDetails detail = new DataTransferDetails();
+ TransferStatus status = new TransferStatus();
+ MessageContext inputNew = new MessageContext();
+ try {
+
+ if (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null) {
+ try {
+ GFACSSHUtils.addSecurityContext(jobExecutionContext);
+ } catch (ApplicationSettingsException e) {
+ log.error(e.getMessage());
+ throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+ }
+ }
+ log.info("Invoking SCPInputHandler");
+ super.invoke(jobExecutionContext);
+
+
+ MessageContext input = jobExecutionContext.getInMessageContext();
+ Set<String> parameters = input.getParameters().keySet();
+ for (String paramName : parameters) {
+ ActualParameter actualParameter = (ActualParameter) input.getParameters().get(paramName);
+ String paramValue = MappingFactory.toString(actualParameter);
+ //TODO: Review this with type
+ if ("URI".equals(actualParameter.getType().getType().toString())) {
+ ((URIParameterType) actualParameter.getType()).setValue(stageInputFiles(jobExecutionContext, paramValue));
+ } else if ("URIArray".equals(actualParameter.getType().getType().toString())) {
+ List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue));
+ List<String> newFiles = new ArrayList<String>();
+ for (String paramValueEach : split) {
+ String stageInputFiles = stageInputFiles(jobExecutionContext, paramValueEach);
+ status.setTransferState(TransferState.UPLOAD);
+ detail.setTransferStatus(status);
+ detail.setTransferDescription("Input Data Staged: " + stageInputFiles);
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+ newFiles.add(stageInputFiles);
+ }
+ ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
+ }
+ inputNew.getParameters().put(paramName, actualParameter);
+ }
+ } catch (Exception e) {
+ log.error(e.getMessage());
+ status.setTransferState(TransferState.FAILED);
+ detail.setTransferStatus(status);
+ try {
+ GFacUtils.saveErrorDetails(jobExecutionContext, e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE);
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+ } catch (Exception e1) {
+ throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
+ }
+ throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
+ }
+ jobExecutionContext.setInMessageContext(inputNew);
+ }
+
+ private static String stageInputFiles(JobExecutionContext jobExecutionContext, String paramValue) throws IOException, GFacException {
+ Cluster cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster();
+ if (cluster == null) {
+ throw new GFacException("Security context is not set properly");
+ } else {
+ log.info("Successfully retrieved the Security Context");
+ }
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
+ int i = paramValue.lastIndexOf(File.separator);
+ String substring = paramValue.substring(i + 1);
+ try {
+ String targetFile = app.getInputDataDirectory() + File.separator + substring;
+ if(paramValue.startsWith("file")){
+ paramValue = paramValue.substring(paramValue.indexOf(":") + 1, paramValue.length());
+ }
+ cluster.scpTo(targetFile, paramValue);
+ return targetFile;
+ } catch (SSHApiException e) {
+ throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
+ }
+ }
+
+ public void initProperties(Map<String, String> properties) throws GFacHandlerException {
+
+ }
+}