You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/06/16 17:21:05 UTC

[1/2] airavata git commit: Added Factory , HPCRemoteCluster , SSHUtils and SCPFileTransferTask classes

Repository: airavata
Updated Branches:
  refs/heads/master 869df7ecd -> e6622078b


http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 3be4774..15d0167 100644
--- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -20,30 +20,23 @@
 */
 package org.apache.airavata.gfac.server;
 
-import com.google.common.eventbus.EventBus;
-import org.apache.airavata.common.exception.AiravataStartupException;
-import org.apache.airavata.common.utils.LocalEventPublisher;
-import org.apache.airavata.gfac.core.GFacConstants;
-import org.apache.airavata.gfac.impl.GFacWorker;
-import org.apache.airavata.gfac.core.context.ProcessContext;
-import org.apache.airavata.registry.cpi.AppCatalog;
 import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.AiravataStartupException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.log.AiravataLogger;
-import org.apache.airavata.common.log.AiravataLoggerFactory;
 import org.apache.airavata.common.utils.AiravataZKUtils;
+import org.apache.airavata.common.utils.LocalEventPublisher;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.common.utils.listener.AbstractActivityListener;
-import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
-import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.GFac;
-import org.apache.airavata.gfac.core.handler.ThreadedHandler;
-import org.apache.airavata.gfac.core.GFacThreadPoolExecutor;
+import org.apache.airavata.gfac.core.GFacConstants;
+import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.GFacUtils;
 import org.apache.airavata.gfac.cpi.GfacService;
 import org.apache.airavata.gfac.cpi.gfac_cpi_serviceConstants;
 import org.apache.airavata.gfac.impl.BetterGfacImpl;
+import org.apache.airavata.gfac.impl.Factory;
+import org.apache.airavata.gfac.impl.GFacWorker;
 import org.apache.airavata.messaging.core.MessageContext;
 import org.apache.airavata.messaging.core.MessageHandler;
 import org.apache.airavata.messaging.core.MessagingConstants;
@@ -53,15 +46,13 @@ import org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchConsumer;
 import org.apache.airavata.model.messaging.event.MessageType;
 import org.apache.airavata.model.messaging.event.TaskSubmitEvent;
 import org.apache.airavata.model.messaging.event.TaskTerminateEvent;
-import org.apache.airavata.model.experiment.ExperimentState;
-import org.apache.airavata.model.experiment.ExperimentStatus;
+import org.apache.airavata.model.status.ExperimentState;
+import org.apache.airavata.model.status.ExperimentStatus;
+import org.apache.airavata.registry.cpi.AppCatalog;
 import org.apache.airavata.registry.cpi.ExperimentCatalog;
 import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
 import org.apache.airavata.registry.cpi.RegistryException;
