You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/06/16 23:12:42 UTC
[1/2] airavata git commit: Added Scp input and output data staging
tasks
Repository: airavata
Updated Branches:
refs/heads/master 2bb805a02 -> f6d34cc64
Added Scp input and output data staging tasks
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/e2235afa
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/e2235afa
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/e2235afa
Branch: refs/heads/master
Commit: e2235afa9f520ef14e3a44c57c07b962a56e022c
Parents: d05c0a1
Author: Shameera Rathanyaka <sh...@gmail.com>
Authored: Tue Jun 16 17:12:18 2015 -0400
Committer: Shameera Rathanyaka <sh...@gmail.com>
Committed: Tue Jun 16 17:12:18 2015 -0400
----------------------------------------------------------------------
.../gfac/impl/DefaultHostScheduler.java | 37 +++++++
.../org/apache/airavata/gfac/impl/Factory.java | 9 +-
.../gfac/impl/GfacInternalStatusUpdator.java | 104 ------------------
.../airavata/gfac/impl/InputHandlerWorker.java | 52 ---------
.../airavata/gfac/impl/OutHandlerWorker.java | 86 ---------------
.../apache/airavata/gfac/impl/OutputUtils.java | 110 -------------------
.../gfac/impl/task/AbstractSCPTask.java | 62 +++++++++++
.../gfac/impl/task/SCPFileTransferTask.java | 98 -----------------
.../gfac/impl/task/SCPInputDataStageTask.java | 78 +++++++++++++
.../gfac/impl/task/SCPOutputDataStatgeTask.java | 70 ++++++++++++
10 files changed, 254 insertions(+), 452 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/e2235afa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/DefaultHostScheduler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/DefaultHostScheduler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/DefaultHostScheduler.java
new file mode 100644
index 0000000..d9b6a76
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/DefaultHostScheduler.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.impl;
+
+import org.apache.airavata.gfac.core.scheduler.HostScheduler;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+
+import java.util.List;
+
+public class DefaultHostScheduler implements HostScheduler {
+ @Override
+ public ComputeResourceDescription schedule(List<ComputeResourceDescription> registeredHosts) {
+ if (registeredHosts == null || registeredHosts.isEmpty()) {
+ return null;
+ } else {
+ return registeredHosts.get(0); // return first schedulear in the list.
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e2235afa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
index a0d3a9b..93c8de9 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
@@ -31,6 +31,7 @@ import org.apache.airavata.gfac.core.JobManagerConfiguration;
import org.apache.airavata.gfac.core.cluster.RemoteCluster;
import org.apache.airavata.gfac.core.cluster.ServerInfo;
import org.apache.airavata.gfac.core.monitor.JobMonitor;
+import org.apache.airavata.gfac.core.scheduler.HostScheduler;
import org.apache.airavata.gfac.impl.job.LSFJobConfiguration;
import org.apache.airavata.gfac.impl.job.LSFOutputParser;
import org.apache.airavata.gfac.impl.job.PBSJobConfiguration;
@@ -40,7 +41,6 @@ import org.apache.airavata.gfac.impl.job.SlurmOutputParser;
import org.apache.airavata.gfac.impl.job.UGEJobConfiguration;
import org.apache.airavata.gfac.impl.job.UGEOutputParser;
import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor;
-import org.apache.airavata.gfac.monitor.email.EmailMonitorFactory;
import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
import org.apache.airavata.registry.cpi.AppCatalog;
@@ -119,7 +119,7 @@ public abstract class Factory {
return null; // TODO write a job monitor for this.
} else {
if (emailBasedMonitor == null) {
- synchronized (EmailMonitorFactory.class){
+ synchronized (EmailBasedMonitor.class){
if (emailBasedMonitor == null) {
emailBasedMonitor = new EmailBasedMonitor(resourceJobManagerType);
emailBasedMonitor.setDate(startMonitorDate);
@@ -146,4 +146,9 @@ public abstract class Factory {
public static JobManagerConfiguration getLSFJobManager(String installedPath) {
return new LSFJobConfiguration("LSFTemplate.xslt", ".lsf", installedPath, new LSFOutputParser());
}
+
+ public static HostScheduler getHostScheduler() {
+ return new DefaultHostScheduler();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e2235afa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GfacInternalStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GfacInternalStatusUpdator.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GfacInternalStatusUpdator.java
deleted file mode 100644
index a45eb23..0000000
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GfacInternalStatusUpdator.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.impl;
-
-import com.google.common.eventbus.Subscribe;
-import org.apache.airavata.common.utils.AiravataZKUtils;
-import org.apache.airavata.common.utils.Constants;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.common.utils.listener.AbstractActivityListener;
-import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest;
-import org.apache.airavata.gfac.core.states.GfacExperimentState;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-
-public class GfacInternalStatusUpdator implements AbstractActivityListener, Watcher {
- private final static Logger logger = LoggerFactory.getLogger(GfacInternalStatusUpdator.class);
-
- private CuratorFramework curatorClient;
-
- private static Integer mutex = -1;
-
- @Subscribe
- public void updateZK(GfacExperimentStateChangeRequest statusChangeRequest) throws Exception {
- logger.info("Gfac internal state changed to: " + statusChangeRequest.getState().toString());
- MonitorID monitorID = statusChangeRequest.getMonitorID();
- String experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
- String experimentPath = experimentNode + File.separator + ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME)
- + File.separator + statusChangeRequest.getMonitorID().getExperimentID();
- Stat exists = null;
- if(!(GfacExperimentState.COMPLETED.equals(statusChangeRequest.getState()) || GfacExperimentState.FAILED.equals(statusChangeRequest.getState()))) {
- exists = curatorClient.checkExists().forPath(experimentPath);
- if (exists == null) {
- logger.error("ZK path: " + experimentPath + " does not exists !!");
- return;
- }
- Stat state = curatorClient.checkExists().forPath(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
- if (state == null) {
- // state znode has to be created
- curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).
- forPath(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
- String.valueOf(statusChangeRequest.getState().getValue()).getBytes());
- } else {
- curatorClient.setData().withVersion(state.getVersion()).forPath(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
- String.valueOf(statusChangeRequest.getState().getValue()).getBytes());
- }
- }
- switch (statusChangeRequest.getState()) {
- case COMPLETED:
- logger.info("Experiment Completed, So removing the ZK entry for the experiment" + monitorID.getExperimentID());
- logger.info("Zookeeper experiment Path: " + experimentPath);
- break;
- case FAILED:
- logger.info("Experiment Failed, So removing the ZK entry for the experiment" + monitorID.getExperimentID());
- logger.info("Zookeeper experiment Path: " + experimentPath);
- break;
- default:
- }
- }
-
- public void setup(Object... configurations) {
- for (Object configuration : configurations) {
- if (configuration instanceof CuratorFramework) {
- this.curatorClient = (CuratorFramework) configuration;
- }
- }
- }
-
- public void process(WatchedEvent watchedEvent) {
- logger.info(watchedEvent.getPath());
- synchronized (mutex) {
- Event.KeeperState state = watchedEvent.getState();
- if (state == Event.KeeperState.SyncConnected) {
- mutex.notify();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e2235afa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/InputHandlerWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/InputHandlerWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/InputHandlerWorker.java
deleted file mode 100644
index 461cc1e..0000000
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/InputHandlerWorker.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.impl;
-
-import org.apache.airavata.gfac.core.GFac;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class InputHandlerWorker implements Runnable {
- private static Logger log = LoggerFactory.getLogger(InputHandlerWorker.class);
-
- String experimentId;
- String taskId;
- String gatewayId;
- String tokenId;
-
- GFac gfac;
- public InputHandlerWorker(GFac gfac, String experimentId,String taskId,String gatewayId, String tokenId) {
- this.gfac = gfac;
- this.experimentId = experimentId;
- this.taskId = taskId;
- this.gatewayId = gatewayId;
- this.tokenId = tokenId;
- }
-
- @Override
- public void run() {
- try {
- gfac.submitJob(experimentId, taskId, gatewayId, tokenId);
- } catch (Exception e) {
- log.error(e.getMessage(), e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e2235afa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/OutHandlerWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/OutHandlerWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/OutHandlerWorker.java
deleted file mode 100644
index 227550c..0000000
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/OutHandlerWorker.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.impl;
-
-import org.apache.airavata.common.utils.LocalEventPublisher;
-import org.apache.airavata.gfac.core.GFacException;
-import org.apache.airavata.gfac.core.context.JobExecutionContext;
-import org.apache.airavata.gfac.core.GFac;
-import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.core.GFacUtils;
-import org.apache.airavata.model.messaging.event.TaskIdentifier;
-import org.apache.airavata.model.messaging.event.TaskStatusChangeRequestEvent;
-import org.apache.airavata.model.status.TaskState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-
-public class OutHandlerWorker implements Runnable {
- private final static Logger logger = LoggerFactory.getLogger(OutHandlerWorker.class);
-
- private GFac gfac;
-
- private MonitorID monitorID;
-
- private LocalEventPublisher localEventPublisher;
- private JobExecutionContext jEC;
-
- public OutHandlerWorker(GFac gfac, MonitorID monitorID,LocalEventPublisher localEventPublisher) {
- this.gfac = gfac;
- this.monitorID = monitorID;
- this.localEventPublisher = localEventPublisher;
- this.jEC = monitorID.getJobExecutionContext();
- }
-
- public OutHandlerWorker(JobExecutionContext jEC) {
- this.jEC = jEC;
- this.gfac = jEC.getGfac();
- this.localEventPublisher = jEC.getLocalEventPublisher();
- }
-
- @Override
- public void run() {
- try {
-// gfac.invokeOutFlowHandlers(monitorID.getJobExecutionContext());
- gfac.invokeOutFlowHandlers(jEC);
- } catch (Exception e) {
- logger.error(e.getMessage(),e);
- TaskIdentifier taskIdentifier = new TaskIdentifier(monitorID.getTaskID(), monitorID.getWorkflowNodeID(),monitorID.getExperimentID(), monitorID.getJobExecutionContext().getGatewayID());
- //FIXME this is a case where the output retrieving fails even if the job execution was a success. Thus updating the task status
- localEventPublisher.publish(new TaskStatusChangeRequestEvent(TaskState.FAILED, taskIdentifier));
- try {
- StringWriter errors = new StringWriter();
- e.printStackTrace(new PrintWriter(errors));
- GFacUtils.saveErrorDetails(monitorID.getJobExecutionContext(), errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- } catch (GFacException e1) {
- logger.error("Error while persisting error details", e);
- }
- logger.info(e.getLocalizedMessage(), e);
- // Save error details to registry
-
- }
-// localEventPublisher.publish(monitorID.getStatus());
- localEventPublisher.publish(jEC.getJobDetails().getJobStatus());
-
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e2235afa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/OutputUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/OutputUtils.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/OutputUtils.java
deleted file mode 100644
index 904839c..0000000
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/OutputUtils.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.impl;
-
-import org.apache.airavata.common.utils.StringUtil;
-import org.apache.airavata.model.appcatalog.appinterface.DataType;
-import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-public class OutputUtils {
- private static String regexPattern = "\\s*=\\s*(.*)\\r?\\n";
-
- public static void fillOutputFromStdout(Map<String, Object> output, String stdout, String stderr, List<OutputDataObjectType> outputArray) throws Exception {
- // this is no longer correct
-// if (stdout == null || stdout.equals("")) {
-// throw new GFacHandlerException("Standard output is empty.");
-// }
-
- Set<String> keys = output.keySet();
- OutputDataObjectType actual = null;
- OutputDataObjectType resultOutput = null;
- for (String paramName : keys) {
- actual = (OutputDataObjectType) output.get(paramName);
- // if parameter value is not already set, we let it go
-
- if (actual == null) {
- continue;
- }
- resultOutput = new OutputDataObjectType();
- if (DataType.STDOUT == actual.getType()) {
- actual.setValue(stdout);
- resultOutput.setName(paramName);
- resultOutput.setType(DataType.STDOUT);
- resultOutput.setValue(stdout);
- outputArray.add(resultOutput);
- } else if (DataType.STDERR == actual.getType()) {
- actual.setValue(stderr);
- resultOutput.setName(paramName);
- resultOutput.setType(DataType.STDERR);
- resultOutput.setValue(stderr);
- outputArray.add(resultOutput);
- }
-// else if ("URI".equals(actual.getType().getType().toString())) {
-// continue;
-// }
- else {
- String parseStdout = parseStdout(stdout, paramName);
- if (parseStdout != null) {
- actual.setValue(parseStdout);
- resultOutput.setName(paramName);
- resultOutput.setType(DataType.STRING);
- resultOutput.setValue(parseStdout);
- outputArray.add(resultOutput);
- }
- }
- }
- }
-
- private static String parseStdout(String stdout, String outParam) throws Exception {
- String regex = Pattern.quote(outParam) + regexPattern;
- String match = null;
- Pattern pattern = Pattern.compile(regex);
- Matcher matcher = pattern.matcher(stdout);
- while (matcher.find()) {
- match = matcher.group(1);
- }
- if (match != null) {
- match = match.trim();
- return match;
- }
- return null;
- }
-
- public static String[] parseStdoutArray(String stdout, String outParam) throws Exception {
- String regex = Pattern.quote(outParam) + regexPattern;
- StringBuffer match = new StringBuffer();
- Pattern pattern = Pattern.compile(regex);
- Matcher matcher = pattern.matcher(stdout);
- while (matcher.find()) {
- match.append(matcher.group(1) + StringUtil.DELIMETER);
- }
- if (match != null && match.length() >0) {
- return StringUtil.getElementsFromString(match.toString());
- }
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e2235afa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AbstractSCPTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AbstractSCPTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AbstractSCPTask.java
new file mode 100644
index 0000000..9abc380
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AbstractSCPTask.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.impl.task;
+
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.Task;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.model.status.TaskState;
+
+import java.util.Map;
+
+public class AbstractSCPTask implements Task {
+ protected static final int DEFAULT_SSH_PORT = 22;
+ protected String password;
+ protected String publicKeyPath;
+ protected String passPhrase;
+ protected String privateKeyPath;
+ protected String userName;
+ protected String hostName;
+ protected String inputPath;
+
+ @Override
+ public void init(Map<String, String> propertyMap) throws TaskException {
+ password = propertyMap.get("password");
+ passPhrase = propertyMap.get("passPhrase");
+ privateKeyPath = propertyMap.get("privateKeyPath");
+ publicKeyPath = propertyMap.get("publicKeyPath");
+ userName = propertyMap.get("userName");
+ hostName = propertyMap.get("hostName");
+ inputPath = propertyMap.get("inputPath");
+ }
+
+ @Override
+ public TaskState execute(TaskContext taskContext) throws TaskException {
+ return null;
+ }
+
+ @Override
+ public TaskState recover(TaskContext taskContext) throws TaskException {
+ return null;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e2235afa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPFileTransferTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPFileTransferTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPFileTransferTask.java
deleted file mode 100644
index 49560a6..0000000
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPFileTransferTask.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.airavata.gfac.impl.task;
-
-import com.jcraft.jsch.JSch;
-import com.jcraft.jsch.JSchException;
-import com.jcraft.jsch.Session;
-import org.apache.airavata.gfac.core.SSHApiException;
-import org.apache.airavata.gfac.core.context.TaskContext;
-import org.apache.airavata.gfac.core.task.Task;
-import org.apache.airavata.gfac.core.task.TaskException;
-import org.apache.airavata.gfac.impl.SSHUtils;
-import org.apache.airavata.model.status.TaskState;
-import org.apache.airavata.model.task.DataStagingTaskModel;
-
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Map;
-
-public class SCPFileTransferTask implements Task {
-
- public static final int DEFAULT_SSH_PORT = 22;
- private String password;
- private String publicKeyPath;
- private String passPhrase;
- private String privateKeyPath;
- private String userName;
- private String hostName;
- private String inputPath;
-
-
- @Override
- public void init(Map<String, String> properties) throws TaskException {
- password = properties.get("password");
- passPhrase = properties.get("passPhrase");
- privateKeyPath = properties.get("privateKeyPath");
- publicKeyPath = properties.get("publicKeyPath");
- userName = properties.get("userName");
- hostName = properties.get("hostName");
- inputPath = properties.get("inputPath");
- }
-
- @Override
- public TaskState execute(TaskContext taskContext) throws TaskException {
- DataStagingTaskModel dataStagingTaskModel = new DataStagingTaskModel();
- try {
- URL sourceURL = new URL(dataStagingTaskModel.getSource());
- URL destinationURL = new URL(dataStagingTaskModel.getDestination());
-
- if (sourceURL.getProtocol().equalsIgnoreCase("file")) { // local --> Airavata --> RemoteCluster
- taskContext.getParentProcessContext().getRemoteCluster().scpTo(sourceURL.getPath(),
- dataStagingTaskModel.getDestination());
- } else { // PGA(client) --> Airavata --> RemoteCluster
- // PGA(client) --> Airavata
- JSch jsch = new JSch();
- jsch.addIdentity(privateKeyPath, passPhrase);
- Session session = jsch.getSession(userName, hostName, DEFAULT_SSH_PORT);
- SSHUtils.scpFrom(sourceURL.getPath(), inputPath, session);
-
- // Airavata --> RemoteCluster
- taskContext.getParentProcessContext().getRemoteCluster().scpTo(destinationURL.getPath(), inputPath);
- }
- } catch (MalformedURLException e) {
- throw new TaskException("Wrong source or destination file path.", e);
- } catch (SSHApiException e) {
- throw new TaskException("Scp attempt failed", e);
- } catch (JSchException | IOException e) {
- throw new TaskException("Scp failed", e);
- }
- return null;
- }
-
- @Override
- public TaskState recover(TaskContext taskContext) throws TaskException {
- return null;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e2235afa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java
new file mode 100644
index 0000000..8a74b3d
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.impl.task;
+
+import com.jcraft.jsch.JSch;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.SSHUtils;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.task.DataStagingTaskModel;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+public class SCPInputDataStageTask extends AbstractSCPTask {
+
+ public SCPInputDataStageTask() {
+ }
+
+ @Override
+ public TaskState execute(TaskContext taskContext) throws TaskException {
+ DataStagingTaskModel dataStagingTaskModel = new DataStagingTaskModel();
+ try {
+ URL sourceURL = new URL(dataStagingTaskModel.getSource());
+ URL destinationURL = new URL(dataStagingTaskModel.getDestination());
+
+ if (sourceURL.getProtocol().equalsIgnoreCase("file")) { // local --> Airavata --> RemoteCluster
+ taskContext.getParentProcessContext().getRemoteCluster().scpTo(sourceURL.getPath(),
+ dataStagingTaskModel.getDestination());
+ } else { // PGA(client) --> Airavata --> RemoteCluster
+ // PGA(client) --> Airavata
+ JSch jsch = new JSch();
+ jsch.addIdentity(privateKeyPath, passPhrase);
+ Session session = jsch.getSession(userName, hostName, DEFAULT_SSH_PORT);
+ SSHUtils.scpFrom(sourceURL.getPath(), inputPath, session);
+
+ // Airavata --> RemoteCluster
+ taskContext.getParentProcessContext().getRemoteCluster().scpTo(destinationURL.getPath(), inputPath);
+ }
+ } catch (MalformedURLException e) {
+ throw new TaskException("Wrong source or destination file path.", e);
+ } catch (SSHApiException e) {
+ throw new TaskException("Scp attempt failed", e);
+ } catch (JSchException | IOException e) {
+ throw new TaskException("Scp failed", e);
+ }
+ return null;
+ }
+
+ @Override
+ public TaskState recover(TaskContext taskContext) throws TaskException {
+ return null;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e2235afa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPOutputDataStatgeTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPOutputDataStatgeTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPOutputDataStatgeTask.java
new file mode 100644
index 0000000..d9cf1ba
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPOutputDataStatgeTask.java
@@ -0,0 +1,70 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.impl.task;
+
+import com.jcraft.jsch.JSch;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.SSHUtils;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.task.DataStagingTaskModel;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+public class SCPOutputDataStatgeTask extends AbstractSCPTask {
+
+
+ @Override
+ public TaskState execute(TaskContext taskContext) throws TaskException {
+ DataStagingTaskModel dataStagingTaskModel = new DataStagingTaskModel();
+ try {
+ URL sourceURL = new URL(dataStagingTaskModel.getSource());
+ URL destinationURL = new URL(dataStagingTaskModel.getDestination());
+ JSch jsch = new JSch();
+ jsch.addIdentity(privateKeyPath, passPhrase);
+ Session session = jsch.getSession(userName, hostName, DEFAULT_SSH_PORT);
+ // RemoteCluster --> Airavata
+ taskContext.getParentProcessContext().getRemoteCluster().scpFrom(sourceURL.getPath(), inputPath);
+
+ if (destinationURL.getProtocol().equalsIgnoreCase("file")) {
+ // Airavata --> PGA(Client)
+ SSHUtils.scpTo(inputPath, destinationURL.getPath(), session);
+ }
+ } catch (MalformedURLException e) {
+ throw new TaskException("Wrong source or destination file path.", e);
+ } catch (SSHApiException e) {
+ throw new TaskException("Scp attempt failed", e);
+ } catch (JSchException | IOException e) {
+ throw new TaskException("Scp failed", e);
+ }
+ return null;
+ }
+
+ @Override
+ public TaskState recover(TaskContext taskContext) throws TaskException {
+ return null;
+ }
+}
[2/2] airavata git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/airavata
Posted by sh...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/airavata
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/f6d34cc6
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/f6d34cc6
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/f6d34cc6
Branch: refs/heads/master
Commit: f6d34cc64fe034d7d6a01d8c3f71aa303b638a49
Parents: e2235af 2bb805a
Author: Shameera Rathanyaka <sh...@gmail.com>
Authored: Tue Jun 16 17:12:34 2015 -0400
Committer: Shameera Rathanyaka <sh...@gmail.com>
Committed: Tue Jun 16 17:12:34 2015 -0400
----------------------------------------------------------------------
.../apache/airavata/common/utils/Constants.java | 5 +
.../apache/airavata/gfac/core/GFacUtils.java | 506 ++++++++++---------
.../gfac/impl/task/JobSubmissionTaskImpl.java | 47 +-
3 files changed, 319 insertions(+), 239 deletions(-)
----------------------------------------------------------------------