You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/06/04 22:15:22 UTC
[09/81] [abbrv] airavata git commit: Refactored gfac sub modules,
merged gfac-ssh, gfac-gsissh, gfac-local,
gfac-monitor and gsissh modules and create gface-impl,
removed implementation from gfac-core to gfac-impl
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
deleted file mode 100644
index 0a2aa8d..0000000
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.ssh.handler;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.core.context.JobExecutionContext;
-import org.apache.airavata.gfac.core.handler.AbstractHandler;
-import org.apache.airavata.gfac.core.handler.GFacHandlerException;
-import org.apache.airavata.gfac.core.utils.GFacUtils;
-import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
-import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
-import org.apache.airavata.gsi.ssh.api.Cluster;
-import org.apache.airavata.gsi.ssh.api.SSHApiException;
-import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
-import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPasswordAuthenticationInfo;
-import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPublicKeyFileAuthentication;
-import org.apache.airavata.model.appcatalog.appinterface.DataType;
-import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
-import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
-import org.apache.airavata.model.workspace.experiment.ErrorCategory;
-import org.apache.airavata.registry.cpi.ChildDataType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.*;
-
-/**
- * This handler will copy outputs from airavata installed local directory
- * to a remote location, prior to this handler SCPOutputHandler should be invoked
- * Should add following configuration to gfac-config.xml and configure the keys properly
- * <Handler class="AdvancedSCPOutputHandler">
- <property name="privateKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa"/>
- <property name="publicKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa.pub"/>
- <property name="userName" value="airavata"/>
- <property name="hostName" value="gw98.iu.xsede.org"/>
- <property name="outputPath" value="/home/airavata/outputData"/>
- <property name="passPhrase" value="/home/airavata/outputData"/>
- <property name="password" value="/home/airavata/outputData"/>
-
- */
-public class AdvancedSCPOutputHandler extends AbstractHandler {
- private static final Logger log = LoggerFactory.getLogger(AdvancedSCPOutputHandler.class);
-
- public static final int DEFAULT_SSH_PORT = 22;
-
- private String password = null;
-
- private String publicKeyPath;
-
- private String passPhrase;
-
- private String privateKeyPath;
-
- private String userName;
-
- private String hostName;
-
- private String outputPath;
-
-
- public void initProperties(Properties properties) throws GFacHandlerException {
- password = (String)properties.get("password");
- passPhrase = (String)properties.get("passPhrase");
- privateKeyPath = (String)properties.get("privateKeyPath");
- publicKeyPath = (String)properties.get("publicKeyPath");
- userName = (String)properties.get("userName");
- hostName = (String)properties.get("hostName");
- outputPath = (String)properties.get("outputPath");
- }
-
- @Override
- public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- Cluster pbsCluster = null;
- AuthenticationInfo authenticationInfo = null;
- if (password != null) {
- authenticationInfo = new DefaultPasswordAuthenticationInfo(this.password);
- } else {
- authenticationInfo = new DefaultPublicKeyFileAuthentication(this.publicKeyPath, this.privateKeyPath,
- this.passPhrase);
- }
- try {
- String hostName = jobExecutionContext.getHostName();
- if (jobExecutionContext.getSecurityContext(hostName) == null) {
- try {
- GFACSSHUtils.addSecurityContext(jobExecutionContext);
- } catch (ApplicationSettingsException e) {
- log.error(e.getMessage());
- try {
- StringWriter errors = new StringWriter();
- e.printStackTrace(new PrintWriter(errors));
- GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- } catch (GFacException e1) {
- log.error(e1.getLocalizedMessage());
- }
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
- }
- }
- String standardError = jobExecutionContext.getStandardError();
- String standardOutput = jobExecutionContext.getStandardOutput();
- super.invoke(jobExecutionContext);
- // Server info
- if(jobExecutionContext.getTaskData().getAdvancedOutputDataHandling() != null && jobExecutionContext.getTaskData().getAdvancedOutputDataHandling().getOutputDataDir() != null){
- try{
- URL outputPathURL = new URL(jobExecutionContext.getTaskData().getAdvancedOutputDataHandling().getOutputDataDir());
- this.userName = outputPathURL.getUserInfo();
- this.hostName = outputPathURL.getHost();
- outputPath = outputPathURL.getPath();
- } catch (MalformedURLException e) {
- log.error(e.getLocalizedMessage(),e);
- }
- }
- String key = GFACSSHUtils.prepareSecurityContext(jobExecutionContext, authenticationInfo, this.userName, this.hostName, DEFAULT_SSH_PORT);
- pbsCluster = ((SSHSecurityContext)jobExecutionContext.getSecurityContext(key)).getPbsCluster();
- if(jobExecutionContext.getTaskData().getAdvancedOutputDataHandling() != null && !jobExecutionContext.getTaskData().getAdvancedOutputDataHandling().isPersistOutputData()){
- outputPath = outputPath + File.separator + jobExecutionContext.getExperimentID() + "-" + jobExecutionContext.getTaskData().getTaskID()
- + File.separator;
- pbsCluster.makeDirectory(outputPath);
- }
- pbsCluster.scpTo(outputPath, standardError);
- pbsCluster.scpTo(outputPath, standardOutput);
- List<OutputDataObjectType> outputArray = new ArrayList<OutputDataObjectType>();
- Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters();
- Set<String> keys = output.keySet();
- for (String paramName : keys) {
- OutputDataObjectType outputDataObjectType = (OutputDataObjectType) output.get(paramName);
- if (outputDataObjectType.getType() == DataType.URI) {
- // for failed jobs outputs are not generated. So we should not download outputs
- if (GFacUtils.isFailedJob(jobExecutionContext)){
- continue;
- }
- String downloadFile = outputDataObjectType.getValue();
- if(downloadFile == null || !(new File(downloadFile).isFile())){
- GFacUtils.saveErrorDetails(jobExecutionContext, "Empty Output returned from the application", CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- throw new GFacHandlerException("Empty Output returned from the application.." );
- }
- pbsCluster.scpTo(outputPath, downloadFile);
- String fileName = downloadFile.substring(downloadFile.lastIndexOf(File.separatorChar)+1, downloadFile.length());
- OutputDataObjectType dataObjectType = new OutputDataObjectType();
- dataObjectType.setValue(outputPath + File.separatorChar + fileName);
- dataObjectType.setName(paramName);
- dataObjectType.setType(DataType.URI);
- dataObjectType.setIsRequired(outputDataObjectType.isIsRequired());
- dataObjectType.setRequiredToAddedToCommandLine(outputDataObjectType.isRequiredToAddedToCommandLine());
- dataObjectType.setApplicationArgument(outputDataObjectType.getApplicationArgument());
- dataObjectType.setSearchQuery(outputDataObjectType.getSearchQuery());
- outputArray.add(dataObjectType);
- }else if (outputDataObjectType.getType() == DataType.STDOUT) {
- pbsCluster.scpTo(outputPath, standardOutput);
- String fileName = standardOutput.substring(standardOutput.lastIndexOf(File.separatorChar)+1, standardOutput.length());
- OutputDataObjectType dataObjectType = new OutputDataObjectType();
- dataObjectType.setValue(outputPath + File.separatorChar + fileName);
- dataObjectType.setName(paramName);
- dataObjectType.setType(DataType.STDOUT);
- dataObjectType.setIsRequired(outputDataObjectType.isIsRequired());
- dataObjectType.setRequiredToAddedToCommandLine(outputDataObjectType.isRequiredToAddedToCommandLine());
- dataObjectType.setApplicationArgument(outputDataObjectType.getApplicationArgument());
- dataObjectType.setSearchQuery(outputDataObjectType.getSearchQuery());
- outputArray.add(dataObjectType);
- }else if (outputDataObjectType.getType() == DataType.STDERR) {
- pbsCluster.scpTo(outputPath, standardError);
- String fileName = standardError.substring(standardError.lastIndexOf(File.separatorChar)+1, standardError.length());
- OutputDataObjectType dataObjectType = new OutputDataObjectType();
- dataObjectType.setValue(outputPath + File.separatorChar + fileName);
- dataObjectType.setName(paramName);
- dataObjectType.setType(DataType.STDERR);
- dataObjectType.setIsRequired(outputDataObjectType.isIsRequired());
- dataObjectType.setRequiredToAddedToCommandLine(outputDataObjectType.isRequiredToAddedToCommandLine());
- dataObjectType.setApplicationArgument(outputDataObjectType.getApplicationArgument());
- dataObjectType.setSearchQuery(outputDataObjectType.getSearchQuery());
- outputArray.add(dataObjectType);
- }
- }
- registry.add(ChildDataType.EXPERIMENT_OUTPUT, outputArray, jobExecutionContext.getExperimentID());
- } catch (SSHApiException e) {
- try {
- StringWriter errors = new StringWriter();
- e.printStackTrace(new PrintWriter(errors));
- GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- } catch (GFacException e1) {
- log.error(e1.getLocalizedMessage());
- }
- log.error("Error transfering files to remote host : " + hostName + " with the user: " + userName);
- log.error(e.getMessage());
- throw new GFacHandlerException(e);
- } catch (Exception e) {
- try {
- GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- } catch (GFacException e1) {
- log.error(e1.getLocalizedMessage());
- }
- throw new GFacHandlerException(e);
- }
- }
-
- @Override
- public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- // TODO: Auto generated method body.
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/NewSSHOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/NewSSHOutputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/NewSSHOutputHandler.java
deleted file mode 100644
index 93d0ed0..0000000
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/NewSSHOutputHandler.java
+++ /dev/null
@@ -1,78 +0,0 @@
-package org.apache.airavata.gfac.ssh.handler;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.core.context.JobExecutionContext;
-import org.apache.airavata.gfac.core.handler.AbstractHandler;
-import org.apache.airavata.gfac.core.handler.GFacHandlerException;
-import org.apache.airavata.gfac.core.provider.GFacProviderException;
-import org.apache.airavata.gfac.core.utils.GFacUtils;
-import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
-import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
-import org.apache.airavata.gfac.ssh.util.HandleOutputs;
-import org.apache.airavata.gsi.ssh.api.Cluster;
-import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
-import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
-import org.apache.airavata.model.workspace.experiment.ErrorCategory;
-import org.apache.airavata.registry.cpi.ChildDataType;
-import org.apache.airavata.registry.cpi.RegistryException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class NewSSHOutputHandler extends AbstractHandler{
-
- private static final Logger log = LoggerFactory.getLogger(NewSSHOutputHandler.class);
-
- public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- String hostAddress = jobExecutionContext.getHostName();
- Cluster cluster = null;
- // Security Context and connection
- try {
- if (jobExecutionContext.getSecurityContext(hostAddress) == null) {
- GFACSSHUtils.addSecurityContext(jobExecutionContext);
- }
- cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster();
- if (cluster == null) {
- throw new GFacProviderException("Security context is not set properly");
- } else {
- log.info("Successfully retrieved the Security Context");
- }
- } catch (Exception e) {
- log.error(e.getMessage());
- try {
- StringWriter errors = new StringWriter();
- e.printStackTrace(new PrintWriter(errors));
- GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- } catch (GFacException e1) {
- log.error(e1.getLocalizedMessage());
- }
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
- }
-
- super.invoke(jobExecutionContext);
- List<OutputDataObjectType> outputArray = HandleOutputs.handleOutputs(jobExecutionContext, cluster);
- try {
- registry.add(ChildDataType.EXPERIMENT_OUTPUT, outputArray, jobExecutionContext.getExperimentID());
- } catch (RegistryException e) {
- throw new GFacHandlerException(e);
- }
-
-
- }
-
- @Override
- public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- // TODO: Auto generated method body.
- }
-
- @Override
- public void initProperties(Properties properties) throws GFacHandlerException {
- // TODO Auto-generated method stub
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHDirectorySetupHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHDirectorySetupHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHDirectorySetupHandler.java
deleted file mode 100644
index a985bd3..0000000
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHDirectorySetupHandler.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.ssh.handler;
-
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.core.context.JobExecutionContext;
-import org.apache.airavata.gfac.core.handler.AbstractHandler;
-import org.apache.airavata.gfac.core.handler.GFacHandlerException;
-import org.apache.airavata.gfac.core.utils.GFacUtils;
-import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
-import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
-import org.apache.airavata.gsi.ssh.api.Cluster;
-import org.apache.airavata.model.workspace.experiment.*;
-import org.apache.airavata.registry.cpi.ChildDataType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.Properties;
-
-public class SSHDirectorySetupHandler extends AbstractHandler {
- private static final Logger log = LoggerFactory.getLogger(SSHDirectorySetupHandler.class);
-
- public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- try {
- String hostAddress = jobExecutionContext.getHostName();
- if (jobExecutionContext.getSecurityContext(hostAddress) == null) {
- GFACSSHUtils.addSecurityContext(jobExecutionContext);
- }
- } catch (Exception e) {
- log.error(e.getMessage());
- try {
- StringWriter errors = new StringWriter();
- e.printStackTrace(new PrintWriter(errors));
- GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- } catch (GFacException e1) {
- log.error(e1.getLocalizedMessage());
- }
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
- }
-
- log.info("Setup SSH job directorties");
- super.invoke(jobExecutionContext);
- makeDirectory(jobExecutionContext);
-
- }
-
- @Override
- public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- // TODO: Auto generated method body.
- }
-
- private void makeDirectory(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- Cluster cluster = null;
- try{
- String hostAddress = jobExecutionContext.getHostName();
- cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster();
- if (cluster == null) {
- throw new GFacHandlerException("Security context is not set properly");
- } else {
- log.info("Successfully retrieved the Security Context");
- }
- String workingDirectory = jobExecutionContext.getWorkingDir();
- cluster.makeDirectory(workingDirectory);
- if(!jobExecutionContext.getInputDir().equals(workingDirectory))
- cluster.makeDirectory(jobExecutionContext.getInputDir());
- if(!jobExecutionContext.getOutputDir().equals(workingDirectory))
- cluster.makeDirectory(jobExecutionContext.getOutputDir());
-
- DataTransferDetails detail = new DataTransferDetails();
- TransferStatus status = new TransferStatus();
- status.setTransferState(TransferState.DIRECTORY_SETUP);
- detail.setTransferStatus(status);
- detail.setTransferDescription("Working directory = " + workingDirectory);
-
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
-
- } catch (Exception e) {
- DataTransferDetails detail = new DataTransferDetails();
- TransferStatus status = new TransferStatus();
- status.setTransferState(TransferState.FAILED);
- detail.setTransferStatus(status);
- detail.setTransferDescription("Working directory = " + jobExecutionContext.getWorkingDir());
- try {
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
- StringWriter errors = new StringWriter();
- e.printStackTrace(new PrintWriter(errors));
- GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE);
- } catch (Exception e1) {
- throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
- }
- throw new GFacHandlerException("Error executing the Handler: " + SSHDirectorySetupHandler.class, e);
- }
-
- }
-
- public void initProperties(Properties properties) throws GFacHandlerException {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java
deleted file mode 100644
index b2210a9..0000000
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.ssh.handler;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.core.context.JobExecutionContext;
-import org.apache.airavata.gfac.core.context.MessageContext;
-import org.apache.airavata.gfac.core.handler.AbstractHandler;
-import org.apache.airavata.gfac.core.handler.GFacHandlerException;
-import org.apache.airavata.gfac.core.utils.GFacUtils;
-import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
-import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
-import org.apache.airavata.gsi.ssh.api.Cluster;
-import org.apache.airavata.model.appcatalog.appinterface.DataType;
-import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
-import org.apache.airavata.model.workspace.experiment.*;
-import org.apache.airavata.registry.cpi.ChildDataType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-
-public class SSHInputHandler extends AbstractHandler {
-
- private static final Logger log = LoggerFactory.getLogger(SSHInputHandler.class);
-
-
- public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- DataTransferDetails detail = new DataTransferDetails();
- detail.setTransferDescription("Input Data Staging");
- TransferStatus status = new TransferStatus();
- int index = 0;
- int oldIndex = 0;
- List<String> oldFiles = new ArrayList<String>();
- StringBuffer data = new StringBuffer("|");
- MessageContext inputNew = new MessageContext();
- Cluster cluster = null;
-
- try {
- String hostAddress = jobExecutionContext.getHostName();
- if (jobExecutionContext.getSecurityContext(hostAddress) == null) {
- try {
- GFACSSHUtils.addSecurityContext(jobExecutionContext);
- } catch (ApplicationSettingsException e) {
- log.error(e.getMessage());
- try {
- StringWriter errors = new StringWriter();
- e.printStackTrace(new PrintWriter(errors));
- GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- } catch (GFacException e1) {
- log.error(e1.getLocalizedMessage());
- }
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
- }
- }
-
- cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster();
- if (cluster == null) {
- throw new GFacException("Security context is not set properly");
- } else {
- log.info("Successfully retrieved the Security Context");
- }
- log.info("Invoking SCPInputHandler");
- super.invoke(jobExecutionContext);
-
-
- MessageContext input = jobExecutionContext.getInMessageContext();
- Set<String> parameters = input.getParameters().keySet();
- for (String paramName : parameters) {
- InputDataObjectType inputParamType = (InputDataObjectType) input.getParameters().get(paramName);
- String paramValue = inputParamType.getValue();
- //TODO: Review this with type
- if (inputParamType.getType() == DataType.URI) {
- if (index < oldIndex) {
- log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!");
- inputParamType.setValue(oldFiles.get(index));
- data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index
- } else {
- String stageInputFile = stageInputFiles(cluster, jobExecutionContext, paramValue);
- inputParamType.setValue(stageInputFile);
- StringBuffer temp = new StringBuffer(data.append(stageInputFile).append(",").toString());
- status.setTransferState(TransferState.UPLOAD);
- detail.setTransferStatus(status);
- detail.setTransferDescription("Input Data Staged: " + stageInputFile);
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
-
- GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
- }
- }// FIXME: what is the thrift model DataType equivalent for URIArray type?
-// else if ("URIArray".equals(actualParameter.getType().getType().toString())) {
-// if (index < oldIndex) {
-// log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!");
-// ((URIParameterType) actualParameter.getType()).setValue(oldFiles.get(index));
-// data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index
-// }else{
-// List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue));
-// List<String> newFiles = new ArrayList<String>();
-// for (String paramValueEach : split) {
-// String stageInputFiles = stageInputFiles(cluster,jobExecutionContext, paramValueEach);
-// status.setTransferState(TransferState.UPLOAD);
-// detail.setTransferStatus(status);
-// detail.setTransferDescription("Input Data Staged: " + stageInputFiles);
-// registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
-// newFiles.add(stageInputFiles);
-// StringBuffer temp = new StringBuffer(data.append(stageInputFiles).append(",").toString());
-// GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
-// }
-// ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
-// }
-// }
- inputNew.getParameters().put(paramName, inputParamType);
- }
- } catch (Exception e) {
- log.error(e.getMessage());
- status.setTransferState(TransferState.FAILED);
- detail.setTransferStatus(status);
- try {
- StringWriter errors = new StringWriter();
- e.printStackTrace(new PrintWriter(errors));
- GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE);
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
- } catch (Exception e1) {
- throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
- }
- throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
- }
- jobExecutionContext.setInMessageContext(inputNew);
- }
-
- @Override
- public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- // TODO: Auto generated method body.
- }
-
- private static String stageInputFiles(Cluster cluster, JobExecutionContext jobExecutionContext, String paramValue) throws IOException, GFacException {
- int i = paramValue.lastIndexOf(File.separator);
- String substring = paramValue.substring(i + 1);
- try {
- String targetFile = jobExecutionContext.getInputDir() + File.separator + substring;
- if(paramValue.startsWith("scp:")){
- paramValue = paramValue.substring(paramValue.indexOf(":") + 1, paramValue.length());
- cluster.scpThirdParty(paramValue, targetFile);
- }else{
- if(paramValue.startsWith("file")){
- paramValue = paramValue.substring(paramValue.indexOf(":") + 1, paramValue.length());
- }
- boolean success = false;
- int j = 1;
- while(!success){
- try {
- cluster.scpTo(targetFile, paramValue);
- success = true;
- } catch (Exception e) {
- log.info(e.getLocalizedMessage());
- Thread.sleep(2000);
- if(j==3) {
- throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
- }
- }
- j++;
- }
- }
- return targetFile;
- } catch (Exception e) {
- throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
- }
- }
-
- public void initProperties(Properties properties) throws GFacHandlerException {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java
deleted file mode 100644
index f7fd2f4..0000000
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.ssh.handler;
-
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.gfac.Constants;
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.core.context.JobExecutionContext;
-import org.apache.airavata.gfac.core.handler.AbstractHandler;
-import org.apache.airavata.gfac.core.handler.GFacHandlerException;
-import org.apache.airavata.gfac.core.provider.GFacProviderException;
-import org.apache.airavata.gfac.core.utils.GFacUtils;
-import org.apache.airavata.gfac.core.utils.OutputUtils;
-import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
-import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
-import org.apache.airavata.gsi.ssh.api.Cluster;
-import org.apache.airavata.model.appcatalog.appinterface.DataType;
-import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
-import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
-import org.apache.airavata.model.workspace.experiment.DataTransferDetails;
-import org.apache.airavata.model.workspace.experiment.ErrorCategory;
-import org.apache.airavata.model.workspace.experiment.TaskDetails;
-import org.apache.airavata.model.workspace.experiment.TransferState;
-import org.apache.airavata.model.workspace.experiment.TransferStatus;
-import org.apache.airavata.registry.cpi.ChildDataType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-public class SSHOutputHandler extends AbstractHandler {
- private static final Logger log = LoggerFactory.getLogger(SSHOutputHandler.class);
-
- public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- String hostAddress = jobExecutionContext.getHostName();
- try {
- if (jobExecutionContext.getSecurityContext(hostAddress) == null) {
- GFACSSHUtils.addSecurityContext(jobExecutionContext);
- }
- } catch (Exception e) {
- log.error(e.getMessage());
- try {
- StringWriter errors = new StringWriter();
- e.printStackTrace(new PrintWriter(errors));
- GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- } catch (GFacException e1) {
- log.error(e1.getLocalizedMessage());
- }
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
- }
-
- super.invoke(jobExecutionContext);
- DataTransferDetails detail = new DataTransferDetails();
- detail.setTransferDescription("Output data staging");
- TransferStatus status = new TransferStatus();
-
- Cluster cluster = null;
- try {
- cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster();
- if (cluster == null) {
- throw new GFacProviderException("Security context is not set properly");
- } else {
- log.info("Successfully retrieved the Security Context");
- }
-
- // Get the Stdouts and StdErrs
- String timeStampedExperimentID = GFacUtils.createUniqueNameWithDate(jobExecutionContext.getExperimentID());
-
- TaskDetails taskData = jobExecutionContext.getTaskData();
- String outputDataDir = ServerSettings.getSetting(Constants.OUTPUT_DATA_DIR, File.separator + "tmp");
- File localStdOutFile;
- File localStdErrFile;
- //FIXME: AdvancedOutput is remote location and third party transfer should work to make this work
-// if (taskData.getAdvancedOutputDataHandling() != null) {
-// outputDataDir = taskData.getAdvancedOutputDataHandling().getOutputDataDir();
-// }
- if (outputDataDir == null) {
- outputDataDir = File.separator + "tmp";
- }
- outputDataDir = outputDataDir + File.separator + jobExecutionContext.getExperimentID() + "-" + jobExecutionContext.getTaskData().getTaskID();
- (new File(outputDataDir)).mkdirs();
-
-
- localStdOutFile = new File(outputDataDir + File.separator + timeStampedExperimentID + "stdout");
- localStdErrFile = new File(outputDataDir + File.separator + timeStampedExperimentID + "stderr");
-// cluster.makeDirectory(outputDataDir);
- int i = 0;
- String stdOutStr = "";
- while (stdOutStr.isEmpty()) {
- try {
- cluster.scpFrom(jobExecutionContext.getStandardOutput(), localStdOutFile.getAbsolutePath());
- stdOutStr = GFacUtils.readFileToString(localStdOutFile.getAbsolutePath());
- } catch (Exception e) {
- log.error(e.getLocalizedMessage());
- Thread.sleep(2000);
- }
- i++;
- if (i == 3) break;
- }
- Thread.sleep(1000);
- cluster.scpFrom(jobExecutionContext.getStandardError(), localStdErrFile.getAbsolutePath());
- Thread.sleep(1000);
-
- String stdErrStr = GFacUtils.readFileToString(localStdErrFile.getAbsolutePath());
- status.setTransferState(TransferState.STDOUT_DOWNLOAD);
- detail.setTransferStatus(status);
- detail.setTransferDescription("STDOUT:" + localStdOutFile.getAbsolutePath());
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
-
- status.setTransferState(TransferState.STDERROR_DOWNLOAD);
- detail.setTransferStatus(status);
- detail.setTransferDescription("STDERR:" + localStdErrFile.getAbsolutePath());
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
-
-
- List<OutputDataObjectType> outputArray = new ArrayList<OutputDataObjectType>();
- Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters();
- Set<String> keys = output.keySet();
- for (String paramName : keys) {
- OutputDataObjectType actualParameter = (OutputDataObjectType) output.get(paramName);
- if (DataType.URI == actualParameter.getType()) {
- List<String> outputList = null;
- int retry = 3;
- while (retry > 0) {
- outputList = cluster.listDirectory(jobExecutionContext.getOutputDir());
- if (outputList.size() > 0) {
- break;
- }
- retry--;
- Thread.sleep(2000);
- }
-
- if (outputList.size() == 0 || outputList.get(0).isEmpty() || outputList.size() > 1) {
- OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr, outputArray);
- Set<String> strings = output.keySet();
- outputArray.clear();
- for (String key : strings) {
- OutputDataObjectType actualParameter1 = (OutputDataObjectType) output.get(key);
- if (DataType.URI == actualParameter1.getType()) {
- String downloadFile = actualParameter1.getValue();
- cluster.scpFrom(downloadFile, outputDataDir);
- String fileName = downloadFile.substring(downloadFile.lastIndexOf(File.separatorChar) + 1, downloadFile.length());
- String localFile = outputDataDir + File.separator + fileName;
- jobExecutionContext.addOutputFile(localFile);
- actualParameter1.setValue(localFile);
- OutputDataObjectType dataObjectType = new OutputDataObjectType();
- dataObjectType.setValue(localFile);
- dataObjectType.setName(key);
- dataObjectType.setType(DataType.URI);
- outputArray.add(dataObjectType);
- }else if (DataType.STDOUT == actualParameter.getType()) {
- String fileName = localStdOutFile.getName();
- String localFile = outputDataDir + File.separator + fileName;
- jobExecutionContext.addOutputFile(localFile);
- actualParameter.setValue(localFile);
- OutputDataObjectType dataObjectType = new OutputDataObjectType();
- dataObjectType.setValue(localFile);
- dataObjectType.setName(key);
- dataObjectType.setType(DataType.STDOUT);
- outputArray.add(dataObjectType);
- }else if (DataType.STDERR == actualParameter.getType()) {
- String fileName = localStdErrFile.getName();
- String localFile = outputDataDir + File.separator + fileName;
- jobExecutionContext.addOutputFile(localFile);
- actualParameter.setValue(localFile);
- OutputDataObjectType dataObjectType = new OutputDataObjectType();
- dataObjectType.setValue(localFile);
- dataObjectType.setName(key);
- dataObjectType.setType(DataType.STDERR);
- outputArray.add(dataObjectType);
- }
- }
- break;
- } else if (outputList.size() == 1) {//FIXME: Ultrascan case
- String valueList = outputList.get(0);
- cluster.scpFrom(jobExecutionContext.getOutputDir() + File.separator + valueList, outputDataDir);
- String outputPath = outputDataDir + File.separator + valueList;
- jobExecutionContext.addOutputFile(outputPath);
- actualParameter.setValue(outputPath);
- OutputDataObjectType dataObjectType = new OutputDataObjectType();
- dataObjectType.setValue(outputPath);
- dataObjectType.setName(paramName);
- dataObjectType.setType(DataType.URI);
- outputArray.add(dataObjectType);
- }
- } else {
- OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr, outputArray);
- }
- }
- if (outputArray == null || outputArray.isEmpty()) {
- log.error("Empty Output returned from the Application, Double check the application and ApplicationDescriptor output Parameter Names");
- if (jobExecutionContext.getTaskData().getAdvancedOutputDataHandling() == null) {
- throw new GFacHandlerException(
- "Empty Output returned from the Application, Double check the application"
- + "and ApplicationDescriptor output Parameter Names");
- }
- }
- jobExecutionContext.setStandardError(localStdErrFile.getAbsolutePath());
- jobExecutionContext.setStandardOutput(localStdOutFile.getAbsolutePath());
- jobExecutionContext.setOutputDir(outputDataDir);
- status.setTransferState(TransferState.DOWNLOAD);
- detail.setTransferStatus(status);
- detail.setTransferDescription(outputDataDir);
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
- registry.add(ChildDataType.EXPERIMENT_OUTPUT, outputArray, jobExecutionContext.getExperimentID());
-
- } catch (Exception e) {
- try {
- status.setTransferState(TransferState.FAILED);
- detail.setTransferStatus(status);
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
- StringWriter errors = new StringWriter();
- e.printStackTrace(new PrintWriter(errors));
- GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE);
- } catch (Exception e1) {
- throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
- }
- throw new GFacHandlerException("Error in retrieving results", e);
- }
-
- }
-
- @Override
- public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- // TODO: Auto generated method body.
- }
-
- public void initProperties(Properties properties) throws GFacHandlerException {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
deleted file mode 100644
index cc6cca0..0000000
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
+++ /dev/null
@@ -1,473 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.gfac.ssh.provider.impl;
-
-import org.airavata.appcatalog.cpi.AppCatalogException;
-import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.MonitorPublisher;
-import org.apache.airavata.gfac.Constants;
-import org.apache.airavata.gfac.ExecutionMode;
-import org.apache.airavata.gfac.GFacConfiguration;
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.core.context.JobExecutionContext;
-import org.apache.airavata.gfac.core.context.MessageContext;
-import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
-import org.apache.airavata.gfac.core.handler.GFacHandlerException;
-import org.apache.airavata.gfac.core.handler.ThreadedHandler;
-import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest;
-import org.apache.airavata.gfac.core.notification.events.StartExecutionEvent;
-import org.apache.airavata.gfac.core.provider.AbstractProvider;
-import org.apache.airavata.gfac.core.provider.GFacProviderException;
-import org.apache.airavata.gfac.core.states.GfacExperimentState;
-import org.apache.airavata.gfac.core.utils.GFacUtils;
-import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor;
-import org.apache.airavata.gfac.monitor.email.EmailMonitorFactory;
-import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
-import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
-import org.apache.airavata.gsi.ssh.api.Cluster;
-import org.apache.airavata.gsi.ssh.api.CommandExecutor;
-import org.apache.airavata.gsi.ssh.api.SSHApiException;
-import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
-import org.apache.airavata.gsi.ssh.impl.JobStatus;
-import org.apache.airavata.gsi.ssh.impl.RawCommandInfo;
-import org.apache.airavata.gsi.ssh.impl.StandardOutReader;
-import org.apache.airavata.model.appcatalog.appdeployment.SetEnvPaths;
-import org.apache.airavata.model.appcatalog.appinterface.DataType;
-import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
-import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
-import org.apache.airavata.model.appcatalog.computeresource.MonitorMode;
-import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
-import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
-import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
-import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
-import org.apache.airavata.model.workspace.experiment.ErrorCategory;
-import org.apache.airavata.model.workspace.experiment.ExperimentState;
-import org.apache.airavata.model.workspace.experiment.JobDetails;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.xmlbeans.XmlException;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
-
-import java.io.*;
-import java.net.URL;
-import java.util.*;
-
-/**
- * Execute application using remote SSH
- */
-public class SSHProvider extends AbstractProvider {
- private static final Logger log = LoggerFactory.getLogger(SSHProvider.class);
- private Cluster cluster;
- private String jobID = null;
- private String taskID = null;
- // we keep gsisshprovider to support qsub submission incase of hpc scenario with ssh
- private boolean hpcType = false;
-
- public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
- try {
- super.initialize(jobExecutionContext);
- String hostAddress = jobExecutionContext.getHostName();
- ResourceJobManager resourceJobManager = jobExecutionContext.getResourceJobManager();
- ResourceJobManagerType resourceJobManagerType = resourceJobManager.getResourceJobManagerType();
- if (jobExecutionContext.getSecurityContext(hostAddress) == null) {
- GFACSSHUtils.addSecurityContext(jobExecutionContext);
- }
- taskID = jobExecutionContext.getTaskData().getTaskID();
-
- JobSubmissionProtocol preferredJobSubmissionProtocol = jobExecutionContext.getPreferredJobSubmissionProtocol();
- if (preferredJobSubmissionProtocol == JobSubmissionProtocol.SSH && resourceJobManagerType == ResourceJobManagerType.FORK) {
- jobID = "SSH_" + jobExecutionContext.getHostName() + "_" + Calendar.getInstance().getTimeInMillis();
- cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster();
-
- String remoteFile = jobExecutionContext.getWorkingDir() + File.separatorChar + Constants.EXECUTABLE_NAME;
- details.setJobID(taskID);
- details.setJobDescription(remoteFile);
- jobExecutionContext.setJobDetails(details);
- // FIXME : Why cluster is passed as null
- JobDescriptor jobDescriptor = GFACSSHUtils.createJobDescriptor(jobExecutionContext, cluster);
- details.setJobDescription(jobDescriptor.toXML());
-
- GFacUtils.saveJobStatus(jobExecutionContext, details, JobState.SETUP);
- log.info(remoteFile);
- File runscript = createShellScript(jobExecutionContext);
- cluster.scpTo(remoteFile, runscript.getAbsolutePath());
- } else {
- hpcType = true;
- }
- } catch (ApplicationSettingsException e) {
- log.error(e.getMessage());
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
- } catch (Exception e) {
- throw new GFacProviderException(e.getLocalizedMessage(), e);
- }
- }
-
-
- public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException {
- if (!hpcType) {
- try {
- /*
- * Execute
- */
- String executable = jobExecutionContext.getWorkingDir() + File.separatorChar + Constants.EXECUTABLE_NAME;
- details.setJobDescription(executable);
- RawCommandInfo rawCommandInfo = new RawCommandInfo("/bin/chmod 755 " + executable + "; " + executable);
- StandardOutReader jobIDReaderCommandOutput = new StandardOutReader();
- log.info("Executing RawCommand : " + rawCommandInfo.getCommand());
- CommandExecutor.executeCommand(rawCommandInfo, cluster.getSession(), jobIDReaderCommandOutput);
- String stdOutputString = getOutputifAvailable(jobIDReaderCommandOutput, "Error submitting job to resource");
- log.info("stdout=" + stdOutputString);
- } catch (Exception e) {
- throw new GFacProviderException(e.getMessage(), e);
- }
- } else {
- try {
- StringBuffer data = new StringBuffer();
- jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
- JobDetails jobDetails = new JobDetails();
- String hostAddress = jobExecutionContext.getHostName();
- MonitorPublisher monitorPublisher = jobExecutionContext.getMonitorPublisher();
- try {
- Cluster cluster = null;
- if (jobExecutionContext.getSecurityContext(hostAddress) == null) {
- GFACSSHUtils.addSecurityContext(jobExecutionContext);
- }
- cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster();
- if (cluster == null) {
- throw new GFacProviderException("Security context is not set properly");
- } else {
- log.info("Successfully retrieved the Security Context");
- }
- // This installed path is a mandetory field, because this could change based on the computing resource
- JobDescriptor jobDescriptor = GFACSSHUtils.createJobDescriptor(jobExecutionContext, cluster);
- jobDetails.setJobName(jobDescriptor.getJobName());
- log.info(jobDescriptor.toXML());
- jobDetails.setJobDescription(jobDescriptor.toXML());
- String jobID = cluster.submitBatchJob(jobDescriptor);
- if (jobID != null && !jobID.isEmpty()) {
- jobDetails.setJobID(jobID);
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED);
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
- , GfacExperimentState.JOBSUBMITTED));
- jobExecutionContext.setJobDetails(jobDetails);
- if (verifyJobSubmissionByJobId(cluster, jobID)) {
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
- , GfacExperimentState.JOBSUBMITTED));
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.QUEUED);
- }
- } else {
- jobExecutionContext.setJobDetails(jobDetails);
- String verifyJobId = verifyJobSubmission(cluster, jobDetails);
- if (verifyJobId != null && !verifyJobId.isEmpty()) {
- // JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED
- jobID = verifyJobId;
- jobDetails.setJobID(jobID);
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
- , GfacExperimentState.JOBSUBMITTED));
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.QUEUED);
- }
- }
-
- if (jobID == null || jobID.isEmpty()) {
- log.error("Couldn't find remote jobId for JobName:" + jobDetails.getJobName() + ", ExperimentId:" + jobExecutionContext.getExperimentID());
- GFacUtils.updateExperimentStatus(jobExecutionContext.getExperimentID(), ExperimentState.FAILED);
- return;
- }
- data.append("jobDesc=").append(jobDescriptor.toXML());
- data.append(",jobId=").append(jobDetails.getJobID());
- monitor(jobExecutionContext);
- } catch (SSHApiException e) {
- String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage();
- log.error(error);
- jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
- GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- throw new GFacProviderException(error, e);
- } catch (Exception e) {
- String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage();
- log.error(error);
- jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
- GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- throw new GFacProviderException(error, e);
- } finally {
- log.info("Saving data for future recovery: ");
- log.info(data.toString());
- GFacUtils.saveHandlerData(jobExecutionContext, data, this.getClass().getName());
- }
- } catch (GFacException e) {
- throw new GFacProviderException(e.getMessage(), e);
- }
- }
- }
-
- private boolean verifyJobSubmissionByJobId(Cluster cluster, String jobID) throws SSHApiException {
- JobStatus status = cluster.getJobStatus(jobID);
- return status != null && status != JobStatus.U;
- }
-
- private String verifyJobSubmission(Cluster cluster, JobDetails jobDetails) {
- String jobName = jobDetails.getJobName();
- String jobId = null;
- try {
- jobId = cluster.getJobIdByJobName(jobName, cluster.getServerInfo().getUserName());
- } catch (SSHApiException e) {
- log.error("Error while verifying JobId from JobName");
- }
- return jobId;
- }
-
- public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
-
- }
-
- public boolean cancelJob(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
- JobDetails jobDetails = jobExecutionContext.getJobDetails();
- StringBuffer data = new StringBuffer();
- String hostAddress = jobExecutionContext.getHostName();
- if (!hpcType) {
- throw new NotImplementedException();
- } else {
- Cluster cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster();
- if (cluster == null) {
- throw new GFacProviderException("Security context is not set properly");
- } else {
- log.info("Successfully retrieved the Security Context");
- }
- // This installed path is a mandetory field, because this could change based on the computing resource
- if (jobDetails == null) {
- log.error("There is not JobDetails, Cancel request can't be performed !!!");
- return false;
- }
- try {
- if (jobDetails.getJobID() != null) {
- if (cluster.cancelJob(jobDetails.getJobID()) != null) {
- // if this operation success without any exceptions, we can assume cancel operation succeeded.
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.CANCELED);
- return true;
- } else {
- log.info("Job Cancel operation failed");
- }
- } else {
- log.error("No Job Id is set, so cannot perform the cancel operation !!!");
- throw new GFacProviderException("Cancel request failed to cancel job as JobId is null in Job Execution Context");
- }
- } catch (SSHApiException e) {
- String error = "Cancel request failed " + jobExecutionContext.getHostName() + " message: " + e.getMessage();
- log.error(error);
- StringWriter errors = new StringWriter();
- e.printStackTrace(new PrintWriter(errors));
- GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
-// throw new GFacProviderException(error, e);
- } catch (Exception e) {
- String error = "Cancel request failed " + jobExecutionContext.getHostName() + " message: " + e.getMessage();
- log.error(error);
- StringWriter errors = new StringWriter();
- e.printStackTrace(new PrintWriter(errors));
- GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
-// throw new GFacProviderException(error, e);
- }
- return false;
- }
- }
-
- private File createShellScript(JobExecutionContext context) throws IOException {
- String uniqueDir = jobExecutionContext.getApplicationName() + System.currentTimeMillis()
- + new Random().nextLong();
-
- File shellScript = File.createTempFile(uniqueDir, "sh");
- OutputStream out = new FileOutputStream(shellScript);
-
- out.write("#!/bin/bash\n".getBytes());
- out.write(("cd " + jobExecutionContext.getWorkingDir() + "\n").getBytes());
- out.write(("export " + Constants.INPUT_DATA_DIR_VAR_NAME + "=" + jobExecutionContext.getInputDir() + "\n").getBytes());
- out.write(("export " + Constants.OUTPUT_DATA_DIR_VAR_NAME + "=" + jobExecutionContext.getOutputDir() + "\n")
- .getBytes());
- // get the env of the host and the application
- List<SetEnvPaths> envPathList = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getSetEnvironment();
- for (SetEnvPaths setEnvPaths : envPathList) {
- log.debug("Env[" + setEnvPaths.getName() + "] = " + setEnvPaths.getValue());
- out.write(("export " + setEnvPaths.getName() + "=" + setEnvPaths.getValue() + "\n").getBytes());
- }
-
- // prepare the command
- final String SPACE = " ";
- StringBuffer cmd = new StringBuffer();
- cmd.append(jobExecutionContext.getExecutablePath());
- cmd.append(SPACE);
-
- MessageContext input = context.getInMessageContext();
- Map<String, Object> inputs = input.getParameters();
- Set<String> keys = inputs.keySet();
- for (String paramName : keys) {
- InputDataObjectType inputParamType = (InputDataObjectType) input.getParameters().get(paramName);
- //if ("URIArray".equals(actualParameter.getType().getType().toString())) {
- if (inputParamType.getType() == DataType.URI) {
- String value = inputParamType.getValue();
- cmd.append(value);
- cmd.append(SPACE);
- } else {
- String paramValue = inputParamType.getValue();
- cmd.append(paramValue);
- cmd.append(SPACE);
- }
- }
- // We redirect the error and stdout to remote files, they will be read
- // in later
- cmd.append(SPACE);
- cmd.append("1>");
- cmd.append(SPACE);
- cmd.append(jobExecutionContext.getStandardOutput());
- cmd.append(SPACE);
- cmd.append("2>");
- cmd.append(SPACE);
- cmd.append(jobExecutionContext.getStandardError());
-
- String cmdStr = cmd.toString();
- log.info("Command = " + cmdStr);
- out.write((cmdStr + "\n").getBytes());
- String message = "\"execuationSuceeded\"";
- out.write(("echo " + message + "\n").getBytes());
- out.close();
-
- return shellScript;
- }
-
- public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
-
- }
-
- /**
- * This method will read standard output and if there's any it will be parsed
- *
- * @param jobIDReaderCommandOutput
- * @param errorMsg
- * @return
- * @throws SSHApiException
- */
- private String getOutputifAvailable(StandardOutReader jobIDReaderCommandOutput, String errorMsg) throws SSHApiException {
- String stdOutputString = jobIDReaderCommandOutput.getStdOutputString();
- String stdErrorString = jobIDReaderCommandOutput.getStdErrorString();
-
- if (stdOutputString == null || stdOutputString.isEmpty() || (stdErrorString != null && !stdErrorString.isEmpty())) {
- log.error("Standard Error output : " + stdErrorString);
- throw new SSHApiException(errorMsg + stdErrorString);
- }
- return stdOutputString;
- }
-
- public void recover(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
- // have to implement the logic to recover a gfac failure
- initialize(jobExecutionContext);
- if(hpcType) {
- log.info("Invoking Recovering for the Experiment: " + jobExecutionContext.getExperimentID());
- String hostName = jobExecutionContext.getHostName();
- String jobId = "";
- String jobDesc = "";
- String jobName = "";
- try {
- String pluginData = GFacUtils.getHandlerData(jobExecutionContext, this.getClass().getName());
- String[] split = pluginData.split(",");
- if (split.length < 2) {
- this.execute(jobExecutionContext);
- return;
- }
- jobDesc = split[0].substring(8);
- jobId = split[1].substring(6);
- try {
- JobDescriptor jobDescriptor = JobDescriptor.fromXML(jobDesc);
- jobName = jobDescriptor.getJobName();
- } catch (XmlException e) {
- log.error(e.getMessage(), e);
- log.error("Cannot parse plugin data stored, but trying to recover");
-
- }
- log.info("Following data have recovered: ");
- log.info("Job Description: " + jobDesc);
- log.info("Job Id: " + jobId);
- if (jobName.isEmpty() || jobId.isEmpty() || "none".equals(jobId) ||
- "".equals(jobId)) {
- log.info("Cannot recover data so submitting the job again !!!");
- this.execute(jobExecutionContext);
- return;
- }
- } catch (Exception e) {
- log.error("Error while recovering provider", e);
- }
- try {
- // Now we are we have enough data to recover
- JobDetails jobDetails = new JobDetails();
- jobDetails.setJobDescription(jobDesc);
- jobDetails.setJobID(jobId);
- jobDetails.setJobName(jobName);
- jobExecutionContext.setJobDetails(jobDetails);
- if (jobExecutionContext.getSecurityContext(hostName) == null) {
- try {
- GFACSSHUtils.addSecurityContext(jobExecutionContext);
- } catch (ApplicationSettingsException e) {
- log.error(e.getMessage());
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
- }
- }
- monitor(jobExecutionContext);
- } catch (Exception e) {
- log.error("Error while recover the job", e);
- throw new GFacProviderException("Error delegating already ran job to Monitoring", e);
- }
- }else{
- log.info("We do not handle non hpc recovery so we simply run the Job directly");
- this.execute(jobExecutionContext);
- }
- }
-
- @Override
- public void monitor(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
- if (jobExecutionContext.getPreferredJobSubmissionProtocol() == JobSubmissionProtocol.SSH) {
- String jobSubmissionInterfaceId = jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionInterfaceId();
- SSHJobSubmission sshJobSubmission = null;
- try {
- sshJobSubmission = jobExecutionContext.getAppCatalog().getComputeResource().getSSHJobSubmission(jobSubmissionInterfaceId);
- } catch (AppCatalogException e) {
- throw new GFacException("Error while reading compute resource", e);
- }
- MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
- if (monitorMode != null && monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR) {
- try {
- EmailBasedMonitor emailBasedMonitor = EmailMonitorFactory.getEmailBasedMonitor(
- sshJobSubmission.getResourceJobManager().getResourceJobManagerType());
- emailBasedMonitor.addToJobMonitorMap(jobExecutionContext);
- } catch (AiravataException e) {
- throw new GFacHandlerException("Error while activating email job monitoring ", e);
- }
- return;
- }
- } else {
- throw new IllegalArgumentException("Monitoring is implemented only for SSH, "
- + jobExecutionContext.getPreferredJobSubmissionProtocol().name() + " is not yet implemented");
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/security/SSHSecurityContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/security/SSHSecurityContext.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/security/SSHSecurityContext.java
deleted file mode 100644
index c406c41..0000000
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/security/SSHSecurityContext.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.airavata.gfac.ssh.security;
-
-import java.io.IOException;
-
-import net.schmizz.sshj.SSHClient;
-import net.schmizz.sshj.connection.channel.direct.Session;
-import net.schmizz.sshj.userauth.keyprovider.KeyProvider;
-
-import org.apache.airavata.gfac.SecurityContext;
-import org.apache.airavata.gsi.ssh.api.Cluster;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Handle SSH security
- */
-public class SSHSecurityContext implements SecurityContext {
- private static final Logger log = LoggerFactory.getLogger(SSHSecurityContext.class);
-
- private String username;
- private String privateKeyLoc;
- private String keyPass;
- private SSHClient sshClient;
- private Session session;
-
- private Cluster pbsCluster;
-
- public String getUsername() {
- return username;
- }
-
- public void setUsername(String username) {
- this.username = username;
- }
-
- public String getPrivateKeyLoc() {
- return privateKeyLoc;
- }
-
- public void setPrivateKeyLoc(String privateKeyLoc) {
- this.privateKeyLoc = privateKeyLoc;
- }
-
- public String getKeyPass() {
- return keyPass;
- }
-
- public void setKeyPass(String keyPass) {
- this.keyPass = keyPass;
- }
-
- public void closeSession(Session session) {
- if (session != null) {
- try {
- session.close();
- } catch (Exception e) {
- log.warn("Cannot Close SSH Session");
- }
- }
- }
-
- public Session getSession(String hostAddress) throws IOException {
- try {
- if (sshClient == null) {
- sshClient = new SSHClient();
- }
- if (getSSHClient().isConnected())
- return getSSHClient().startSession();
-
- KeyProvider pkey = getSSHClient().loadKeys(getPrivateKeyLoc(), getKeyPass());
-
- getSSHClient().loadKnownHosts();
-
- getSSHClient().connect(hostAddress);
- getSSHClient().authPublickey(getUsername(), pkey);
- session = getSSHClient().startSession();
- return session;
-
- } catch (NullPointerException ne) {
- throw new SecurityException("Cannot load security context for SSH", ne);
- }
- }
-
- public SSHClient getSSHClient() {
- if (sshClient == null) {
- sshClient = new SSHClient();
- }
- return sshClient;
- }
-
- public void setPbsCluster(Cluster pbsCluster) {
- this.pbsCluster = pbsCluster;
- }
-
- public Cluster getPbsCluster() {
- return this.pbsCluster;
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/security/TokenizedSSHAuthInfo.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/security/TokenizedSSHAuthInfo.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/security/TokenizedSSHAuthInfo.java
deleted file mode 100644
index f09a662..0000000
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/security/TokenizedSSHAuthInfo.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.ssh.security;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.IOUtil;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.credential.store.credential.Credential;
-import org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential;
-import org.apache.airavata.credential.store.store.CredentialReader;
-import org.apache.airavata.gfac.Constants;
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.RequestData;
-import org.apache.airavata.gfac.core.utils.GFacUtils;
-import org.apache.airavata.gsi.ssh.api.authentication.SSHPublicKeyFileAuthentication;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-import java.util.Properties;
-
-public class TokenizedSSHAuthInfo implements SSHPublicKeyFileAuthentication {
- protected static final Logger log = LoggerFactory.getLogger(TokenizedSSHAuthInfo.class);
-
- private String publicKeyFile;
-
- private String privateKeyFile;
-
- private String passPhrase = null;
-
- private SSHCredential gssCredentials = null;
-
- private CredentialReader credentialReader;
-
- private RequestData requestData;
-
- public TokenizedSSHAuthInfo(CredentialReader credentialReader, RequestData requestData) {
- this.credentialReader = credentialReader;
- this.requestData = requestData;
- }
-
- public TokenizedSSHAuthInfo(RequestData requestData) {
- this.requestData = requestData;
- }
-
- public String getPublicKeyFile(String userName, String hostName) {
- return publicKeyFile;
- }
-
- public String getPrivateKeyFile(String userName, String hostName) {
- return privateKeyFile;
- }
-
- public String getPassPhrase() {
- return passPhrase;
- }
-
- public void bannerMessage(String message) {
-
- }
-
- public SSHCredential getCredentials() throws SecurityException {
-
- if (gssCredentials == null) {
-
- try {
- gssCredentials = getCredentialsFromStore();
- } catch (Exception e) {
- log.error("An exception occurred while retrieving credentials from the credential store. " +
- "Will continue with my proxy user name and password. Provided TokenId:" + requestData.getTokenId() + e.getMessage(), e);
- }
-
- if (gssCredentials == null) {
- System.out.println("Authenticating with provided token failed, so falling back to authenticate with defaultCredentials");
- try {
- gssCredentials = getDefaultCredentials();
- } catch (Exception e) {
- throw new SecurityException("Error retrieving my proxy using username password",e.getCause());
- }
- }
- // if still null, throw an exception
- if (gssCredentials == null) {
- throw new SecurityException("Unable to retrieve my proxy credentials to continue operation.");
- }
- }
-
- return gssCredentials;
- }
-
-
- /**
- * Reads the credentials from credential store.
- *
- * @return If token is found in the credential store, will return a valid credential. Else returns null.
- * @throws Exception If an error occurred while retrieving credentials.
- */
- public SSHCredential getCredentialsFromStore() throws Exception {
-
- if (getCredentialReader() == null) {
- credentialReader = GFacUtils.getCredentialReader();
- if(credentialReader == null){
- return null;
- }
- }
-
- Credential credential = getCredentialReader().getCredential(getRequestData().getGatewayId(),
- getRequestData().getTokenId());
-
- if (credential instanceof SSHCredential) {
- SSHCredential credential1 = (SSHCredential) credential;
- this.publicKeyFile = writeFileToDisk(credential1.getPublicKey());
- this.privateKeyFile = writeFileToDisk(credential1.getPrivateKey());
- this.passPhrase = credential1.getPassphrase();
- System.out.println(this.publicKeyFile);
- System.out.println(this.privateKeyFile);
- System.out.println(this.passPhrase);
- this.getRequestData().setRequestUser(credential1.getPortalUserName());
- return credential1;
- } else {
- log.info("Could not find SSH credentials for token - " + getRequestData().getTokenId() + " and "
- + "gateway id - " + getRequestData().getGatewayId());
- }
-
- return null;
- }
-
- /**
- * Gets the default proxy certificate.
- *
- * @return Default my proxy credentials.
- * @throws org.apache.airavata.gfac.GFacException If an error occurred while retrieving credentials.
- * @throws org.apache.airavata.common.exception.ApplicationSettingsException
- */
- public SSHCredential getDefaultCredentials() throws GFacException, ApplicationSettingsException, IOException {
- Properties configurationProperties = ServerSettings.getProperties();
- String sshUserName = configurationProperties.getProperty(Constants.SSH_USER_NAME);
- this.getRequestData().setRequestUser(sshUserName);
- this.privateKeyFile = configurationProperties.getProperty(Constants.SSH_PRIVATE_KEY);
- this.publicKeyFile = configurationProperties.getProperty(Constants.SSH_PUBLIC_KEY);
- this.passPhrase = configurationProperties.getProperty(Constants.SSH_PRIVATE_KEY_PASS);
- this.getRequestData().setRequestUser(sshUserName);
- return new SSHCredential(IOUtil.readToByteArray(new File(this.privateKeyFile)), IOUtil.readToByteArray(new File(this.publicKeyFile)), this.passPhrase, requestData.getGatewayId(), sshUserName);
- }
-
- public CredentialReader getCredentialReader() {
- return credentialReader;
- }
-
- public RequestData getRequestData() {
- return requestData;
- }
-
- private String writeFileToDisk(byte[] data) {
- File temp = null;
- try {
- temp = File.createTempFile("id_rsa", "");
- //write it
- FileOutputStream bw = new FileOutputStream(temp);
- bw.write(data);
- bw.close();
- } catch (IOException e) {
- log.error(e.getMessage(), e);
- }
- return temp.getAbsolutePath();
- }
-}