-import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
@@ -70,7 +61,6 @@ import org.apache.zookeeper.ZooDefs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.HashMap;
@@ -91,8 +81,6 @@ public class GfacServerHandler implements GfacService.Iface {
     private LocalEventPublisher localEventPublisher;
     private String airavataServerHostPort;
     private BlockingQueue<TaskSubmitEvent> taskSubmitEvents;
-    private static File gfacConfigFile;
-    private static List<ThreadedHandler> daemonHandlers = new ArrayList<ThreadedHandler>();
     private static List<AbstractActivityListener> activityListeners = new ArrayList<AbstractActivityListener>();
     private ExecutorService executorService;
 
@@ -102,9 +90,6 @@ public class GfacServerHandler implements GfacService.Iface {
             initZkDataStructure();
             initAMQPClient();
 	        executorService = Executors.newFixedThreadPool(ServerSettings.getGFacThreadPoolSize());
-	        localEventPublisher = new LocalEventPublisher(new EventBus());
-            experimentCatalog = RegistryFactory.getDefaultExpCatalog();
-            appCatalog = RegistryFactory.getAppCatalog();
             startStatusUpdators(experimentCatalog, curatorClient, localEventPublisher, rabbitMQTaskLaunchConsumer);
         } catch (Exception e) {
             throw new AiravataStartupException("Gfac Server Initialization error ", e);
@@ -117,9 +102,7 @@ public class GfacServerHandler implements GfacService.Iface {
     }
 
     private void startCuratorClient() throws ApplicationSettingsException {
-        String connectionSting = ServerSettings.getZookeeperConnection();
-        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
-        curatorClient = CuratorFrameworkFactory.newClient(connectionSting, retryPolicy);
+		curatorClient = Factory.getCuratorClient();
         curatorClient.start();
     }
 
@@ -174,18 +157,14 @@ public class GfacServerHandler implements GfacService.Iface {
         log.info("-----------------------------------" + requestCount + "-----------------------------------------");
         log.info(experimentId, "GFac Received submit job request for the Experiment: {} process: {}", experimentId,
                 processId);
-        ProcessContext processContext = new ProcessContext(processId, gatewayId, tokenId);
-        processContext.setAppCatalog(appCatalog);
-        processContext.setExperimentCatalog(experimentCatalog);
-        processContext.setCuratorClient(curatorClient);
-        processContext.setLocalEventPublisher(localEventPublisher);
+
         try {
-	        executorService.execute(new GFacWorker(processContext));
-        } catch (AiravataException e) {
+	        executorService.execute(new GFacWorker(experimentId,processId, gatewayId, tokenId));
+        } catch (GFacException e) {
             log.error("Failed to submit process", e);
             return false;
         }
-        return true;
+	    return true;
     }
 
     public boolean cancelJob(String experimentId, String taskId, String gatewayId, String tokenId) throws TException {
@@ -294,7 +273,7 @@ public class GfacServerHandler implements GfacService.Iface {
                     ThriftUtils.createThriftFromBytes(bytes, event);
                     // update experiment status to executing
                     ExperimentStatus status = new ExperimentStatus();
-                    status.setExperimentState(ExperimentState.EXECUTING);
+                    status.setState(ExperimentState.EXECUTING);
                     status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
                     experimentCatalog.update(ExperimentCatalogModelType.EXPERIMENT_STATUS, status, event.getExperimentId());
                     try {


[2/2] airavata git commit: Added Factory , HPCRemoteCluster , SSHUtils and SCPFileTransferTask classes

Posted by sh...@apache.org.
Added Factory , HPCRemoteCluster , SSHUtils and SCPFileTransferTask classes


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/e6622078
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/e6622078
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/e6622078

Branch: refs/heads/master
Commit: e6622078b2607cbfa7a6b3274ac470ce8d6fbd9a
Parents: 869df7e
Author: Shameera Rathanyaka <sh...@gmail.com>
Authored: Tue Jun 16 11:20:55 2015 -0400
Committer: Shameera Rathanyaka <sh...@gmail.com>
Committed: Tue Jun 16 11:20:55 2015 -0400

----------------------------------------------------------------------
 .../core/AbstractJobManagerConfiguration.java   |  40 ++
 .../airavata/gfac/core/GFacConstants.java       |   2 +
 .../apache/airavata/gfac/core/GFacEngine.java   |  41 ++
 .../apache/airavata/gfac/core/GFacUtils.java    |   3 -
 .../core/authentication/AuthenticationInfo.java |   9 +-
 .../authentication/SSHKeyAuthentication.java    |  45 +-
 .../SSHPasswordAuthentication.java              |  29 +-
 .../core/cluster/AbstractRemoteCluster.java     |  39 ++
 .../airavata/gfac/core/cluster/JobStatus.java   |  87 ----
 .../gfac/core/cluster/OutputParser.java         |   1 +
 .../gfac/core/cluster/RemoteCluster.java        | 230 +++++-----
 .../gfac/core/context/ProcessContext.java       |  10 +
 .../gsi/ssh/api/job/LSFJobConfiguration.java    |   1 +
 .../gsi/ssh/impl/GSISSHAbstractCluster.java     | 185 ++++----
 .../gfac/gsi/ssh/impl/HPCRemoteCluster.java     | 178 +++++++-
 .../org/apache/airavata/gfac/impl/Factory.java  |  96 ++++
 .../apache/airavata/gfac/impl/GFacEngine.java   | 127 ------
 .../airavata/gfac/impl/GFacEngineImpl.java      | 128 ++++++
 .../apache/airavata/gfac/impl/GFacWorker.java   |  28 +-
 .../org/apache/airavata/gfac/impl/SSHUtils.java | 443 +++++++++++++++++++
 .../gfac/impl/task/SCPFileTransferTask.java     |  98 ++++
 .../airavata/gfac/server/GfacServerHandler.java |  51 +--
 22 files changed, 1354 insertions(+), 517 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/AbstractJobManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/AbstractJobManagerConfiguration.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/AbstractJobManagerConfiguration.java
new file mode 100644
index 0000000..7f1ff5a
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/AbstractJobManagerConfiguration.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.core;
+
+import org.apache.airavata.gfac.core.cluster.OutputParser;
+import org.apache.airavata.model.appcatalog.computeresource.JobManagerCommand;
+
+import java.util.Map;
+
+public abstract class AbstractJobManagerConfiguration implements JobManagerConfiguration {
+	final String jobManagerBinPath;
+	final Map<JobManagerCommand, String> jobManagerCommands;
+	final OutputParser outputParser;
+
+	public AbstractJobManagerConfiguration(String jobManagerBinDir, Map<JobManagerCommand, String> jobManagerCommands,
+	                                       OutputParser outputParser) {
+		this.jobManagerBinPath = jobManagerBinDir;
+		this.jobManagerCommands = jobManagerCommands;
+		this.outputParser = outputParser;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java
index 621eeac..2706f27 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java
@@ -53,6 +53,8 @@ public class GFacConstants {
 	public static final String ZOOKEEPER_SERVERS_NODE = "/servers";
 	public static final String ZOOKEEPER_GFAC_SERVER_NODE = "/gfac";
 	public static final String ZOOKEEPER_EXPERIMENT_NODE = "/experiments";
+	public static final String ZOOKEEPER_DELIVERYTAG_NODE = "/deliveryTag";
+	public static final String ZOOKEEPER_CANCEL_LISTENER_NODE = "/cancelListener";
 
 	public static final String PROP_WORKFLOW_INSTANCE_ID = "workflow.instance.id";
 	public static final String PROP_WORKFLOW_NODE_ID = "workflow.node.id";

http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacEngine.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacEngine.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacEngine.java
new file mode 100644
index 0000000..c70ddb5
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacEngine.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.core;
+
+import org.apache.airavata.gfac.core.context.ProcessContext;
+
+public interface GFacEngine {
+
+
+	public ProcessContext populateProcessContext(String experimentId, String processId, String gatewayId, String tokenId) throws GFacException;
+
+	public void createTaskChain(ProcessContext processContext) throws GFacException;
+
+	public void executeProcess(ProcessContext processContext) throws GFacException ;
+
+	public void recoverProcess(ProcessContext processContext) throws GFacException ;
+
+	public void runProcessOutflow(ProcessContext processContext) throws GFacException ;
+
+	public void recoverProcessOutflow(ProcessContext processContext) throws GFacException ;
+
+	public void cancelProcess() throws GFacException ;
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
index 3a8c1c5..ae48ed7 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
@@ -101,9 +101,6 @@ public class GFacUtils {
 	private GFacUtils() {
 	}
 
-	public static ProcessContext populateProcessContext(ProcessContext processContext) {
-		return processContext;
-	}
 
 	/**
 	 * Read data from inputStream and convert it to String.

http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/authentication/AuthenticationInfo.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/authentication/AuthenticationInfo.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/authentication/AuthenticationInfo.java
index 2a01e9d..87af69f 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/authentication/AuthenticationInfo.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/authentication/AuthenticationInfo.java
@@ -1,4 +1,4 @@
-package org.apache.airavata.gfac.core.authentication;/*
+/*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -19,14 +19,11 @@ package org.apache.airavata.gfac.core.authentication;/*
  *
  */
 
-/**
- * User: AmilaJ (amilaj@apache.org)
- * Date: 10/4/13
- * Time: 11:25 AM
- */
+package org.apache.airavata.gfac.core.authentication;
 
 /**
  * An empty interface that represents authentication data to the API.
  */
 public interface AuthenticationInfo {
+
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/authentication/SSHKeyAuthentication.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/authentication/SSHKeyAuthentication.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/authentication/SSHKeyAuthentication.java
index 41b8c9e..94beadd 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/authentication/SSHKeyAuthentication.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/authentication/SSHKeyAuthentication.java
@@ -28,19 +28,34 @@ package org.apache.airavata.gfac.core.authentication;/*
 /**
  * Abstracts out common methods for SSH key authentication.
  */
-public interface SSHKeyAuthentication extends AuthenticationInfo {
-
-    /**
-     * This is needed only if private key and public keys are encrypted.
-     * If they are not encrypted we can just return null.
-     * @return User should return pass phrase if keys are encrypted. If not null.
-     */
-    String getPassPhrase();
-
-    /**
-     * Callback with the banner message. API user can get hold of banner message
-     * by implementing this method.
-     * @param message The banner message.
-     */
-    void bannerMessage(String message);
+public class SSHKeyAuthentication implements AuthenticationInfo {
+
+	private String userName;
+	private String privateKeyFilePath;
+	private String publicKeyFilePath;
+	private String passphrase;
+
+	public SSHKeyAuthentication(String userName, String privateKeyFilePath, String publicKeyFilePath, String
+			passphrase) {
+		this.userName = userName;
+		this.privateKeyFilePath = privateKeyFilePath;
+		this.publicKeyFilePath = publicKeyFilePath;
+		this.passphrase = passphrase;
+	}
+
+	public String getUserName() {
+		return userName;
+	}
+
+	public String getPrivateKeyFilePath() {
+		return privateKeyFilePath;
+	}
+
+	public String getPublicKeyFilePath() {
+		return publicKeyFilePath;
+	}
+
+	public String getPassphrase() {
+		return passphrase;
+	}
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/authentication/SSHPasswordAuthentication.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/authentication/SSHPasswordAuthentication.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/authentication/SSHPasswordAuthentication.java
index e5b867b..2ca2e7e 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/authentication/SSHPasswordAuthentication.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/authentication/SSHPasswordAuthentication.java
@@ -22,22 +22,23 @@
 package org.apache.airavata.gfac.core.authentication;
 
 /**
- * User: AmilaJ (amilaj@apache.org)
- * Date: 10/4/13
- * Time: 11:22 AM
- */
-
-/**
  * Password authentication for vanilla SSH.
  */
-public interface SSHPasswordAuthentication extends AuthenticationInfo {
+public class SSHPasswordAuthentication implements AuthenticationInfo {
+
+	private String userName;
+	private String password;
+
+	public SSHPasswordAuthentication(String userName, String password) {
+		this.userName = userName;
+		this.password = password;
+	}
 
-    /**
-     * Gets the password for given host name and given user name.
-     * @param userName The connecting user name name.
-     * @param hostName The connecting host.
-     * @return Password for the given user.
-     */
-    String getPassword(String userName, String hostName);
+	public String getUserName() {
+		return userName;
+	}
 
+	public String getPassword() {
+		return password;
+	}
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/AbstractRemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/AbstractRemoteCluster.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/AbstractRemoteCluster.java
new file mode 100644
index 0000000..3487224
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/AbstractRemoteCluster.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.core.cluster;
+
+import org.apache.airavata.gfac.core.JobManagerConfiguration;
+
+import java.util.Map;
+
+public class AbstractRemoteCluster {
+
+	ServerInfo serverInfo;
+	JobManagerConfiguration jobManagerConfiguration;
+	Map<String,String> authenticationParam;
+
+	public AbstractRemoteCluster(ServerInfo serverInfo, JobManagerConfiguration jobManagerConfiguration, Map<String,
+			String> authenticationParam) {
+		this.serverInfo = serverInfo;
+		this.jobManagerConfiguration = jobManagerConfiguration;
+		this.authenticationParam = authenticationParam;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/JobStatus.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/JobStatus.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/JobStatus.java
index 6e8e144..f784aa6 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/JobStatus.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/JobStatus.java
@@ -20,91 +20,4 @@
 */
  package org.apache.airavata.gfac.core.cluster;
 
- /**
-  * This will contains all the PBS specific job statuses.
-  * C -  Job is completed after having run/
-  * E -  Job is exiting after having run.
-  * H -  Job is held.
-  * Q -  job is queued, eligible to run or routed.
-  * R -  job is running.
-  * T -  job is being moved to new location.
-  * W -  job is waiting for its execution time
-  * (-a option) to be reached.
-  * S -  (Unicos only) job is suspend.
-  */
- public enum JobStatus {
-     C, E, H, Q, R, T, W, S,U,F,CA,CD,CF,CG,NF,PD,PR,TO,qw,t,r,h,Er,Eqw,PEND,RUN,PSUSP,USUSP,SSUSP,DONE,EXIT,UNKWN,ZOMBI;
 
-     public static JobStatus fromString(String status){
-        if(status != null){
-            if("C".equals(status)){
-                return JobStatus.C;
-            }else if("E".equals(status)){
-                return JobStatus.E;
-            }else if("H".equals(status)){
-                return JobStatus.H;
-            }else if("Q".equals(status)){
-                return JobStatus.Q;
-            }else if("R".equals(status)){
-                return JobStatus.R;
-            }else if("T".equals(status)){
-                return JobStatus.T;
-            }else if("W".equals(status)){
-                return JobStatus.W;
-            }else if("S".equals(status)){
-                return JobStatus.S;
-            }else if("F".equals(status)){
-                return JobStatus.F;
-            }else if("S".equals(status)){
-                return JobStatus.S;
-            }else if("CA".equals(status)){
-                return JobStatus.CA;
-            }else if("CF".equals(status)){
-                return JobStatus.CF;
-            }else if("CD".equals(status)){
-                return JobStatus.CD;
-            }else if("CG".equals(status)){
-                return JobStatus.CG;
-            }else if("NF".equals(status)){
-                return JobStatus.NF;
-            }else if("PD".equals(status)){
-                return JobStatus.PD;
-            }else if("PR".equals(status)){
-                return JobStatus.PR;
-            }else if("TO".equals(status)){
-                return JobStatus.TO;
-            }else if("U".equals(status)){
-                return JobStatus.U;
-            }else if("qw".equals(status)){
-                return JobStatus.qw;
-            }else if("t".equals(status)){
-                return JobStatus.t;
-            }else if("r".equals(status)){
-                return JobStatus.r;
-            }else if("h".equals(status)){
-                return JobStatus.h;
-            }else if("Er".equals(status)){
-                return JobStatus.Er;
-            }else if("Eqw".equals(status)){
-                return JobStatus.Er;
-            }else if("RUN".equals(status)){      // LSF starts here
-                return JobStatus.RUN;
-            }else if("PEND".equals(status)){
-                return JobStatus.PEND;
-            }else if("DONE".equals(status)){
-                return JobStatus.DONE;
-            }else if("PSUSP".equals(status)){
-                return JobStatus.PSUSP;
-            }else if("USUSP".equals(status)){
-                return JobStatus.USUSP;
-            }else if("SSUSP".equals(status)){
-                return JobStatus.SSUSP;
-            }else if("EXIT".equals(status)){
-                return JobStatus.EXIT;
-            }else if("ZOMBI".equals(status)){
-                return JobStatus.ZOMBI;
-            }
-        }
-         return JobStatus.U;
-     }
- }

http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/OutputParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/OutputParser.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/OutputParser.java
index 658a5bc..521e23f 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/OutputParser.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/OutputParser.java
@@ -22,6 +22,7 @@ package org.apache.airavata.gfac.core.cluster;
 
 import org.apache.airavata.gfac.core.JobDescriptor;
 import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.model.status.JobStatus;
 
 import java.util.Map;
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
index d61bd0c..e438a37 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
@@ -21,8 +21,8 @@
 package org.apache.airavata.gfac.core.cluster;
 
 import com.jcraft.jsch.Session;
-import org.apache.airavata.gfac.core.JobDescriptor;
 import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.model.status.JobStatus;
 
 import java.util.List;
 import java.util.Map;
@@ -34,128 +34,110 @@ import java.util.Map;
  */
 public interface RemoteCluster { // FIXME: replace SSHApiException with suitable exception.
 
-    /**
-     * This will submit a job to the cluster with a given pbs file and some parameters
-     *
-     * @param pbsFilePath  path of the pbs file
-     * @param workingDirectory working directory where pbs should has to copy
-     * @return jobId after successful job submission
-     * @throws SSHApiException throws exception during error
-     */
-    public String submitBatchJobWithScript(String pbsFilePath, String workingDirectory) throws SSHApiException;
-
-    /**
-     * This will submit the given job and not performing any monitoring
-     *
-     * @param jobDescriptor  job descriptor to submit to cluster, this contains all the parameter
-     * @return jobID after successful job submission.
-     * @throws SSHApiException  throws exception during error
-     */
-    public String submitBatchJob(JobDescriptor jobDescriptor) throws SSHApiException;
-
-    /**
-     * This will copy the localFile to remoteFile location in configured cluster
-     *
-     * @param remoteFile remote file location, this can be a directory too
-     * @param localFile local file path of the file which needs to copy to remote location
-     * @throws SSHApiException throws exception during error
-     */
-    public void scpTo(String remoteFile, String localFile) throws SSHApiException;
-
-    /**
-     * This will copy a remote file in path rFile to local file lFile
-     * @param remoteFile remote file path, this has to be a full qualified path
-     * @param localFile This is the local file to copy, this can be a directory too
-     * @throws SSHApiException
-     */
-    public void scpFrom(String remoteFile, String localFile) throws SSHApiException;
-
-    /**
-     * This will copy a remote file in path rFile to local file lFile
-     * @param remoteFile remote file path, this has to be a full qualified path
-     * @param localFile This is the local file to copy, this can be a directory too
-     * @throws SSHApiException
-     */
-    public void scpThirdParty(String remoteFileSorce, String remoteFileTarget) throws SSHApiException;
-    
-    /**
-     * This will create directories in computing resources
-     * @param directoryPath the full qualified path for the directory user wants to create
-     * @throws SSHApiException throws during error
-     */
-    public void makeDirectory(String directoryPath) throws SSHApiException;
-
-
-    /**
-     * This will get the job description of a job which is there in the cluster
-     * if jbo is not available with the given ID it returns
-     * @param jobID jobId has to pass
-     * @return Returns full job description of the job which submitted successfully
-     * @throws SSHApiException throws exception during error
-     */
-    public JobDescriptor getJobDescriptorById(String jobID) throws SSHApiException;
-
-    /**
-     * This will delete the given job from the queue
-     *
-     * @param jobID  jobId of the job which user wants to delete
-     * @return return the description of the deleted job
-     * @throws SSHApiException throws exception during error
-     */
-    public JobDescriptor cancelJob(String jobID) throws SSHApiException;
-
-    /**
-     * This will get the job status of the the job associated with this jobId
-     *
-     * @param jobID jobId of the job user want to get the status
-     * @return job status of the given jobID
-     * @throws SSHApiException throws exception during error
-     */
-    public JobStatus getJobStatus(String jobID) throws SSHApiException;
-    /**
-     * This will get the job status of the the job associated with this jobId
-     *
-     * @param jobName jobName of the job user want to get the status
-     * @return jobId of the given jobName
-     * @throws SSHApiException throws exception during error
-     */
-    public String getJobIdByJobName(String jobName, String userName) throws SSHApiException;
-
-    /**
-     * This method can be used to poll the jobstatuses based on the given
-     * user but we should pass the jobID list otherwise we will get unwanted
-     * job statuses which submitted by different middleware outside apache
-     * airavata with the same uername which we are not considering
-     * @param userName userName of the jobs which required to get the status
-     * @param jobIDs precises set of jobIDs
-     * @return
-     */
-    public void getJobStatuses(String userName,Map<String,JobStatus> jobIDs)throws SSHApiException;
-    /**
-     * This will list directories in computing resources
-     * @param directoryPath the full qualified path for the directory user wants to create
-     * @throws SSHApiException throws during error
-     */
-    public List<String> listDirectory(String directoryPath) throws SSHApiException;
-
-    /**
-     * This method can be used to get created ssh session
-     * to reuse the created session.
-     * @throws SSHApiException
-     */
-    public Session getSession() throws SSHApiException;
-    
-    /**
-     * This method can be used to close the connections initialized
-     * to handle graceful shutdown of the system
-     * @throws SSHApiException
-     */
-    public void disconnect() throws SSHApiException;
-
-    /**
-     * This gives the server Info
-     * @return
-     */
-    public ServerInfo getServerInfo();
+	/**
+	 * This will submit a job to the cluster with a given pbs file and some parameters
+	 *
+	 * @param jobScriptFilePath path of the job script file
+	 * @param workingDirectory  working directory where pbs should has to copy
+	 * @return jobId after successful job submission
+	 * @throws SSHApiException throws exception during error
+	 */
+	public String submitBatchJob(String jobScriptFilePath, String workingDirectory) throws SSHApiException;
+
+	/**
+	 * This will copy the localFile to remoteFile location in configured cluster
+	 *
+	 * @param sourceFile      remote file location, this can be a directory too
+	 * @param destinationFile local file path of the file which needs to copy to remote location
+	 * @throws SSHApiException throws exception during error
+	 */
+	public void scpTo(String sourceFile, String destinationFile) throws SSHApiException;
+
+	/**
+	 * This will copy a remote file in path rFile to local file lFile
+	 *
+	 * @param sourceFile      remote file path, this has to be a full qualified path
+	 * @param destinationFile This is the local file to copy, this can be a directory too
+	 */
+	public void scpFrom(String sourceFile, String destinationFile) throws SSHApiException;
+
+	/**
+	 * This wil copy source remote file to target remote file.
+	 *
+	 * @param remoteFileSource remote file path, this has to be a full qualified path
+	 * @param remoteFileTarget This is the local file to copy, this can be a directory too
+	 */
+	public void scpThirdParty(String remoteFileSource, String remoteFileTarget) throws SSHApiException;
+
+	/**
+	 * This will create directories in computing resources
+	 *
+	 * @param directoryPath the full qualified path for the directory user wants to create
+	 * @throws SSHApiException throws during error
+	 */
+	public void makeDirectory(String directoryPath) throws SSHApiException;
+
+	/**
+	 * This will delete the given job from the queue
+	 *
+	 * @param jobID jobId of the job which user wants to delete
+	 * @return return the description of the deleted job
+	 * @throws SSHApiException throws exception during error
+	 */
+	public boolean cancelJob(String jobID) throws SSHApiException;
+
+	/**
+	 * This will get the job status of the the job associated with this jobId
+	 *
+	 * @param jobID jobId of the job user want to get the status
+	 * @return job status of the given jobID
+	 * @throws SSHApiException throws exception during error
+	 */
+	public JobStatus getJobStatus(String jobID) throws SSHApiException;
+
+	/**
+	 * This will get the job status of the the job associated with this jobId
+	 *
+	 * @param jobName jobName of the job user want to get the status
+	 * @return jobId of the given jobName
+	 * @throws SSHApiException throws exception during error
+	 */
+	public String getJobIdByJobName(String jobName, String userName) throws SSHApiException;
+
+	/**
+	 * This method can be used to poll the jobstatuses based on the given
+	 * user but we should pass the jobID list otherwise we will get unwanted
+	 * job statuses which submitted by different middleware outside apache
+	 * airavata with the same uername which we are not considering
+	 *
+	 * @param userName userName of the jobs which required to get the status
+	 * @param jobIDs   precises set of jobIDs
+	 */
+	public void getJobStatuses(String userName, Map<String, JobStatus> jobIDs) throws SSHApiException;
+
+	/**
+	 * This will list directories in computing resources
+	 *
+	 * @param directoryPath the full qualified path for the directory user wants to create
+	 * @throws SSHApiException throws during error
+	 */
+	public List<String> listDirectory(String directoryPath) throws SSHApiException;
+
+	/**
+	 * This method can be used to get created ssh session
+	 * to reuse the created session.
+	 */
+	public Session getSession() throws SSHApiException;
+
+	/**
+	 * This method can be used to close the connections initialized
+	 * to handle graceful shutdown of the system
+	 */
+	public void disconnect() throws SSHApiException;
+
+	/**
+	 * This gives the server Info
+	 */
+	public ServerInfo getServerInfo();
 
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
index 10c881c..16943fe 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
@@ -31,6 +31,7 @@ import org.apache.airavata.registry.cpi.ExperimentCatalog;
 import org.apache.curator.framework.CuratorFramework;
 
 import java.util.List;
+import java.util.Map;
 
 public class ProcessContext {
 	// process model
@@ -46,6 +47,7 @@ public class ProcessContext {
 	private List<Task> taskChain;
 	private GatewayResourceProfile gatewayResourceProfile;
 	private RemoteCluster remoteCluster;
+	private Map<String, String> sshProperties;
 
 	public ProcessContext(String processId, String gatewayId, String tokenId) {
 		this.processId = processId;
@@ -138,4 +140,12 @@ public class ProcessContext {
 	public void setRemoteCluster(RemoteCluster remoteCluster) {
 		this.remoteCluster = remoteCluster;
 	}
+
+	public Map<String, String> getSshProperties() {
+		return sshProperties;
+	}
+
+	public void setSshProperties(Map<String, String> sshProperties) {
+		this.sshProperties = sshProperties;
+	}
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/api/job/LSFJobConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/api/job/LSFJobConfiguration.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/api/job/LSFJobConfiguration.java
index 26941cd..9e2a913 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/api/job/LSFJobConfiguration.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/api/job/LSFJobConfiguration.java
@@ -20,6 +20,7 @@
 */
 package org.apache.airavata.gfac.gsi.ssh.api.job;
 
+import org.apache.airavata.gfac.core.AbstractJobManagerConfiguration;
 import org.apache.airavata.gfac.core.JobManagerConfiguration;
 import org.apache.airavata.gfac.core.cluster.OutputParser;
 import org.apache.airavata.gfac.core.cluster.RawCommandInfo;

http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/impl/GSISSHAbstractCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/impl/GSISSHAbstractCluster.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/impl/GSISSHAbstractCluster.java
index 113e4ec..5f44843 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/impl/GSISSHAbstractCluster.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/impl/GSISSHAbstractCluster.java
@@ -37,7 +37,6 @@ import org.apache.airavata.gfac.core.authentication.SSHPasswordAuthentication;
 import org.apache.airavata.gfac.core.authentication.SSHPublicKeyAuthentication;
 import org.apache.airavata.gfac.core.authentication.SSHPublicKeyFileAuthentication;
 import org.apache.airavata.gfac.core.cluster.RemoteCluster;
-import org.apache.airavata.gfac.core.cluster.JobStatus;
 import org.apache.airavata.gfac.core.cluster.OutputParser;
 import org.apache.airavata.gfac.core.cluster.RawCommandInfo;
 import org.apache.airavata.gfac.core.cluster.ServerInfo;
@@ -47,6 +46,7 @@ import org.apache.airavata.gfac.gsi.ssh.jsch.ExtendedJSch;
 import org.apache.airavata.gfac.gsi.ssh.util.SSHAPIUIKeyboardInteractive;
 import org.apache.airavata.gfac.gsi.ssh.util.SSHKeyPasswordHandler;
 import org.apache.airavata.gfac.gsi.ssh.util.SSHUtils;
+import org.apache.airavata.model.status.JobStatus;
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -235,11 +235,11 @@ public class GSISSHAbstractCluster implements RemoteCluster {
         }
     }
 
-    public synchronized JobDescriptor cancelJob(String jobID) throws SSHApiException {
+    public synchronized boolean cancelJob(String jobID) throws SSHApiException {
         JobStatus jobStatus = getJobStatus(jobID);
         if (jobStatus == null || jobStatus == JobStatus.U) {
             log.info("Validation before cancel is failed, couldn't found job in remote host to cancel. Job may be already completed|failed|canceled");
-            return null;
+            return false;
         }
         RawCommandInfo rawCommandInfo = jobManagerConfiguration.getCancelCommand(jobID);
 
@@ -249,15 +249,10 @@ public class GSISSHAbstractCluster implements RemoteCluster {
         String outputifAvailable = getOutputifAvailable(stdOutReader, "Error reading output of job submission", jobManagerConfiguration.getBaseCancelCommand());
         // this might not be the case for all teh resources, if so Cluster implementation can override this method
         // because here after cancelling we try to get the job description and return it back
-        try {
-            return this.getJobDescriptorById(jobID);
-        } catch (Exception e) {
-            //its ok to fail to get status when the job is gone
-            return null;
-        }
+	    return true;
     }
 
-    public synchronized String submitBatchJobWithScript(String scriptPath, String workingDirectory) throws SSHApiException {
+    public synchronized String submitBatchJob(String scriptPath, String workingDirectory) throws SSHApiException {
         this.scpTo(workingDirectory, scriptPath);
 
         // since this is a constant we do not ask users to fill this
@@ -278,85 +273,6 @@ public class GSISSHAbstractCluster implements RemoteCluster {
         return  outputParser.parseJobSubmission(outputifAvailable);
     }
 
-
-    @Override
-    public synchronized String submitBatchJob(JobDescriptor jobDescriptor) throws SSHApiException {
-        TransformerFactory factory = TransformerFactory.newInstance();
-        URL resource = this.getClass().getClassLoader().getResource(jobManagerConfiguration.getJobDescriptionTemplateName());
-
-        if (resource == null) {
-            String error = "System configuration file '" + jobManagerConfiguration.getJobDescriptionTemplateName()
-                    + "' not found in the classpath";
-            throw new SSHApiException(error);
-        }
-
-        Source xslt = new StreamSource(new File(resource.getPath()));
-        Transformer transformer;
-        StringWriter results = new StringWriter();
-        File tempPBSFile = null;
-        try {
-            // generate the pbs script using xslt
-            transformer = factory.newTransformer(xslt);
-            Source text = new StreamSource(new ByteArrayInputStream(jobDescriptor.toXML().getBytes()));
-            transformer.transform(text, new StreamResult(results));
-            String scriptContent = results.toString().replaceAll("^[ |\t]*\n$", "");
-            if (scriptContent.startsWith("\n")) {
-                scriptContent = scriptContent.substring(1);
-            }
-//            log.debug("generated PBS:" + results.toString());
-
-            // creating a temporary file using pbs script generated above
-            int number = new SecureRandom().nextInt();
-            number = (number < 0 ? -number : number);
-
-            tempPBSFile = new File(Integer.toString(number) + jobManagerConfiguration.getScriptExtension());
-            FileUtils.writeStringToFile(tempPBSFile, scriptContent);
-
-            //reusing submitBatchJobWithScript method to submit a job
-            String jobID = null;
-            int retry = 3;
-            while(retry>0) {
-                try {
-                    jobID = this.submitBatchJobWithScript(tempPBSFile.getAbsolutePath(),
-                            jobDescriptor.getWorkingDirectory());
-                    retry=0;
-                } catch (SSHApiException e) {
-                    retry--;
-                    if(retry==0) {
-                        throw e;
-                    }else{
-                        try {
-                            Thread.sleep(5000);
-                        } catch (InterruptedException e1) {
-                            log.error(e1.getMessage(), e1);
-                        }
-                        log.error("Error occured during job submission but doing a retry");
-                    }
-                }
-            }
-            log.debug("Job has successfully submitted, JobID : " + jobID);
-            if (jobID != null) {
-                return jobID.replace("\n", "");
-            } else {
-                return null;
-            }
-            } catch (TransformerConfigurationException e) {
-            throw new SSHApiException("Error parsing PBS transformation", e);
-        } catch (TransformerException e) {
-            throw new SSHApiException("Error generating PBS script", e);
-        } catch (IOException e) {
-            throw new SSHApiException("An exception occurred while connecting to server." +
-                    "Connecting server - " + serverInfo.getHost() + ":" + serverInfo.getPort() +
-                    " connecting user name - "
-                    + serverInfo.getUserName(), e);
-        } finally {
-            if (tempPBSFile != null) {
-                tempPBSFile.delete();
-            }
-        }
-    }
-
-
     public void generateJobScript(JobDescriptor jobDescriptor) throws SSHApiException {
         TransformerFactory factory = TransformerFactory.newInstance();
         URL resource = this.getClass().getClassLoader().getResource(jobManagerConfiguration.getJobDescriptionTemplateName());
@@ -767,4 +683,95 @@ public class GSISSHAbstractCluster implements RemoteCluster {
 			// Oh well. They don't have a known hosts in home.
 		}
 	}
+
+
+	/**
+	 * This will contains all the PBS specific job statuses.
+	 * C -  Job is completed after having run/
+	 * E -  Job is exiting after having run.
+	 * H -  Job is held.
+	 * Q -  job is queued, eligible to run or routed.
+	 * R -  job is running.
+	 * T -  job is being moved to new location.
+	 * W -  job is waiting for its execution time
+	 * (-a option) to be reached.
+	 * S -  (Unicos only) job is suspend.
+	 */
+	public enum HPCJobStatus {
+		C, E, H, Q, R, T, W, S,U,F,CA,CD,CF,CG,NF,PD,PR,TO,qw,t,r,h,Er,Eqw,PEND,RUN,PSUSP,USUSP,SSUSP,DONE,EXIT,UNKWN,ZOMBI;
+
+		public static HPCJobStatus fromString(String status){
+			if(status != null){
+				if("C".equals(status)){
+					return HPCJobStatus.C;
+				}else if("E".equals(status)){
+					return HPCJobStatus.E;
+				}else if("H".equals(status)){
+					return HPCJobStatus.H;
+				}else if("Q".equals(status)){
+					return HPCJobStatus.Q;
+				}else if("R".equals(status)){
+					return HPCJobStatus.R;
+				}else if("T".equals(status)){
+					return HPCJobStatus.T;
+				}else if("W".equals(status)){
+					return HPCJobStatus.W;
+				}else if("S".equals(status)){
+					return HPCJobStatus.S;
+				}else if("F".equals(status)){
+					return HPCJobStatus.F;
+				}else if("S".equals(status)){
+					return HPCJobStatus.S;
+				}else if("CA".equals(status)){
+					return HPCJobStatus.CA;
+				}else if("CF".equals(status)){
+					return HPCJobStatus.CF;
+				}else if("CD".equals(status)){
+					return HPCJobStatus.CD;
+				}else if("CG".equals(status)){
+					return HPCJobStatus.CG;
+				}else if("NF".equals(status)){
+					return HPCJobStatus.NF;
+				}else if("PD".equals(status)){
+					return HPCJobStatus.PD;
+				}else if("PR".equals(status)){
+					return HPCJobStatus.PR;
+				}else if("TO".equals(status)){
+					return HPCJobStatus.TO;
+				}else if("U".equals(status)){
+					return HPCJobStatus.U;
+				}else if("qw".equals(status)){
+					return HPCJobStatus.qw;
+				}else if("t".equals(status)){
+					return HPCJobStatus.t;
+				}else if("r".equals(status)){
+					return HPCJobStatus.r;
+				}else if("h".equals(status)){
+					return HPCJobStatus.h;
+				}else if("Er".equals(status)){
+					return HPCJobStatus.Er;
+				}else if("Eqw".equals(status)){
+					return HPCJobStatus.Er;
+				}else if("RUN".equals(status)){      // LSF starts here
+					return HPCJobStatus.RUN;
+				}else if("PEND".equals(status)){
+					return HPCJobStatus.PEND;
+				}else if("DONE".equals(status)){
+					return HPCJobStatus.DONE;
+				}else if("PSUSP".equals(status)){
+					return HPCJobStatus.PSUSP;
+				}else if("USUSP".equals(status)){
+					return HPCJobStatus.USUSP;
+				}else if("SSUSP".equals(status)){
+					return HPCJobStatus.SSUSP;
+				}else if("EXIT".equals(status)){
+					return HPCJobStatus.EXIT;
+				}else if("ZOMBI".equals(status)){
+					return HPCJobStatus.ZOMBI;
+				}
+			}
+			return HPCJobStatus.U;
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/impl/HPCRemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/impl/HPCRemoteCluster.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/impl/HPCRemoteCluster.java
index cc688e2..e1d9c27 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/impl/HPCRemoteCluster.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/impl/HPCRemoteCluster.java
@@ -20,27 +20,191 @@
 */
 package org.apache.airavata.gfac.gsi.ssh.impl;
 
+import com.jcraft.jsch.Channel;
+import com.jcraft.jsch.ChannelExec;
+import com.jcraft.jsch.JSch;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import com.jcraft.jsch.UserInfo;
+import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
 import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.gfac.core.authentication.SSHKeyAuthentication;
+import org.apache.airavata.gfac.core.cluster.CommandInfo;
+import org.apache.airavata.gfac.core.cluster.CommandOutput;
+import org.apache.airavata.gfac.core.cluster.RemoteCluster;
 import org.apache.airavata.gfac.core.cluster.ServerInfo;
 import org.apache.airavata.gfac.core.JobManagerConfiguration;
+import org.apache.airavata.model.status.JobStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
+import java.util.Map;
+
 
 /**
  * This is the default implementation of a cluster.
  * this has most of the methods to be used by the end user of the
  * library.
  */
-public class HPCRemoteCluster extends GSISSHAbstractCluster {
+public class HPCRemoteCluster implements RemoteCluster{
     private static final Logger log = LoggerFactory.getLogger(HPCRemoteCluster.class);
+	private final SSHKeyAuthentication authentication;
+	private final ServerInfo serverInfo;
+	private final JobManagerConfiguration jobManagerConfiguration;
+	private final JSch jSch;
+	private Session session;
+
+	public HPCRemoteCluster(ServerInfo serverInfo, JobManagerConfiguration jobManagerConfiguration, AuthenticationInfo
+			authenticationInfo) throws AiravataException {
+		try {
+			this.serverInfo = serverInfo;
+			this.jobManagerConfiguration = jobManagerConfiguration;
+			if (authenticationInfo instanceof SSHKeyAuthentication) {
+				authentication = (SSHKeyAuthentication) authenticationInfo;
+			} else {
+				throw new AiravataException("Support ssh key authentication only");
+			}
+
+			jSch = new JSch();
+			jSch.addIdentity(authentication.getPrivateKeyFilePath(), authentication.getPublicKeyFilePath(), authentication
+					.getPassphrase().getBytes());
+			session = jSch.getSession(serverInfo.getUserName(), serverInfo.getHost(), serverInfo.getPort());
+			session.setUserInfo(new DefaultUserInfo(serverInfo.getUserName(), null, authentication.getPassphrase()));
+			session.connect(); // 0 connection timeout
+		} catch (JSchException e) {
+			throw new AiravataException("JSch initialization error ", e);
+		}
+	}
+
+	@Override
+	public String submitBatchJob(String jobScriptFilePath, String workingDirectory) throws SSHApiException {
+		
+		return null;
+	}
+
+	@Override
+	public void scpTo(String sourceFile, String destinationFile) throws SSHApiException {
+
+	}
+
+	@Override
+	public void scpFrom(String sourceFile, String destinationFile) throws SSHApiException {
+
+	}
+
+	@Override
+	public void scpThirdParty(String remoteFileSource, String remoteFileTarget) throws SSHApiException {
+
+	}
+
+	@Override
+	public void makeDirectory(String directoryPath) throws SSHApiException {
+
+	}
+
+	@Override
+	public boolean cancelJob(String jobID) throws SSHApiException {
+		return false;
+	}
+
+	@Override
+	public JobStatus getJobStatus(String jobID) throws SSHApiException {
+		return null;
+	}
+
+	@Override
+	public String getJobIdByJobName(String jobName, String userName) throws SSHApiException {
+		return null;
+	}
+
+	@Override
+	public void getJobStatuses(String userName, Map<String, JobStatus> jobIDs) throws SSHApiException {
+
+	}
+
+	@Override
+	public List<String> listDirectory(String directoryPath) throws SSHApiException {
+		return null;
+	}
+
+	@Override
+	public Session getSession() throws SSHApiException {
+		return null;
+	}
+
+	@Override
+	public void disconnect() throws SSHApiException {
+
+	}
+
+	private void executeCommand(CommandInfo commandInfo, CommandOutput commandOutput) throws SSHApiException {
+		String command = commandInfo.getCommand();
+		ChannelExec channelExec = null;
+		try {
+			if (!session.isConnected()) {
+				session.connect();
+			}
+			channelExec = ((ChannelExec) session.openChannel("exec"));
+			channelExec.setCommand(command);
+		    channelExec.setInputStream(null);
+			channelExec.setErrStream(commandOutput.getStandardError());
+			channelExec.connect();
+			commandOutput.onOutput(channelExec);
+		} catch (JSchException e) {
+			throw new SSHApiException("Unable to execute command - ", e);
+		}finally {
+			//Only disconnecting the channel, session can be reused
+			channelExec.disconnect();
+		}
+	}
+
+	@Override
+	public ServerInfo getServerInfo() {
+		return this.serverInfo;
+	}
+
+	private class DefaultUserInfo implements UserInfo {
+
+		private String userName;
+		private String password;
+		private String passphrase;
+
+		public DefaultUserInfo(String userName, String password, String passphrase) {
+			this.userName = userName;
+			this.password = password;
+			this.passphrase = passphrase;
+		}
+
+		@Override
+		public String getPassphrase() {
+			return null;
+		}
+
+		@Override
+		public String getPassword() {
+			return null;
+		}
+
+		@Override
+		public boolean promptPassword(String s) {
+			return false;
+		}
+
+		@Override
+		public boolean promptPassphrase(String s) {
+			return false;
+		}
+
+		@Override
+		public boolean promptYesNo(String s) {
+			return false;
+		}
 
+		@Override
+		public void showMessage(String s) {
 
-    public HPCRemoteCluster(JobManagerConfiguration jobManagerConfiguration) {
-        super(jobManagerConfiguration);
-    }
-    public HPCRemoteCluster(ServerInfo serverInfo, AuthenticationInfo authenticationInfo, JobManagerConfiguration config) throws SSHApiException {
-        super(serverInfo, authenticationInfo,config);
-    }
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
new file mode 100644
index 0000000..5ab2e96
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
@@ -0,0 +1,96 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.impl;
+
+import com.google.common.eventbus.EventBus;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.LocalEventPublisher;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.gfac.core.GFacEngine;
+import org.apache.airavata.gfac.core.GFacException;
+import org.apache.airavata.gfac.core.cluster.RemoteCluster;
+import org.apache.airavata.gfac.core.cluster.ServerInfo;
+import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.AppCatalog;
+import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.airavata.registry.cpi.ExperimentCatalog;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+
+import java.util.Map;
+
+public abstract class Factory {
+
+	private static GFacEngine engine;
+	private static Map<String, RemoteCluster> remoteClusterMap;
+	private static LocalEventPublisher localEventPublisher;
+	private static CuratorFramework curatorClient;
+
+	public static GFacEngine getGFacEngine() throws GFacException {
+		if (engine == null) {
+			synchronized (GFacEngineImpl.class) {
+				if (engine == null) {
+					engine = new GFacEngineImpl();
+				}
+			}
+		}
+		return engine;
+	}
+
+	public static RemoteCluster getRemoteCluster(ServerInfo serverInfo) {
+		return remoteClusterMap.get(serverInfo.getHost());
+	}
+
+	public static ExperimentCatalog getDefaultExpCatalog() throws RegistryException {
+		return RegistryFactory.getDefaultExpCatalog();
+	}
+
+	public static AppCatalog getDefaultAppCatalog() throws AppCatalogException {
+		return RegistryFactory.getAppCatalog();
+	}
+
+	public static LocalEventPublisher getLocalEventPublisher() {
+		if (localEventPublisher == null) {
+			synchronized (LocalEventPublisher.class) {
+				if (localEventPublisher == null) {
+					localEventPublisher = new LocalEventPublisher(new EventBus());
+				}
+			}
+		}
+		return localEventPublisher;
+	}
+
+	public static CuratorFramework getCuratorClient() throws ApplicationSettingsException {
+		if (curatorClient == null) {
+			synchronized (Factory.class) {
+				if (curatorClient == null) {
+					String connectionSting = ServerSettings.getZookeeperConnection();
+					RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
+					curatorClient = CuratorFrameworkFactory.newClient(connectionSting, retryPolicy);
+				}
+			}
+		}
+		return curatorClient;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngine.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngine.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngine.java
deleted file mode 100644
index 9c32913..0000000
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngine.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-
-package org.apache.airavata.gfac.impl;
-
-import org.apache.airavata.gfac.core.GFacException;
-import org.apache.airavata.gfac.core.config.DataTransferTaskConfig;
-import org.apache.airavata.gfac.core.config.GFacYamlConfigruation;
-import org.apache.airavata.gfac.core.config.JobSubmitterTaskConfig;
-import org.apache.airavata.gfac.core.config.ResourceConfig;
-import org.apache.airavata.gfac.core.context.ProcessContext;
-import org.apache.airavata.gfac.core.task.Task;
-import org.apache.airavata.model.appcatalog.computeresource.DataMovementProtocol;
-import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
-import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
-import org.apache.airavata.model.application.io.DataType;
-import org.apache.airavata.model.application.io.InputDataObjectType;
-import org.apache.airavata.model.process.ProcessModel;
-import org.apache.airavata.model.task.TaskModel;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class GFacEngine {
-	private static GFacEngine engine;
-	Map<JobSubmissionProtocol, Task> jobSubmissionTask;
-	Map<DataMovementProtocol, Task> dataMovementTask;
-	Map<ResourceJobManagerType, ResourceConfig> resources;
-
-
-	private GFacEngine() throws GFacException {
-		GFacYamlConfigruation config = new GFacYamlConfigruation();
-		for (JobSubmitterTaskConfig jobSubmitterTaskConfig : config.getJobSbumitters()) {
-			jobSubmissionTask.put(jobSubmitterTaskConfig.getSubmissionProtocol(), null);
-		}
-
-		for (DataTransferTaskConfig dataTransferTaskConfig : config.getFileTransferTasks()) {
-			dataMovementTask.put(dataTransferTaskConfig.getTransferProtocol(), null);
-		}
-
-		for (ResourceConfig resourceConfig : config.getResourceConfiguration()) {
-			resources.put(resourceConfig.getJobManagerType(), resourceConfig);
-		}
-	}
-
-	public static GFacEngine getInstance() throws GFacException {
-		if (engine == null) {
-			synchronized (GFacEngine.class) {
-				if (engine == null) {
-					engine = new GFacEngine();
-				}
-			}
-		}
-		return engine;
-	}
-
-	public ProcessContext populateProcessContext(ProcessContext processContext) {
-		processContext.setProcessModel(new ProcessModel()); // TODO: get rocess model from app catalog
-		// TODO: set datamovement protocol and jobsubmission protocol
-		//TODO: set up gatewayResourceProfile.
-		return processContext;
-	}
-
-	public void createTaskChain(ProcessContext processContext) throws GFacException {
-		List<InputDataObjectType> processInputs = processContext.getProcessModel().getProcessInputs();
-		List<TaskModel> taskChain = new ArrayList<>();
-		if (processInputs != null) {
-			for (InputDataObjectType processInput : processInputs) {
-				DataType type = processInput.getType();
-				switch (type) {
-					case STDERR:
-						//
-						break;
-					case STDOUT:
-						//
-						break;
-					case URI:
-						// add URI type Task
-						break;
-					default:
-						// nothing to do
-						break;
-				}
-			}
-		}
-	}
-
-	public void executeProcess(ProcessContext processContext) throws GFacException {
-
-
-	}
-
-	public void recoverProcess(ProcessContext processContext) throws GFacException {
-
-	}
-
-	public void runProcessOutflow(ProcessContext processContext) throws GFacException {
-
-	}
-
-	public void recoverProcessOutflow(ProcessContext processContext) throws GFacException {
-
-	}
-
-	public void cancelProcess() throws GFacException {
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
new file mode 100644
index 0000000..b2f9fd3
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -0,0 +1,128 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.gfac.impl;
+
+import org.apache.airavata.gfac.core.GFacEngine;
+import org.apache.airavata.gfac.core.GFacException;
+import org.apache.airavata.gfac.core.config.DataTransferTaskConfig;
+import org.apache.airavata.gfac.core.config.GFacYamlConfigruation;
+import org.apache.airavata.gfac.core.config.JobSubmitterTaskConfig;
+import org.apache.airavata.gfac.core.config.ResourceConfig;
+import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.gfac.core.task.Task;
+import org.apache.airavata.model.appcatalog.computeresource.DataMovementProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
+import org.apache.airavata.model.application.io.DataType;
+import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.process.ProcessModel;
+import org.apache.airavata.model.task.TaskModel;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class GFacEngineImpl implements GFacEngine {
+	private static GFacEngineImpl engine;
+	Map<JobSubmissionProtocol, Task> jobSubmissionTask;
+	Map<DataMovementProtocol, Task> dataMovementTask;
+	Map<ResourceJobManagerType, ResourceConfig> resources;
+
+
+	public GFacEngineImpl() throws GFacException {
+		GFacYamlConfigruation config = new GFacYamlConfigruation();
+		for (JobSubmitterTaskConfig jobSubmitterTaskConfig : config.getJobSbumitters()) {
+			jobSubmissionTask.put(jobSubmitterTaskConfig.getSubmissionProtocol(), null);
+		}
+
+		for (DataTransferTaskConfig dataTransferTaskConfig : config.getFileTransferTasks()) {
+			dataMovementTask.put(dataTransferTaskConfig.getTransferProtocol(), null);
+		}
+
+		for (ResourceConfig resourceConfig : config.getResourceConfiguration()) {
+			resources.put(resourceConfig.getJobManagerType(), resourceConfig);
+		}
+	}
+
+	@Override
+	public ProcessContext populateProcessContext(String experimentId, String processId, String gatewayId, String
+			tokenId) throws GFacException {
+		ProcessContext processContext = new ProcessContext(processId, gatewayId, tokenId);
+		processContext.setProcessModel(new ProcessModel());
+		// TODO: get process model from app catalog
+		// TODO: set datamovement protocol and jobsubmission protocol
+		// TODO: set up gatewayResourceProfile.
+		// TODO: set RemoteCluster
+		return processContext;
+	}
+
+	@Override
+	public void createTaskChain(ProcessContext processContext) throws GFacException {
+		List<InputDataObjectType> processInputs = processContext.getProcessModel().getProcessInputs();
+		List<TaskModel> taskChain = new ArrayList<>();
+		if (processInputs != null) {
+			for (InputDataObjectType processInput : processInputs) {
+				DataType type = processInput.getType();
+				switch (type) {
+					case STDERR:
+						//
+						break;
+					case STDOUT:
+						//
+						break;
+					case URI:
+						// add URI type Task
+
+						break;
+					default:
+						// nothing to do
+						break;
+				}
+			}
+		}
+	}
+
+	@Override
+	public void executeProcess(ProcessContext processContext) throws GFacException {
+
+	}
+
+	@Override
+	public void recoverProcess(ProcessContext processContext) throws GFacException {
+
+	}
+
+	@Override
+	public void runProcessOutflow(ProcessContext processContext) throws GFacException {
+
+	}
+
+	@Override
+	public void recoverProcessOutflow(ProcessContext processContext) throws GFacException {
+
+	}
+
+	@Override
+	public void cancelProcess() throws GFacException {
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
index f7616b3..63f0088 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
@@ -21,9 +21,8 @@
 
 package org.apache.airavata.gfac.impl;
 
-import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.gfac.core.GFacEngine;
 import org.apache.airavata.gfac.core.GFacException;
-import org.apache.airavata.gfac.core.GFacUtils;
 import org.apache.airavata.gfac.core.context.ProcessContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,30 +30,42 @@ import org.slf4j.LoggerFactory;
 public class GFacWorker implements Runnable {
 
 	private static final Logger log = LoggerFactory.getLogger(GFacWorker.class);
-	private final ProcessContext processContext;
+	private ProcessContext processContext;
+	private String experimentId;
+	private String processId;
+	private String gatewayId;
+	private String tokenId;
 
-    public   GFacWorker(ProcessContext processContext) throws AiravataException {
+
+    public   GFacWorker(ProcessContext processContext) throws GFacException {
         if (processContext == null) {
-            throw new AiravataException("Worker must initialize with valide processContext, Process context is null");
+            throw new GFacException("Worker must initialize with valide processContext, Process context is null");
         }
 	    this.processContext = processContext;
     }
+	public GFacWorker(String experimentId, String processId, String gatewayId, String tokenId) throws GFacException {
+		this.experimentId = experimentId;
+		this.processId = processId;
+		this.gatewayId = gatewayId;
+		this.tokenId = tokenId;
+	}
 
     @Override
     public void run() {
 	    try {
-		    GFacEngine engine = GFacEngine.getInstance();
+		    GFacEngine engine = Factory.getGFacEngine();
 		    ProcessType type = getProcessType(processContext);
+		    if (processContext == null) {
+			    processContext = engine.populateProcessContext(experimentId, processId, gatewayId, tokenId);
+		    }
 		    try {
 			    switch (type) {
 				    case NEW:
-					    engine.populateProcessContext(processContext);
 					    engine.createTaskChain(processContext);
 					    engine.executeProcess(processContext);
 					    break;
 				    case RECOVER:
 					    // recover the process
-					    engine.populateProcessContext(processContext);
 					    engine.createTaskChain(processContext);
 					    engine.recoverProcess(processContext);
 					    break;
@@ -64,7 +75,6 @@ public class GFacWorker implements Runnable {
 					    break;
 				    case RECOVER_OUTFLOW:
 					    // recover  outflow task;
-					    engine.populateProcessContext(processContext);
 					    engine.recoverProcessOutflow(processContext);
 			    }
 		    } catch (GFacException e) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
new file mode 100644
index 0000000..c0b458c
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
@@ -0,0 +1,443 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.impl;
+
+import com.jcraft.jsch.Channel;
+import com.jcraft.jsch.ChannelExec;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.gfac.gsi.ssh.impl.StandardOutReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Utility class to do all ssh and scp related things.
+ */
+public class SSHUtils {
+	private static final Logger log = LoggerFactory.getLogger(SSHUtils.class);
+
+
+	/**
+	 * This will copy a local file to a remote location
+	 *
+	 * @param remoteFile remote location you want to transfer the file, this cannot be a directory, if user pass
+	 *                   a dirctory we do copy it to that directory but we simply return the directory name
+	 *                   todo handle the directory name as input and return the proper final output file name
+	 * @param localFile  Local file to transfer, this can be a directory
+	 * @param session
+	 * @return returns the final remote file path, so that users can use the new file location
+	 * @throws IOException
+	 * @throws JSchException
+	 * @throws SSHApiException
+	 */
+	public static String scpTo(String remoteFile, String localFile, Session session) throws IOException, JSchException, SSHApiException {
+		FileInputStream fis = null;
+		String prefix = null;
+		if (new File(localFile).isDirectory()) {
+			prefix = localFile + File.separator;
+		}
+		boolean ptimestamp = true;
+
+		// exec 'scp -t rfile' remotely
+		String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + remoteFile;
+		Channel channel = session.openChannel("exec");
+
+		StandardOutReader stdOutReader = new StandardOutReader();
+		((ChannelExec) channel).setErrStream(stdOutReader.getStandardError());
+		((ChannelExec) channel).setCommand(command);
+
+		// get I/O streams for remote scp
+		OutputStream out = channel.getOutputStream();
+		InputStream in = channel.getInputStream();
+
+		channel.connect();
+
+		if (checkAck(in) != 0) {
+			String error = "Error Reading input Stream";
+			log.error(error);
+			throw new SSHApiException(error);
+		}
+
+		File _lfile = new File(localFile);
+
+		if (ptimestamp) {
+			command = "T" + (_lfile.lastModified() / 1000) + " 0";
+			// The access time should be sent here,
+			// but it is not accessible with JavaAPI ;-<
+			command += (" " + (_lfile.lastModified() / 1000) + " 0\n");
+			out.write(command.getBytes());
+			out.flush();
+			if (checkAck(in) != 0) {
+				String error = "Error Reading input Stream";
+				log.error(error);
+				throw new SSHApiException(error);
+			}
+		}
+
+		// send "C0644 filesize filename", where filename should not include '/'
+		long filesize = _lfile.length();
+		command = "C0644 " + filesize + " ";
+		if (localFile.lastIndexOf('/') > 0) {
+			command += localFile.substring(localFile.lastIndexOf('/') + 1);
+		} else {
+			command += localFile;
+		}
+		command += "\n";
+		out.write(command.getBytes());
+		out.flush();
+		if (checkAck(in) != 0) {
+			String error = "Error Reading input Stream";
+			log.error(error);
+			throw new SSHApiException(error);
+		}
+
+		// send a content of lFile
+		fis = new FileInputStream(localFile);
+		byte[] buf = new byte[1024];
+		while (true) {
+			int len = fis.read(buf, 0, buf.length);
+			if (len <= 0) break;
+			out.write(buf, 0, len); //out.flush();
+		}
+		fis.close();
+		fis = null;
+		// send '\0'
+		buf[0] = 0;
+		out.write(buf, 0, 1);
+		out.flush();
+		if (checkAck(in) != 0) {
+			String error = "Error Reading input Stream";
+			log.error(error);
+			throw new SSHApiException(error);
+		}
+		out.close();
+		stdOutReader.onOutput(channel);
+
+
+		channel.disconnect();
+		if (stdOutReader.getStdErrorString().contains("scp:")) {
+			throw new SSHApiException(stdOutReader.getStdErrorString());
+		}
+		//since remote file is always a file  we just return the file
+		return remoteFile;
+	}
+
+	/**
+	 * This method will copy a remote file to a local directory
+	 *
+	 * @param remoteFile remote file path, this has to be a full qualified path
+	 * @param localFile  This is the local file to copy, this can be a directory too
+	 * @param session
+	 * @return returns the final local file path of the new file came from the remote resource
+	 */
+	public static void scpFrom(String remoteFile, String localFile, Session session) throws IOException, JSchException, SSHApiException {
+		FileOutputStream fos = null;
+		try {
+			String prefix = null;
+			if (new File(localFile).isDirectory()) {
+				prefix = localFile + File.separator;
+			}
+
+			// exec 'scp -f remotefile' remotely
+			String command = "scp -f " + remoteFile;
+			Channel channel = session.openChannel("exec");
+			((ChannelExec) channel).setCommand(command);
+
+			StandardOutReader stdOutReader = new StandardOutReader();
+			((ChannelExec) channel).setErrStream(stdOutReader.getStandardError());
+			// get I/O streams for remote scp
+			OutputStream out = channel.getOutputStream();
+			InputStream in = channel.getInputStream();
+
+			channel.connect();
+
+			byte[] buf = new byte[1024];
+
+			// send '\0'
+			buf[0] = 0;
+			out.write(buf, 0, 1);
+			out.flush();
+
+			while (true) {
+				int c = checkAck(in);
+				if (c != 'C') {
+					break;
+				}
+
+				// read '0644 '
+				in.read(buf, 0, 5);
+
+				long filesize = 0L;
+				while (true) {
+					if (in.read(buf, 0, 1) < 0) {
+						// error
+						break;
+					}
+					if (buf[0] == ' ') break;
+					filesize = filesize * 10L + (long) (buf[0] - '0');
+				}
+
+				String file = null;
+				for (int i = 0; ; i++) {
+					in.read(buf, i, 1);
+					if (buf[i] == (byte) 0x0a) {
+						file = new String(buf, 0, i);
+						break;
+					}
+				}
+
+				//System.out.println("filesize="+filesize+", file="+file);
+
+				// send '\0'
+				buf[0] = 0;
+				out.write(buf, 0, 1);
+				out.flush();
+
+				// read a content of lfile
+				fos = new FileOutputStream(prefix == null ? localFile : prefix + file);
+				int foo;
+				while (true) {
+					if (buf.length < filesize) foo = buf.length;
+					else foo = (int) filesize;
+					foo = in.read(buf, 0, foo);
+					if (foo < 0) {
+						// error
+						break;
+					}
+					fos.write(buf, 0, foo);
+					filesize -= foo;
+					if (filesize == 0L) break;
+				}
+				fos.close();
+				fos = null;
+
+				if (checkAck(in) != 0) {
+					String error = "Error transfering the file content";
+					log.error(error);
+					throw new SSHApiException(error);
+				}
+
+				// send '\0'
+				buf[0] = 0;
+				out.write(buf, 0, 1);
+				out.flush();
+			}
+			stdOutReader.onOutput(channel);
+			if (stdOutReader.getStdErrorString().contains("scp:")) {
+				throw new SSHApiException(stdOutReader.getStdErrorString());
+			}
+
+		} catch (Exception e) {
+			log.error(e.getMessage(), e);
+		} finally {
+			try {
+				if (fos != null) fos.close();
+			} catch (Exception ee) {
+			}
+		}
+	}
+
+	/**
+	 * This method will copy a remote file to a local directory
+	 *
+	 * @param remoteFileSource remote file path, this has to be a full qualified path
+	 * @param remoteFileTarget  This is the local file to copy, this can be a directory too
+	 * @param session JSch Session object
+	 * @return returns the final local file path of the new file came from the remote resource
+	 */
+	public static void scpThirdParty(String remoteFileSource, String remoteFileTarget, Session session) throws IOException, JSchException, SSHApiException {
+		FileOutputStream fos = null;
+		try {
+			String prefix = null;
+
+			// exec 'scp -f remotefile' remotely
+			String command = "scp -3 " + remoteFileSource + " " + remoteFileTarget;
+			Channel channel = session.openChannel("exec");
+			((ChannelExec) channel).setCommand(command);
+
+			StandardOutReader stdOutReader = new StandardOutReader();
+			((ChannelExec) channel).setErrStream(stdOutReader.getStandardError());
+			// get I/O streams for remote scp
+			OutputStream out = channel.getOutputStream();
+			InputStream in = channel.getInputStream();
+
+			channel.connect();
+
+			byte[] buf = new byte[1024];
+
+			// send '\0'
+			buf[0] = 0;
+			out.write(buf, 0, 1);
+			out.flush();
+
+			while (true) {
+				int c = checkAck(in);
+				if (c != 'C') {
+					break;
+				}
+
+				// read '0644 '
+				in.read(buf, 0, 5);
+
+				long filesize = 0L;
+				while (true) {
+					if (in.read(buf, 0, 1) < 0) {
+						// error
+						break;
+					}
+					if (buf[0] == ' ') break;
+					filesize = filesize * 10L + (long) (buf[0] - '0');
+				}
+				int foo;
+				while (true) {
+					if (buf.length < filesize) foo = buf.length;
+					else foo = (int) filesize;
+
+					int len = in.read(buf, 0, foo);
+					if (len <= 0) break;
+					out.write(buf, 0, len);
+				}
+				// send '\0'
+				buf[0] = 0;
+				out.write(buf, 0, 1);
+				out.flush();
+				if (checkAck(in) != 0) {
+					String error = "Error transfering the file content";
+					log.error(error);
+					throw new SSHApiException(error);
+				}
+
+			}
+			out.close();
+
+			stdOutReader.onOutput(channel);
+			if (stdOutReader.getStdErrorString().contains("scp:")) {
+				throw new SSHApiException(stdOutReader.getStdErrorString());
+			}
+
+		} catch (Exception e) {
+			log.error(e.getMessage(), e);
+		} finally {
+			try {
+				if (fos != null) fos.close();
+			} catch (Exception ee) {
+			}
+		}
+	}
+
+	public static void makeDirectory(String path, Session session) throws IOException, JSchException, SSHApiException {
+
+		// exec 'scp -t rfile' remotely
+		String command = "mkdir -p " + path;
+		Channel channel = session.openChannel("exec");
+		StandardOutReader stdOutReader = new StandardOutReader();
+
+		((ChannelExec) channel).setCommand(command);
+
+
+		((ChannelExec) channel).setErrStream(stdOutReader.getStandardError());
+		try {
+			channel.connect();
+		} catch (JSchException e) {
+
+			channel.disconnect();
+//            session.disconnect();
+
+			throw new SSHApiException("Unable to retrieve command output. Command - " + command +
+					" on server - " + session.getHost() + ":" + session.getPort() +
+					" connecting user name - "
+					+ session.getUserName(), e);
+		}
+		stdOutReader.onOutput(channel);
+		if (stdOutReader.getStdErrorString().contains("mkdir:")) {
+			throw new SSHApiException(stdOutReader.getStdErrorString());
+		}
+
+		channel.disconnect();
+	}
+
+	public static List<String> listDirectory(String path, Session session) throws IOException, JSchException, SSHApiException {
+
+		// exec 'scp -t rfile' remotely
+		String command = "ls " + path;
+		Channel channel = session.openChannel("exec");
+		StandardOutReader stdOutReader = new StandardOutReader();
+
+		((ChannelExec) channel).setCommand(command);
+
+
+		((ChannelExec) channel).setErrStream(stdOutReader.getStandardError());
+		try {
+			channel.connect();
+		} catch (JSchException e) {
+
+			channel.disconnect();
+//            session.disconnect();
+
+			throw new SSHApiException("Unable to retrieve command output. Command - " + command +
+					" on server - " + session.getHost() + ":" + session.getPort() +
+					" connecting user name - "
+					+ session.getUserName(), e);
+		}
+		stdOutReader.onOutput(channel);
+		stdOutReader.getStdOutputString();
+		if (stdOutReader.getStdErrorString().contains("ls:")) {
+			throw new SSHApiException(stdOutReader.getStdErrorString());
+		}
+		channel.disconnect();
+		return Arrays.asList(stdOutReader.getStdOutputString().split("\n"));
+	}
+
+
+	static int checkAck(InputStream in) throws IOException {
+		int b = in.read();
+		if (b == 0) return b;
+		if (b == -1) return b;
+
+		if (b == 1 || b == 2) {
+			StringBuffer sb = new StringBuffer();
+			int c;
+			do {
+				c = in.read();
+				sb.append((char) c);
+			}
+			while (c != '\n');
+			if (b == 1) { // error
+				System.out.print(sb.toString());
+			}
+			if (b == 2) { // fatal error
+				System.out.print(sb.toString());
+			}
+		}
+		return b;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/e6622078/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPFileTransferTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPFileTransferTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPFileTransferTask.java
new file mode 100644
index 0000000..49560a6
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPFileTransferTask.java
@@ -0,0 +1,98 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.impl.task;
+
+import com.jcraft.jsch.JSch;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.Task;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.SSHUtils;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.task.DataStagingTaskModel;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Map;
+
+public class SCPFileTransferTask implements Task {
+
+	public static final int DEFAULT_SSH_PORT = 22;
+	private String password;
+	private String publicKeyPath;
+	private String passPhrase;
+	private String privateKeyPath;
+	private String userName;
+	private String hostName;
+	private String inputPath;
+
+
+	@Override
+	public void init(Map<String, String> properties) throws TaskException {
+		password = properties.get("password");
+		passPhrase = properties.get("passPhrase");
+		privateKeyPath = properties.get("privateKeyPath");
+		publicKeyPath = properties.get("publicKeyPath");
+		userName = properties.get("userName");
+		hostName = properties.get("hostName");
+		inputPath = properties.get("inputPath");
+	}
+
+	@Override
+	public TaskState execute(TaskContext taskContext) throws TaskException {
+		DataStagingTaskModel dataStagingTaskModel = new DataStagingTaskModel();
+		try {
+			URL sourceURL = new URL(dataStagingTaskModel.getSource());
+			URL destinationURL = new URL(dataStagingTaskModel.getDestination());
+
+			if (sourceURL.getProtocol().equalsIgnoreCase("file")) {  //  local --> Airavata --> RemoteCluster
+				taskContext.getParentProcessContext().getRemoteCluster().scpTo(sourceURL.getPath(),
+						dataStagingTaskModel.getDestination());
+			} else { // PGA(client) --> Airavata --> RemoteCluster
+				// PGA(client) --> Airavata
+				JSch jsch = new JSch();
+				jsch.addIdentity(privateKeyPath, passPhrase);
+				Session session = jsch.getSession(userName, hostName, DEFAULT_SSH_PORT);
+				SSHUtils.scpFrom(sourceURL.getPath(), inputPath, session);
+
+				// Airavata --> RemoteCluster
+				taskContext.getParentProcessContext().getRemoteCluster().scpTo(destinationURL.getPath(), inputPath);
+			}
+		} catch (MalformedURLException e) {
+			throw new TaskException("Wrong source or destination file path.", e);
+		} catch (SSHApiException e) {
+			throw new TaskException("Scp attempt failed", e);
+		} catch (JSchException | IOException e) {
+			throw new TaskException("Scp failed", e);
+		}
+		return null;
+	}
+
+	@Override
+	public TaskState recover(TaskContext taskContext) throws TaskException {
+		return null;
+	}
+
+
+}