You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/06/03 20:14:37 UTC
[20/39] airavata git commit: Refactored gfac sub modules,
merged gfac-ssh, gfac-gsissh, gfac-local,
gfac-monitor and gsissh modules and create gface-impl,
removed implementation from gfac-core to gfac-impl
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
new file mode 100644
index 0000000..e53fe09
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
@@ -0,0 +1,280 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.util;
+
+import org.apache.airavata.common.logger.AiravataLogger;
+import org.apache.airavata.common.logger.AiravataLoggerFactory;
+import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.handler.GFacHandler;
+import org.apache.airavata.gfac.core.handler.GFacHandlerConfig;
+import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.monitor.HostMonitorData;
+import org.apache.airavata.gfac.monitor.UserMonitorData;
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+public class CommonUtils {
+ private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(CommonUtils.class);
+
+ public static String getChannelID(MonitorID monitorID) {
+ return monitorID.getUserName() + "-" + monitorID.getComputeResourceDescription().getHostName();
+ }
+
+ public static String getRoutingKey(MonitorID monitorID) {
+ return "*." + monitorID.getUserName() + "." + monitorID.getComputeResourceDescription().getIpAddresses().get(0);
+ }
+
+ public static String getChannelID(String userName,String hostAddress) {
+ return userName + "-" + hostAddress;
+ }
+
+ public static String getRoutingKey(String userName,String hostAddress) {
+ return "*." + userName + "." + hostAddress;
+ }
+
+ public static void addMonitortoQueue(BlockingQueue<UserMonitorData> queue, MonitorID monitorID, JobExecutionContext jobExecutionContext) throws AiravataMonitorException {
+ synchronized (queue) {
+ Iterator<UserMonitorData> iterator = queue.iterator();
+ while (iterator.hasNext()) {
+ UserMonitorData next = iterator.next();
+ if (next.getUserName().equals(monitorID.getUserName())) {
+ // then this is the right place to update
+ List<HostMonitorData> monitorIDs = next.getHostMonitorData();
+ for (HostMonitorData host : monitorIDs) {
+ if (isEqual(host.getComputeResourceDescription(), monitorID.getComputeResourceDescription())) {
+ // ok we found right place to add this monitorID
+ host.addMonitorIDForHost(monitorID);
+ logger.debugId(monitorID.getJobID(), "Added new job to the monitoring queue, experiment {}," +
+ " task {}", monitorID.getExperimentID(), monitorID.getTaskID());
+ return;
+ }
+ }
+ // there is a userMonitor object for this user name but no Hosts for this host
+ // so we have to create new Hosts
+ HostMonitorData hostMonitorData = new HostMonitorData(jobExecutionContext);
+ hostMonitorData.addMonitorIDForHost(monitorID);
+ next.addHostMonitorData(hostMonitorData);
+ logger.debugId(monitorID.getJobID(), "Added new job to the monitoring queue, experiment {}," +
+ " task {}", monitorID.getExperimentID(), monitorID.getTaskID());
+ return;
+ }
+ }
+ HostMonitorData hostMonitorData = new HostMonitorData(jobExecutionContext);
+ hostMonitorData.addMonitorIDForHost(monitorID);
+
+ UserMonitorData userMonitorData = new UserMonitorData(monitorID.getUserName());
+ userMonitorData.addHostMonitorData(hostMonitorData);
+ try {
+ queue.put(userMonitorData);
+ logger.debugId(monitorID.getJobID(), "Added new job to the monitoring queue, experiment {}," +
+ " task {}", monitorID.getExperimentID(), monitorID.getTaskID());
+ } catch (InterruptedException e) {
+ throw new AiravataMonitorException(e);
+ }
+ }
+ }
+
+ private static boolean isEqual(ComputeResourceDescription comRes_1, ComputeResourceDescription comRes_2) {
+ return comRes_1.getComputeResourceId().equals(comRes_2.getComputeResourceId()) &&
+ comRes_1.getHostName().equals(comRes_2.getHostName());
+ }
+
+ public static boolean isTheLastJobInQueue(BlockingQueue<MonitorID> queue,MonitorID monitorID){
+ Iterator<MonitorID> iterator = queue.iterator();
+ while(iterator.hasNext()){
+ MonitorID next = iterator.next();
+ if (monitorID.getUserName().equals(next.getUserName()) &&
+ CommonUtils.isEqual(monitorID.getComputeResourceDescription(), next.getComputeResourceDescription())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * This method doesn't have to be synchronized because it will be invoked by HPCPullMonitor which already synchronized
+ * @param monitorID
+ * @throws AiravataMonitorException
+ */
+ public static void removeMonitorFromQueue(UserMonitorData userMonitorData, MonitorID monitorID) throws AiravataMonitorException {
+ if (userMonitorData.getUserName().equals(monitorID.getUserName())) {
+ // then this is the right place to update
+ List<HostMonitorData> hostMonitorData = userMonitorData.getHostMonitorData();
+ Iterator<HostMonitorData> iterator1 = hostMonitorData.iterator();
+ while (iterator1.hasNext()) {
+ HostMonitorData iHostMonitorID = iterator1.next();
+ if (isEqual(iHostMonitorID.getComputeResourceDescription(), monitorID.getComputeResourceDescription())) {
+ Iterator<MonitorID> iterator2 = iHostMonitorID.getMonitorIDs().iterator();
+ while (iterator2.hasNext()) {
+ MonitorID iMonitorID = iterator2.next();
+ if (iMonitorID.getJobID().equals(monitorID.getJobID())
+ || iMonitorID.getJobName().equals(monitorID.getJobName())) {
+ // OK we found the object, we cannot do list.remove(object) states of two objects
+ // could be different, thats why we check the jobID
+ iterator2.remove();
+ logger.infoId(monitorID.getJobID(), "Removed the jobId: {} JobName: {} from monitoring last " +
+ "status:{}", monitorID.getJobID(),monitorID.getJobName(), monitorID.getStatus().toString());
+
+ return;
+ }
+ }
+ }
+ }
+ }
+ logger.info("Cannot find the given MonitorID in the queue with userName " +
+ monitorID.getUserName() + " and jobID " + monitorID.getJobID());
+ logger.info("This might not be an error because someone else removed this job from the queue");
+ }
+
+
+ public static void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
+ List<GFacHandlerConfig> handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
+
+ for (GFacHandlerConfig handlerClassName : handlers) {
+ Class<? extends GFacHandler> handlerClass;
+ GFacHandler handler;
+ try {
+ handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
+ handler = handlerClass.newInstance();
+ handler.initProperties(handlerClassName.getProperties());
+ } catch (ClassNotFoundException e) {
+ logger.error(e.getMessage());
+ throw new GFacException("Cannot load handler class " + handlerClassName, e);
+ } catch (InstantiationException e) {
+ logger.error(e.getMessage());
+ throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ } catch (IllegalAccessException e) {
+ logger.error(e.getMessage());
+ throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ }
+ try {
+ handler.invoke(jobExecutionContext);
+ } catch (Exception e) {
+ // TODO: Better error reporting.
+ throw new GFacException("Error Executing a OutFlow Handler", e);
+ }
+ }
+ }
+
+ /**
+ * Update job count for a given set of paths.
+ * @param curatorClient - CuratorFramework instance
+ * @param changeCountMap - map of change job count with relevant path
+ * @param isAdd - Should add or reduce existing job count by the given job count.
+ */
+ public static void updateZkWithJobCount(CuratorFramework curatorClient, final Map<String, Integer> changeCountMap, boolean isAdd) {
+ StringBuilder changeZNodePaths = new StringBuilder();
+ try {
+ for (String path : changeCountMap.keySet()) {
+ if (isAdd) {
+ CommonUtils.checkAndCreateZNode(curatorClient, path);
+ }
+ byte[] byteData = curatorClient.getData().forPath(path);
+ String nodeData;
+ if (byteData == null) {
+ if (isAdd) {
+ curatorClient.setData().withVersion(-1).forPath(path, String.valueOf(changeCountMap.get(path)).getBytes());
+ } else {
+ // This is not possible, but we handle in case there any data zookeeper communication failure
+ logger.warn("Couldn't reduce job count in " + path + " as it returns null data. Hence reset the job count to 0");
+ curatorClient.setData().withVersion(-1).forPath(path, "0".getBytes());
+ }
+ } else {
+ nodeData = new String(byteData);
+ if (isAdd) {
+ curatorClient.setData().withVersion(-1).forPath(path,
+ String.valueOf(changeCountMap.get(path) + Integer.parseInt(nodeData)).getBytes());
+ } else {
+ int previousCount = Integer.parseInt(nodeData);
+ int removeCount = changeCountMap.get(path);
+ if (previousCount >= removeCount) {
+ curatorClient.setData().withVersion(-1).forPath(path,
+ String.valueOf(previousCount - removeCount).getBytes());
+ } else {
+ // This is not possible, do we need to reset the job count to 0 ?
+ logger.error("Requested remove job count is " + removeCount +
+ " which is higher than the existing job count " + previousCount
+ + " in " + path + " path.");
+ }
+ }
+ }
+ changeZNodePaths.append(path).append(":");
+ }
+
+ // update stat node to trigger orchestrator watchers
+ if (changeCountMap.size() > 0) {
+ changeZNodePaths.deleteCharAt(changeZNodePaths.length() - 1);
+ curatorClient.setData().withVersion(-1).forPath("/" + Constants.STAT, changeZNodePaths.toString().getBytes());
+ }
+ } catch (Exception e) {
+ logger.error("Error while writing job count to zookeeper", e);
+ }
+
+ }
+
+ /**
+ * Increase job count by one and update the zookeeper
+ * @param monitorID - Job monitorId
+ */
+ public static void increaseZkJobCount(MonitorID monitorID) {
+ Map<String, Integer> addMap = new HashMap<String, Integer>();
+ addMap.put(CommonUtils.getJobCountUpdatePath(monitorID), 1);
+ updateZkWithJobCount(monitorID.getJobExecutionContext().getCuratorClient(), addMap, true);
+ }
+
+ /**
+ * Construct and return the path for a given MonitorID , eg: /stat/{username}/{resourceName}/job
+ * @param monitorID - Job monitorId
+ * @return
+ */
+ public static String getJobCountUpdatePath(MonitorID monitorID){
+ return new StringBuilder("/").append(Constants.STAT).append("/").append(monitorID.getUserName())
+ .append("/").append(monitorID.getComputeResourceDescription().getHostName()).append("/").append(Constants.JOB).toString();
+ }
+
+ /**
+ * Check whether znode is exist in given path if not create a new znode
+ * @param curatorClient - zookeeper instance
+ * @param path - path to check znode
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ private static void checkAndCreateZNode(CuratorFramework curatorClient , String path) throws Exception {
+ if (curatorClient.checkExists().forPath(path) == null) { // if znode doesn't exist
+ if (path.lastIndexOf("/") > 1) { // recursively traverse to parent znode and check parent exist
+ checkAndCreateZNode(curatorClient, (path.substring(0, path.lastIndexOf("/"))));
+ }
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(path);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/X509Helper.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/X509Helper.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/X509Helper.java
new file mode 100644
index 0000000..08c3f67
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/X509Helper.java
@@ -0,0 +1,164 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.util;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+
+
+import java.io.*;
+import java.security.*;
+import java.security.cert.CertificateException;
+import java.security.cert.CertificateFactory;
+import java.security.cert.CertificateParsingException;
+import java.security.cert.X509Certificate;
+import java.security.spec.InvalidKeySpecException;
+
+public class X509Helper {
+
+ static {
+ // parsing of RSA key fails without this
+ java.security.Security.addProvider(new BouncyCastleProvider());
+ }
+
+
+
+ public static KeyStore keyStoreFromPEM(String proxyFile,
+ String keyPassPhrase) throws IOException,
+ CertificateException,
+ NoSuchAlgorithmException,
+ InvalidKeySpecException,
+ KeyStoreException {
+ return keyStoreFromPEM(proxyFile,proxyFile,keyPassPhrase);
+ }
+
+ public static KeyStore keyStoreFromPEM(String certFile,
+ String keyFile,
+ String keyPassPhrase) throws IOException,
+ CertificateException,
+ NoSuchAlgorithmException,
+ InvalidKeySpecException,
+ KeyStoreException {
+ CertificateFactory cf = CertificateFactory.getInstance("X.509");
+ X509Certificate cert = (X509Certificate)cf.generateCertificate(new FileInputStream(certFile));
+ //System.out.println(cert.toString());
+
+ // this works for proxy files, too, since it skips over the certificate
+ BufferedReader reader = new BufferedReader(new FileReader(keyFile));
+ String line = null;
+ StringBuilder builder = new StringBuilder();
+ boolean inKey = false;
+ while((line=reader.readLine()) != null) {
+ if (line.contains("-----BEGIN RSA PRIVATE KEY-----")) {
+ inKey = true;
+ }
+ if (inKey) {
+ builder.append(line);
+ builder.append(System.getProperty("line.separator"));
+ }
+ if (line.contains("-----END RSA PRIVATE KEY-----")) {
+ inKey = false;
+ }
+ }
+ String privKeyPEM = builder.toString();
+ //System.out.println(privKeyPEM);
+
+ // using BouncyCastle
+// PEMReader pemParser = new PEMReader(new StringReader(privKeyPEM));
+// Object object = pemParser.readObject();
+//
+// PrivateKey privKey = null;
+// if(object instanceof KeyPair){
+// privKey = ((KeyPair)object).getPrivate();
+// }
+ // PEMParser from BouncyCastle is good for reading PEM files, but I didn't want to add that dependency
+ /*
+ // Base64 decode the data
+ byte[] encoded = javax.xml.bind.DatatypeConverter.parseBase64Binary(privKeyPEM);
+
+ // PKCS8 decode the encoded RSA private key
+ java.security.spec.PKCS8EncodedKeySpec keySpec = new PKCS8EncodedKeySpec(encoded);
+ KeyFactory kf = KeyFactory.getInstance("RSA");
+ PrivateKey privKey = kf.generatePrivate(keySpec);
+ //RSAPrivateKey privKey = (RSAPrivateKey)kf.generatePrivate(keySpec);
+ */
+ //System.out.println(privKey.toString());
+
+// KeyStore keyStore = KeyStore.getInstance("PKCS12");
+// keyStore.load(null,null);
+//
+// KeyStore.PrivateKeyEntry entry =
+// new KeyStore.PrivateKeyEntry(privKey,
+// new java.security.cert.Certificate[] {(java.security.cert.Certificate)cert});
+// KeyStore.PasswordProtection prot = new KeyStore.PasswordProtection(keyPassPhrase.toCharArray());
+// keyStore.setEntry(cert.getSubjectX500Principal().getName(), entry, prot);
+
+// return keyStore;
+ //TODO: Problem with BouncyCastle version used in gsissh
+ throw new CertificateException("Method not implemented");
+
+ }
+
+
+ public static KeyStore trustKeyStoreFromCertDir() throws IOException,
+ KeyStoreException,
+ CertificateException,
+ NoSuchAlgorithmException, ApplicationSettingsException {
+ return trustKeyStoreFromCertDir(ServerSettings.getSetting("trusted.cert.location"));
+ }
+
+ public static KeyStore trustKeyStoreFromCertDir(String certDir) throws IOException,
+ KeyStoreException,
+ CertificateException,
+ NoSuchAlgorithmException {
+ KeyStore ks = KeyStore.getInstance("JKS");
+ ks.load(null,null);
+
+ File dir = new File(certDir);
+ for(File file : dir.listFiles()) {
+ if (!file.isFile()) {
+ continue;
+ }
+ if (!file.getName().endsWith(".0")) {
+ continue;
+ }
+
+ try {
+ //System.out.println("reading file "+file.getName());
+ CertificateFactory cf = CertificateFactory.getInstance("X.509");
+ X509Certificate cert = (X509Certificate) cf.generateCertificate(new FileInputStream(file));
+ //System.out.println(cert.toString());
+
+ KeyStore.TrustedCertificateEntry entry = new KeyStore.TrustedCertificateEntry(cert);
+
+ ks.setEntry(cert.getSubjectX500Principal().getName(), entry, null);
+ } catch (KeyStoreException e) {
+ } catch (CertificateParsingException e) {
+ continue;
+ }
+
+ }
+
+ return ks;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/context/SSHAuthWrapper.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/context/SSHAuthWrapper.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/context/SSHAuthWrapper.java
new file mode 100644
index 0000000..74642dc
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/context/SSHAuthWrapper.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.ssh.context;
+
+import org.apache.airavata.gfac.ssh.api.ServerInfo;
+import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
+
+public class SSHAuthWrapper {
+ private ServerInfo serverInfo;
+
+ private AuthenticationInfo authenticationInfo;
+
+ private String key;
+
+ public SSHAuthWrapper(ServerInfo serverInfo, AuthenticationInfo authenticationInfo, String key) {
+ this.serverInfo = serverInfo;
+ this.authenticationInfo = authenticationInfo;
+ this.key = key;
+ }
+
+ public ServerInfo getServerInfo() {
+ return serverInfo;
+ }
+
+ public AuthenticationInfo getAuthenticationInfo() {
+ return authenticationInfo;
+ }
+
+ public String getKey() {
+ return key;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
new file mode 100644
index 0000000..9481188
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
@@ -0,0 +1,229 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.ssh.handler;
+
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.context.MessageContext;
+import org.apache.airavata.gfac.core.handler.AbstractHandler;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
+import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
+import org.apache.airavata.gfac.ssh.api.Cluster;
+import org.apache.airavata.gfac.ssh.api.SSHApiException;
+import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
+import org.apache.airavata.gfac.ssh.impl.authentication.DefaultPasswordAuthenticationInfo;
+import org.apache.airavata.gfac.ssh.impl.authentication.DefaultPublicKeyFileAuthentication;
+import org.apache.airavata.model.appcatalog.appinterface.DataType;
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.model.workspace.experiment.*;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.*;
+
+/**
+ * This handler will copy input data from gateway machine to airavata
+ * installed machine, later running handlers can copy the input files to computing resource
+ * <Handler class="AdvancedSCPOutputHandler">
+ * <property name="privateKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa"/>
+ * <property name="publicKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa.pub"/>
+ * <property name="userName" value="airavata"/>
+ * <property name="hostName" value="gw98.iu.xsede.org"/>
+ * <property name="inputPath" value="/home/airavata/outputData"/>
+ */
+public class AdvancedSCPInputHandler extends AbstractHandler {
+ private static final Logger log = LoggerFactory.getLogger(AdvancedSCPInputHandler.class);
+ public static final String ADVANCED_SSH_AUTH = "advanced.ssh.auth";
+ public static final int DEFAULT_SSH_PORT = 22;
+
+ private String password = null;
+
+ private String publicKeyPath;
+
+ private String passPhrase;
+
+ private String privateKeyPath;
+
+ private String userName;
+
+ private String hostName;
+
+ private String inputPath;
+
+ public void initProperties(Properties properties) throws GFacHandlerException {
+ password = (String) properties.get("password");
+ passPhrase = (String) properties.get("passPhrase");
+ privateKeyPath = (String) properties.get("privateKeyPath");
+ publicKeyPath = (String) properties.get("publicKeyPath");
+ userName = (String) properties.get("userName");
+ hostName = (String) properties.get("hostName");
+ inputPath = (String) properties.get("inputPath");
+ }
+
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ super.invoke(jobExecutionContext);
+ int index = 0;
+ int oldIndex = 0;
+ List<String> oldFiles = new ArrayList<String>();
+ MessageContext inputNew = new MessageContext();
+ StringBuffer data = new StringBuffer("|");
+ Cluster pbsCluster = null;
+
+ try {
+ String pluginData = GFacUtils.getHandlerData(jobExecutionContext, this.getClass().getName());
+ if (pluginData != null) {
+ try {
+ oldIndex = Integer.parseInt(pluginData.split("\\|")[0].trim());
+ oldFiles = Arrays.asList(pluginData.split("\\|")[1].split(","));
+ if (oldIndex == oldFiles.size()) {
+ log.info("Old data looks good !!!!");
+ } else {
+ oldIndex = 0;
+ oldFiles.clear();
+ }
+ } catch (NumberFormatException e) {
+ log.error("Previously stored data " + pluginData + " is wrong so we continue the operations");
+ }
+ }
+
+ AuthenticationInfo authenticationInfo = null;
+ if (password != null) {
+ authenticationInfo = new DefaultPasswordAuthenticationInfo(this.password);
+ } else {
+ authenticationInfo = new DefaultPublicKeyFileAuthentication(this.publicKeyPath, this.privateKeyPath,
+ this.passPhrase);
+ }
+
+ // Server info
+ String parentPath = inputPath + File.separator + jobExecutionContext.getExperimentID() + File.separator + jobExecutionContext.getTaskData().getTaskID();
+ if (index < oldIndex) {
+ parentPath = oldFiles.get(index);
+ data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index
+ } else {
+ (new File(parentPath)).mkdirs();
+ StringBuffer temp = new StringBuffer(data.append(parentPath).append(",").toString());
+ GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
+ }
+ DataTransferDetails detail = new DataTransferDetails();
+ TransferStatus status = new TransferStatus();
+ // here doesn't matter what the job manager is because we are only doing some file handling
+ // not really dealing with monitoring or job submission, so we pa
+
+ MessageContext input = jobExecutionContext.getInMessageContext();
+ Set<String> parameters = input.getParameters().keySet();
+ for (String paramName : parameters) {
+ InputDataObjectType inputParamType = (InputDataObjectType) input.getParameters().get(paramName);
+ String paramValue = inputParamType.getValue();
+ // TODO: Review this with type
+ if (inputParamType.getType() == DataType.URI) {
+ try {
+ URL file = new URL(paramValue);
+ String key = file.getUserInfo() + file.getHost() + DEFAULT_SSH_PORT;
+ GFACSSHUtils.prepareSecurityContext(jobExecutionContext, authenticationInfo, file.getUserInfo(), file.getHost(), DEFAULT_SSH_PORT);
+ pbsCluster = ((SSHSecurityContext)jobExecutionContext.getSecurityContext(key)).getPbsCluster();
+ paramValue = file.getPath();
+ } catch (MalformedURLException e) {
+ String key = this.userName + this.hostName + DEFAULT_SSH_PORT;
+ GFACSSHUtils.prepareSecurityContext(jobExecutionContext, authenticationInfo, this.userName, this.hostName, DEFAULT_SSH_PORT);
+ pbsCluster = ((SSHSecurityContext)jobExecutionContext.getSecurityContext(key)).getPbsCluster();
+ log.error(e.getLocalizedMessage(), e);
+ }
+
+ if (index < oldIndex) {
+ log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!");
+ inputParamType.setValue(oldFiles.get(index));
+ data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index
+ } else {
+ String stageInputFile = stageInputFiles(pbsCluster, paramValue, parentPath);
+ inputParamType.setValue(stageInputFile);
+ StringBuffer temp = new StringBuffer(data.append(stageInputFile).append(",").toString());
+ status.setTransferState(TransferState.UPLOAD);
+ detail.setTransferStatus(status);
+ detail.setTransferDescription("Input Data Staged: " + stageInputFile);
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+
+ GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
+ }
+ }
+ // FIXME: what is the thrift model DataType equivalent for URIArray type?
+// else if ("URIArray".equals(actualParameter.getType().getType().toString())) {
+// List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue));
+// List<String> newFiles = new ArrayList<String>();
+// for (String paramValueEach : split) {
+// try {
+// URL file = new URL(paramValue);
+// this.userName = file.getUserInfo();
+// this.hostName = file.getHost();
+// paramValueEach = file.getPath();
+// } catch (MalformedURLException e) {
+// log.error(e.getLocalizedMessage(), e);
+// }
+// if (index < oldIndex) {
+// log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!");
+// newFiles.add(oldFiles.get(index));
+// data.append(oldFiles.get(index++)).append(",");
+// } else {
+// String stageInputFiles = stageInputFiles(pbsCluster, paramValueEach, parentPath);
+// StringBuffer temp = new StringBuffer(data.append(stageInputFiles).append(",").toString());
+// GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
+// newFiles.add(stageInputFiles);
+// }
+// }
+// ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
+// }
+ inputNew.getParameters().put(paramName, inputParamType);
+ }
+ } catch (Exception e) {
+ log.error(e.getMessage());
+ try {
+ StringWriter errors = new StringWriter();
+ e.printStackTrace(new PrintWriter(errors));
+ GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ } catch (GFacException e1) {
+ log.error(e1.getLocalizedMessage());
+ }
+ throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
+ }
+ jobExecutionContext.setInMessageContext(inputNew);
+ }
+
+ public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ this.invoke(jobExecutionContext);
+ }
+
+ private String stageInputFiles(Cluster cluster, String paramValue, String parentPath) throws GFacException {
+ try {
+ cluster.scpFrom(paramValue, parentPath);
+ return "file://" + parentPath + File.separator + (new File(paramValue)).getName();
+ } catch (SSHApiException e) {
+ log.error("Error tranfering remote file to local file, remote path: " + paramValue);
+ throw new GFacException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
new file mode 100644
index 0000000..320f236
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
@@ -0,0 +1,225 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.ssh.handler;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.handler.AbstractHandler;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
+import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
+import org.apache.airavata.gfac.ssh.api.Cluster;
+import org.apache.airavata.gfac.ssh.api.SSHApiException;
+import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
+import org.apache.airavata.gfac.ssh.impl.authentication.DefaultPasswordAuthenticationInfo;
+import org.apache.airavata.gfac.ssh.impl.authentication.DefaultPublicKeyFileAuthentication;
+import org.apache.airavata.model.appcatalog.appinterface.DataType;
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.*;
+
+/**
+ * This handler will copy outputs from airavata installed local directory
+ * to a remote location, prior to this handler SCPOutputHandler should be invoked
+ * Should add following configuration to gfac-config.xml and configure the keys properly
+ * <Handler class="AdvancedSCPOutputHandler">
+ <property name="privateKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa"/>
+ <property name="publicKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa.pub"/>
+ <property name="userName" value="airavata"/>
+ <property name="hostName" value="gw98.iu.xsede.org"/>
+ <property name="outputPath" value="/home/airavata/outputData"/>
+ <property name="passPhrase" value="/home/airavata/outputData"/>
+ <property name="password" value="/home/airavata/outputData"/>
+
+ */
+public class AdvancedSCPOutputHandler extends AbstractHandler {
+ private static final Logger log = LoggerFactory.getLogger(AdvancedSCPOutputHandler.class);
+
+ public static final int DEFAULT_SSH_PORT = 22;
+
+ private String password = null;
+
+ private String publicKeyPath;
+
+ private String passPhrase;
+
+ private String privateKeyPath;
+
+ private String userName;
+
+ private String hostName;
+
+ private String outputPath;
+
+
+ public void initProperties(Properties properties) throws GFacHandlerException {
+ password = (String)properties.get("password");
+ passPhrase = (String)properties.get("passPhrase");
+ privateKeyPath = (String)properties.get("privateKeyPath");
+ publicKeyPath = (String)properties.get("publicKeyPath");
+ userName = (String)properties.get("userName");
+ hostName = (String)properties.get("hostName");
+ outputPath = (String)properties.get("outputPath");
+ }
+
+ @Override
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ Cluster pbsCluster = null;
+ AuthenticationInfo authenticationInfo = null;
+ if (password != null) {
+ authenticationInfo = new DefaultPasswordAuthenticationInfo(this.password);
+ } else {
+ authenticationInfo = new DefaultPublicKeyFileAuthentication(this.publicKeyPath, this.privateKeyPath,
+ this.passPhrase);
+ }
+ try {
+ String hostName = jobExecutionContext.getHostName();
+ if (jobExecutionContext.getSecurityContext(hostName) == null) {
+ try {
+ GFACSSHUtils.addSecurityContext(jobExecutionContext);
+ } catch (ApplicationSettingsException e) {
+ log.error(e.getMessage());
+ try {
+ StringWriter errors = new StringWriter();
+ e.printStackTrace(new PrintWriter(errors));
+ GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ } catch (GFacException e1) {
+ log.error(e1.getLocalizedMessage());
+ }
+ throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+ }
+ }
+ String standardError = jobExecutionContext.getStandardError();
+ String standardOutput = jobExecutionContext.getStandardOutput();
+ super.invoke(jobExecutionContext);
+ // Server info
+ if(jobExecutionContext.getTaskData().getAdvancedOutputDataHandling() != null && jobExecutionContext.getTaskData().getAdvancedOutputDataHandling().getOutputDataDir() != null){
+ try{
+ URL outputPathURL = new URL(jobExecutionContext.getTaskData().getAdvancedOutputDataHandling().getOutputDataDir());
+ this.userName = outputPathURL.getUserInfo();
+ this.hostName = outputPathURL.getHost();
+ outputPath = outputPathURL.getPath();
+ } catch (MalformedURLException e) {
+ log.error(e.getLocalizedMessage(),e);
+ }
+ }
+ String key = GFACSSHUtils.prepareSecurityContext(jobExecutionContext, authenticationInfo, this.userName, this.hostName, DEFAULT_SSH_PORT);
+ pbsCluster = ((SSHSecurityContext)jobExecutionContext.getSecurityContext(key)).getPbsCluster();
+ if(jobExecutionContext.getTaskData().getAdvancedOutputDataHandling() != null && !jobExecutionContext.getTaskData().getAdvancedOutputDataHandling().isPersistOutputData()){
+ outputPath = outputPath + File.separator + jobExecutionContext.getExperimentID() + "-" + jobExecutionContext.getTaskData().getTaskID()
+ + File.separator;
+ pbsCluster.makeDirectory(outputPath);
+ }
+ pbsCluster.scpTo(outputPath, standardError);
+ pbsCluster.scpTo(outputPath, standardOutput);
+ List<OutputDataObjectType> outputArray = new ArrayList<OutputDataObjectType>();
+ Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters();
+ Set<String> keys = output.keySet();
+ for (String paramName : keys) {
+ OutputDataObjectType outputDataObjectType = (OutputDataObjectType) output.get(paramName);
+ if (outputDataObjectType.getType() == DataType.URI) {
+ // for failed jobs outputs are not generated. So we should not download outputs
+ if (GFacUtils.isFailedJob(jobExecutionContext)){
+ continue;
+ }
+ String downloadFile = outputDataObjectType.getValue();
+ if(downloadFile == null || !(new File(downloadFile).isFile())){
+ GFacUtils.saveErrorDetails(jobExecutionContext, "Empty Output returned from the application", CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ throw new GFacHandlerException("Empty Output returned from the application.." );
+ }
+ pbsCluster.scpTo(outputPath, downloadFile);
+ String fileName = downloadFile.substring(downloadFile.lastIndexOf(File.separatorChar)+1, downloadFile.length());
+ OutputDataObjectType dataObjectType = new OutputDataObjectType();
+ dataObjectType.setValue(outputPath + File.separatorChar + fileName);
+ dataObjectType.setName(paramName);
+ dataObjectType.setType(DataType.URI);
+ dataObjectType.setIsRequired(outputDataObjectType.isIsRequired());
+ dataObjectType.setRequiredToAddedToCommandLine(outputDataObjectType.isRequiredToAddedToCommandLine());
+ dataObjectType.setApplicationArgument(outputDataObjectType.getApplicationArgument());
+ dataObjectType.setSearchQuery(outputDataObjectType.getSearchQuery());
+ outputArray.add(dataObjectType);
+ }else if (outputDataObjectType.getType() == DataType.STDOUT) {
+ pbsCluster.scpTo(outputPath, standardOutput);
+ String fileName = standardOutput.substring(standardOutput.lastIndexOf(File.separatorChar)+1, standardOutput.length());
+ OutputDataObjectType dataObjectType = new OutputDataObjectType();
+ dataObjectType.setValue(outputPath + File.separatorChar + fileName);
+ dataObjectType.setName(paramName);
+ dataObjectType.setType(DataType.STDOUT);
+ dataObjectType.setIsRequired(outputDataObjectType.isIsRequired());
+ dataObjectType.setRequiredToAddedToCommandLine(outputDataObjectType.isRequiredToAddedToCommandLine());
+ dataObjectType.setApplicationArgument(outputDataObjectType.getApplicationArgument());
+ dataObjectType.setSearchQuery(outputDataObjectType.getSearchQuery());
+ outputArray.add(dataObjectType);
+ }else if (outputDataObjectType.getType() == DataType.STDERR) {
+ pbsCluster.scpTo(outputPath, standardError);
+ String fileName = standardError.substring(standardError.lastIndexOf(File.separatorChar)+1, standardError.length());
+ OutputDataObjectType dataObjectType = new OutputDataObjectType();
+ dataObjectType.setValue(outputPath + File.separatorChar + fileName);
+ dataObjectType.setName(paramName);
+ dataObjectType.setType(DataType.STDERR);
+ dataObjectType.setIsRequired(outputDataObjectType.isIsRequired());
+ dataObjectType.setRequiredToAddedToCommandLine(outputDataObjectType.isRequiredToAddedToCommandLine());
+ dataObjectType.setApplicationArgument(outputDataObjectType.getApplicationArgument());
+ dataObjectType.setSearchQuery(outputDataObjectType.getSearchQuery());
+ outputArray.add(dataObjectType);
+ }
+ }
+ registry.add(ChildDataType.EXPERIMENT_OUTPUT, outputArray, jobExecutionContext.getExperimentID());
+ } catch (SSHApiException e) {
+ try {
+ StringWriter errors = new StringWriter();
+ e.printStackTrace(new PrintWriter(errors));
+ GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ } catch (GFacException e1) {
+ log.error(e1.getLocalizedMessage());
+ }
+ log.error("Error transfering files to remote host : " + hostName + " with the user: " + userName);
+ log.error(e.getMessage());
+ throw new GFacHandlerException(e);
+ } catch (Exception e) {
+ try {
+ GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ } catch (GFacException e1) {
+ log.error(e1.getLocalizedMessage());
+ }
+ throw new GFacHandlerException(e);
+ }
+ }
+
+ @Override
+ public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ // TODO: Auto generated method body.
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/NewSSHOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/NewSSHOutputHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/NewSSHOutputHandler.java
new file mode 100644
index 0000000..61a1805
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/NewSSHOutputHandler.java
@@ -0,0 +1,78 @@
+package org.apache.airavata.gfac.ssh.handler;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.handler.AbstractHandler;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.gfac.core.provider.GFacProviderException;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
+import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
+import org.apache.airavata.gfac.ssh.util.HandleOutputs;
+import org.apache.airavata.gfac.ssh.api.Cluster;
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NewSSHOutputHandler extends AbstractHandler{
+
+ private static final Logger log = LoggerFactory.getLogger(NewSSHOutputHandler.class);
+
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ String hostAddress = jobExecutionContext.getHostName();
+ Cluster cluster = null;
+ // Security Context and connection
+ try {
+ if (jobExecutionContext.getSecurityContext(hostAddress) == null) {
+ GFACSSHUtils.addSecurityContext(jobExecutionContext);
+ }
+ cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster();
+ if (cluster == null) {
+ throw new GFacProviderException("Security context is not set properly");
+ } else {
+ log.info("Successfully retrieved the Security Context");
+ }
+ } catch (Exception e) {
+ log.error(e.getMessage());
+ try {
+ StringWriter errors = new StringWriter();
+ e.printStackTrace(new PrintWriter(errors));
+ GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ } catch (GFacException e1) {
+ log.error(e1.getLocalizedMessage());
+ }
+ throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+ }
+
+ super.invoke(jobExecutionContext);
+ List<OutputDataObjectType> outputArray = HandleOutputs.handleOutputs(jobExecutionContext, cluster);
+ try {
+ registry.add(ChildDataType.EXPERIMENT_OUTPUT, outputArray, jobExecutionContext.getExperimentID());
+ } catch (RegistryException e) {
+ throw new GFacHandlerException(e);
+ }
+
+
+ }
+
+ @Override
+ public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ // TODO: Auto generated method body.
+ }
+
+ @Override
+ public void initProperties(Properties properties) throws GFacHandlerException {
+ // TODO Auto-generated method stub
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHDirectorySetupHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHDirectorySetupHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHDirectorySetupHandler.java
new file mode 100644
index 0000000..fb86dd3
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHDirectorySetupHandler.java
@@ -0,0 +1,119 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.ssh.handler;
+
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.handler.AbstractHandler;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
+import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
+import org.apache.airavata.gfac.ssh.api.Cluster;
+import org.apache.airavata.model.workspace.experiment.*;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Properties;
+
+public class SSHDirectorySetupHandler extends AbstractHandler {
+ private static final Logger log = LoggerFactory.getLogger(SSHDirectorySetupHandler.class);
+
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ try {
+ String hostAddress = jobExecutionContext.getHostName();
+ if (jobExecutionContext.getSecurityContext(hostAddress) == null) {
+ GFACSSHUtils.addSecurityContext(jobExecutionContext);
+ }
+ } catch (Exception e) {
+ log.error(e.getMessage());
+ try {
+ StringWriter errors = new StringWriter();
+ e.printStackTrace(new PrintWriter(errors));
+ GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ } catch (GFacException e1) {
+ log.error(e1.getLocalizedMessage());
+ }
+ throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+ }
+
+ log.info("Setup SSH job directorties");
+ super.invoke(jobExecutionContext);
+ makeDirectory(jobExecutionContext);
+
+ }
+
+ @Override
+ public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ // TODO: Auto generated method body.
+ }
+
+ private void makeDirectory(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ Cluster cluster = null;
+ try{
+ String hostAddress = jobExecutionContext.getHostName();
+ cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster();
+ if (cluster == null) {
+ throw new GFacHandlerException("Security context is not set properly");
+ } else {
+ log.info("Successfully retrieved the Security Context");
+ }
+ String workingDirectory = jobExecutionContext.getWorkingDir();
+ cluster.makeDirectory(workingDirectory);
+ if(!jobExecutionContext.getInputDir().equals(workingDirectory))
+ cluster.makeDirectory(jobExecutionContext.getInputDir());
+ if(!jobExecutionContext.getOutputDir().equals(workingDirectory))
+ cluster.makeDirectory(jobExecutionContext.getOutputDir());
+
+ DataTransferDetails detail = new DataTransferDetails();
+ TransferStatus status = new TransferStatus();
+ status.setTransferState(TransferState.DIRECTORY_SETUP);
+ detail.setTransferStatus(status);
+ detail.setTransferDescription("Working directory = " + workingDirectory);
+
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+
+ } catch (Exception e) {
+ DataTransferDetails detail = new DataTransferDetails();
+ TransferStatus status = new TransferStatus();
+ status.setTransferState(TransferState.FAILED);
+ detail.setTransferStatus(status);
+ detail.setTransferDescription("Working directory = " + jobExecutionContext.getWorkingDir());
+ try {
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+ StringWriter errors = new StringWriter();
+ e.printStackTrace(new PrintWriter(errors));
+ GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE);
+ } catch (Exception e1) {
+ throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
+ }
+ throw new GFacHandlerException("Error executing the Handler: " + SSHDirectorySetupHandler.class, e);
+ }
+
+ }
+
+ public void initProperties(Properties properties) throws GFacHandlerException {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java
new file mode 100644
index 0000000..277ff0e
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java
@@ -0,0 +1,198 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.ssh.handler;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.context.MessageContext;
+import org.apache.airavata.gfac.core.handler.AbstractHandler;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
+import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
+import org.apache.airavata.gfac.ssh.api.Cluster;
+import org.apache.airavata.model.appcatalog.appinterface.DataType;
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.model.workspace.experiment.*;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+public class SSHInputHandler extends AbstractHandler {
+
+ private static final Logger log = LoggerFactory.getLogger(SSHInputHandler.class);
+
+
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ DataTransferDetails detail = new DataTransferDetails();
+ detail.setTransferDescription("Input Data Staging");
+ TransferStatus status = new TransferStatus();
+ int index = 0;
+ int oldIndex = 0;
+ List<String> oldFiles = new ArrayList<String>();
+ StringBuffer data = new StringBuffer("|");
+ MessageContext inputNew = new MessageContext();
+ Cluster cluster = null;
+
+ try {
+ String hostAddress = jobExecutionContext.getHostName();
+ if (jobExecutionContext.getSecurityContext(hostAddress) == null) {
+ try {
+ GFACSSHUtils.addSecurityContext(jobExecutionContext);
+ } catch (ApplicationSettingsException e) {
+ log.error(e.getMessage());
+ try {
+ StringWriter errors = new StringWriter();
+ e.printStackTrace(new PrintWriter(errors));
+ GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ } catch (GFacException e1) {
+ log.error(e1.getLocalizedMessage());
+ }
+ throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+ }
+ }
+
+ cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster();
+ if (cluster == null) {
+ throw new GFacException("Security context is not set properly");
+ } else {
+ log.info("Successfully retrieved the Security Context");
+ }
+ log.info("Invoking SCPInputHandler");
+ super.invoke(jobExecutionContext);
+
+
+ MessageContext input = jobExecutionContext.getInMessageContext();
+ Set<String> parameters = input.getParameters().keySet();
+ for (String paramName : parameters) {
+ InputDataObjectType inputParamType = (InputDataObjectType) input.getParameters().get(paramName);
+ String paramValue = inputParamType.getValue();
+ //TODO: Review this with type
+ if (inputParamType.getType() == DataType.URI) {
+ if (index < oldIndex) {
+ log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!");
+ inputParamType.setValue(oldFiles.get(index));
+ data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index
+ } else {
+ String stageInputFile = stageInputFiles(cluster, jobExecutionContext, paramValue);
+ inputParamType.setValue(stageInputFile);
+ StringBuffer temp = new StringBuffer(data.append(stageInputFile).append(",").toString());
+ status.setTransferState(TransferState.UPLOAD);
+ detail.setTransferStatus(status);
+ detail.setTransferDescription("Input Data Staged: " + stageInputFile);
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+
+ GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
+ }
+ }// FIXME: what is the thrift model DataType equivalent for URIArray type?
+// else if ("URIArray".equals(actualParameter.getType().getType().toString())) {
+// if (index < oldIndex) {
+// log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!");
+// ((URIParameterType) actualParameter.getType()).setValue(oldFiles.get(index));
+// data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index
+// }else{
+// List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue));
+// List<String> newFiles = new ArrayList<String>();
+// for (String paramValueEach : split) {
+// String stageInputFiles = stageInputFiles(cluster,jobExecutionContext, paramValueEach);
+// status.setTransferState(TransferState.UPLOAD);
+// detail.setTransferStatus(status);
+// detail.setTransferDescription("Input Data Staged: " + stageInputFiles);
+// registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+// newFiles.add(stageInputFiles);
+// StringBuffer temp = new StringBuffer(data.append(stageInputFiles).append(",").toString());
+// GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
+// }
+// ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
+// }
+// }
+ inputNew.getParameters().put(paramName, inputParamType);
+ }
+ } catch (Exception e) {
+ log.error(e.getMessage());
+ status.setTransferState(TransferState.FAILED);
+ detail.setTransferStatus(status);
+ try {
+ StringWriter errors = new StringWriter();
+ e.printStackTrace(new PrintWriter(errors));
+ GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE);
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+ } catch (Exception e1) {
+ throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
+ }
+ throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
+ }
+ jobExecutionContext.setInMessageContext(inputNew);
+ }
+
+ @Override
+ public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ // TODO: Auto generated method body.
+ }
+
+ private static String stageInputFiles(Cluster cluster, JobExecutionContext jobExecutionContext, String paramValue) throws IOException, GFacException {
+ int i = paramValue.lastIndexOf(File.separator);
+ String substring = paramValue.substring(i + 1);
+ try {
+ String targetFile = jobExecutionContext.getInputDir() + File.separator + substring;
+ if(paramValue.startsWith("scp:")){
+ paramValue = paramValue.substring(paramValue.indexOf(":") + 1, paramValue.length());
+ cluster.scpThirdParty(paramValue, targetFile);
+ }else{
+ if(paramValue.startsWith("file")){
+ paramValue = paramValue.substring(paramValue.indexOf(":") + 1, paramValue.length());
+ }
+ boolean success = false;
+ int j = 1;
+ while(!success){
+ try {
+ cluster.scpTo(targetFile, paramValue);
+ success = true;
+ } catch (Exception e) {
+ log.info(e.getLocalizedMessage());
+ Thread.sleep(2000);
+ if(j==3) {
+ throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
+ }
+ }
+ j++;
+ }
+ }
+ return targetFile;
+ } catch (Exception e) {
+ throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
+ }
+ }
+
+ public void initProperties(Properties properties) throws GFacHandlerException {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java
new file mode 100644
index 0000000..7c5538a
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java
@@ -0,0 +1,256 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.ssh.handler;
+
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.handler.AbstractHandler;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.gfac.core.provider.GFacProviderException;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.impl.OutputUtils;
+import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
+import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
+import org.apache.airavata.gfac.ssh.api.Cluster;
+import org.apache.airavata.model.appcatalog.appinterface.DataType;
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+import org.apache.airavata.model.workspace.experiment.DataTransferDetails;
+import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.TransferState;
+import org.apache.airavata.model.workspace.experiment.TransferStatus;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+public class SSHOutputHandler extends AbstractHandler {
+ private static final Logger log = LoggerFactory.getLogger(SSHOutputHandler.class);
+
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ String hostAddress = jobExecutionContext.getHostName();
+ try {
+ if (jobExecutionContext.getSecurityContext(hostAddress) == null) {
+ GFACSSHUtils.addSecurityContext(jobExecutionContext);
+ }
+ } catch (Exception e) {
+ log.error(e.getMessage());
+ try {
+ StringWriter errors = new StringWriter();
+ e.printStackTrace(new PrintWriter(errors));
+ GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ } catch (GFacException e1) {
+ log.error(e1.getLocalizedMessage());
+ }
+ throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+ }
+
+ super.invoke(jobExecutionContext);
+ DataTransferDetails detail = new DataTransferDetails();
+ detail.setTransferDescription("Output data staging");
+ TransferStatus status = new TransferStatus();
+
+ Cluster cluster = null;
+ try {
+ cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster();
+ if (cluster == null) {
+ throw new GFacProviderException("Security context is not set properly");
+ } else {
+ log.info("Successfully retrieved the Security Context");
+ }
+
+ // Get the Stdouts and StdErrs
+ String timeStampedExperimentID = GFacUtils.createUniqueNameWithDate(jobExecutionContext.getExperimentID());
+
+ TaskDetails taskData = jobExecutionContext.getTaskData();
+ String outputDataDir = ServerSettings.getSetting(Constants.OUTPUT_DATA_DIR, File.separator + "tmp");
+ File localStdOutFile;
+ File localStdErrFile;
+ //FIXME: AdvancedOutput is remote location and third party transfer should work to make this work
+// if (taskData.getAdvancedOutputDataHandling() != null) {
+// outputDataDir = taskData.getAdvancedOutputDataHandling().getOutputDataDir();
+// }
+ if (outputDataDir == null) {
+ outputDataDir = File.separator + "tmp";
+ }
+ outputDataDir = outputDataDir + File.separator + jobExecutionContext.getExperimentID() + "-" + jobExecutionContext.getTaskData().getTaskID();
+ (new File(outputDataDir)).mkdirs();
+
+
+ localStdOutFile = new File(outputDataDir + File.separator + timeStampedExperimentID + "stdout");
+ localStdErrFile = new File(outputDataDir + File.separator + timeStampedExperimentID + "stderr");
+// cluster.makeDirectory(outputDataDir);
+ int i = 0;
+ String stdOutStr = "";
+ while (stdOutStr.isEmpty()) {
+ try {
+ cluster.scpFrom(jobExecutionContext.getStandardOutput(), localStdOutFile.getAbsolutePath());
+ stdOutStr = GFacUtils.readFileToString(localStdOutFile.getAbsolutePath());
+ } catch (Exception e) {
+ log.error(e.getLocalizedMessage());
+ Thread.sleep(2000);
+ }
+ i++;
+ if (i == 3) break;
+ }
+ Thread.sleep(1000);
+ cluster.scpFrom(jobExecutionContext.getStandardError(), localStdErrFile.getAbsolutePath());
+ Thread.sleep(1000);
+
+ String stdErrStr = GFacUtils.readFileToString(localStdErrFile.getAbsolutePath());
+ status.setTransferState(TransferState.STDOUT_DOWNLOAD);
+ detail.setTransferStatus(status);
+ detail.setTransferDescription("STDOUT:" + localStdOutFile.getAbsolutePath());
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+
+ status.setTransferState(TransferState.STDERROR_DOWNLOAD);
+ detail.setTransferStatus(status);
+ detail.setTransferDescription("STDERR:" + localStdErrFile.getAbsolutePath());
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+
+
+ List<OutputDataObjectType> outputArray = new ArrayList<OutputDataObjectType>();
+ Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters();
+ Set<String> keys = output.keySet();
+ for (String paramName : keys) {
+ OutputDataObjectType actualParameter = (OutputDataObjectType) output.get(paramName);
+ if (DataType.URI == actualParameter.getType()) {
+ List<String> outputList = null;
+ int retry = 3;
+ while (retry > 0) {
+ outputList = cluster.listDirectory(jobExecutionContext.getOutputDir());
+ if (outputList.size() > 0) {
+ break;
+ }
+ retry--;
+ Thread.sleep(2000);
+ }
+
+ if (outputList.size() == 0 || outputList.get(0).isEmpty() || outputList.size() > 1) {
+ OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr, outputArray);
+ Set<String> strings = output.keySet();
+ outputArray.clear();
+ for (String key : strings) {
+ OutputDataObjectType actualParameter1 = (OutputDataObjectType) output.get(key);
+ if (DataType.URI == actualParameter1.getType()) {
+ String downloadFile = actualParameter1.getValue();
+ cluster.scpFrom(downloadFile, outputDataDir);
+ String fileName = downloadFile.substring(downloadFile.lastIndexOf(File.separatorChar) + 1, downloadFile.length());
+ String localFile = outputDataDir + File.separator + fileName;
+ jobExecutionContext.addOutputFile(localFile);
+ actualParameter1.setValue(localFile);
+ OutputDataObjectType dataObjectType = new OutputDataObjectType();
+ dataObjectType.setValue(localFile);
+ dataObjectType.setName(key);
+ dataObjectType.setType(DataType.URI);
+ outputArray.add(dataObjectType);
+ }else if (DataType.STDOUT == actualParameter.getType()) {
+ String fileName = localStdOutFile.getName();
+ String localFile = outputDataDir + File.separator + fileName;
+ jobExecutionContext.addOutputFile(localFile);
+ actualParameter.setValue(localFile);
+ OutputDataObjectType dataObjectType = new OutputDataObjectType();
+ dataObjectType.setValue(localFile);
+ dataObjectType.setName(key);
+ dataObjectType.setType(DataType.STDOUT);
+ outputArray.add(dataObjectType);
+ }else if (DataType.STDERR == actualParameter.getType()) {
+ String fileName = localStdErrFile.getName();
+ String localFile = outputDataDir + File.separator + fileName;
+ jobExecutionContext.addOutputFile(localFile);
+ actualParameter.setValue(localFile);
+ OutputDataObjectType dataObjectType = new OutputDataObjectType();
+ dataObjectType.setValue(localFile);
+ dataObjectType.setName(key);
+ dataObjectType.setType(DataType.STDERR);
+ outputArray.add(dataObjectType);
+ }
+ }
+ break;
+ } else if (outputList.size() == 1) {//FIXME: Ultrascan case
+ String valueList = outputList.get(0);
+ cluster.scpFrom(jobExecutionContext.getOutputDir() + File.separator + valueList, outputDataDir);
+ String outputPath = outputDataDir + File.separator + valueList;
+ jobExecutionContext.addOutputFile(outputPath);
+ actualParameter.setValue(outputPath);
+ OutputDataObjectType dataObjectType = new OutputDataObjectType();
+ dataObjectType.setValue(outputPath);
+ dataObjectType.setName(paramName);
+ dataObjectType.setType(DataType.URI);
+ outputArray.add(dataObjectType);
+ }
+ } else {
+ OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr, outputArray);
+ }
+ }
+ if (outputArray == null || outputArray.isEmpty()) {
+ log.error("Empty Output returned from the Application, Double check the application and ApplicationDescriptor output Parameter Names");
+ if (jobExecutionContext.getTaskData().getAdvancedOutputDataHandling() == null) {
+ throw new GFacHandlerException(
+ "Empty Output returned from the Application, Double check the application"
+ + "and ApplicationDescriptor output Parameter Names");
+ }
+ }
+ jobExecutionContext.setStandardError(localStdErrFile.getAbsolutePath());
+ jobExecutionContext.setStandardOutput(localStdOutFile.getAbsolutePath());
+ jobExecutionContext.setOutputDir(outputDataDir);
+ status.setTransferState(TransferState.DOWNLOAD);
+ detail.setTransferStatus(status);
+ detail.setTransferDescription(outputDataDir);
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+ registry.add(ChildDataType.EXPERIMENT_OUTPUT, outputArray, jobExecutionContext.getExperimentID());
+
+ } catch (Exception e) {
+ try {
+ status.setTransferState(TransferState.FAILED);
+ detail.setTransferStatus(status);
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+ StringWriter errors = new StringWriter();
+ e.printStackTrace(new PrintWriter(errors));
+ GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE);
+ } catch (Exception e1) {
+ throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
+ }
+ throw new GFacHandlerException("Error in retrieving results", e);
+ }
+
+ }
+
+ @Override
+ public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ // TODO: Auto generated method body.
+ }
+
+ public void initProperties(Properties properties) throws GFacHandlerException {
+
+ }
+}