You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/06/03 20:14:46 UTC
[29/39] airavata git commit: Refactored gfac sub modules,
merged gfac-ssh, gfac-gsissh, gfac-local,
gfac-monitor and gsissh modules and create gface-impl,
removed implementation from gfac-core to gfac-impl
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java
deleted file mode 100644
index f027ccb..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.core.utils;
-
-import org.apache.airavata.common.utils.MonitorPublisher;
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.core.context.JobExecutionContext;
-import org.apache.airavata.gfac.core.cpi.GFac;
-import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.model.messaging.event.TaskIdentifier;
-import org.apache.airavata.model.messaging.event.TaskStatusChangeRequestEvent;
-import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
-import org.apache.airavata.model.workspace.experiment.ErrorCategory;
-import org.apache.airavata.model.workspace.experiment.TaskState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-
-public class OutHandlerWorker implements Runnable {
- private final static Logger logger = LoggerFactory.getLogger(OutHandlerWorker.class);
-
- private GFac gfac;
-
- private MonitorID monitorID;
-
- private MonitorPublisher monitorPublisher;
- private JobExecutionContext jEC;
-
- public OutHandlerWorker(GFac gfac, MonitorID monitorID,MonitorPublisher monitorPublisher) {
- this.gfac = gfac;
- this.monitorID = monitorID;
- this.monitorPublisher = monitorPublisher;
- this.jEC = monitorID.getJobExecutionContext();
- }
-
- public OutHandlerWorker(JobExecutionContext jEC) {
- this.jEC = jEC;
- this.gfac = jEC.getGfac();
- this.monitorPublisher = jEC.getMonitorPublisher();
- }
-
- @Override
- public void run() {
- try {
-// gfac.invokeOutFlowHandlers(monitorID.getJobExecutionContext());
- gfac.invokeOutFlowHandlers(jEC);
- } catch (Exception e) {
- logger.error(e.getMessage(),e);
- TaskIdentifier taskIdentifier = new TaskIdentifier(monitorID.getTaskID(), monitorID.getWorkflowNodeID(),monitorID.getExperimentID(), monitorID.getJobExecutionContext().getGatewayID());
- //FIXME this is a case where the output retrieving fails even if the job execution was a success. Thus updating the task status
- monitorPublisher.publish(new TaskStatusChangeRequestEvent(TaskState.FAILED, taskIdentifier));
- try {
- StringWriter errors = new StringWriter();
- e.printStackTrace(new PrintWriter(errors));
- GFacUtils.saveErrorDetails(monitorID.getJobExecutionContext(), errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- } catch (GFacException e1) {
- logger.error("Error while persisting error details", e);
- }
- logger.info(e.getLocalizedMessage(), e);
- // Save error details to registry
-
- }
-// monitorPublisher.publish(monitorID.getStatus());
- monitorPublisher.publish(jEC.getJobDetails().getJobStatus());
-
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutputUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutputUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutputUtils.java
deleted file mode 100644
index 3c8bbf0..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutputUtils.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.core.utils;
-
-import org.apache.airavata.common.utils.StringUtil;
-import org.apache.airavata.gfac.core.handler.GFacHandlerException;
-import org.apache.airavata.model.appcatalog.appinterface.DataType;
-import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-public class OutputUtils {
- private static String regexPattern = "\\s*=\\s*(.*)\\r?\\n";
-
- public static void fillOutputFromStdout(Map<String, Object> output, String stdout, String stderr, List<OutputDataObjectType> outputArray) throws Exception {
- // this is no longer correct
-// if (stdout == null || stdout.equals("")) {
-// throw new GFacHandlerException("Standard output is empty.");
-// }
-
- Set<String> keys = output.keySet();
- OutputDataObjectType actual = null;
- OutputDataObjectType resultOutput = null;
- for (String paramName : keys) {
- actual = (OutputDataObjectType) output.get(paramName);
- // if parameter value is not already set, we let it go
-
- if (actual == null) {
- continue;
- }
- resultOutput = new OutputDataObjectType();
- if (DataType.STDOUT == actual.getType()) {
- actual.setValue(stdout);
- resultOutput.setName(paramName);
- resultOutput.setType(DataType.STDOUT);
- resultOutput.setValue(stdout);
- outputArray.add(resultOutput);
- } else if (DataType.STDERR == actual.getType()) {
- actual.setValue(stderr);
- resultOutput.setName(paramName);
- resultOutput.setType(DataType.STDERR);
- resultOutput.setValue(stderr);
- outputArray.add(resultOutput);
- }
-// else if ("URI".equals(actual.getType().getType().toString())) {
-// continue;
-// }
- else {
- String parseStdout = parseStdout(stdout, paramName);
- if (parseStdout != null) {
- actual.setValue(parseStdout);
- resultOutput.setName(paramName);
- resultOutput.setType(DataType.STRING);
- resultOutput.setValue(parseStdout);
- outputArray.add(resultOutput);
- }
- }
- }
- }
-
- private static String parseStdout(String stdout, String outParam) throws Exception {
- String regex = Pattern.quote(outParam) + regexPattern;
- String match = null;
- Pattern pattern = Pattern.compile(regex);
- Matcher matcher = pattern.matcher(stdout);
- while (matcher.find()) {
- match = matcher.group(1);
- }
- if (match != null) {
- match = match.trim();
- return match;
- }
- return null;
- }
-
- public static String[] parseStdoutArray(String stdout, String outParam) throws Exception {
- String regex = Pattern.quote(outParam) + regexPattern;
- StringBuffer match = new StringBuffer();
- Pattern pattern = Pattern.compile(regex);
- Matcher matcher = pattern.matcher(stdout);
- while (matcher.find()) {
- match.append(matcher.group(1) + StringUtil.DELIMETER);
- }
- if (match != null && match.length() >0) {
- return StringUtil.getElementsFromString(match.toString());
- }
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-gsissh/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/pom.xml b/modules/gfac/gfac-gsissh/pom.xml
deleted file mode 100644
index 81c3ec9..0000000
--- a/modules/gfac/gfac-gsissh/pom.xml
+++ /dev/null
@@ -1,117 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-
-<!--Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under
- the Apache License, Version 2.0 (theĆ "License"); you may not use this file except in compliance with the License. You may
- obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to
- in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
- ANY ~ KIND, either express or implied. See the License for the specific language governing permissions and limitations under
- the License. -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <groupId>org.apache.airavata</groupId>
- <artifactId>gfac</artifactId>
- <version>0.16-SNAPSHOT</version>
- <relativePath>../pom.xml</relativePath>
- </parent>
-
- <modelVersion>4.0.0</modelVersion>
- <artifactId>airavata-gfac-gsissh</artifactId>
- <name>Airavata GFac GSI-SSH implementation</name>
- <description>This is the extension of </description>
- <url>http://airavata.apache.org/</url>
-
- <dependencies>
-
- <dependency>
- <groupId>org.apache.airavata</groupId>
- <artifactId>airavata-gfac-email-monitor</artifactId>
- <version>${project.version}</version>
- </dependency>
- <!-- Logging -->
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
-
- <!-- GFAC dependencies -->
- <dependency>
- <groupId>org.apache.airavata</groupId>
- <artifactId>airavata-gfac-core</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.airavata</groupId>
- <artifactId>airavata-gfac-ssh</artifactId>
- <version>${project.version}</version>
- </dependency>
- <!-- Credential Store -->
- <dependency>
- <groupId>org.apache.airavata</groupId>
- <artifactId>airavata-credential-store</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.airavata</groupId>
- <artifactId>airavata-server-configuration</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.airavata</groupId>
- <artifactId>airavata-client-configuration</artifactId>
- <scope>test</scope>
- </dependency>
-
-
- <!-- Test -->
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.testng</groupId>
- <artifactId>testng</artifactId>
- <version>6.1.1</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>jcl-over-slf4j</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <scope>test</scope>
- </dependency>
-
- <!-- gsi-ssh api dependencies -->
- <dependency>
- <groupId>org.apache.airavata</groupId>
- <artifactId>gsissh</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.airavata</groupId>
- <artifactId>airavata-data-models</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>com.jcraft</groupId>
- <artifactId>jsch</artifactId>
- <version>0.1.50</version>
- </dependency>
- <dependency>
- <groupId>org.apache.xmlbeans</groupId>
- <artifactId>xmlbeans</artifactId>
- <version>${xmlbeans.version}</version>
- </dependency>
- <dependency>
- <groupId>net.schmizz</groupId>
- <artifactId>sshj</artifactId>
- <version>0.6.1</version>
- </dependency>
- </dependencies>
-</project>
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHDirectorySetupHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHDirectorySetupHandler.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHDirectorySetupHandler.java
deleted file mode 100644
index b4790c7..0000000
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHDirectorySetupHandler.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.gsissh.handler;
-
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.core.context.JobExecutionContext;
-import org.apache.airavata.gfac.core.handler.AbstractHandler;
-import org.apache.airavata.gfac.core.handler.GFacHandlerException;
-import org.apache.airavata.gfac.core.utils.GFacUtils;
-import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
-import org.apache.airavata.gfac.gsissh.util.GFACGSISSHUtils;
-import org.apache.airavata.gsi.ssh.api.Cluster;
-import org.apache.airavata.model.workspace.experiment.*;
-import org.apache.airavata.registry.cpi.ChildDataType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.Properties;
-
-public class GSISSHDirectorySetupHandler extends AbstractHandler {
- private static final Logger log = LoggerFactory.getLogger(GSISSHDirectorySetupHandler.class);
-
- public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- try {
- String hostAddress = jobExecutionContext.getHostName();
- if (jobExecutionContext.getSecurityContext(hostAddress) == null) {
- GFACGSISSHUtils.addSecurityContext(jobExecutionContext);
- }
- } catch (Exception e) {
- try {
- StringWriter errors = new StringWriter();
- e.printStackTrace(new PrintWriter(errors));
- GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- } catch (GFacException e1) {
- log.error(e1.getLocalizedMessage());
- }
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
- }
-
- log.info("Setup SSH job directorties");
- super.invoke(jobExecutionContext);
- makeDirectory(jobExecutionContext);
- }
- private void makeDirectory(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- Cluster cluster = null;
- try {
- String hostAddress = jobExecutionContext.getHostName();
- cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster();
- if (cluster == null) {
- try {
- GFacUtils.saveErrorDetails(jobExecutionContext, "Security context is not set properly", CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- } catch (GFacException e1) {
- log.error(e1.getLocalizedMessage());
- }
- throw new GFacHandlerException("Security context is not set properly");
- } else {
- log.info("Successfully retrieved the Security Context");
- }
-
- String workingDirectory = jobExecutionContext.getWorkingDir();
- cluster.makeDirectory(workingDirectory);
- if(!jobExecutionContext.getInputDir().equals(workingDirectory))
- cluster.makeDirectory(jobExecutionContext.getInputDir());
- if(!jobExecutionContext.getOutputDir().equals(workingDirectory))
- cluster.makeDirectory(jobExecutionContext.getOutputDir());
-
- DataTransferDetails detail = new DataTransferDetails();
- TransferStatus status = new TransferStatus();
- status.setTransferState(TransferState.DIRECTORY_SETUP);
- detail.setTransferStatus(status);
- detail.setTransferDescription("Working directory = " + workingDirectory);
-
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
-
- } catch (Exception e) {
- DataTransferDetails detail = new DataTransferDetails();
- TransferStatus status = new TransferStatus();
- detail.setTransferDescription("Working directory = " + jobExecutionContext.getWorkingDir());
- status.setTransferState(TransferState.FAILED);
- detail.setTransferStatus(status);
- try {
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
- GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE);
- } catch (Exception e1) {
- throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
- }
- throw new GFacHandlerException("Error executing the Handler: " + GSISSHDirectorySetupHandler.class, e);
- }
- }
-
- public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- this.invoke(jobExecutionContext);
- }
-
- public void initProperties(Properties properties) throws GFacHandlerException {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java
deleted file mode 100644
index 3b36e86..0000000
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.gsissh.handler;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.core.context.JobExecutionContext;
-import org.apache.airavata.gfac.core.context.MessageContext;
-import org.apache.airavata.gfac.core.handler.AbstractHandler;
-import org.apache.airavata.gfac.core.handler.GFacHandlerException;
-import org.apache.airavata.gfac.core.utils.GFacUtils;
-import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
-import org.apache.airavata.gfac.gsissh.util.GFACGSISSHUtils;
-import org.apache.airavata.gsi.ssh.api.Cluster;
-import org.apache.airavata.model.appcatalog.appinterface.DataType;
-import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
-import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
-import org.apache.airavata.model.workspace.experiment.DataTransferDetails;
-import org.apache.airavata.model.workspace.experiment.ErrorCategory;
-import org.apache.airavata.model.workspace.experiment.TransferState;
-import org.apache.airavata.model.workspace.experiment.TransferStatus;
-import org.apache.airavata.registry.cpi.ChildDataType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-
-/**
- * Recoverability for this handler assumes the same input values will come in the second
- * run, and assume nobody is changing registry during the original submission and re-submission
- */
-public class GSISSHInputHandler extends AbstractHandler {
- private static final Logger log = LoggerFactory.getLogger(GSISSHInputHandler.class);
-
-
- public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- super.invoke(jobExecutionContext);
- int index = 0;
- int oldIndex = 0;
- List<String> oldFiles = new ArrayList<String>();
- MessageContext inputNew = new MessageContext();
- DataTransferDetails detail = new DataTransferDetails();
- TransferStatus status = new TransferStatus();
- StringBuffer data = new StringBuffer("|");
- Cluster cluster = null;
-
- try {
- String hostAddress = jobExecutionContext.getHostName();
- if (jobExecutionContext.getSecurityContext(hostAddress) == null) {
- GFACGSISSHUtils.addSecurityContext(jobExecutionContext);
- }
-
- cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster();
- if (cluster == null) {
- throw new GFacException("Security context is not set properly");
- } else {
- log.info("Successfully retrieved the Security Context");
- }
-
- String pluginData = GFacUtils.getHandlerData(jobExecutionContext, this.getClass().getName());
- if (pluginData != null) {
- try {
- oldIndex = Integer.parseInt(pluginData.split("\\|")[0].trim());
- oldFiles = Arrays.asList(pluginData.split("\\|")[1].split(","));
- if (oldIndex == oldFiles.size()) {
- log.info("Old data looks good !!!!");
- } else {
- oldIndex = 0;
- oldFiles.clear();
- }
- } catch (NumberFormatException e) {
- log.error("Previously stored data " + pluginData + " is wrong so we continue the operations");
- }
- }
- if (jobExecutionContext.getSecurityContext(hostAddress) == null) {
- try {
- GFACGSISSHUtils.addSecurityContext(jobExecutionContext);
- } catch (ApplicationSettingsException e) {
- log.error(e.getMessage());
- try {
- GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- } catch (GFacException e1) {
- log.error(e1.getLocalizedMessage());
- }
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
- }
- }
- log.info("Invoking SCPInputHandler");
-
- MessageContext input = jobExecutionContext.getInMessageContext();
- Set<String> parameters = input.getParameters().keySet();
- for (String paramName : parameters) {
- InputDataObjectType inputParamType = (InputDataObjectType) input.getParameters().get(paramName);
- String paramValue = inputParamType.getValue();
- //TODO: Review this with type
- if (inputParamType.getType() == DataType.URI) {
- if (index < oldIndex) {
- log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!");
- inputParamType.setValue(oldFiles.get(index));
- data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index
- } else {
- String stageInputFile = stageInputFiles(cluster, jobExecutionContext, paramValue);
- inputParamType.setValue(stageInputFile);
- StringBuffer temp = new StringBuffer(data.append(stageInputFile).append(",").toString());
- status.setTransferState(TransferState.UPLOAD);
- detail.setTransferStatus(status);
- detail.setTransferDescription("Input Data Staged: " + stageInputFile);
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
-
- GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
- }
- } // FIXME: what is the thrift model DataType equivalent for URIArray type?
-// else if ("URIArray".equals(inputParamType.getType().getType().toString())) {
-// List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue));
-// List<String> newFiles = new ArrayList<String>();
-// for (String paramValueEach : split) {
-// if (index < oldIndex) {
-// log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!");
-// newFiles.add(oldFiles.get(index));
-// data.append(oldFiles.get(index++)).append(",");
-// } else {
-// String stageInputFiles = stageInputFiles(cluster, jobExecutionContext, paramValueEach);
-// status.setTransferState(TransferState.UPLOAD);
-// detail.setTransferStatus(status);
-// detail.setTransferDescription("Input Data Staged: " + stageInputFiles);
-// registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
-// StringBuffer temp = new StringBuffer(data.append(stageInputFiles).append(",").toString());
-// GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
-// newFiles.add(stageInputFiles);
-// }
-//
-// }
-// ((URIArrayType) inputParamType.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
-// }
- inputNew.getParameters().put(paramName, inputParamType);
- }
- } catch (Exception e) {
- log.error(e.getMessage());
- status.setTransferState(TransferState.FAILED);
- detail.setTransferDescription(e.getLocalizedMessage());
- detail.setTransferStatus(status);
- try {
- GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE);
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
- } catch (Exception e1) {
- throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
- }
- throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
- }
- jobExecutionContext.setInMessageContext(inputNew);
- }
-
- private static String stageInputFiles(Cluster cluster, JobExecutionContext jobExecutionContext, String paramValue) throws IOException, GFacException {
- int i = paramValue.lastIndexOf(File.separator);
- String substring = paramValue.substring(i + 1);
- try {
- String targetFile = jobExecutionContext.getInputDir() + File.separator + substring;
- if (paramValue.startsWith("file")) {
- paramValue = paramValue.substring(paramValue.indexOf(":") + 1, paramValue.length());
- }
- boolean success = false;
- int j = 1;
- while(!success){
- try {
- cluster.scpTo(targetFile, paramValue);
- success = true;
- } catch (Exception e) {
- log.info(e.getLocalizedMessage());
- Thread.sleep(2000);
- if(j==3) {
- throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
- }
- }
- j++;
- }
- return targetFile;
- } catch (Exception e) {
- throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
- }
- }
-
- public void initProperties(Properties properties) throws GFacHandlerException {
-
- }
-
- public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- this.invoke(jobExecutionContext);
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHOutputHandler.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHOutputHandler.java
deleted file mode 100644
index 18dcb97..0000000
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHOutputHandler.java
+++ /dev/null
@@ -1,323 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.gsissh.handler;
-
-//import org.apache.airavata.commons.gfac.type.ActualParameter;
-//import org.apache.airavata.commons.gfac.type.MappingFactory;
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.core.context.JobExecutionContext;
-import org.apache.airavata.gfac.core.handler.AbstractHandler;
-import org.apache.airavata.gfac.core.handler.GFacHandlerException;
-import org.apache.airavata.gfac.core.provider.GFacProviderException;
-import org.apache.airavata.gfac.core.utils.GFacUtils;
-import org.apache.airavata.gfac.core.utils.OutputUtils;
-import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
-import org.apache.airavata.gfac.gsissh.util.GFACGSISSHUtils;
-import org.apache.airavata.gsi.ssh.api.Cluster;
-import org.apache.airavata.model.appcatalog.appinterface.DataType;
-import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
-import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
-import org.apache.airavata.model.workspace.experiment.DataTransferDetails;
-import org.apache.airavata.model.workspace.experiment.ErrorCategory;
-import org.apache.airavata.model.workspace.experiment.TaskDetails;
-import org.apache.airavata.model.workspace.experiment.TransferState;
-import org.apache.airavata.model.workspace.experiment.TransferStatus;
-import org.apache.airavata.registry.cpi.ChildDataType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-public class GSISSHOutputHandler extends AbstractHandler {
- private static final Logger log = LoggerFactory.getLogger(GSISSHOutputHandler.class);
-
- public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- super.invoke(jobExecutionContext);
- int index = 0;
- int oldIndex = 0;
- List<String> oldFiles = new ArrayList<String>();
- StringBuffer data = new StringBuffer("|");
- String hostAddress = jobExecutionContext.getHostName();
- try {
- if (jobExecutionContext.getSecurityContext(hostAddress) == null) {
- GFACGSISSHUtils.addSecurityContext(jobExecutionContext);
- }
- } catch (Exception e) {
- try {
- GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- } catch (GFacException e1) {
- log.error(e1.getLocalizedMessage());
- }
- log.error(e.getMessage());
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
- }
- DataTransferDetails detail = new DataTransferDetails();
- TransferStatus status = new TransferStatus();
-
- Cluster cluster = null;
-
- try {
- cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster();
- if (cluster == null) {
- GFacUtils.saveErrorDetails(jobExecutionContext, "Security context is not set properly", CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE);
-
- throw new GFacProviderException("Security context is not set properly");
- } else {
- log.info("Successfully retrieved the Security Context");
- }
-
- // Get the Stdouts and StdErrs
- String pluginData = GFacUtils.getHandlerData(jobExecutionContext, this.getClass().getName());
- if (pluginData != null) {
- try {
- oldIndex = Integer.parseInt(pluginData.split("\\|")[0].trim());
- oldFiles = Arrays.asList(pluginData.split("\\|")[1].split(","));
- if (oldIndex == oldFiles.size()) {
- log.info("Old data looks good !!!!");
- } else {
- oldIndex = 0;
- oldFiles.clear();
- }
- } catch (NumberFormatException e) {
- log.error("Previously stored data " + pluginData + " is wrong so we continue the operations");
- }
- }
-
- String timeStampedExperimentID = GFacUtils.createUniqueNameWithDate(jobExecutionContext.getExperimentID());
-
- TaskDetails taskData = jobExecutionContext.getTaskData();
- String outputDataDir = null;
- File localStdOutFile;
- File localStdErrFile;
- //FIXME: AdvancedOutput is remote location and third party transfer should work to make this work
-// if (taskData.getAdvancedOutputDataHandling() != null) {
-// outputDataDir = taskData.getAdvancedOutputDataHandling().getOutputDataDir();
-// }
- if (outputDataDir == null) {
- outputDataDir = File.separator + "tmp";
- }
- outputDataDir = outputDataDir + File.separator + jobExecutionContext.getExperimentID() + "-" + jobExecutionContext.getTaskData().getTaskID();
- (new File(outputDataDir)).mkdirs();
-
- String stdOutStr = "";
- if (index < oldIndex) {
- localStdOutFile = new File(oldFiles.get(index));
- data.append(oldFiles.get(index++)).append(",");
- } else {
- int i = 0;
- localStdOutFile = new File(outputDataDir + File.separator + jobExecutionContext.getApplicationName() + ".stdout");
- while(stdOutStr.isEmpty()){
- try {
- cluster.scpFrom(jobExecutionContext.getStandardOutput(), localStdOutFile.getAbsolutePath());
- stdOutStr = GFacUtils.readFileToString(localStdOutFile.getAbsolutePath());
- } catch (Exception e) {
- log.error(e.getLocalizedMessage());
- Thread.sleep(2000);
- }
- i++;
- if(i==3)break;
- }
-
- StringBuffer temp = new StringBuffer(data.append(localStdOutFile.getAbsolutePath()).append(",").toString());
- GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
- }
- if (index < oldIndex) {
- localStdErrFile = new File(oldFiles.get(index));
- data.append(oldFiles.get(index++)).append(",");
- } else {
- localStdErrFile = new File(outputDataDir + File.separator + jobExecutionContext.getApplicationName() + ".stderr");
- cluster.scpFrom(jobExecutionContext.getStandardError(), localStdErrFile.getAbsolutePath());
- StringBuffer temp = new StringBuffer(data.append(localStdErrFile.getAbsolutePath()).append(",").toString());
- GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
- }
-
- String stdErrStr = GFacUtils.readFileToString(localStdErrFile.getAbsolutePath());
- status.setTransferState(TransferState.STDOUT_DOWNLOAD);
- detail.setTransferStatus(status);
- detail.setTransferDescription("STDOUT:" + localStdOutFile.getAbsolutePath());
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
-
- status.setTransferState(TransferState.STDERROR_DOWNLOAD);
- detail.setTransferStatus(status);
- detail.setTransferDescription("STDERR:" + localStdErrFile.getAbsolutePath());
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
-
- //todo this is a mess we have to fix this
- List<OutputDataObjectType> outputArray = new ArrayList<OutputDataObjectType>();
- Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters();
- Set<String> keys = output.keySet();
- for (String paramName : keys) {
- OutputDataObjectType outputDataObjectType = (OutputDataObjectType) output.get(paramName);
- if (DataType.URI == outputDataObjectType.getType()) {
-
- List<String> outputList = null;
- int retry=3;
- while(retry>0){
- outputList = cluster.listDirectory(jobExecutionContext.getOutputDir());
- if (outputList.size() == 1 && outputList.get(0).isEmpty()) {
- Thread.sleep(10000);
- } else if (outputList.size() > 0) {
- break;
- }else{
- Thread.sleep(10000);
- }
- retry--;
- if(retry==0){
- }
- Thread.sleep(10000);
- }
- if (outputList.size() == 0 || outputList.get(0).isEmpty() || outputList.size() > 1) {
- OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr, outputArray);
- Set<String> strings = output.keySet();
- outputArray.clear();
- for (String key : strings) {
- OutputDataObjectType outputDataObjectType1 = (OutputDataObjectType) output.get(key);
- if (DataType.URI == outputDataObjectType1.getType()) {
- String downloadFile = outputDataObjectType1.getValue();
- String localFile;
- if (index < oldIndex) {
- localFile = oldFiles.get(index);
- data.append(oldFiles.get(index++)).append(",");
- } else {
- cluster.scpFrom(downloadFile, outputDataDir);
- String fileName = downloadFile.substring(downloadFile.lastIndexOf(File.separatorChar) + 1, downloadFile.length());
- localFile = outputDataDir + File.separator + fileName;
- StringBuffer temp = new StringBuffer(data.append(localFile).append(",").toString());
- GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
- }
- jobExecutionContext.addOutputFile(localFile);
- outputDataObjectType1.setValue(localFile);
- OutputDataObjectType dataObjectType = new OutputDataObjectType();
- dataObjectType.setValue(localFile);
- dataObjectType.setName(key);
- dataObjectType.setType(DataType.URI);
- outputArray.add(dataObjectType);
- }else if (DataType.STDOUT == outputDataObjectType1.getType()) {
- String localFile;
- if (index < oldIndex) {
- localFile = oldFiles.get(index);
- data.append(oldFiles.get(index++)).append(",");
- } else {
- String fileName = localStdOutFile.getName();
- localFile = outputDataDir + File.separator + fileName;
- StringBuffer temp = new StringBuffer(data.append(localFile).append(",").toString());
- GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
- }
- jobExecutionContext.addOutputFile(localFile);
- outputDataObjectType1.setValue(localFile);
- OutputDataObjectType dataObjectType = new OutputDataObjectType();
- dataObjectType.setValue(localFile);
- dataObjectType.setName(key);
- dataObjectType.setType(DataType.STDOUT);
- outputArray.add(dataObjectType);
- }else if (DataType.STDERR == outputDataObjectType1.getType()) {
- String localFile;
- if (index < oldIndex) {
- localFile = oldFiles.get(index);
- data.append(oldFiles.get(index++)).append(",");
- } else {
- String fileName = localStdErrFile.getName();
- localFile = outputDataDir + File.separator + fileName;
- StringBuffer temp = new StringBuffer(data.append(localFile).append(",").toString());
- GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
- }
- jobExecutionContext.addOutputFile(localFile);
- outputDataObjectType1.setValue(localFile);
- OutputDataObjectType dataObjectType = new OutputDataObjectType();
- dataObjectType.setValue(localFile);
- dataObjectType.setName(key);
- dataObjectType.setType(DataType.STDERR);
- outputArray.add(dataObjectType);
- }
- }
- break;
- } else if(outputList.size() == 1) { //FIXME: this is ultrascan specific
- String valueList = outputList.get(0);
- String outputFile;
- if (index < oldIndex) {
- outputFile = oldFiles.get(index);
- data.append(oldFiles.get(index++)).append(",");
- } else {
- cluster.scpFrom(jobExecutionContext.getOutputDir() + File.separator + valueList, outputDataDir);
- outputFile = outputDataDir + File.separator + valueList;
- jobExecutionContext.addOutputFile(outputFile);
- StringBuffer temp = new StringBuffer(data.append(outputFile).append(",").toString());
- GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
- }
- jobExecutionContext.addOutputFile(outputFile);
- outputDataObjectType.setValue(outputFile);
- OutputDataObjectType dataObjectType = new OutputDataObjectType();
- dataObjectType.setValue(valueList);
- dataObjectType.setName(paramName);
- dataObjectType.setType(DataType.URI);
- outputArray.add(dataObjectType);
- }
- } else {
- OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr, outputArray);
-// break;
- }
- }
- if (outputArray == null || outputArray.isEmpty()) {
- if(jobExecutionContext.getTaskData().getAdvancedOutputDataHandling() == null){
- throw new GFacHandlerException(
- "Empty Output returned from the Application, Double check the application"
- + "and ApplicationDescriptor output Parameter Names"
- );
- }
- }
- // Why we set following?
- jobExecutionContext.setStandardError(localStdErrFile.getAbsolutePath());
- jobExecutionContext.setStandardOutput(localStdOutFile.getAbsolutePath());
- jobExecutionContext.setOutputDir(outputDataDir);
- status.setTransferState(TransferState.DOWNLOAD);
- detail.setTransferStatus(status);
- detail.setTransferDescription(outputDataDir);
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
- registry.add(ChildDataType.EXPERIMENT_OUTPUT, outputArray, jobExecutionContext.getExperimentID());
- fireTaskOutputChangeEvent(jobExecutionContext, outputArray);
- } catch (Exception e) {
- try {
- status.setTransferState(TransferState.FAILED);
- detail.setTransferStatus(status);
- detail.setTransferDescription(e.getLocalizedMessage());
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
- GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE);
- } catch (Exception e1) {
- throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
- }
- throw new GFacHandlerException("Error in retrieving results", e);
- }
- }
-
- public void initProperties(Properties properties) throws GFacHandlerException {
-
- }
-
- public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- this.invoke(jobExecutionContext);
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/NewGSISSHOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/NewGSISSHOutputHandler.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/NewGSISSHOutputHandler.java
deleted file mode 100644
index ed94312..0000000
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/NewGSISSHOutputHandler.java
+++ /dev/null
@@ -1,83 +0,0 @@
-package org.apache.airavata.gfac.gsissh.handler;
-
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.core.context.JobExecutionContext;
-import org.apache.airavata.gfac.core.handler.AbstractHandler;
-import org.apache.airavata.gfac.core.handler.GFacHandlerException;
-import org.apache.airavata.gfac.core.provider.GFacProviderException;
-import org.apache.airavata.gfac.core.utils.GFacUtils;
-import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
-import org.apache.airavata.gfac.gsissh.util.GFACGSISSHUtils;
-import org.apache.airavata.gfac.ssh.util.HandleOutputs;
-import org.apache.airavata.gsi.ssh.api.Cluster;
-import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
-import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
-import org.apache.airavata.model.workspace.experiment.ErrorCategory;
-import org.apache.airavata.registry.cpi.ChildDataType;
-import org.apache.airavata.registry.cpi.RegistryException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class NewGSISSHOutputHandler extends AbstractHandler{
- private static final Logger log = LoggerFactory.getLogger(NewGSISSHOutputHandler.class);
- public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- super.invoke(jobExecutionContext);
- String hostAddress = jobExecutionContext.getHostName();
- try {
- if (jobExecutionContext.getSecurityContext(hostAddress) == null) {
- GFACGSISSHUtils.addSecurityContext(jobExecutionContext);
- }
- } catch (Exception e) {
- try {
- GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- } catch (GFacException e1) {
- log.error(e1.getLocalizedMessage());
- }
- log.error(e.getMessage());
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
- }
- Cluster cluster = null;
-
- try {
- cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster();
- if (cluster == null) {
- GFacUtils.saveErrorDetails(jobExecutionContext, "Security context is not set properly", CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE);
-
- throw new GFacProviderException("Security context is not set properly");
- } else {
- log.info("Successfully retrieved the Security Context");
- }
- } catch (Exception e) {
- log.error(e.getMessage());
- try {
- GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- } catch (GFacException e1) {
- log.error(e1.getLocalizedMessage());
- }
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
- }
-
- super.invoke(jobExecutionContext);
- List<OutputDataObjectType> outputArray = HandleOutputs.handleOutputs(jobExecutionContext, cluster);
- try {
- registry.add(ChildDataType.EXPERIMENT_OUTPUT, outputArray, jobExecutionContext.getExperimentID());
- } catch (RegistryException e) {
- throw new GFacHandlerException(e);
- }
- }
-
- @Override
- public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- // TODO: Auto generated method body.
- }
-
- @Override
- public void initProperties(Properties properties) throws GFacHandlerException {
- // TODO Auto-generated method stub
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
deleted file mode 100644
index 36aac4c..0000000
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
+++ /dev/null
@@ -1,351 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.gsissh.provider.impl;
-
-import org.airavata.appcatalog.cpi.AppCatalogException;
-import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.gfac.ExecutionMode;
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.core.context.JobExecutionContext;
-import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
-import org.apache.airavata.gfac.core.handler.GFacHandlerException;
-import org.apache.airavata.gfac.core.handler.ThreadedHandler;
-import org.apache.airavata.gfac.core.notification.events.StartExecutionEvent;
-import org.apache.airavata.gfac.core.provider.AbstractProvider;
-import org.apache.airavata.gfac.core.provider.GFacProviderException;
-import org.apache.airavata.gfac.core.utils.GFacUtils;
-import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
-import org.apache.airavata.gfac.gsissh.util.GFACGSISSHUtils;
-import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor;
-import org.apache.airavata.gfac.monitor.email.EmailMonitorFactory;
-import org.apache.airavata.gsi.ssh.api.Cluster;
-import org.apache.airavata.gsi.ssh.api.SSHApiException;
-import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
-import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
-import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
-import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
-import org.apache.airavata.model.appcatalog.computeresource.MonitorMode;
-import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
-import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
-import org.apache.airavata.model.workspace.experiment.ErrorCategory;
-import org.apache.airavata.model.workspace.experiment.JobDetails;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.List;
-import java.util.Map;
-
-//import org.apache.airavata.schemas.gfac.GsisshHostType;
-
-public class GSISSHProvider extends AbstractProvider {
- private static final Logger log = LoggerFactory.getLogger(GSISSHProvider.class);
-
- public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
-
- }
-
- public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
- super.initialize(jobExecutionContext);
- try {
- String hostAddress = jobExecutionContext.getHostName();
- if (jobExecutionContext.getSecurityContext(hostAddress) == null) {
- GFACGSISSHUtils.addSecurityContext(jobExecutionContext);
- }
- } catch (ApplicationSettingsException e) {
- log.error(e.getMessage());
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
- } catch (GFacException e) {
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
- }
- }
-
- public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
- log.info("Invoking GSISSH Provider Invoke ...");
- StringBuffer data = new StringBuffer();
- jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
- ComputeResourceDescription computeResourceDescription = jobExecutionContext.getApplicationContext()
- .getComputeResourceDescription();
- ApplicationDeploymentDescription appDeployDesc = jobExecutionContext.getApplicationContext()
- .getApplicationDeploymentDescription();
- JobDetails jobDetails = new JobDetails();
- Cluster cluster = null;
-
- try {
- if (jobExecutionContext.getSecurityContext(jobExecutionContext.getHostName()) != null) {
- cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(jobExecutionContext.getHostName())).getPbsCluster();
- }
- if (cluster == null) {
- throw new GFacProviderException("Security context is not set properly");
- } else {
- log.info("Successfully retrieved the Security Context");
- }
- // This installed path is a mandetory field, because this could change based on the computing resource
- JobDescriptor jobDescriptor = GFACGSISSHUtils.createJobDescriptor(jobExecutionContext, cluster);
- jobDetails.setJobName(jobDescriptor.getJobName());
-
- log.info(jobDescriptor.toXML());
- data.append("jobDesc=").append(jobDescriptor.toXML());
- jobDetails.setJobDescription(jobDescriptor.toXML());
- String jobID = cluster.submitBatchJob(jobDescriptor);
- jobExecutionContext.setJobDetails(jobDetails);
- if (jobID == null) {
- jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
- } else {
- jobDetails.setJobID(jobID.split("\\.")[0]);
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED);
- }
- data.append(",jobId=").append(jobDetails.getJobID());
-
- // Now job has submitted to the resource, its up to the Provider to parse the information to daemon handler
- // to perform monitoring, daemon handlers can be accessed from anywhere
- monitor(jobExecutionContext);
- // we know this host is type GsiSSHHostType
- } catch (Exception e) {
- String error = "Error submitting the job to host " + computeResourceDescription.getHostName() + " message: " + e.getMessage();
- log.error(error);
- jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
- StringWriter errors = new StringWriter();
- e.printStackTrace(new PrintWriter(errors));
- GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- throw new GFacProviderException(error, e);
- } finally {
- log.info("Saving data for future recovery: ");
- log.info(data.toString());
- GFacUtils.saveHandlerData(jobExecutionContext, data, this.getClass().getName());
- }
-
- }
-
- public void removeFromMonitorHandlers(JobExecutionContext jobExecutionContext, SSHJobSubmission sshJobSubmission, String jobID) throws GFacHandlerException {
-/* List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers();
- if (daemonHandlers == null) {
- daemonHandlers = BetterGfacImpl.getDaemonHandlers();
- }
- ThreadedHandler pullMonitorHandler = null;
- ThreadedHandler pushMonitorHandler = null;
- MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
- for (ThreadedHandler threadedHandler : daemonHandlers) {
- if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) {
- pullMonitorHandler = threadedHandler;
- if (monitorMode == null || monitorMode == MonitorMode.POLL_JOB_MANAGER) {
- jobExecutionContext.setProperty("cancel","true");
- pullMonitorHandler.invoke(jobExecutionContext);
- } else {
- log.error("Currently we only support Pull and Push monitoring and monitorMode should be PULL" +
- " to handle by the GridPullMonitorHandler");
- }
- } else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())) {
- pushMonitorHandler = threadedHandler;
- if ( monitorMode == null || monitorMode == MonitorMode.XSEDE_AMQP_SUBSCRIBE) {
- pushMonitorHandler.invoke(jobExecutionContext);
- } else {
- log.error("Currently we only support Pull and Push monitoring and monitorMode should be PUSH" +
- " to handle by the GridPushMonitorHandler");
- }
- }
- // have to handle the GridPushMonitorHandler logic
- }
- if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) {
- log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" +
- ", execution is configured as asynchronous, so Outhandler will not be invoked");
- }*/
- }
-
- public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean cancelJob(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException {
- //To change body of implemented methods use File | Settings | File Templates.
- log.info("canceling the job status in GSISSHProvider!!!!!");
- JobDetails jobDetails = jobExecutionContext.getJobDetails();
- String hostName = jobExecutionContext.getHostName();
- try {
- Cluster cluster = null;
- if (jobExecutionContext.getSecurityContext(hostName) == null) {
- GFACGSISSHUtils.addSecurityContext(jobExecutionContext);
- }
- cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(hostName)).getPbsCluster();
- if (cluster == null) {
- throw new GFacProviderException("Security context is not set properly");
- } else {
- log.info("Successfully retrieved the Security Context");
- }
- // This installed path is a mandetory field, because this could change based on the computing resource
- if(jobDetails == null) {
- log.error("There is not JobDetails so cancelations cannot perform !!!");
- return false;
- }
- if (jobDetails.getJobID() != null) {
- // if this operation success without any exceptions, we can assume cancel operation succeeded.
- cluster.cancelJob(jobDetails.getJobID());
- } else {
- log.error("No Job Id is set, so cannot perform the cancel operation !!!");
- return false;
- }
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.CANCELED);
- return true;
- // we know this host is type GsiSSHHostType
- } catch (SSHApiException e) {
- String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage();
- log.error(error);
- jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
- GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- throw new GFacProviderException(error, e);
- } catch (Exception e) {
- String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage();
- log.error(error);
- jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
- GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- throw new GFacProviderException(error, e);
- }
- }
-
- public void recover(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException {
- // have to implement the logic to recover a gfac failure
- log.info("Invoking Recovering for the Experiment: " + jobExecutionContext.getExperimentID());
- ComputeResourceDescription computeResourceDescription = jobExecutionContext.getApplicationContext()
- .getComputeResourceDescription();
- String hostName = jobExecutionContext.getHostName();
- String jobId = "";
- String jobDesc = "";
- try {
- String pluginData = GFacUtils.getHandlerData(jobExecutionContext, this.getClass().getName());
- String[] split = pluginData.split(",");
- if (split.length < 2) {
- try {
- this.execute(jobExecutionContext);
- } catch (GFacException e) {
- log.error("Error while recovering provider", e);
- throw new GFacProviderException("Error recovering provider", e);
- }
- return;
- }
- jobDesc = split[0].substring(7);
- jobId = split[1].substring(6);
-
- log.info("Following data have recovered: ");
- log.info("Job Description: " + jobDesc);
- log.info("Job Id: " + jobId);
- if (jobId == null || "none".equals(jobId) ||
- "".equals(jobId)) {
- try {
- this.execute(jobExecutionContext);
- } catch (GFacException e) {
- log.error("Error while recovering provider", e);
- throw new GFacProviderException("Error recovering provider", e);
- }
- return;
- }
- } catch (Exception e) {
- log.error("Error while recovering provider", e);
- }
- try {
- // Now we are we have enough data to recover
- JobDetails jobDetails = new JobDetails();
- jobDetails.setJobDescription(jobDesc);
- jobDetails.setJobID(jobId);
- jobExecutionContext.setJobDetails(jobDetails);
- if (jobExecutionContext.getSecurityContext(hostName) == null) {
- try {
- GFACGSISSHUtils.addSecurityContext(jobExecutionContext);
- } catch (ApplicationSettingsException e) {
- log.error(e.getMessage());
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
- }
- }
- monitor(jobExecutionContext);
- } catch (Exception e) {
- log.error("Error while recover the job", e);
- throw new GFacProviderException("Error delegating already ran job to Monitoring", e);
- }
- }
-
- @Override
- public void monitor(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
- String jobSubmissionInterfaceId = jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionInterfaceId();
- SSHJobSubmission sshJobSubmission = null;
- try {
- sshJobSubmission = jobExecutionContext.getAppCatalog().getComputeResource().getSSHJobSubmission(jobSubmissionInterfaceId);
- } catch (AppCatalogException e) {
- throw new GFacException("Error while reading compute resource", e);
- }
- if (jobExecutionContext.getPreferredJobSubmissionProtocol() == JobSubmissionProtocol.SSH) {
- MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
- if (monitorMode != null && monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR) {
- try {
- EmailBasedMonitor emailBasedMonitor = EmailMonitorFactory.getEmailBasedMonitor(
- sshJobSubmission.getResourceJobManager().getResourceJobManagerType());
- emailBasedMonitor.addToJobMonitorMap(jobExecutionContext);
- } catch (AiravataException e) {
- throw new GFacHandlerException("Error while activating email job monitoring ", e);
- }
- return;
- }
- }
-/*
- // if email monitor is not activeated or not configure we use pull or push monitor
- List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers();
- if (daemonHandlers == null) {
- daemonHandlers = BetterGfacImpl.getDaemonHandlers();
- }
- ThreadedHandler pullMonitorHandler = null;
- ThreadedHandler pushMonitorHandler = null;
- MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
- String jobID = jobExecutionContext.getJobDetails().getJobID();
- for (ThreadedHandler threadedHandler : daemonHandlers) {
- if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) {
- pullMonitorHandler = threadedHandler;
- if (monitorMode == null || monitorMode == MonitorMode.POLL_JOB_MANAGER) {
- log.info("Job is launched successfully now parsing it to monitoring in pull mode, JobID Returned: " + jobID);
- pullMonitorHandler.invoke(jobExecutionContext);
- } else {
- log.error("Currently we only support Pull and Push monitoring and monitorMode should be PULL" +
- " to handle by the GridPullMonitorHandler");
- }
- } else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())) {
- pushMonitorHandler = threadedHandler;
- if (monitorMode == null || monitorMode == MonitorMode.XSEDE_AMQP_SUBSCRIBE) {
- log.info("Job is launched successfully now parsing it to monitoring in push mode, JobID Returned: " + jobID);
- pushMonitorHandler.invoke(jobExecutionContext);
- } else {
- log.error("Currently we only support Pull and Push monitoring and monitorMode should be PUSH" +
- " to handle by the GridPushMonitorHandler");
- }
- }
- // have to handle the GridPushMonitorHandler logic
- }
- if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) {
- log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" +
- ", execution is configured as asynchronous, so Outhandler will not be invoked");
-
- }*/
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/security/GSISecurityContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/security/GSISecurityContext.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/security/GSISecurityContext.java
deleted file mode 100644
index 46e7acd..0000000
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/security/GSISecurityContext.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.gsissh.security;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.credential.store.credential.Credential;
-import org.apache.airavata.credential.store.credential.impl.certificate.CertificateCredential;
-import org.apache.airavata.credential.store.store.CredentialReader;
-import org.apache.airavata.gfac.AbstractSecurityContext;
-import org.apache.airavata.gfac.Constants;
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.RequestData;
-import org.apache.airavata.gsi.ssh.api.Cluster;
-import org.globus.gsi.X509Credential;
-import org.globus.gsi.gssapi.GlobusGSSCredentialImpl;
-import org.globus.gsi.provider.GlobusProvider;
-import org.globus.myproxy.GetParams;
-import org.globus.myproxy.MyProxy;
-import org.globus.myproxy.MyProxyException;
-import org.gridforum.jgss.ExtendedGSSCredential;
-import org.ietf.jgss.GSSCredential;
-import org.ietf.jgss.GSSException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.security.Security;
-import java.security.cert.X509Certificate;
-
-/**
- * Handles GRID related security.
- */
-public class GSISecurityContext extends AbstractSecurityContext {
-
- protected static final Logger log = LoggerFactory.getLogger(GSISecurityContext.class);
- /*
- * context name
- */
-
- private Cluster pbsCluster = null;
-
-
- public GSISecurityContext(CredentialReader credentialReader, RequestData requestData, Cluster pbsCluster) {
- super(credentialReader, requestData);
- this.pbsCluster = pbsCluster;
- }
-
-
- public GSISecurityContext(CredentialReader credentialReader, RequestData requestData) {
- super(credentialReader, requestData);
- }
-
-
- public GSISecurityContext(Cluster pbsCluster) {
- this.setPbsCluster(pbsCluster);
- }
-
-
-
- public Cluster getPbsCluster() {
- return pbsCluster;
- }
-
- public void setPbsCluster(Cluster pbsCluster) {
- this.pbsCluster = pbsCluster;
- }
-}