You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/06/16 17:21:05 UTC
[1/2] airavata git commit: Added Factory , HPCRemoteCluster ,
SSHUtils and SCPFileTransferTask classes
Repository: airavata
Updated Branches:
refs/heads/master 869df7ecd -> e6622078b
http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 3be4774..15d0167 100644
--- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -20,30 +20,23 @@
*/
package org.apache.airavata.gfac.server;
-import com.google.common.eventbus.EventBus;
-import org.apache.airavata.common.exception.AiravataStartupException;
-import org.apache.airavata.common.utils.LocalEventPublisher;
-import org.apache.airavata.gfac.core.GFacConstants;
-import org.apache.airavata.gfac.impl.GFacWorker;
-import org.apache.airavata.gfac.core.context.ProcessContext;
-import org.apache.airavata.registry.cpi.AppCatalog;
import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.AiravataStartupException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.log.AiravataLogger;
-import org.apache.airavata.common.log.AiravataLoggerFactory;
import org.apache.airavata.common.utils.AiravataZKUtils;
+import org.apache.airavata.common.utils.LocalEventPublisher;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.ThriftUtils;
import org.apache.airavata.common.utils.listener.AbstractActivityListener;
-import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
-import org.apache.airavata.gfac.core.GFacException;
import org.apache.airavata.gfac.core.GFac;
-import org.apache.airavata.gfac.core.handler.ThreadedHandler;
-import org.apache.airavata.gfac.core.GFacThreadPoolExecutor;
+import org.apache.airavata.gfac.core.GFacConstants;
+import org.apache.airavata.gfac.core.GFacException;
import org.apache.airavata.gfac.core.GFacUtils;
import org.apache.airavata.gfac.cpi.GfacService;
import org.apache.airavata.gfac.cpi.gfac_cpi_serviceConstants;
import org.apache.airavata.gfac.impl.BetterGfacImpl;
+import org.apache.airavata.gfac.impl.Factory;
+import org.apache.airavata.gfac.impl.GFacWorker;
import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.messaging.core.MessageHandler;
import org.apache.airavata.messaging.core.MessagingConstants;
@@ -53,15 +46,13 @@ import org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchConsumer;
import org.apache.airavata.model.messaging.event.MessageType;
import org.apache.airavata.model.messaging.event.TaskSubmitEvent;
import org.apache.airavata.model.messaging.event.TaskTerminateEvent;
-import org.apache.airavata.model.experiment.ExperimentState;
-import org.apache.airavata.model.experiment.ExperimentStatus;
+import org.apache.airavata.model.status.ExperimentState;
+import org.apache.airavata.model.status.ExperimentStatus;
+import org.apache.airavata.registry.cpi.AppCatalog;
import org.apache.airavata.registry.cpi.ExperimentCatalog;
import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
import org.apache.airavata.registry.cpi.RegistryException;
-import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.ZKPaths;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
@@ -70,7 +61,6 @@ import org.apache.zookeeper.ZooDefs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
@@ -91,8 +81,6 @@ public class GfacServerHandler implements GfacService.Iface {
private LocalEventPublisher localEventPublisher;
private String airavataServerHostPort;
private BlockingQueue<TaskSubmitEvent> taskSubmitEvents;
- private static File gfacConfigFile;
- private static List<ThreadedHandler> daemonHandlers = new ArrayList<ThreadedHandler>();
private static List<AbstractActivityListener> activityListeners = new ArrayList<AbstractActivityListener>();
private ExecutorService executorService;
@@ -102,9 +90,6 @@ public class GfacServerHandler implements GfacService.Iface {
initZkDataStructure();
initAMQPClient();
executorService = Executors.newFixedThreadPool(ServerSettings.getGFacThreadPoolSize());
- localEventPublisher = new LocalEventPublisher(new EventBus());
- experimentCatalog = RegistryFactory.getDefaultExpCatalog();
- appCatalog = RegistryFactory.getAppCatalog();
startStatusUpdators(experimentCatalog, curatorClient, localEventPublisher, rabbitMQTaskLaunchConsumer);
} catch (Exception e) {
throw new AiravataStartupException("Gfac Server Initialization error ", e);
@@ -117,9 +102,7 @@ public class GfacServerHandler implements GfacService.Iface {
}
private void startCuratorClient() throws ApplicationSettingsException {
- String connectionSting = ServerSettings.getZookeeperConnection();
- RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
- curatorClient = CuratorFrameworkFactory.newClient(connectionSting, retryPolicy);
+ curatorClient = Factory.getCuratorClient();
curatorClient.start();
}
@@ -174,18 +157,14 @@ public class GfacServerHandler implements GfacService.Iface {
log.info("-----------------------------------" + requestCount + "-----------------------------------------");
log.info(experimentId, "GFac Received submit job request for the Experiment: {} process: {}", experimentId,
processId);
- ProcessContext processContext = new ProcessContext(processId, gatewayId, tokenId);
- processContext.setAppCatalog(appCatalog);
- processContext.setExperimentCatalog(experimentCatalog);
- processContext.setCuratorClient(curatorClient);
- processContext.setLocalEventPublisher(localEventPublisher);
+
try {
- executorService.execute(new GFacWorker(processContext));
- } catch (AiravataException e) {
+ executorService.execute(new GFacWorker(experimentId,processId, gatewayId, tokenId));
+ } catch (GFacException e) {
log.error("Failed to submit process", e);
return false;
}
- return true;
+ return true;
}
public boolean cancelJob(String experimentId, String taskId, String gatewayId, String tokenId) throws TException {
@@ -294,7 +273,7 @@ public class GfacServerHandler implements GfacService.Iface {
ThriftUtils.createThriftFromBytes(bytes, event);
// update experiment status to executing
ExperimentStatus status = new ExperimentStatus();
- status.setExperimentState(ExperimentState.EXECUTING);
+ status.setState(ExperimentState.EXECUTING);
status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
experimentCatalog.update(ExperimentCatalogModelType.EXPERIMENT_STATUS, status, event.getExperimentId());
try {
[2/2] airavata git commit: Added Factory , HPCRemoteCluster ,
SSHUtils and SCPFileTransferTask classes
Posted by sh...@apache.org.
Added Factory , HPCRemoteCluster , SSHUtils and SCPFileTransferTask classes
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/e6622078
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/e6622078
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/e6622078
Branch: refs/heads/master
Commit: e6622078b2607cbfa7a6b3274ac470ce8d6fbd9a
Parents: 869df7e
Author: Shameera Rathanyaka <sh...@gmail.com>
Authored: Tue Jun 16 11:20:55 2015 -0400
Committer: Shameera Rathanyaka <sh...@gmail.com>
Committed: Tue Jun 16 11:20:55 2015 -0400
----------------------------------------------------------------------
.../core/AbstractJobManagerConfiguration.java | 40 ++
.../airavata/gfac/core/GFacConstants.java | 2 +
.../apache/airavata/gfac/core/GFacEngine.java | 41 ++
.../apache/airavata/gfac/core/GFacUtils.java | 3 -
.../core/authentication/AuthenticationInfo.java | 9 +-
.../authentication/SSHKeyAuthentication.java | 45 +-
.../SSHPasswordAuthentication.java | 29 +-
.../core/cluster/AbstractRemoteCluster.java | 39 ++
.../airavata/gfac/core/cluster/JobStatus.java | 87 ----
.../gfac/core/cluster/OutputParser.java | 1 +
.../gfac/core/cluster/RemoteCluster.java | 230 +++++-----
.../gfac/core/context/ProcessContext.java | 10 +
.../gsi/ssh/api/job/LSFJobConfiguration.java | 1 +
.../gsi/ssh/impl/GSISSHAbstractCluster.java | 185 ++++----
.../gfac/gsi/ssh/impl/HPCRemoteCluster.java | 178 +++++++-
.../org/apache/airavata/gfac/impl/Factory.java | 96 ++++
.../apache/airavata/gfac/impl/GFacEngine.java | 127 ------
.../airavata/gfac/impl/GFacEngineImpl.java | 128 ++++++
.../apache/airavata/gfac/impl/GFacWorker.java | 28 +-
.../org/apache/airavata/gfac/impl/SSHUtils.java | 443 +++++++++++++++++++
.../gfac/impl/task/SCPFileTransferTask.java | 98 ++++
.../airavata/gfac/server/GfacServerHandler.java | 51 +--
22 files changed, 1354 insertions(+), 517 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/AbstractJobManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/AbstractJobManagerConfiguration.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/AbstractJobManagerConfiguration.java
new file mode 100644
index 0000000..7f1ff5a
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/AbstractJobManagerConfiguration.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.core;
+
+import org.apache.airavata.gfac.core.cluster.OutputParser;
+import org.apache.airavata.model.appcatalog.computeresource.JobManagerCommand;
+
+import java.util.Map;
+
+public abstract class AbstractJobManagerConfiguration implements JobManagerConfiguration {
+ final String jobManagerBinPath;
+ final Map<JobManagerCommand, String> jobManagerCommands;
+ final OutputParser outputParser;
+
+ public AbstractJobManagerConfiguration(String jobManagerBinDir, Map<JobManagerCommand, String> jobManagerCommands,
+ OutputParser outputParser) {
+ this.jobManagerBinPath = jobManagerBinDir;
+ this.jobManagerCommands = jobManagerCommands;
+ this.outputParser = outputParser;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java
index 621eeac..2706f27 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java
@@ -53,6 +53,8 @@ public class GFacConstants {
public static final String ZOOKEEPER_SERVERS_NODE = "/servers";
public static final String ZOOKEEPER_GFAC_SERVER_NODE = "/gfac";
public static final String ZOOKEEPER_EXPERIMENT_NODE = "/experiments";
+ public static final String ZOOKEEPER_DELIVERYTAG_NODE = "/deliveryTag";
+ public static final String ZOOKEEPER_CANCEL_LISTENER_NODE = "/cancelListener";
public static final String PROP_WORKFLOW_INSTANCE_ID = "workflow.instance.id";
public static final String PROP_WORKFLOW_NODE_ID = "workflow.node.id";
http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacEngine.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacEngine.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacEngine.java
new file mode 100644
index 0000000..c70ddb5
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacEngine.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.core;
+
+import org.apache.airavata.gfac.core.context.ProcessContext;
+
+public interface GFacEngine {
+
+
+ public ProcessContext populateProcessContext(String experimentId, String processId, String gatewayId, String tokenId) throws GFacException;
+
+ public void createTaskChain(ProcessContext processContext) throws GFacException;
+
+ public void executeProcess(ProcessContext processContext) throws GFacException ;
+
+ public void recoverProcess(ProcessContext processContext) throws GFacException ;
+
+ public void runProcessOutflow(ProcessContext processContext) throws GFacException ;
+
+ public void recoverProcessOutflow(ProcessContext processContext) throws GFacException ;
+
+ public void cancelProcess() throws GFacException ;
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
index 3a8c1c5..ae48ed7 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
@@ -101,9 +101,6 @@ public class GFacUtils {
private GFacUtils() {
}
- public static ProcessContext populateProcessContext(ProcessContext processContext) {
- return processContext;
- }
/**
* Read data from inputStream and convert it to String.
http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/authentication/AuthenticationInfo.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/authentication/AuthenticationInfo.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/authentication/AuthenticationInfo.java
index 2a01e9d..87af69f 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/authentication/AuthenticationInfo.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/authentication/AuthenticationInfo.java
@@ -1,4 +1,4 @@
-package org.apache.airavata.gfac.core.authentication;/*
+/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -19,14 +19,11 @@ package org.apache.airavata.gfac.core.authentication;/*
*
*/
-/**
- * User: AmilaJ (amilaj@apache.org)
- * Date: 10/4/13
- * Time: 11:25 AM
- */
+package org.apache.airavata.gfac.core.authentication;
/**
* An empty interface that represents authentication data to the API.
*/
public interface AuthenticationInfo {
+
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/authentication/SSHKeyAuthentication.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/authentication/SSHKeyAuthentication.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/authentication/SSHKeyAuthentication.java
index 41b8c9e..94beadd 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/authentication/SSHKeyAuthentication.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/authentication/SSHKeyAuthentication.java
@@ -28,19 +28,34 @@ package org.apache.airavata.gfac.core.authentication;/*
/**
* Abstracts out common methods for SSH key authentication.
*/
-public interface SSHKeyAuthentication extends AuthenticationInfo {
-
- /**
- * This is needed only if private key and public keys are encrypted.
- * If they are not encrypted we can just return null.
- * @return User should return pass phrase if keys are encrypted. If not null.
- */
- String getPassPhrase();
-
- /**
- * Callback with the banner message. API user can get hold of banner message
- * by implementing this method.
- * @param message The banner message.
- */
- void bannerMessage(String message);
+public class SSHKeyAuthentication implements AuthenticationInfo {
+
+ private String userName;
+ private String privateKeyFilePath;
+ private String publicKeyFilePath;
+ private String passphrase;
+
+ public SSHKeyAuthentication(String userName, String privateKeyFilePath, String publicKeyFilePath, String
+ passphrase) {
+ this.userName = userName;
+ this.privateKeyFilePath = privateKeyFilePath;
+ this.publicKeyFilePath = publicKeyFilePath;
+ this.passphrase = passphrase;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public String getPrivateKeyFilePath() {
+ return privateKeyFilePath;
+ }
+
+ public String getPublicKeyFilePath() {
+ return publicKeyFilePath;
+ }
+
+ public String getPassphrase() {
+ return passphrase;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/authentication/SSHPasswordAuthentication.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/authentication/SSHPasswordAuthentication.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/authentication/SSHPasswordAuthentication.java
index e5b867b..2ca2e7e 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/authentication/SSHPasswordAuthentication.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/authentication/SSHPasswordAuthentication.java
@@ -22,22 +22,23 @@
package org.apache.airavata.gfac.core.authentication;
/**
- * User: AmilaJ (amilaj@apache.org)
- * Date: 10/4/13
- * Time: 11:22 AM
- */
-
-/**
* Password authentication for vanilla SSH.
*/
-public interface SSHPasswordAuthentication extends AuthenticationInfo {
+public class SSHPasswordAuthentication implements AuthenticationInfo {
+
+ private String userName;
+ private String password;
+
+ public SSHPasswordAuthentication(String userName, String password) {
+ this.userName = userName;
+ this.password = password;
+ }
- /**
- * Gets the password for given host name and given user name.
- * @param userName The connecting user name name.
- * @param hostName The connecting host.
- * @return Password for the given user.
- */
- String getPassword(String userName, String hostName);
+ public String getUserName() {
+ return userName;
+ }
+ public String getPassword() {
+ return password;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/AbstractRemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/AbstractRemoteCluster.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/AbstractRemoteCluster.java
new file mode 100644
index 0000000..3487224
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/AbstractRemoteCluster.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.core.cluster;
+
+import org.apache.airavata.gfac.core.JobManagerConfiguration;
+
+import java.util.Map;
+
+public class AbstractRemoteCluster {
+
+ ServerInfo serverInfo;
+ JobManagerConfiguration jobManagerConfiguration;
+ Map<String,String> authenticationParam;
+
+ public AbstractRemoteCluster(ServerInfo serverInfo, JobManagerConfiguration jobManagerConfiguration, Map<String,
+ String> authenticationParam) {
+ this.serverInfo = serverInfo;
+ this.jobManagerConfiguration = jobManagerConfiguration;
+ this.authenticationParam = authenticationParam;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/JobStatus.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/JobStatus.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/JobStatus.java
index 6e8e144..f784aa6 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/JobStatus.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/JobStatus.java
@@ -20,91 +20,4 @@
*/
package org.apache.airavata.gfac.core.cluster;
- /**
- * This will contains all the PBS specific job statuses.
- * C - Job is completed after having run/
- * E - Job is exiting after having run.
- * H - Job is held.
- * Q - job is queued, eligible to run or routed.
- * R - job is running.
- * T - job is being moved to new location.
- * W - job is waiting for its execution time
- * (-a option) to be reached.
- * S - (Unicos only) job is suspend.
- */
- public enum JobStatus {
- C, E, H, Q, R, T, W, S,U,F,CA,CD,CF,CG,NF,PD,PR,TO,qw,t,r,h,Er,Eqw,PEND,RUN,PSUSP,USUSP,SSUSP,DONE,EXIT,UNKWN,ZOMBI;
- public static JobStatus fromString(String status){
- if(status != null){
- if("C".equals(status)){
- return JobStatus.C;
- }else if("E".equals(status)){
- return JobStatus.E;
- }else if("H".equals(status)){
- return JobStatus.H;
- }else if("Q".equals(status)){
- return JobStatus.Q;
- }else if("R".equals(status)){
- return JobStatus.R;
- }else if("T".equals(status)){
- return JobStatus.T;
- }else if("W".equals(status)){
- return JobStatus.W;
- }else if("S".equals(status)){
- return JobStatus.S;
- }else if("F".equals(status)){
- return JobStatus.F;
- }else if("S".equals(status)){
- return JobStatus.S;
- }else if("CA".equals(status)){
- return JobStatus.CA;
- }else if("CF".equals(status)){
- return JobStatus.CF;
- }else if("CD".equals(status)){
- return JobStatus.CD;
- }else if("CG".equals(status)){
- return JobStatus.CG;
- }else if("NF".equals(status)){
- return JobStatus.NF;
- }else if("PD".equals(status)){
- return JobStatus.PD;
- }else if("PR".equals(status)){
- return JobStatus.PR;
- }else if("TO".equals(status)){
- return JobStatus.TO;
- }else if("U".equals(status)){
- return JobStatus.U;
- }else if("qw".equals(status)){
- return JobStatus.qw;
- }else if("t".equals(status)){
- return JobStatus.t;
- }else if("r".equals(status)){
- return JobStatus.r;
- }else if("h".equals(status)){
- return JobStatus.h;
- }else if("Er".equals(status)){
- return JobStatus.Er;
- }else if("Eqw".equals(status)){
- return JobStatus.Er;
- }else if("RUN".equals(status)){ // LSF starts here
- return JobStatus.RUN;
- }else if("PEND".equals(status)){
- return JobStatus.PEND;
- }else if("DONE".equals(status)){
- return JobStatus.DONE;
- }else if("PSUSP".equals(status)){
- return JobStatus.PSUSP;
- }else if("USUSP".equals(status)){
- return JobStatus.USUSP;
- }else if("SSUSP".equals(status)){
- return JobStatus.SSUSP;
- }else if("EXIT".equals(status)){
- return JobStatus.EXIT;
- }else if("ZOMBI".equals(status)){
- return JobStatus.ZOMBI;
- }
- }
- return JobStatus.U;
- }
- }
http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/OutputParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/OutputParser.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/OutputParser.java
index 658a5bc..521e23f 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/OutputParser.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/OutputParser.java
@@ -22,6 +22,7 @@ package org.apache.airavata.gfac.core.cluster;
import org.apache.airavata.gfac.core.JobDescriptor;
import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.model.status.JobStatus;
import java.util.Map;
http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
index d61bd0c..e438a37 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
@@ -21,8 +21,8 @@
package org.apache.airavata.gfac.core.cluster;
import com.jcraft.jsch.Session;
-import org.apache.airavata.gfac.core.JobDescriptor;
import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.model.status.JobStatus;
import java.util.List;
import java.util.Map;
@@ -34,128 +34,110 @@ import java.util.Map;
*/
public interface RemoteCluster { // FIXME: replace SSHApiException with suitable exception.
- /**
- * This will submit a job to the cluster with a given pbs file and some parameters
- *
- * @param pbsFilePath path of the pbs file
- * @param workingDirectory working directory where pbs should has to copy
- * @return jobId after successful job submission
- * @throws SSHApiException throws exception during error
- */
- public String submitBatchJobWithScript(String pbsFilePath, String workingDirectory) throws SSHApiException;
-
- /**
- * This will submit the given job and not performing any monitoring
- *
- * @param jobDescriptor job descriptor to submit to cluster, this contains all the parameter
- * @return jobID after successful job submission.
- * @throws SSHApiException throws exception during error
- */
- public String submitBatchJob(JobDescriptor jobDescriptor) throws SSHApiException;
-
- /**
- * This will copy the localFile to remoteFile location in configured cluster
- *
- * @param remoteFile remote file location, this can be a directory too
- * @param localFile local file path of the file which needs to copy to remote location
- * @throws SSHApiException throws exception during error
- */
- public void scpTo(String remoteFile, String localFile) throws SSHApiException;
-
- /**
- * This will copy a remote file in path rFile to local file lFile
- * @param remoteFile remote file path, this has to be a full qualified path
- * @param localFile This is the local file to copy, this can be a directory too
- * @throws SSHApiException
- */
- public void scpFrom(String remoteFile, String localFile) throws SSHApiException;
-
- /**
- * This will copy a remote file in path rFile to local file lFile
- * @param remoteFile remote file path, this has to be a full qualified path
- * @param localFile This is the local file to copy, this can be a directory too
- * @throws SSHApiException
- */
- public void scpThirdParty(String remoteFileSorce, String remoteFileTarget) throws SSHApiException;
-
- /**
- * This will create directories in computing resources
- * @param directoryPath the full qualified path for the directory user wants to create
- * @throws SSHApiException throws during error
- */
- public void makeDirectory(String directoryPath) throws SSHApiException;
-
-
- /**
- * This will get the job description of a job which is there in the cluster
- * if jbo is not available with the given ID it returns
- * @param jobID jobId has to pass
- * @return Returns full job description of the job which submitted successfully
- * @throws SSHApiException throws exception during error
- */
- public JobDescriptor getJobDescriptorById(String jobID) throws SSHApiException;
-
- /**
- * This will delete the given job from the queue
- *
- * @param jobID jobId of the job which user wants to delete
- * @return return the description of the deleted job
- * @throws SSHApiException throws exception during error
- */
- public JobDescriptor cancelJob(String jobID) throws SSHApiException;
-
- /**
- * This will get the job status of the the job associated with this jobId
- *
- * @param jobID jobId of the job user want to get the status
- * @return job status of the given jobID
- * @throws SSHApiException throws exception during error
- */
- public JobStatus getJobStatus(String jobID) throws SSHApiException;
- /**
- * This will get the job status of the the job associated with this jobId
- *
- * @param jobName jobName of the job user want to get the status
- * @return jobId of the given jobName
- * @throws SSHApiException throws exception during error
- */
- public String getJobIdByJobName(String jobName, String userName) throws SSHApiException;
-
- /**
- * This method can be used to poll the jobstatuses based on the given
- * user but we should pass the jobID list otherwise we will get unwanted
- * job statuses which submitted by different middleware outside apache
- * airavata with the same uername which we are not considering
- * @param userName userName of the jobs which required to get the status
- * @param jobIDs precises set of jobIDs
- * @return
- */
- public void getJobStatuses(String userName,Map<String,JobStatus> jobIDs)throws SSHApiException;
- /**
- * This will list directories in computing resources
- * @param directoryPath the full qualified path for the directory user wants to create
- * @throws SSHApiException throws during error
- */
- public List<String> listDirectory(String directoryPath) throws SSHApiException;
-
- /**
- * This method can be used to get created ssh session
- * to reuse the created session.
- * @throws SSHApiException
- */
- public Session getSession() throws SSHApiException;
-
- /**
- * This method can be used to close the connections initialized
- * to handle graceful shutdown of the system
- * @throws SSHApiException
- */
- public void disconnect() throws SSHApiException;
-
- /**
- * This gives the server Info
- * @return
- */
- public ServerInfo getServerInfo();
+ /**
+ * This will submit a job to the cluster with a given pbs file and some parameters
+ *
+ * @param jobScriptFilePath path of the job script file
+ * @param workingDirectory working directory where pbs should has to copy
+ * @return jobId after successful job submission
+ * @throws SSHApiException throws exception during error
+ */
+ public String submitBatchJob(String jobScriptFilePath, String workingDirectory) throws SSHApiException;
+
+ /**
+ * This will copy the localFile to remoteFile location in configured cluster
+ *
+ * @param sourceFile remote file location, this can be a directory too
+ * @param destinationFile local file path of the file which needs to copy to remote location
+ * @throws SSHApiException throws exception during error
+ */
+ public void scpTo(String sourceFile, String destinationFile) throws SSHApiException;
+
+ /**
+ * This will copy a remote file in path rFile to local file lFile
+ *
+ * @param sourceFile remote file path, this has to be a full qualified path
+ * @param destinationFile This is the local file to copy, this can be a directory too
+ */
+ public void scpFrom(String sourceFile, String destinationFile) throws SSHApiException;
+
+ /**
+ * This wil copy source remote file to target remote file.
+ *
+ * @param remoteFileSource remote file path, this has to be a full qualified path
+ * @param remoteFileTarget This is the local file to copy, this can be a directory too
+ */
+ public void scpThirdParty(String remoteFileSource, String remoteFileTarget) throws SSHApiException;
+
+ /**
+ * This will create directories in computing resources
+ *
+ * @param directoryPath the full qualified path for the directory user wants to create
+ * @throws SSHApiException throws during error
+ */
+ public void makeDirectory(String directoryPath) throws SSHApiException;
+
+ /**
+ * This will delete the given job from the queue
+ *
+ * @param jobID jobId of the job which user wants to delete
+ * @return return the description of the deleted job
+ * @throws SSHApiException throws exception during error
+ */
+ public boolean cancelJob(String jobID) throws SSHApiException;
+
+ /**
+ * This will get the job status of the the job associated with this jobId
+ *
+ * @param jobID jobId of the job user want to get the status
+ * @return job status of the given jobID
+ * @throws SSHApiException throws exception during error
+ */
+ public JobStatus getJobStatus(String jobID) throws SSHApiException;
+
+ /**
+ * This will get the job status of the the job associated with this jobId
+ *
+ * @param jobName jobName of the job user want to get the status
+ * @return jobId of the given jobName
+ * @throws SSHApiException throws exception during error
+ */
+ public String getJobIdByJobName(String jobName, String userName) throws SSHApiException;
+
+ /**
+ * This method can be used to poll the jobstatuses based on the given
+ * user but we should pass the jobID list otherwise we will get unwanted
+ * job statuses which submitted by different middleware outside apache
+ * airavata with the same uername which we are not considering
+ *
+ * @param userName userName of the jobs which required to get the status
+ * @param jobIDs precises set of jobIDs
+ */
+ public void getJobStatuses(String userName, Map<String, JobStatus> jobIDs) throws SSHApiException;
+
+ /**
+ * This will list directories in computing resources
+ *
+ * @param directoryPath the full qualified path for the directory user wants to create
+ * @throws SSHApiException throws during error
+ */
+ public List<String> listDirectory(String directoryPath) throws SSHApiException;
+
+ /**
+ * This method can be used to get created ssh session
+ * to reuse the created session.
+ */
+ public Session getSession() throws SSHApiException;
+
+ /**
+ * This method can be used to close the connections initialized
+ * to handle graceful shutdown of the system
+ */
+ public void disconnect() throws SSHApiException;
+
+ /**
+ * This gives the server Info
+ */
+ public ServerInfo getServerInfo();
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
index 10c881c..16943fe 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
@@ -31,6 +31,7 @@ import org.apache.airavata.registry.cpi.ExperimentCatalog;
import org.apache.curator.framework.CuratorFramework;
import java.util.List;
+import java.util.Map;
public class ProcessContext {
// process model
@@ -46,6 +47,7 @@ public class ProcessContext {
private List<Task> taskChain;
private GatewayResourceProfile gatewayResourceProfile;
private RemoteCluster remoteCluster;
+ private Map<String, String> sshProperties;
public ProcessContext(String processId, String gatewayId, String tokenId) {
this.processId = processId;
@@ -138,4 +140,12 @@ public class ProcessContext {
public void setRemoteCluster(RemoteCluster remoteCluster) {
this.remoteCluster = remoteCluster;
}
+
+ public Map<String, String> getSshProperties() {
+ return sshProperties;
+ }
+
+ public void setSshProperties(Map<String, String> sshProperties) {
+ this.sshProperties = sshProperties;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/api/job/LSFJobConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/api/job/LSFJobConfiguration.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/api/job/LSFJobConfiguration.java
index 26941cd..9e2a913 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/api/job/LSFJobConfiguration.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/api/job/LSFJobConfiguration.java
@@ -20,6 +20,7 @@
*/
package org.apache.airavata.gfac.gsi.ssh.api.job;
+import org.apache.airavata.gfac.core.AbstractJobManagerConfiguration;
import org.apache.airavata.gfac.core.JobManagerConfiguration;
import org.apache.airavata.gfac.core.cluster.OutputParser;
import org.apache.airavata.gfac.core.cluster.RawCommandInfo;
http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/impl/GSISSHAbstractCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/impl/GSISSHAbstractCluster.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/impl/GSISSHAbstractCluster.java
index 113e4ec..5f44843 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/impl/GSISSHAbstractCluster.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/impl/GSISSHAbstractCluster.java
@@ -37,7 +37,6 @@ import org.apache.airavata.gfac.core.authentication.SSHPasswordAuthentication;
import org.apache.airavata.gfac.core.authentication.SSHPublicKeyAuthentication;
import org.apache.airavata.gfac.core.authentication.SSHPublicKeyFileAuthentication;
import org.apache.airavata.gfac.core.cluster.RemoteCluster;
-import org.apache.airavata.gfac.core.cluster.JobStatus;
import org.apache.airavata.gfac.core.cluster.OutputParser;
import org.apache.airavata.gfac.core.cluster.RawCommandInfo;
import org.apache.airavata.gfac.core.cluster.ServerInfo;
@@ -47,6 +46,7 @@ import org.apache.airavata.gfac.gsi.ssh.jsch.ExtendedJSch;
import org.apache.airavata.gfac.gsi.ssh.util.SSHAPIUIKeyboardInteractive;
import org.apache.airavata.gfac.gsi.ssh.util.SSHKeyPasswordHandler;
import org.apache.airavata.gfac.gsi.ssh.util.SSHUtils;
+import org.apache.airavata.model.status.JobStatus;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -235,11 +235,11 @@ public class GSISSHAbstractCluster implements RemoteCluster {
}
}
- public synchronized JobDescriptor cancelJob(String jobID) throws SSHApiException {
+ public synchronized boolean cancelJob(String jobID) throws SSHApiException {
JobStatus jobStatus = getJobStatus(jobID);
if (jobStatus == null || jobStatus == JobStatus.U) {
log.info("Validation before cancel is failed, couldn't found job in remote host to cancel. Job may be already completed|failed|canceled");
- return null;
+ return false;
}
RawCommandInfo rawCommandInfo = jobManagerConfiguration.getCancelCommand(jobID);
@@ -249,15 +249,10 @@ public class GSISSHAbstractCluster implements RemoteCluster {
String outputifAvailable = getOutputifAvailable(stdOutReader, "Error reading output of job submission", jobManagerConfiguration.getBaseCancelCommand());
// this might not be the case for all teh resources, if so Cluster implementation can override this method
// because here after cancelling we try to get the job description and return it back
- try {
- return this.getJobDescriptorById(jobID);
- } catch (Exception e) {
- //its ok to fail to get status when the job is gone
- return null;
- }
+ return true;
}
- public synchronized String submitBatchJobWithScript(String scriptPath, String workingDirectory) throws SSHApiException {
+ public synchronized String submitBatchJob(String scriptPath, String workingDirectory) throws SSHApiException {
this.scpTo(workingDirectory, scriptPath);
// since this is a constant we do not ask users to fill this
@@ -278,85 +273,6 @@ public class GSISSHAbstractCluster implements RemoteCluster {
return outputParser.parseJobSubmission(outputifAvailable);
}
-
- @Override
- public synchronized String submitBatchJob(JobDescriptor jobDescriptor) throws SSHApiException {
- TransformerFactory factory = TransformerFactory.newInstance();
- URL resource = this.getClass().getClassLoader().getResource(jobManagerConfiguration.getJobDescriptionTemplateName());
-
- if (resource == null) {
- String error = "System configuration file '" + jobManagerConfiguration.getJobDescriptionTemplateName()
- + "' not found in the classpath";
- throw new SSHApiException(error);
- }
-
- Source xslt = new StreamSource(new File(resource.getPath()));
- Transformer transformer;
- StringWriter results = new StringWriter();
- File tempPBSFile = null;
- try {
- // generate the pbs script using xslt
- transformer = factory.newTransformer(xslt);
- Source text = new StreamSource(new ByteArrayInputStream(jobDescriptor.toXML().getBytes()));
- transformer.transform(text, new StreamResult(results));
- String scriptContent = results.toString().replaceAll("^[ |\t]*\n$", "");
- if (scriptContent.startsWith("\n")) {
- scriptContent = scriptContent.substring(1);
- }
-// log.debug("generated PBS:" + results.toString());
-
- // creating a temporary file using pbs script generated above
- int number = new SecureRandom().nextInt();
- number = (number < 0 ? -number : number);
-
- tempPBSFile = new File(Integer.toString(number) + jobManagerConfiguration.getScriptExtension());
- FileUtils.writeStringToFile(tempPBSFile, scriptContent);
-
- //reusing submitBatchJobWithScript method to submit a job
- String jobID = null;
- int retry = 3;
- while(retry>0) {
- try {
- jobID = this.submitBatchJobWithScript(tempPBSFile.getAbsolutePath(),
- jobDescriptor.getWorkingDirectory());
- retry=0;
- } catch (SSHApiException e) {
- retry--;
- if(retry==0) {
- throw e;
- }else{
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e1) {
- log.error(e1.getMessage(), e1);
- }
- log.error("Error occured during job submission but doing a retry");
- }
- }
- }
- log.debug("Job has successfully submitted, JobID : " + jobID);
- if (jobID != null) {
- return jobID.replace("\n", "");
- } else {
- return null;
- }
- } catch (TransformerConfigurationException e) {
- throw new SSHApiException("Error parsing PBS transformation", e);
- } catch (TransformerException e) {
- throw new SSHApiException("Error generating PBS script", e);
- } catch (IOException e) {
- throw new SSHApiException("An exception occurred while connecting to server." +
- "Connecting server - " + serverInfo.getHost() + ":" + serverInfo.getPort() +
- " connecting user name - "
- + serverInfo.getUserName(), e);
- } finally {
- if (tempPBSFile != null) {
- tempPBSFile.delete();
- }
- }
- }
-
-
public void generateJobScript(JobDescriptor jobDescriptor) throws SSHApiException {
TransformerFactory factory = TransformerFactory.newInstance();
URL resource = this.getClass().getClassLoader().getResource(jobManagerConfiguration.getJobDescriptionTemplateName());
@@ -767,4 +683,95 @@ public class GSISSHAbstractCluster implements RemoteCluster {
// Oh well. They don't have a known hosts in home.
}
}
+
+
+ /**
+ * This will contains all the PBS specific job statuses.
+ * C - Job is completed after having run/
+ * E - Job is exiting after having run.
+ * H - Job is held.
+ * Q - job is queued, eligible to run or routed.
+ * R - job is running.
+ * T - job is being moved to new location.
+ * W - job is waiting for its execution time
+ * (-a option) to be reached.
+ * S - (Unicos only) job is suspend.
+ */
+ public enum HPCJobStatus {
+ C, E, H, Q, R, T, W, S,U,F,CA,CD,CF,CG,NF,PD,PR,TO,qw,t,r,h,Er,Eqw,PEND,RUN,PSUSP,USUSP,SSUSP,DONE,EXIT,UNKWN,ZOMBI;
+
+ public static HPCJobStatus fromString(String status){
+ if(status != null){
+ if("C".equals(status)){
+ return HPCJobStatus.C;
+ }else if("E".equals(status)){
+ return HPCJobStatus.E;
+ }else if("H".equals(status)){
+ return HPCJobStatus.H;
+ }else if("Q".equals(status)){
+ return HPCJobStatus.Q;
+ }else if("R".equals(status)){
+ return HPCJobStatus.R;
+ }else if("T".equals(status)){
+ return HPCJobStatus.T;
+ }else if("W".equals(status)){
+ return HPCJobStatus.W;
+ }else if("S".equals(status)){
+ return HPCJobStatus.S;
+ }else if("F".equals(status)){
+ return HPCJobStatus.F;
+ }else if("S".equals(status)){
+ return HPCJobStatus.S;
+ }else if("CA".equals(status)){
+ return HPCJobStatus.CA;
+ }else if("CF".equals(status)){
+ return HPCJobStatus.CF;
+ }else if("CD".equals(status)){
+ return HPCJobStatus.CD;
+ }else if("CG".equals(status)){
+ return HPCJobStatus.CG;
+ }else if("NF".equals(status)){
+ return HPCJobStatus.NF;
+ }else if("PD".equals(status)){
+ return HPCJobStatus.PD;
+ }else if("PR".equals(status)){
+ return HPCJobStatus.PR;
+ }else if("TO".equals(status)){
+ return HPCJobStatus.TO;
+ }else if("U".equals(status)){
+ return HPCJobStatus.U;
+ }else if("qw".equals(status)){
+ return HPCJobStatus.qw;
+ }else if("t".equals(status)){
+ return HPCJobStatus.t;
+ }else if("r".equals(status)){
+ return HPCJobStatus.r;
+ }else if("h".equals(status)){
+ return HPCJobStatus.h;
+ }else if("Er".equals(status)){
+ return HPCJobStatus.Er;
+ }else if("Eqw".equals(status)){
+ return HPCJobStatus.Er;
+ }else if("RUN".equals(status)){ // LSF starts here
+ return HPCJobStatus.RUN;
+ }else if("PEND".equals(status)){
+ return HPCJobStatus.PEND;
+ }else if("DONE".equals(status)){
+ return HPCJobStatus.DONE;
+ }else if("PSUSP".equals(status)){
+ return HPCJobStatus.PSUSP;
+ }else if("USUSP".equals(status)){
+ return HPCJobStatus.USUSP;
+ }else if("SSUSP".equals(status)){
+ return HPCJobStatus.SSUSP;
+ }else if("EXIT".equals(status)){
+ return HPCJobStatus.EXIT;
+ }else if("ZOMBI".equals(status)){
+ return HPCJobStatus.ZOMBI;
+ }
+ }
+ return HPCJobStatus.U;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/impl/HPCRemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/impl/HPCRemoteCluster.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/impl/HPCRemoteCluster.java
index cc688e2..e1d9c27 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/impl/HPCRemoteCluster.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/impl/HPCRemoteCluster.java
@@ -20,27 +20,191 @@
*/
package org.apache.airavata.gfac.gsi.ssh.impl;
+import com.jcraft.jsch.Channel;
+import com.jcraft.jsch.ChannelExec;
+import com.jcraft.jsch.JSch;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import com.jcraft.jsch.UserInfo;
+import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.gfac.core.authentication.SSHKeyAuthentication;
+import org.apache.airavata.gfac.core.cluster.CommandInfo;
+import org.apache.airavata.gfac.core.cluster.CommandOutput;
+import org.apache.airavata.gfac.core.cluster.RemoteCluster;
import org.apache.airavata.gfac.core.cluster.ServerInfo;
import org.apache.airavata.gfac.core.JobManagerConfiguration;
+import org.apache.airavata.model.status.JobStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.List;
+import java.util.Map;
+
/**
* This is the default implementation of a cluster.
* this has most of the methods to be used by the end user of the
* library.
*/
-public class HPCRemoteCluster extends GSISSHAbstractCluster {
+public class HPCRemoteCluster implements RemoteCluster{
private static final Logger log = LoggerFactory.getLogger(HPCRemoteCluster.class);
+ private final SSHKeyAuthentication authentication;
+ private final ServerInfo serverInfo;
+ private final JobManagerConfiguration jobManagerConfiguration;
+ private final JSch jSch;
+ private Session session;
+
+ public HPCRemoteCluster(ServerInfo serverInfo, JobManagerConfiguration jobManagerConfiguration, AuthenticationInfo
+ authenticationInfo) throws AiravataException {
+ try {
+ this.serverInfo = serverInfo;
+ this.jobManagerConfiguration = jobManagerConfiguration;
+ if (authenticationInfo instanceof SSHKeyAuthentication) {
+ authentication = (SSHKeyAuthentication) authenticationInfo;
+ } else {
+ throw new AiravataException("Support ssh key authentication only");
+ }
+
+ jSch = new JSch();
+ jSch.addIdentity(authentication.getPrivateKeyFilePath(), authentication.getPublicKeyFilePath(), authentication
+ .getPassphrase().getBytes());
+ session = jSch.getSession(serverInfo.getUserName(), serverInfo.getHost(), serverInfo.getPort());
+ session.setUserInfo(new DefaultUserInfo(serverInfo.getUserName(), null, authentication.getPassphrase()));
+ session.connect(); // 0 connection timeout
+ } catch (JSchException e) {
+ throw new AiravataException("JSch initialization error ", e);
+ }
+ }
+
+ @Override
+ public String submitBatchJob(String jobScriptFilePath, String workingDirectory) throws SSHApiException {
+
+ return null;
+ }
+
+ @Override
+ public void scpTo(String sourceFile, String destinationFile) throws SSHApiException {
+
+ }
+
+ @Override
+ public void scpFrom(String sourceFile, String destinationFile) throws SSHApiException {
+
+ }
+
+ @Override
+ public void scpThirdParty(String remoteFileSource, String remoteFileTarget) throws SSHApiException {
+
+ }
+
+ @Override
+ public void makeDirectory(String directoryPath) throws SSHApiException {
+
+ }
+
+ @Override
+ public boolean cancelJob(String jobID) throws SSHApiException {
+ return false;
+ }
+
+ @Override
+ public JobStatus getJobStatus(String jobID) throws SSHApiException {
+ return null;
+ }
+
+ @Override
+ public String getJobIdByJobName(String jobName, String userName) throws SSHApiException {
+ return null;
+ }
+
+ @Override
+ public void getJobStatuses(String userName, Map<String, JobStatus> jobIDs) throws SSHApiException {
+
+ }
+
+ @Override
+ public List<String> listDirectory(String directoryPath) throws SSHApiException {
+ return null;
+ }
+
+ @Override
+ public Session getSession() throws SSHApiException {
+ return null;
+ }
+
+ @Override
+ public void disconnect() throws SSHApiException {
+
+ }
+
+ private void executeCommand(CommandInfo commandInfo, CommandOutput commandOutput) throws SSHApiException {
+ String command = commandInfo.getCommand();
+ ChannelExec channelExec = null;
+ try {
+ if (!session.isConnected()) {
+ session.connect();
+ }
+ channelExec = ((ChannelExec) session.openChannel("exec"));
+ channelExec.setCommand(command);
+ channelExec.setInputStream(null);
+ channelExec.setErrStream(commandOutput.getStandardError());
+ channelExec.connect();
+ commandOutput.onOutput(channelExec);
+ } catch (JSchException e) {
+ throw new SSHApiException("Unable to execute command - ", e);
+ }finally {
+ //Only disconnecting the channel, session can be reused
+ channelExec.disconnect();
+ }
+ }
+
+ @Override
+ public ServerInfo getServerInfo() {
+ return this.serverInfo;
+ }
+
+ private class DefaultUserInfo implements UserInfo {
+
+ private String userName;
+ private String password;
+ private String passphrase;
+
+ public DefaultUserInfo(String userName, String password, String passphrase) {
+ this.userName = userName;
+ this.password = password;
+ this.passphrase = passphrase;
+ }
+
+ @Override
+ public String getPassphrase() {
+ return null;
+ }
+
+ @Override
+ public String getPassword() {
+ return null;
+ }
+
+ @Override
+ public boolean promptPassword(String s) {
+ return false;
+ }
+
+ @Override
+ public boolean promptPassphrase(String s) {
+ return false;
+ }
+
+ @Override
+ public boolean promptYesNo(String s) {
+ return false;
+ }
+ @Override
+ public void showMessage(String s) {
- public HPCRemoteCluster(JobManagerConfiguration jobManagerConfiguration) {
- super(jobManagerConfiguration);
- }
- public HPCRemoteCluster(ServerInfo serverInfo, AuthenticationInfo authenticationInfo, JobManagerConfiguration config) throws SSHApiException {
- super(serverInfo, authenticationInfo,config);
- }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
new file mode 100644
index 0000000..5ab2e96
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
@@ -0,0 +1,96 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.impl;
+
+import com.google.common.eventbus.EventBus;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.LocalEventPublisher;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.gfac.core.GFacEngine;
+import org.apache.airavata.gfac.core.GFacException;
+import org.apache.airavata.gfac.core.cluster.RemoteCluster;
+import org.apache.airavata.gfac.core.cluster.ServerInfo;
+import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.AppCatalog;
+import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.airavata.registry.cpi.ExperimentCatalog;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+
+import java.util.Map;
+
+public abstract class Factory {
+
+ private static GFacEngine engine;
+ private static Map<String, RemoteCluster> remoteClusterMap;
+ private static LocalEventPublisher localEventPublisher;
+ private static CuratorFramework curatorClient;
+
+ public static GFacEngine getGFacEngine() throws GFacException {
+ if (engine == null) {
+ synchronized (GFacEngineImpl.class) {
+ if (engine == null) {
+ engine = new GFacEngineImpl();
+ }
+ }
+ }
+ return engine;
+ }
+
+ public static RemoteCluster getRemoteCluster(ServerInfo serverInfo) {
+ return remoteClusterMap.get(serverInfo.getHost());
+ }
+
+ public static ExperimentCatalog getDefaultExpCatalog() throws RegistryException {
+ return RegistryFactory.getDefaultExpCatalog();
+ }
+
+ public static AppCatalog getDefaultAppCatalog() throws AppCatalogException {
+ return RegistryFactory.getAppCatalog();
+ }
+
+ public static LocalEventPublisher getLocalEventPublisher() {
+ if (localEventPublisher == null) {
+ synchronized (LocalEventPublisher.class) {
+ if (localEventPublisher == null) {
+ localEventPublisher = new LocalEventPublisher(new EventBus());
+ }
+ }
+ }
+ return localEventPublisher;
+ }
+
+ public static CuratorFramework getCuratorClient() throws ApplicationSettingsException {
+ if (curatorClient == null) {
+ synchronized (Factory.class) {
+ if (curatorClient == null) {
+ String connectionSting = ServerSettings.getZookeeperConnection();
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
+ curatorClient = CuratorFrameworkFactory.newClient(connectionSting, retryPolicy);
+ }
+ }
+ }
+ return curatorClient;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngine.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngine.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngine.java
deleted file mode 100644
index 9c32913..0000000
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngine.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-
-package org.apache.airavata.gfac.impl;
-
-import org.apache.airavata.gfac.core.GFacException;
-import org.apache.airavata.gfac.core.config.DataTransferTaskConfig;
-import org.apache.airavata.gfac.core.config.GFacYamlConfigruation;
-import org.apache.airavata.gfac.core.config.JobSubmitterTaskConfig;
-import org.apache.airavata.gfac.core.config.ResourceConfig;
-import org.apache.airavata.gfac.core.context.ProcessContext;
-import org.apache.airavata.gfac.core.task.Task;
-import org.apache.airavata.model.appcatalog.computeresource.DataMovementProtocol;
-import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
-import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
-import org.apache.airavata.model.application.io.DataType;
-import org.apache.airavata.model.application.io.InputDataObjectType;
-import org.apache.airavata.model.process.ProcessModel;
-import org.apache.airavata.model.task.TaskModel;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class GFacEngine {
- private static GFacEngine engine;
- Map<JobSubmissionProtocol, Task> jobSubmissionTask;
- Map<DataMovementProtocol, Task> dataMovementTask;
- Map<ResourceJobManagerType, ResourceConfig> resources;
-
-
- private GFacEngine() throws GFacException {
- GFacYamlConfigruation config = new GFacYamlConfigruation();
- for (JobSubmitterTaskConfig jobSubmitterTaskConfig : config.getJobSbumitters()) {
- jobSubmissionTask.put(jobSubmitterTaskConfig.getSubmissionProtocol(), null);
- }
-
- for (DataTransferTaskConfig dataTransferTaskConfig : config.getFileTransferTasks()) {
- dataMovementTask.put(dataTransferTaskConfig.getTransferProtocol(), null);
- }
-
- for (ResourceConfig resourceConfig : config.getResourceConfiguration()) {
- resources.put(resourceConfig.getJobManagerType(), resourceConfig);
- }
- }
-
- public static GFacEngine getInstance() throws GFacException {
- if (engine == null) {
- synchronized (GFacEngine.class) {
- if (engine == null) {
- engine = new GFacEngine();
- }
- }
- }
- return engine;
- }
-
- public ProcessContext populateProcessContext(ProcessContext processContext) {
- processContext.setProcessModel(new ProcessModel()); // TODO: get rocess model from app catalog
- // TODO: set datamovement protocol and jobsubmission protocol
- //TODO: set up gatewayResourceProfile.
- return processContext;
- }
-
- public void createTaskChain(ProcessContext processContext) throws GFacException {
- List<InputDataObjectType> processInputs = processContext.getProcessModel().getProcessInputs();
- List<TaskModel> taskChain = new ArrayList<>();
- if (processInputs != null) {
- for (InputDataObjectType processInput : processInputs) {
- DataType type = processInput.getType();
- switch (type) {
- case STDERR:
- //
- break;
- case STDOUT:
- //
- break;
- case URI:
- // add URI type Task
- break;
- default:
- // nothing to do
- break;
- }
- }
- }
- }
-
- public void executeProcess(ProcessContext processContext) throws GFacException {
-
-
- }
-
- public void recoverProcess(ProcessContext processContext) throws GFacException {
-
- }
-
- public void runProcessOutflow(ProcessContext processContext) throws GFacException {
-
- }
-
- public void recoverProcessOutflow(ProcessContext processContext) throws GFacException {
-
- }
-
- public void cancelProcess() throws GFacException {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
new file mode 100644
index 0000000..b2f9fd3
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -0,0 +1,128 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.gfac.impl;
+
+import org.apache.airavata.gfac.core.GFacEngine;
+import org.apache.airavata.gfac.core.GFacException;
+import org.apache.airavata.gfac.core.config.DataTransferTaskConfig;
+import org.apache.airavata.gfac.core.config.GFacYamlConfigruation;
+import org.apache.airavata.gfac.core.config.JobSubmitterTaskConfig;
+import org.apache.airavata.gfac.core.config.ResourceConfig;
+import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.gfac.core.task.Task;
+import org.apache.airavata.model.appcatalog.computeresource.DataMovementProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
+import org.apache.airavata.model.application.io.DataType;
+import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.process.ProcessModel;
+import org.apache.airavata.model.task.TaskModel;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class GFacEngineImpl implements GFacEngine {
+ private static GFacEngineImpl engine;
+ Map<JobSubmissionProtocol, Task> jobSubmissionTask;
+ Map<DataMovementProtocol, Task> dataMovementTask;
+ Map<ResourceJobManagerType, ResourceConfig> resources;
+
+
+ public GFacEngineImpl() throws GFacException {
+ GFacYamlConfigruation config = new GFacYamlConfigruation();
+ for (JobSubmitterTaskConfig jobSubmitterTaskConfig : config.getJobSbumitters()) {
+ jobSubmissionTask.put(jobSubmitterTaskConfig.getSubmissionProtocol(), null);
+ }
+
+ for (DataTransferTaskConfig dataTransferTaskConfig : config.getFileTransferTasks()) {
+ dataMovementTask.put(dataTransferTaskConfig.getTransferProtocol(), null);
+ }
+
+ for (ResourceConfig resourceConfig : config.getResourceConfiguration()) {
+ resources.put(resourceConfig.getJobManagerType(), resourceConfig);
+ }
+ }
+
+ @Override
+ public ProcessContext populateProcessContext(String experimentId, String processId, String gatewayId, String
+ tokenId) throws GFacException {
+ ProcessContext processContext = new ProcessContext(processId, gatewayId, tokenId);
+ processContext.setProcessModel(new ProcessModel());
+ // TODO: get process model from app catalog
+ // TODO: set datamovement protocol and jobsubmission protocol
+ // TODO: set up gatewayResourceProfile.
+ // TODO: set RemoteCluster
+ return processContext;
+ }
+
+ @Override
+ public void createTaskChain(ProcessContext processContext) throws GFacException {
+ List<InputDataObjectType> processInputs = processContext.getProcessModel().getProcessInputs();
+ List<TaskModel> taskChain = new ArrayList<>();
+ if (processInputs != null) {
+ for (InputDataObjectType processInput : processInputs) {
+ DataType type = processInput.getType();
+ switch (type) {
+ case STDERR:
+ //
+ break;
+ case STDOUT:
+ //
+ break;
+ case URI:
+ // add URI type Task
+
+ break;
+ default:
+ // nothing to do
+ break;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void executeProcess(ProcessContext processContext) throws GFacException {
+
+ }
+
+ @Override
+ public void recoverProcess(ProcessContext processContext) throws GFacException {
+
+ }
+
+ @Override
+ public void runProcessOutflow(ProcessContext processContext) throws GFacException {
+
+ }
+
+ @Override
+ public void recoverProcessOutflow(ProcessContext processContext) throws GFacException {
+
+ }
+
+ @Override
+ public void cancelProcess() throws GFacException {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
index f7616b3..63f0088 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
@@ -21,9 +21,8 @@
package org.apache.airavata.gfac.impl;
-import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.gfac.core.GFacEngine;
import org.apache.airavata.gfac.core.GFacException;
-import org.apache.airavata.gfac.core.GFacUtils;
import org.apache.airavata.gfac.core.context.ProcessContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,30 +30,42 @@ import org.slf4j.LoggerFactory;
public class GFacWorker implements Runnable {
private static final Logger log = LoggerFactory.getLogger(GFacWorker.class);
- private final ProcessContext processContext;
+ private ProcessContext processContext;
+ private String experimentId;
+ private String processId;
+ private String gatewayId;
+ private String tokenId;
- public GFacWorker(ProcessContext processContext) throws AiravataException {
+
+ public GFacWorker(ProcessContext processContext) throws GFacException {
if (processContext == null) {
- throw new AiravataException("Worker must initialize with valide processContext, Process context is null");
+ throw new GFacException("Worker must initialize with valide processContext, Process context is null");
}
this.processContext = processContext;
}
+ public GFacWorker(String experimentId, String processId, String gatewayId, String tokenId) throws GFacException {
+ this.experimentId = experimentId;
+ this.processId = processId;
+ this.gatewayId = gatewayId;
+ this.tokenId = tokenId;
+ }
@Override
public void run() {
try {
- GFacEngine engine = GFacEngine.getInstance();
+ GFacEngine engine = Factory.getGFacEngine();
ProcessType type = getProcessType(processContext);
+ if (processContext == null) {
+ processContext = engine.populateProcessContext(experimentId, processId, gatewayId, tokenId);
+ }
try {
switch (type) {
case NEW:
- engine.populateProcessContext(processContext);
engine.createTaskChain(processContext);
engine.executeProcess(processContext);
break;
case RECOVER:
// recover the process
- engine.populateProcessContext(processContext);
engine.createTaskChain(processContext);
engine.recoverProcess(processContext);
break;
@@ -64,7 +75,6 @@ public class GFacWorker implements Runnable {
break;
case RECOVER_OUTFLOW:
// recover outflow task;
- engine.populateProcessContext(processContext);
engine.recoverProcessOutflow(processContext);
}
} catch (GFacException e) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
new file mode 100644
index 0000000..c0b458c
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
@@ -0,0 +1,443 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.impl;
+
+import com.jcraft.jsch.Channel;
+import com.jcraft.jsch.ChannelExec;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.gfac.gsi.ssh.impl.StandardOutReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Utility class to do all ssh and scp related things.
+ */
+public class SSHUtils {
+ private static final Logger log = LoggerFactory.getLogger(SSHUtils.class);
+
+
+ /**
+ * This will copy a local file to a remote location
+ *
+ * @param remoteFile remote location you want to transfer the file, this cannot be a directory, if user pass
+ * a dirctory we do copy it to that directory but we simply return the directory name
+ * todo handle the directory name as input and return the proper final output file name
+ * @param localFile Local file to transfer, this can be a directory
+ * @param session
+ * @return returns the final remote file path, so that users can use the new file location
+ * @throws IOException
+ * @throws JSchException
+ * @throws SSHApiException
+ */
+ public static String scpTo(String remoteFile, String localFile, Session session) throws IOException, JSchException, SSHApiException {
+ FileInputStream fis = null;
+ String prefix = null;
+ if (new File(localFile).isDirectory()) {
+ prefix = localFile + File.separator;
+ }
+ boolean ptimestamp = true;
+
+ // exec 'scp -t rfile' remotely
+ String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + remoteFile;
+ Channel channel = session.openChannel("exec");
+
+ StandardOutReader stdOutReader = new StandardOutReader();
+ ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError());
+ ((ChannelExec) channel).setCommand(command);
+
+ // get I/O streams for remote scp
+ OutputStream out = channel.getOutputStream();
+ InputStream in = channel.getInputStream();
+
+ channel.connect();
+
+ if (checkAck(in) != 0) {
+ String error = "Error Reading input Stream";
+ log.error(error);
+ throw new SSHApiException(error);
+ }
+
+ File _lfile = new File(localFile);
+
+ if (ptimestamp) {
+ command = "T" + (_lfile.lastModified() / 1000) + " 0";
+ // The access time should be sent here,
+ // but it is not accessible with JavaAPI ;-<
+ command += (" " + (_lfile.lastModified() / 1000) + " 0\n");
+ out.write(command.getBytes());
+ out.flush();
+ if (checkAck(in) != 0) {
+ String error = "Error Reading input Stream";
+ log.error(error);
+ throw new SSHApiException(error);
+ }
+ }
+
+ // send "C0644 filesize filename", where filename should not include '/'
+ long filesize = _lfile.length();
+ command = "C0644 " + filesize + " ";
+ if (localFile.lastIndexOf('/') > 0) {
+ command += localFile.substring(localFile.lastIndexOf('/') + 1);
+ } else {
+ command += localFile;
+ }
+ command += "\n";
+ out.write(command.getBytes());
+ out.flush();
+ if (checkAck(in) != 0) {
+ String error = "Error Reading input Stream";
+ log.error(error);
+ throw new SSHApiException(error);
+ }
+
+ // send a content of lFile
+ fis = new FileInputStream(localFile);
+ byte[] buf = new byte[1024];
+ while (true) {
+ int len = fis.read(buf, 0, buf.length);
+ if (len <= 0) break;
+ out.write(buf, 0, len); //out.flush();
+ }
+ fis.close();
+ fis = null;
+ // send '\0'
+ buf[0] = 0;
+ out.write(buf, 0, 1);
+ out.flush();
+ if (checkAck(in) != 0) {
+ String error = "Error Reading input Stream";
+ log.error(error);
+ throw new SSHApiException(error);
+ }
+ out.close();
+ stdOutReader.onOutput(channel);
+
+
+ channel.disconnect();
+ if (stdOutReader.getStdErrorString().contains("scp:")) {
+ throw new SSHApiException(stdOutReader.getStdErrorString());
+ }
+ //since remote file is always a file we just return the file
+ return remoteFile;
+ }
+
+ /**
+ * This method will copy a remote file to a local directory
+ *
+ * @param remoteFile remote file path, this has to be a full qualified path
+ * @param localFile This is the local file to copy, this can be a directory too
+ * @param session
+ * @return returns the final local file path of the new file came from the remote resource
+ */
+ public static void scpFrom(String remoteFile, String localFile, Session session) throws IOException, JSchException, SSHApiException {
+ FileOutputStream fos = null;
+ try {
+ String prefix = null;
+ if (new File(localFile).isDirectory()) {
+ prefix = localFile + File.separator;
+ }
+
+ // exec 'scp -f remotefile' remotely
+ String command = "scp -f " + remoteFile;
+ Channel channel = session.openChannel("exec");
+ ((ChannelExec) channel).setCommand(command);
+
+ StandardOutReader stdOutReader = new StandardOutReader();
+ ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError());
+ // get I/O streams for remote scp
+ OutputStream out = channel.getOutputStream();
+ InputStream in = channel.getInputStream();
+
+ channel.connect();
+
+ byte[] buf = new byte[1024];
+
+ // send '\0'
+ buf[0] = 0;
+ out.write(buf, 0, 1);
+ out.flush();
+
+ while (true) {
+ int c = checkAck(in);
+ if (c != 'C') {
+ break;
+ }
+
+ // read '0644 '
+ in.read(buf, 0, 5);
+
+ long filesize = 0L;
+ while (true) {
+ if (in.read(buf, 0, 1) < 0) {
+ // error
+ break;
+ }
+ if (buf[0] == ' ') break;
+ filesize = filesize * 10L + (long) (buf[0] - '0');
+ }
+
+ String file = null;
+ for (int i = 0; ; i++) {
+ in.read(buf, i, 1);
+ if (buf[i] == (byte) 0x0a) {
+ file = new String(buf, 0, i);
+ break;
+ }
+ }
+
+ //System.out.println("filesize="+filesize+", file="+file);
+
+ // send '\0'
+ buf[0] = 0;
+ out.write(buf, 0, 1);
+ out.flush();
+
+ // read a content of lfile
+ fos = new FileOutputStream(prefix == null ? localFile : prefix + file);
+ int foo;
+ while (true) {
+ if (buf.length < filesize) foo = buf.length;
+ else foo = (int) filesize;
+ foo = in.read(buf, 0, foo);
+ if (foo < 0) {
+ // error
+ break;
+ }
+ fos.write(buf, 0, foo);
+ filesize -= foo;
+ if (filesize == 0L) break;
+ }
+ fos.close();
+ fos = null;
+
+ if (checkAck(in) != 0) {
+ String error = "Error transfering the file content";
+ log.error(error);
+ throw new SSHApiException(error);
+ }
+
+ // send '\0'
+ buf[0] = 0;
+ out.write(buf, 0, 1);
+ out.flush();
+ }
+ stdOutReader.onOutput(channel);
+ if (stdOutReader.getStdErrorString().contains("scp:")) {
+ throw new SSHApiException(stdOutReader.getStdErrorString());
+ }
+
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ } finally {
+ try {
+ if (fos != null) fos.close();
+ } catch (Exception ee) {
+ }
+ }
+ }
+
+ /**
+ * This method will copy a remote file to a local directory
+ *
+ * @param remoteFileSource remote file path, this has to be a full qualified path
+ * @param remoteFileTarget This is the local file to copy, this can be a directory too
+ * @param session JSch Session object
+ * @return returns the final local file path of the new file came from the remote resource
+ */
+ public static void scpThirdParty(String remoteFileSource, String remoteFileTarget, Session session) throws IOException, JSchException, SSHApiException {
+ FileOutputStream fos = null;
+ try {
+ String prefix = null;
+
+ // exec 'scp -f remotefile' remotely
+ String command = "scp -3 " + remoteFileSource + " " + remoteFileTarget;
+ Channel channel = session.openChannel("exec");
+ ((ChannelExec) channel).setCommand(command);
+
+ StandardOutReader stdOutReader = new StandardOutReader();
+ ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError());
+ // get I/O streams for remote scp
+ OutputStream out = channel.getOutputStream();
+ InputStream in = channel.getInputStream();
+
+ channel.connect();
+
+ byte[] buf = new byte[1024];
+
+ // send '\0'
+ buf[0] = 0;
+ out.write(buf, 0, 1);
+ out.flush();
+
+ while (true) {
+ int c = checkAck(in);
+ if (c != 'C') {
+ break;
+ }
+
+ // read '0644 '
+ in.read(buf, 0, 5);
+
+ long filesize = 0L;
+ while (true) {
+ if (in.read(buf, 0, 1) < 0) {
+ // error
+ break;
+ }
+ if (buf[0] == ' ') break;
+ filesize = filesize * 10L + (long) (buf[0] - '0');
+ }
+ int foo;
+ while (true) {
+ if (buf.length < filesize) foo = buf.length;
+ else foo = (int) filesize;
+
+ int len = in.read(buf, 0, foo);
+ if (len <= 0) break;
+ out.write(buf, 0, len);
+ }
+ // send '\0'
+ buf[0] = 0;
+ out.write(buf, 0, 1);
+ out.flush();
+ if (checkAck(in) != 0) {
+ String error = "Error transfering the file content";
+ log.error(error);
+ throw new SSHApiException(error);
+ }
+
+ }
+ out.close();
+
+ stdOutReader.onOutput(channel);
+ if (stdOutReader.getStdErrorString().contains("scp:")) {
+ throw new SSHApiException(stdOutReader.getStdErrorString());
+ }
+
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ } finally {
+ try {
+ if (fos != null) fos.close();
+ } catch (Exception ee) {
+ }
+ }
+ }
+
+ public static void makeDirectory(String path, Session session) throws IOException, JSchException, SSHApiException {
+
+ // exec 'scp -t rfile' remotely
+ String command = "mkdir -p " + path;
+ Channel channel = session.openChannel("exec");
+ StandardOutReader stdOutReader = new StandardOutReader();
+
+ ((ChannelExec) channel).setCommand(command);
+
+
+ ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError());
+ try {
+ channel.connect();
+ } catch (JSchException e) {
+
+ channel.disconnect();
+// session.disconnect();
+
+ throw new SSHApiException("Unable to retrieve command output. Command - " + command +
+ " on server - " + session.getHost() + ":" + session.getPort() +
+ " connecting user name - "
+ + session.getUserName(), e);
+ }
+ stdOutReader.onOutput(channel);
+ if (stdOutReader.getStdErrorString().contains("mkdir:")) {
+ throw new SSHApiException(stdOutReader.getStdErrorString());
+ }
+
+ channel.disconnect();
+ }
+
+ public static List<String> listDirectory(String path, Session session) throws IOException, JSchException, SSHApiException {
+
+ // exec 'scp -t rfile' remotely
+ String command = "ls " + path;
+ Channel channel = session.openChannel("exec");
+ StandardOutReader stdOutReader = new StandardOutReader();
+
+ ((ChannelExec) channel).setCommand(command);
+
+
+ ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError());
+ try {
+ channel.connect();
+ } catch (JSchException e) {
+
+ channel.disconnect();
+// session.disconnect();
+
+ throw new SSHApiException("Unable to retrieve command output. Command - " + command +
+ " on server - " + session.getHost() + ":" + session.getPort() +
+ " connecting user name - "
+ + session.getUserName(), e);
+ }
+ stdOutReader.onOutput(channel);
+ stdOutReader.getStdOutputString();
+ if (stdOutReader.getStdErrorString().contains("ls:")) {
+ throw new SSHApiException(stdOutReader.getStdErrorString());
+ }
+ channel.disconnect();
+ return Arrays.asList(stdOutReader.getStdOutputString().split("\n"));
+ }
+
+
+ static int checkAck(InputStream in) throws IOException {
+ int b = in.read();
+ if (b == 0) return b;
+ if (b == -1) return b;
+
+ if (b == 1 || b == 2) {
+ StringBuffer sb = new StringBuffer();
+ int c;
+ do {
+ c = in.read();
+ sb.append((char) c);
+ }
+ while (c != '\n');
+ if (b == 1) { // error
+ System.out.print(sb.toString());
+ }
+ if (b == 2) { // fatal error
+ System.out.print(sb.toString());
+ }
+ }
+ return b;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPFileTransferTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPFileTransferTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPFileTransferTask.java
new file mode 100644
index 0000000..49560a6
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPFileTransferTask.java
@@ -0,0 +1,98 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.impl.task;
+
+import com.jcraft.jsch.JSch;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.Task;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.SSHUtils;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.task.DataStagingTaskModel;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Map;
+
+public class SCPFileTransferTask implements Task {
+
+ public static final int DEFAULT_SSH_PORT = 22;
+ private String password;
+ private String publicKeyPath;
+ private String passPhrase;
+ private String privateKeyPath;
+ private String userName;
+ private String hostName;
+ private String inputPath;
+
+
+ @Override
+ public void init(Map<String, String> properties) throws TaskException {
+ password = properties.get("password");
+ passPhrase = properties.get("passPhrase");
+ privateKeyPath = properties.get("privateKeyPath");
+ publicKeyPath = properties.get("publicKeyPath");
+ userName = properties.get("userName");
+ hostName = properties.get("hostName");
+ inputPath = properties.get("inputPath");
+ }
+
+ @Override
+ public TaskState execute(TaskContext taskContext) throws TaskException {
+ DataStagingTaskModel dataStagingTaskModel = new DataStagingTaskModel();
+ try {
+ URL sourceURL = new URL(dataStagingTaskModel.getSource());
+ URL destinationURL = new URL(dataStagingTaskModel.getDestination());
+
+ if (sourceURL.getProtocol().equalsIgnoreCase("file")) { // local --> Airavata --> RemoteCluster
+ taskContext.getParentProcessContext().getRemoteCluster().scpTo(sourceURL.getPath(),
+ dataStagingTaskModel.getDestination());
+ } else { // PGA(client) --> Airavata --> RemoteCluster
+ // PGA(client) --> Airavata
+ JSch jsch = new JSch();
+ jsch.addIdentity(privateKeyPath, passPhrase);
+ Session session = jsch.getSession(userName, hostName, DEFAULT_SSH_PORT);
+ SSHUtils.scpFrom(sourceURL.getPath(), inputPath, session);
+
+ // Airavata --> RemoteCluster
+ taskContext.getParentProcessContext().getRemoteCluster().scpTo(destinationURL.getPath(), inputPath);
+ }
+ } catch (MalformedURLException e) {
+ throw new TaskException("Wrong source or destination file path.", e);
+ } catch (SSHApiException e) {
+ throw new TaskException("Scp attempt failed", e);
+ } catch (JSchException | IOException e) {
+ throw new TaskException("Scp failed", e);
+ }
+ return null;
+ }
+
+ @Override
+ public TaskState recover(TaskContext taskContext) throws TaskException {
+ return null;
+ }
+
+
+}