You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/06/03 20:14:27 UTC

[10/39] airavata git commit: Refactored gfac sub modules, merged gfac-ssh, gfac-gsissh, gfac-local, gfac-monitor and gsissh modules and create gface-impl, removed implementation from gfac-core to gfac-impl

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/cpi/gfac_cpi_serviceConstants.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/cpi/gfac_cpi_serviceConstants.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/cpi/gfac_cpi_serviceConstants.java
new file mode 100644
index 0000000..14fd7fe
--- /dev/null
+++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/cpi/gfac_cpi_serviceConstants.java
@@ -0,0 +1,55 @@
+    /*
+     * Licensed to the Apache Software Foundation (ASF) under one or more
+     * contributor license agreements.  See the NOTICE file distributed with
+     * this work for additional information regarding copyright ownership.
+     * The ASF licenses this file to You under the Apache License, Version 2.0
+     * (the "License"); you may not use this file except in compliance with
+     * the License.  You may obtain a copy of the License at
+     *
+     *     http://www.apache.org/licenses/LICENSE-2.0
+     *
+     * Unless required by applicable law or agreed to in writing, software
+     * distributed under the License is distributed on an "AS IS" BASIS,
+     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+     * See the License for the specific language governing permissions and
+     * limitations under the License.
+     */
+/**
+ * Autogenerated by Thrift Compiler (0.9.1)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.airavata.gfac.cpi;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("all") public class gfac_cpi_serviceConstants {
+
+  public static final String GFAC_CPI_VERSION = "0.13.0";
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServer.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServer.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServer.java
new file mode 100644
index 0000000..b076145
--- /dev/null
+++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServer.java
@@ -0,0 +1,143 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.server;
+
+import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.common.utils.IServer;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.gfac.core.GFacThreadPoolExecutor;
+import org.apache.airavata.gfac.cpi.GfacService;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+
+public class GfacServer implements IServer{
+
+    private final static Logger logger = LoggerFactory.getLogger(GfacServer.class);
+	private static final String SERVER_NAME = "Gfac Server";
+	private static final String SERVER_VERSION = "1.0";
+
+    private IServer.ServerStatus status;
+
+	private TServer server;
+
+	public GfacServer() {
+		setStatus(IServer.ServerStatus.STOPPED);
+	}
+
+    public void StartGfacServer(GfacService.Processor<GfacServerHandler> gfacServerHandlerProcessor)
+            throws Exception {
+        try {
+            final int serverPort = Integer.parseInt(ServerSettings.getSetting(Constants.GFAC_SERVER_PORT, "8950"));
+            final String serverHost = ServerSettings.getSetting(Constants.GFAC_SERVER_HOST, null);
+
+            InetSocketAddress inetSocketAddress = new InetSocketAddress(serverHost, serverPort);
+
+			TServerTransport serverTransport = new TServerSocket(inetSocketAddress);
+
+            server = new TThreadPoolServer(new TThreadPoolServer.Args(serverTransport).processor(gfacServerHandlerProcessor));
+
+            new Thread() {
+				public void run() {
+					server.serve();
+					setStatus(IServer.ServerStatus.STOPPED);
+					logger.info("Gfac Server Stopped.");
+				}
+			}.start();
+			new Thread() {
+				public void run() {
+					while(!server.isServing()){
+						try {
+							Thread.sleep(500);
+						} catch (InterruptedException e) {
+							break;
+						}
+					}
+					if (server.isServing()){
+						setStatus(IServer.ServerStatus.STARTED);
+			            logger.info("Starting Gfac Server on Port " + serverPort);
+			            logger.info("Listening to Gfac Clients ....");
+					}
+				}
+			}.start();
+        } catch (TTransportException e) {
+            logger.error(e.getMessage());
+            setStatus(IServer.ServerStatus.FAILED);
+        }
+    }
+
+    public static void main(String[] args) {
+    	try {
+			new GfacServer().start();
+		} catch (Exception e) {
+            logger.error(e.getMessage(), e);
+		}
+    }
+
+	public void start() throws Exception {
+		setStatus(IServer.ServerStatus.STARTING);
+        GfacService.Processor<GfacServerHandler> gfacService =
+                new GfacService.Processor<GfacServerHandler>(new GfacServerHandler());
+		StartGfacServer(gfacService);
+	}
+
+	public void stop() throws Exception {
+        if (server!=null && server.isServing()){
+			setStatus(IServer.ServerStatus.STOPING);
+			server.stop();
+		}
+		GFacThreadPoolExecutor.getCachedThreadPool().shutdownNow();
+
+	}
+
+	public void restart() throws Exception {
+		stop();
+		start();
+	}
+
+	public void configure() throws Exception {
+		// TODO Auto-generated method stub
+
+	}
+
+	public IServer.ServerStatus getStatus() throws Exception {
+		return status;
+	}
+
+	private void setStatus(IServer.ServerStatus stat){
+		status=stat;
+		status.updateTime();
+	}
+
+	public String getName() {
+		return SERVER_NAME;
+	}
+
+	public String getVersion() {
+		return SERVER_VERSION;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
new file mode 100644
index 0000000..77a89cc
--- /dev/null
+++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -0,0 +1,421 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.server;
+
+import com.google.common.eventbus.EventBus;
+import org.airavata.appcatalog.cpi.AppCatalog;
+import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.logger.AiravataLogger;
+import org.apache.airavata.common.logger.AiravataLoggerFactory;
+import org.apache.airavata.common.utils.AiravataZKUtils;
+import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.common.utils.listener.AbstractActivityListener;
+import org.apache.airavata.gfac.GFacConfiguration;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
+import org.apache.airavata.gfac.core.GFac;
+import org.apache.airavata.gfac.core.handler.GFacHandlerConfig;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.gfac.core.handler.ThreadedHandler;
+import org.apache.airavata.gfac.core.GFacThreadPoolExecutor;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.core.utils.InputHandlerWorker;
+import org.apache.airavata.gfac.cpi.GfacService;
+import org.apache.airavata.gfac.cpi.gfac_cpi_serviceConstants;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.messaging.core.MessagingConstants;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.messaging.core.PublisherFactory;
+import org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchConsumer;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.TaskSubmitEvent;
+import org.apache.airavata.model.messaging.event.TaskTerminateEvent;
+import org.apache.airavata.model.workspace.experiment.ExperimentState;
+import org.apache.airavata.model.workspace.experiment.ExperimentStatus;
+import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.Stat;
+import org.xml.sax.SAXException;
+
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.xpath.XPathExpressionException;
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+public class GfacServerHandler implements GfacService.Iface {
+    private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(GfacServerHandler.class);
+    private static RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer;
+    private static int requestCount=0;
+    private Registry registry;
+    private AppCatalog appCatalog;
+    private String gatewayName;
+    private String airavataUserName;
+    private CuratorFramework curatorClient;
+    private MonitorPublisher publisher;
+    private String gfacServer;
+    private String gfacExperiments;
+    private String airavataServerHostPort;
+    private BlockingQueue<TaskSubmitEvent> taskSubmitEvents;
+    private static File gfacConfigFile;
+    private static List<ThreadedHandler> daemonHandlers = new ArrayList<ThreadedHandler>();
+    private static List<AbstractActivityListener> activityListeners = new ArrayList<AbstractActivityListener>();
+
+    public GfacServerHandler() throws Exception {
+        try {
+            // start curator client
+            String zkhostPort = AiravataZKUtils.getZKhostPort();
+            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
+            curatorClient = CuratorFrameworkFactory.newClient(zkhostPort, retryPolicy);
+            curatorClient.start();
+            gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");
+            gfacExperiments = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
+            airavataServerHostPort = ServerSettings.getSetting(Constants.GFAC_SERVER_HOST)
+                    + ":" + ServerSettings.getSetting(Constants.GFAC_SERVER_PORT);
+            storeServerConfig();
+            publisher = new MonitorPublisher(new EventBus());
+            registry = RegistryFactory.getDefaultRegistry();
+            appCatalog = AppCatalogFactory.getAppCatalog();
+            setGatewayProperties();
+            startDaemonHandlers();
+            // initializing Better Gfac Instance
+            BetterGfacImpl.getInstance().init(registry, appCatalog, curatorClient, publisher);
+            if (ServerSettings.isGFacPassiveMode()) {
+                rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer();
+                rabbitMQTaskLaunchConsumer.listen(new TaskLaunchMessageHandler());
+            }
+            startStatusUpdators(registry, curatorClient, publisher, rabbitMQTaskLaunchConsumer);
+
+        } catch (Exception e) {
+            throw new Exception("Error initialising GFAC", e);
+        }
+    }
+
+    public static void main(String[] args) {
+        RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer = null;
+        try {
+            rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer();
+            rabbitMQTaskLaunchConsumer.listen(new TestHandler());
+        } catch (AiravataException e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+    private void storeServerConfig() throws Exception {
+        Stat stat = curatorClient.checkExists().forPath(gfacServer);
+        if (stat == null) {
+            curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
+                    .forPath(gfacServer, new byte[0]);
+        }
+        String instanceId = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME);
+        String instanceNode = gfacServer + File.separator + instanceId;
+        stat = curatorClient.checkExists().forPath(instanceNode);
+        if (stat == null) {
+            curatorClient.create().withMode(CreateMode.EPHEMERAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(instanceNode, airavataServerHostPort.getBytes());
+            curatorClient.getChildren().watched().forPath(instanceNode);
+        }
+        stat = curatorClient.checkExists().forPath(gfacExperiments);
+        if (stat == null) {
+            curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(gfacExperiments, airavataServerHostPort.getBytes());
+        }
+        stat = curatorClient.checkExists().forPath(gfacExperiments + File.separator + instanceId);
+        if (stat == null) {
+            curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
+                    .forPath(gfacExperiments + File.separator + instanceId, airavataServerHostPort.getBytes());
+        }
+    }
+
+    private long ByateArrayToLong(byte[] data) {
+        long value = 0;
+        for (int i = 0; i < data.length; i++)
+        {
+            value += ((long) data[i] & 0xffL) << (8 * i);
+        }
+        return value;
+    }
+
+    public String getGFACServiceVersion() throws TException {
+        return gfac_cpi_serviceConstants.GFAC_CPI_VERSION;
+    }
+
+    /**
+     * * After creating the experiment Data and Task Data in the orchestrator
+     * * Orchestrator has to invoke this operation for each Task per experiment to run
+     * * the actual Job related actions.
+     * *
+     * * @param experimentID
+     * * @param taskID
+     * * @param gatewayId:
+     * *  The GatewayId is inferred from security context and passed onto gfac.
+     * * @return sucess/failure
+     * *
+     * *
+     *
+     * @param experimentId
+     * @param taskId
+     * @param gatewayId
+     */
+    public boolean submitJob(String experimentId, String taskId, String gatewayId, String tokenId) throws TException {
+        requestCount++;
+        logger.info("-----------------------------------------------------" + requestCount + "-----------------------------------------------------");
+        logger.infoId(experimentId, "GFac Received submit job request for the Experiment: {} TaskId: {}", experimentId, taskId);
+        InputHandlerWorker inputHandlerWorker = new InputHandlerWorker(BetterGfacImpl.getInstance(), experimentId,
+                taskId, gatewayId, tokenId);
+//        try {
+//            if( gfac.submitJob(experimentId, taskId, gatewayId)){
+        logger.debugId(experimentId, "Submitted job to the Gfac Implementation, experiment {}, task {}, gateway " +
+                "{}", experimentId, taskId, gatewayId);
+
+        GFacThreadPoolExecutor.getCachedThreadPool().execute(inputHandlerWorker);
+
+        // we immediately return when we have a threadpool
+        return true;
+    }
+
+    public boolean cancelJob(String experimentId, String taskId, String gatewayId, String tokenId) throws TException {
+        logger.infoId(experimentId, "GFac Received cancel job request for Experiment: {} TaskId: {} ", experimentId, taskId);
+        try {
+            if (BetterGfacImpl.getInstance().cancel(experimentId, taskId, gatewayId, tokenId)) {
+                logger.debugId(experimentId, "Successfully cancelled job, experiment {} , task {}", experimentId, taskId);
+                return true;
+            } else {
+                logger.errorId(experimentId, "Job cancellation failed, experiment {} , task {}", experimentId, taskId);
+                return false;
+            }
+        } catch (Exception e) {
+            logger.errorId(experimentId, "Error cancelling the experiment {}.", experimentId);
+            throw new TException("Error cancelling the experiment : " + e.getMessage(), e);
+        }
+    }
+
+    public Registry getRegistry() {
+        return registry;
+    }
+
+    public void setRegistry(Registry registry) {
+        this.registry = registry;
+    }
+
+    public String getGatewayName() {
+        return gatewayName;
+    }
+
+    public void setGatewayName(String gatewayName) {
+        this.gatewayName = gatewayName;
+    }
+
+    public String getAiravataUserName() {
+        return airavataUserName;
+    }
+
+    public void setAiravataUserName(String airavataUserName) {
+        this.airavataUserName = airavataUserName;
+    }
+
+    protected void setGatewayProperties() throws ApplicationSettingsException {
+        setAiravataUserName(ServerSettings.getDefaultUser());
+        setGatewayName(ServerSettings.getDefaultUserGateway());
+    }
+
+    private GFac getGfac() throws TException {
+        GFac gFac = BetterGfacImpl.getInstance();
+        gFac.init(registry, appCatalog, curatorClient, publisher);
+        return gFac;
+    }
+
+    public void startDaemonHandlers() {
+        List<GFacHandlerConfig> daemonHandlerConfig = null;
+        String className = null;
+        try {
+            URL resource = GfacServerHandler.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
+            if (resource != null) {
+                gfacConfigFile = new File(resource.getPath());
+            }
+            daemonHandlerConfig = GFacConfiguration.getDaemonHandlers(gfacConfigFile);
+            for (GFacHandlerConfig handlerConfig : daemonHandlerConfig) {
+                className = handlerConfig.getClassName();
+                Class<?> aClass = Class.forName(className).asSubclass(ThreadedHandler.class);
+                ThreadedHandler threadedHandler = (ThreadedHandler) aClass.newInstance();
+                threadedHandler.initProperties(handlerConfig.getProperties());
+                daemonHandlers.add(threadedHandler);
+            }
+        } catch (ParserConfigurationException | IOException | XPathExpressionException | ClassNotFoundException |
+                InstantiationException | IllegalAccessException | GFacHandlerException | SAXException e) {
+            logger.error("Error parsing gfac-config.xml, double check the xml configuration", e);
+        }
+        for (ThreadedHandler tHandler : daemonHandlers) {
+            (new Thread(tHandler)).start();
+        }
+    }
+
+
+    public static void startStatusUpdators(Registry registry, CuratorFramework curatorClient, MonitorPublisher publisher,
+
+                                           RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer) {
+        try {
+            String[] listenerClassList = ServerSettings.getActivityListeners();
+            Publisher rabbitMQPublisher = PublisherFactory.createActivityPublisher();
+            for (String listenerClass : listenerClassList) {
+                Class<? extends AbstractActivityListener> aClass = Class.forName(listenerClass).asSubclass(AbstractActivityListener.class);
+                AbstractActivityListener abstractActivityListener = aClass.newInstance();
+                activityListeners.add(abstractActivityListener);
+                abstractActivityListener.setup(publisher, registry, curatorClient, rabbitMQPublisher, rabbitMQTaskLaunchConsumer);
+                logger.info("Registering listener: " + listenerClass);
+                publisher.registerListener(abstractActivityListener);
+            }
+        } catch (Exception e) {
+            logger.error("Error loading the listener classes configured in airavata-server.properties", e);
+        }
+    }
+    private static  class TestHandler implements MessageHandler{
+        @Override
+        public Map<String, Object> getProperties() {
+            Map<String, Object> props = new HashMap<String, Object>();
+            ArrayList<String> keys = new ArrayList<String>();
+            keys.add(ServerSettings.getLaunchQueueName());
+            keys.add(ServerSettings.getCancelQueueName());
+            props.put(MessagingConstants.RABBIT_ROUTING_KEY, keys);
+            props.put(MessagingConstants.RABBIT_QUEUE, ServerSettings.getLaunchQueueName());
+            return props;
+        }
+
+        @Override
+        public void onMessage(MessageContext message) {
+            TaskSubmitEvent event = new TaskSubmitEvent();
+            TBase messageEvent = message.getEvent();
+            byte[] bytes = new byte[0];
+            try {
+                bytes = ThriftUtils.serializeThriftObject(messageEvent);
+                ThriftUtils.createThriftFromBytes(bytes, event);
+                System.out.println(event.getExperimentId());
+            } catch (TException e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+    }
+
+    private class TaskLaunchMessageHandler implements MessageHandler {
+        private String experimentNode;
+        private String nodeName;
+
+        public TaskLaunchMessageHandler() {
+            experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
+            nodeName = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME,"gfac-node0");
+        }
+
+        public Map<String, Object> getProperties() {
+            Map<String, Object> props = new HashMap<String, Object>();
+            ArrayList<String> keys = new ArrayList<String>();
+            keys.add(ServerSettings.getLaunchQueueName());
+            keys.add(ServerSettings.getCancelQueueName());
+            props.put(MessagingConstants.RABBIT_ROUTING_KEY, keys);
+            props.put(MessagingConstants.RABBIT_QUEUE, ServerSettings.getLaunchQueueName());
+            return props;
+        }
+
+        public void onMessage(MessageContext message) {
+            System.out.println(" Message Received with message id '" + message.getMessageId()
+                    + "' and with message type '" + message.getType());
+            if (message.getType().equals(MessageType.LAUNCHTASK)) {
+                try {
+                    TaskSubmitEvent event = new TaskSubmitEvent();
+                    TBase messageEvent = message.getEvent();
+                    byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
+                    ThriftUtils.createThriftFromBytes(bytes, event);
+                    // update experiment status to executing
+                    ExperimentStatus status = new ExperimentStatus();
+                    status.setExperimentState(ExperimentState.EXECUTING);
+                    status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+                    registry.update(RegistryModelType.EXPERIMENT_STATUS, status, event.getExperimentId());
+                    experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
+                    try {
+                        GFacUtils.createExperimentEntryForPassive(event.getExperimentId(), event.getTaskId(), curatorClient,
+                                experimentNode, nodeName, event.getTokenId(), message.getDeliveryTag());
+                        AiravataZKUtils.getExpStatePath(event.getExperimentId());
+                        submitJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId(), event.getTokenId());
+                    } catch (Exception e) {
+                        logger.error(e.getMessage(), e);
+                        rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
+                    }
+                } catch (TException e) {
+                    logger.error(e.getMessage(), e); //nobody is listening so nothing to throw
+                } catch (RegistryException e) {
+                    logger.error("Error while updating experiment status", e);
+                }
+            } else if (message.getType().equals(MessageType.TERMINATETASK)) {
+                boolean cancelSuccess = false;
+                TaskTerminateEvent event = new TaskTerminateEvent();
+                TBase messageEvent = message.getEvent();
+                try {
+                    byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
+                    ThriftUtils.createThriftFromBytes(bytes, event);
+                    boolean saveDeliveryTagSuccess = GFacUtils.setExperimentCancel(event.getExperimentId(), curatorClient, message.getDeliveryTag());
+                    if (saveDeliveryTagSuccess) {
+                        cancelSuccess = cancelJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId(), event.getTokenId());
+                        System.out.println(" Message Received with message id '" + message.getMessageId()
+                                + "' and with message type '" + message.getType());
+                    } else {
+                        throw new GFacException("Terminate Task fail to save delivery tag : " + String.valueOf(message.getDeliveryTag()) + " \n" +
+                                "This happens when another cancel operation is being processed or experiment is in one of final states, complete|failed|cancelled.");
+                    }
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
+                }finally {
+                    if (cancelSuccess) {
+                        // if cancel success , AiravataExperimentStatusUpdator will send an ack to this message.
+                    } else {
+                        try {
+                            if (GFacUtils.ackCancelRequest(event.getExperimentId(), curatorClient)) {
+                                if (!rabbitMQTaskLaunchConsumer.isOpen()) {
+                                    rabbitMQTaskLaunchConsumer.reconnect();
+                                }
+                                rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
+                            }
+                        } catch (Exception e) {
+                            logger.error("Error while ack to cancel request, experimentId: " + event.getExperimentId());
+                        }
+                    }
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-service/src/main/resources/gsissh.properties
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-service/src/main/resources/gsissh.properties b/modules/gfac/gfac-service/src/main/resources/gsissh.properties
new file mode 100644
index 0000000..3fdf76d
--- /dev/null
+++ b/modules/gfac/gfac-service/src/main/resources/gsissh.properties
@@ -0,0 +1,26 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+###########################################################################
+# Specifies system level configurations as a key/value pairs.
+###########################################################################
+
+StrictHostKeyChecking=no
+ssh.session.timeout=360000

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-service/src/test/java/org/apache/airavata/gfac/client/GfacClientFactoryTest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-service/src/test/java/org/apache/airavata/gfac/client/GfacClientFactoryTest.java b/modules/gfac/gfac-service/src/test/java/org/apache/airavata/gfac/client/GfacClientFactoryTest.java
new file mode 100644
index 0000000..21c137f
--- /dev/null
+++ b/modules/gfac/gfac-service/src/test/java/org/apache/airavata/gfac/client/GfacClientFactoryTest.java
@@ -0,0 +1,103 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.gfac.client;
+
+//import org.apache.airavata.client.AiravataAPIFactory;
+//import org.apache.airavata.client.api.AiravataAPI;
+//import org.apache.airavata.client.api.exception.AiravataAPIInvocationException;
+//import org.apache.airavata.client.tools.DocumentCreator;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.AiravataZKUtils;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.gfac.client.util.Initialize;
+import org.apache.airavata.gfac.cpi.GfacService;
+import org.apache.airavata.gfac.server.GfacServer;
+import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+
+public class GfacClientFactoryTest {
+    private final static Logger logger = LoggerFactory.getLogger(GfacClientFactoryTest.class);
+//    private DocumentCreator documentCreator;
+    private GfacService.Client gfacClient;
+    private Registry registry;
+    private int NUM_CONCURRENT_REQUESTS = 1;
+    Initialize initialize;
+    GfacServer service;
+    private static ServerCnxnFactory cnxnFactory;
+/*
+    @Test
+    public void setUp() {
+    	AiravataUtils.setExecutionAsServer();
+        initialize = new Initialize("registry-derby.sql");
+        initialize.initializeDB();
+        AiravataZKUtils.startEmbeddedZK(cnxnFactory);
+        try {
+            service = (new GfacServer());
+            service.start();
+            registry = RegistryFactory.getDefaultRegistry();
+        } catch (Exception e) {
+            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+        }
+        AiravataUtils.setExecutionAsServer();
+        documentCreator = new DocumentCreator(getAiravataAPI());
+        documentCreator.createLocalHostDocs();
+
+        try {
+            service.stop();
+            cnxnFactory.shutdown();
+        } catch (Exception e) {
+            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+        }
+
+    }
+
+    private AiravataAPI getAiravataAPI() {
+        AiravataAPI airavataAPI = null;
+            try {
+                String systemUserName = ServerSettings.getDefaultUser();
+                String gateway = ServerSettings.getDefaultUserGateway();
+                airavataAPI = AiravataAPIFactory.getAPI(gateway, systemUserName);
+            } catch (ApplicationSettingsException e) {
+                e.printStackTrace();
+            } catch (AiravataAPIInvocationException e) {
+                e.printStackTrace();
+            }
+        return airavataAPI;
+    }
+*/
+
+    private void storeDescriptors() {
+
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-service/src/test/java/org/apache/airavata/gfac/client/util/Initialize.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-service/src/test/java/org/apache/airavata/gfac/client/util/Initialize.java b/modules/gfac/gfac-service/src/test/java/org/apache/airavata/gfac/client/util/Initialize.java
new file mode 100644
index 0000000..651f414
--- /dev/null
+++ b/modules/gfac/gfac-service/src/test/java/org/apache/airavata/gfac/client/util/Initialize.java
@@ -0,0 +1,330 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.client.util;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.persistance.registry.jpa.ResourceType;
+import org.apache.airavata.persistance.registry.jpa.resources.*;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.derby.drda.NetworkServerControl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.InetAddress;
+import java.sql.*;
+import java.util.StringTokenizer;
+
+public class Initialize {
+    private static final Logger logger = LoggerFactory.getLogger(Initialize.class);
+    public static final String DERBY_SERVER_MODE_SYS_PROPERTY = "derby.drda.startNetworkServer";
+    public  String scriptName = "registry-derby.sql";
+    private NetworkServerControl server;
+    private static final String delimiter = ";";
+    public static final String PERSISTANT_DATA = "Configuration";
+
+    public Initialize(String scriptName) {
+        this.scriptName = scriptName;
+    }
+
+    public static boolean checkStringBufferEndsWith(StringBuffer buffer, String suffix) {
+        if (suffix.length() > buffer.length()) {
+            return false;
+        }
+        // this loop is done on purpose to avoid memory allocation performance
+        // problems on various JDKs
+        // StringBuffer.lastIndexOf() was introduced in jdk 1.4 and
+        // implementation is ok though does allocation/copying
+        // StringBuffer.toString().endsWith() does massive memory
+        // allocation/copying on JDK 1.5
+        // See http://issues.apache.org/bugzilla/show_bug.cgi?id=37169
+        int endIndex = suffix.length() - 1;
+        int bufferIndex = buffer.length() - 1;
+        while (endIndex >= 0) {
+            if (buffer.charAt(bufferIndex) != suffix.charAt(endIndex)) {
+                return false;
+            }
+            bufferIndex--;
+            endIndex--;
+        }
+        return true;
+    }
+
+    private static boolean isServerStarted(NetworkServerControl server, int ntries)
+    {
+        for (int i = 1; i <= ntries; i ++)
+        {
+            try {
+                Thread.sleep(500);
+                server.ping();
+                return true;
+            }
+            catch (Exception e) {
+                if (i == ntries)
+                    return false;
+            }
+        }
+        return false;
+    }
+
+    public void initializeDB() throws SQLException{
+        String jdbcUrl = null;
+        String jdbcUser = null;
+        String jdbcPassword = null;
+        try{
+            jdbcUrl = ServerSettings.getSetting("registry.jdbc.url");
+            jdbcUser = ServerSettings.getSetting("registry.jdbc.user");
+            jdbcPassword = ServerSettings.getSetting("registry.jdbc.password");
+            jdbcUrl = jdbcUrl + "?" + "user=" + jdbcUser + "&" + "password=" + jdbcPassword;
+        } catch (ApplicationSettingsException e) {
+            logger.error("Unable to read properties", e);
+        }
+        startDerbyInServerMode();
+        if(!isServerStarted(server, 20)){
+           throw new RuntimeException("Derby server cound not started within five seconds...");
+        }
+
+        Connection conn = null;
+        try {
+            Class.forName(Utils.getJDBCDriver()).newInstance();
+            conn = DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPassword);
+            if (!isDatabaseStructureCreated(PERSISTANT_DATA, conn)) {
+                executeSQLScript(conn);
+                logger.info("New Database created for Registry");
+            } else {
+                logger.debug("Database already created for Registry!");
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            throw new RuntimeException("Database failure", e);
+        } finally {
+            try {
+                if (conn != null){
+                    if (!conn.getAutoCommit()) {
+                        conn.commit();
+                    }
+                    conn.close();
+                }
+            } catch (SQLException e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+        try{
+            GatewayResource gatewayResource = new GatewayResource();
+            gatewayResource.setGatewayId(ServerSettings.getSetting("default.registry.gateway"));
+            gatewayResource.setGatewayName(ServerSettings.getSetting("default.registry.gateway"));
+            gatewayResource.setDomain("test-domain");
+            gatewayResource.setEmailAddress("test-email");
+            gatewayResource.save();
+            
+            UserResource userResource = new UserResource();
+            userResource.setUserName(ServerSettings.getSetting("default.registry.user"));
+            userResource.setPassword(ServerSettings.getSetting("default.registry.password"));
+            userResource.save();
+
+            WorkerResource workerResource = (WorkerResource) gatewayResource.create(ResourceType.GATEWAY_WORKER);
+            workerResource.setUser(userResource.getUserName());
+            workerResource.save();
+            
+            ProjectResource projectResource = (ProjectResource)workerResource.create(ResourceType.PROJECT);
+            projectResource.setGatewayId(gatewayResource.getGatewayId());
+            projectResource.setId("default");
+            projectResource.setName("default");
+            projectResource.setWorker(workerResource);
+            projectResource.save();
+        
+          
+        } catch (ApplicationSettingsException e) {
+            logger.error("Unable to read properties", e);
+            throw new SQLException(e.getMessage(), e);
+        } catch (RegistryException e) {
+            logger.error("Unable to save data to registry", e);
+            throw new SQLException(e.getMessage(), e);
+        }
+    }
+
+    public static boolean isDatabaseStructureCreated(String tableName, Connection conn) {
+        try {
+            System.out.println("Running a query to test the database tables existence.");
+            // check whether the tables are already created with a query
+            Statement statement = null;
+            try {
+                statement = conn.createStatement();
+                ResultSet rs = statement.executeQuery("select * from " + tableName);
+                if (rs != null) {
+                    rs.close();
+                }
+            } finally {
+                try {
+                    if (statement != null) {
+                        statement.close();
+                    }
+                } catch (SQLException e) {
+                    return false;
+                }
+            }
+        } catch (SQLException e) {
+            return false;
+        }
+
+        return true;
+    }
+
+    private void executeSQLScript(Connection conn) throws Exception {
+        StringBuffer sql = new StringBuffer();
+        BufferedReader reader = null;
+        try{
+
+        InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream(scriptName);
+        reader = new BufferedReader(new InputStreamReader(inputStream));
+        String line;
+        while ((line = reader.readLine()) != null) {
+            line = line.trim();
+            if (line.startsWith("//")) {
+                continue;
+            }
+            if (line.startsWith("--")) {
+                continue;
+            }
+            StringTokenizer st = new StringTokenizer(line);
+            if (st.hasMoreTokens()) {
+                String token = st.nextToken();
+                if ("REM".equalsIgnoreCase(token)) {
+                    continue;
+                }
+            }
+            sql.append(" ").append(line);
+
+            // SQL defines "--" as a comment to EOL
+            // and in Oracle it may contain a hint
+            // so we cannot just remove it, instead we must end it
+            if (line.indexOf("--") >= 0) {
+                sql.append("\n");
+            }
+            if ((checkStringBufferEndsWith(sql, delimiter))) {
+                executeSQL(sql.substring(0, sql.length() - delimiter.length()), conn);
+                sql.replace(0, sql.length(), "");
+            }
+        }
+        // Catch any statements not followed by ;
+        if (sql.length() > 0) {
+            executeSQL(sql.toString(), conn);
+        }
+        }catch (IOException e){
+            logger.error("Error occurred while executing SQL script for creating Airavata database", e);
+            throw new Exception("Error occurred while executing SQL script for creating Airavata database", e);
+        }finally {
+            if (reader != null) {
+                reader.close();
+            }
+
+        }
+
+    }
+
+    private static void executeSQL(String sql, Connection conn) throws Exception {
+        // Check and ignore empty statements
+        if ("".equals(sql.trim())) {
+            return;
+        }
+
+        Statement statement = null;
+        try {
+            logger.debug("SQL : " + sql);
+
+            boolean ret;
+            int updateCount = 0, updateCountTotal = 0;
+            statement = conn.createStatement();
+            ret = statement.execute(sql);
+            updateCount = statement.getUpdateCount();
+            do {
+                if (!ret) {
+                    if (updateCount != -1) {
+                        updateCountTotal += updateCount;
+                    }
+                }
+                ret = statement.getMoreResults();
+                if (ret) {
+                    updateCount = statement.getUpdateCount();
+                }
+            } while (ret);
+
+            logger.debug(sql + " : " + updateCountTotal + " rows affected");
+
+            SQLWarning warning = conn.getWarnings();
+            while (warning != null) {
+                logger.warn(warning + " sql warning");
+                warning = warning.getNextWarning();
+            }
+            conn.clearWarnings();
+        } catch (SQLException e) {
+            if (e.getSQLState().equals("X0Y32")) {
+                // eliminating the table already exception for the derby
+                // database
+                logger.info("Table Already Exists", e);
+            } else {
+                throw new Exception("Error occurred while executing : " + sql, e);
+            }
+        } finally {
+            if (statement != null) {
+                try {
+                    statement.close();
+                } catch (SQLException e) {
+                    logger.error("Error occurred while closing result set.", e);
+                }
+            }
+        }
+    }
+
+    private void startDerbyInServerMode() {
+        try {
+            System.setProperty(DERBY_SERVER_MODE_SYS_PROPERTY, "true");
+            server = new NetworkServerControl(InetAddress.getByName(Utils.getHost()),
+                    20000,
+                    Utils.getJDBCUser(), Utils.getJDBCPassword());
+            java.io.PrintWriter consoleWriter = new java.io.PrintWriter(System.out, true);
+            server.start(consoleWriter);
+        } catch (IOException e) {
+            logger.error("Unable to start Apache derby in the server mode! Check whether " +
+                    "specified port is available");
+        } catch (Exception e) {
+            logger.error("Unable to start Apache derby in the server mode! Check whether " +
+                    "specified port is available");
+        }
+
+    }
+
+    public void stopDerbyServer() throws SQLException{
+        try {
+            server.shutdown();
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            throw new SQLException("Error while stopping derby server", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-service/src/test/resources/gsissh.properties
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-service/src/test/resources/gsissh.properties b/modules/gfac/gfac-service/src/test/resources/gsissh.properties
new file mode 100644
index 0000000..3fdf76d
--- /dev/null
+++ b/modules/gfac/gfac-service/src/test/resources/gsissh.properties
@@ -0,0 +1,26 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+###########################################################################
+# Specifies system level configurations as a key/value pairs.
+###########################################################################
+
+StrictHostKeyChecking=no
+ssh.session.timeout=360000

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-service/src/test/resources/monitor.properties
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-service/src/test/resources/monitor.properties b/modules/gfac/gfac-service/src/test/resources/monitor.properties
new file mode 100644
index 0000000..7f0299a
--- /dev/null
+++ b/modules/gfac/gfac-service/src/test/resources/monitor.properties
@@ -0,0 +1,30 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+primaryMonitor=org.apache.airavata.gfac.monitor.impl.push.amqp.AMQPMonitor
+secondaryMonitor=org.apache.airavata.gfac.monitor.impl.pull.qstat.QstatMonitor
+amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org
+connection.name=xsede_private
+trusted.certificate.location=/Users/chathuri/dev/airavata/cert/certificates
+certificate.path=/Users/chathuri/dev/airavata/cert/certificates
+myproxy.server=myproxy.teragrid.org
+myproxy.user=ogce
+myproxy.password=
+myproxy.life=3600
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-service/src/test/resources/orchestrator.properties
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-service/src/test/resources/orchestrator.properties b/modules/gfac/gfac-service/src/test/resources/orchestrator.properties
new file mode 100644
index 0000000..35c0427
--- /dev/null
+++ b/modules/gfac/gfac-service/src/test/resources/orchestrator.properties
@@ -0,0 +1,26 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+job.submitter=org.apache.airavata.orchestrator.core.impl.EmbeddedGFACJobSubmitter
+job.validators=org.apache.airavata.orchestrator.core.validator.impl.BatchQueueValidator
+submitter.interval=10000
+threadpool.size=0
+start.submitter=true
+embedded.mode=true
+enable.validation=false

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-service/src/test/resources/registry-derby.sql
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-service/src/test/resources/registry-derby.sql b/modules/gfac/gfac-service/src/test/resources/registry-derby.sql
new file mode 100644
index 0000000..9ed5ca9
--- /dev/null
+++ b/modules/gfac/gfac-service/src/test/resources/registry-derby.sql
@@ -0,0 +1,361 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+CREATE TABLE GATEWAY
+(
+        GATEWAY_NAME VARCHAR(255),
+	      OWNER VARCHAR(255),
+        PRIMARY KEY (GATEWAY_NAME)
+);
+
+CREATE TABLE CONFIGURATION
+(
+        CONFIG_KEY VARCHAR(255),
+        CONFIG_VAL VARCHAR(255),
+        EXPIRE_DATE TIMESTAMP DEFAULT '0000-00-00 00:00:00',
+        CATEGORY_ID VARCHAR (255),
+        PRIMARY KEY(CONFIG_KEY, CONFIG_VAL, CATEGORY_ID)
+);
+
+INSERT INTO CONFIGURATION (CONFIG_KEY, CONFIG_VAL, EXPIRE_DATE, CATEGORY_ID) VALUES('registry.version', '0.12', CURRENT_TIMESTAMP ,'SYSTEM');
+
+CREATE TABLE USERS
+(
+        USER_NAME VARCHAR(255),
+        PASSWORD VARCHAR(255),
+        PRIMARY KEY(USER_NAME)
+);
+
+CREATE TABLE GATEWAY_WORKER
+(
+        GATEWAY_NAME VARCHAR(255),
+        USER_NAME VARCHAR(255),
+        PRIMARY KEY (GATEWAY_NAME, USER_NAME),
+        FOREIGN KEY (GATEWAY_NAME) REFERENCES GATEWAY(GATEWAY_NAME) ON DELETE CASCADE,
+        FOREIGN KEY (USER_NAME) REFERENCES USERS(USER_NAME) ON DELETE CASCADE
+);
+
+CREATE TABLE PROJECT
+(
+         GATEWAY_NAME VARCHAR(255),
+         USER_NAME VARCHAR(255) NOT NULL,
+         PROJECT_ID VARCHAR(255),
+         PROJECT_NAME VARCHAR(255) NOT NULL,
+         DESCRIPTION VARCHAR(255),
+         CREATION_TIME TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+         PRIMARY KEY (PROJECT_ID),
+         FOREIGN KEY (GATEWAY_NAME) REFERENCES GATEWAY(GATEWAY_NAME) ON DELETE CASCADE,
+         FOREIGN KEY (USER_NAME) REFERENCES USERS(USER_NAME) ON DELETE CASCADE
+);
+
+CREATE TABLE PROJECT_USER
+(
+    PROJECT_ID VARCHAR(255),
+    USER_NAME VARCHAR(255),
+    PRIMARY KEY (PROJECT_ID,USER_NAME),
+    FOREIGN KEY (PROJECT_ID) REFERENCES PROJECT(PROJECT_ID) ON DELETE CASCADE,
+    FOREIGN KEY (USER_NAME) REFERENCES USERS(USER_NAME) ON DELETE CASCADE
+);
+
+CREATE TABLE PUBLISHED_WORKFLOW
+(
+         GATEWAY_NAME VARCHAR(255),
+         CREATED_USER VARCHAR(255),
+         PUBLISH_WORKFLOW_NAME VARCHAR(255),
+         VERSION VARCHAR(255),
+         PUBLISHED_DATE TIMESTAMP DEFAULT '0000-00-00 00:00:00',
+         PATH VARCHAR (255),
+         WORKFLOW_CONTENT BLOB,
+         PRIMARY KEY(GATEWAY_NAME, PUBLISH_WORKFLOW_NAME),
+         FOREIGN KEY (GATEWAY_NAME) REFERENCES GATEWAY(GATEWAY_NAME) ON DELETE CASCADE,
+         FOREIGN KEY (CREATED_USER) REFERENCES USERS(USER_NAME) ON DELETE CASCADE
+);
+
+CREATE TABLE USER_WORKFLOW
+(
+         GATEWAY_NAME VARCHAR(255),
+         OWNER VARCHAR(255),
+         TEMPLATE_NAME VARCHAR(255),
+         LAST_UPDATED_TIME TIMESTAMP DEFAULT CURRENT TIMESTAMP,
+         PATH VARCHAR (255),
+         WORKFLOW_GRAPH BLOB,
+         PRIMARY KEY(GATEWAY_NAME, OWNER, TEMPLATE_NAME),
+         FOREIGN KEY (GATEWAY_NAME) REFERENCES GATEWAY(GATEWAY_NAME) ON DELETE CASCADE,
+         FOREIGN KEY (OWNER) REFERENCES USERS(USER_NAME) ON DELETE CASCADE
+);
+
+CREATE TABLE EXPERIMENT
+(
+        EXPERIMENT_ID VARCHAR(255),
+        GATEWAY_NAME VARCHAR(255),
+        EXECUTION_USER VARCHAR(255) NOT NULL,
+        PROJECT_ID VARCHAR(255) NOT NULL,
+        CREATION_TIME TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+        EXPERIMENT_NAME VARCHAR(255) NOT NULL,
+        EXPERIMENT_DESCRIPTION VARCHAR(255),
+        APPLICATION_ID VARCHAR(255),
+        APPLICATION_VERSION VARCHAR(255),
+        WORKFLOW_TEMPLATE_ID VARCHAR(255),
+        WORKFLOW_TEMPLATE_VERSION VARCHAR(255),
+        WORKFLOW_EXECUTION_ID VARCHAR(255),
+        PRIMARY KEY(EXPERIMENT_ID),
+        FOREIGN KEY (GATEWAY_NAME) REFERENCES GATEWAY(GATEWAY_NAME) ON DELETE CASCADE,
+        FOREIGN KEY (EXECUTION_USER) REFERENCES USERS(USER_NAME) ON DELETE CASCADE,
+        FOREIGN KEY (PROJECT_ID) REFERENCES PROJECT(PROJECT_ID) ON DELETE CASCADE
+);
+
+CREATE TABLE EXPERIMENT_INPUT
+(
+        EXPERIMENT_ID VARCHAR(255),
+        INPUT_KEY VARCHAR(255) NOT NULL,
+        INPUT_TYPE VARCHAR(255),
+        METADATA VARCHAR(255),
+        VALUE CLOB,
+        PRIMARY KEY(EXPERIMENT_ID,INPUT_KEY),
+        FOREIGN KEY (EXPERIMENT_ID) REFERENCES EXPERIMENT(EXPERIMENT_ID) ON DELETE CASCADE
+);
+
+CREATE TABLE EXPERIMENT_OUTPUT
+(
+        EXPERIMENT_ID VARCHAR(255),
+        OUTPUT_KEY VARCHAR(255) NOT NULL,
+        OUTPUT_KEY_TYPE VARCHAR(255),
+        METADATA VARCHAR(255),
+        VALUE CLOB,
+        PRIMARY KEY(EXPERIMENT_ID,OUTPUT_KEY),
+        FOREIGN KEY (EXPERIMENT_ID) REFERENCES EXPERIMENT(EXPERIMENT_ID) ON DELETE CASCADE
+);
+
+
+CREATE TABLE WORKFLOW_NODE_DETAIL
+(
+        EXPERIMENT_ID VARCHAR(255) NOT NULL,
+        NODE_INSTANCE_ID VARCHAR(255),
+        CREATION_TIME TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+        NODE_NAME VARCHAR(255) NOT NULL,
+        EXECUTION_UNIT VARCHAR(255) NOT NULL,
+        EXECUTION_UNIT_DATA VARCHAR(255),
+        PRIMARY KEY(NODE_INSTANCE_ID),
+        FOREIGN KEY (EXPERIMENT_ID) REFERENCES EXPERIMENT(EXPERIMENT_ID) ON DELETE CASCADE
+);
+
+CREATE TABLE TASK_DETAIL
+(
+        TASK_ID VARCHAR(255),
+        NODE_INSTANCE_ID VARCHAR(255),
+        CREATION_TIME TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+        APPLICATION_ID VARCHAR(255),
+        APPLICATION_VERSION VARCHAR(255),
+        APPLICATION_DEPLOYMENT_ID VARCHAR(255),
+        PRIMARY KEY(TASK_ID),
+        FOREIGN KEY (NODE_INSTANCE_ID) REFERENCES WORKFLOW_NODE_DETAIL(NODE_INSTANCE_ID) ON DELETE CASCADE
+);
+
+CREATE TABLE ERROR_DETAIL
+(
+         ERROR_ID INTEGER NOT NULL GENERATED BY DEFAULT AS IDENTITY,
+         EXPERIMENT_ID VARCHAR(255),
+         TASK_ID VARCHAR(255),
+         NODE_INSTANCE_ID VARCHAR(255),
+         JOB_ID VARCHAR(255),
+         CREATION_TIME TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+         ACTUAL_ERROR_MESSAGE CLOB,
+         USER_FRIEDNLY_ERROR_MSG VARCHAR(255),
+         TRANSIENT_OR_PERSISTENT SMALLINT,
+         ERROR_CATEGORY VARCHAR(255),
+         CORRECTIVE_ACTION VARCHAR(255),
+         ACTIONABLE_GROUP VARCHAR(255),
+         PRIMARY KEY(ERROR_ID),
+         FOREIGN KEY (EXPERIMENT_ID) REFERENCES EXPERIMENT(EXPERIMENT_ID) ON DELETE CASCADE,
+         FOREIGN KEY (TASK_ID) REFERENCES TASK_DETAIL(TASK_ID) ON DELETE CASCADE,
+         FOREIGN KEY (NODE_INSTANCE_ID) REFERENCES WORKFLOW_NODE_DETAIL(NODE_INSTANCE_ID) ON DELETE CASCADE
+);
+
+CREATE TABLE APPLICATION_INPUT
+(
+        TASK_ID VARCHAR(255),
+        INPUT_KEY VARCHAR(255) NOT NULL,
+        INPUT_KEY_TYPE VARCHAR(255),
+        METADATA VARCHAR(255),
+        VALUE CLOB,
+        PRIMARY KEY(TASK_ID,INPUT_KEY),
+        FOREIGN KEY (TASK_ID) REFERENCES TASK_DETAIL(TASK_ID) ON DELETE CASCADE
+);
+
+CREATE TABLE APPLICATION_OUTPUT
+(
+        TASK_ID VARCHAR(255),
+        OUTPUT_KEY VARCHAR(255) NOT NULL,
+        OUTPUT_KEY_TYPE VARCHAR(255),
+        METADATA VARCHAR(255),
+        VALUE CLOB,
+        PRIMARY KEY(TASK_ID,OUTPUT_KEY),
+        FOREIGN KEY (TASK_ID) REFERENCES TASK_DETAIL(TASK_ID) ON DELETE CASCADE
+);
+
+CREATE TABLE NODE_INPUT
+(
+       NODE_INSTANCE_ID VARCHAR(255),
+       INPUT_KEY VARCHAR(255) NOT NULL,
+       INPUT_KEY_TYPE VARCHAR(255),
+       METADATA VARCHAR(255),
+       VALUE VARCHAR(255),
+       PRIMARY KEY(NODE_INSTANCE_ID,INPUT_KEY),
+       FOREIGN KEY (NODE_INSTANCE_ID) REFERENCES WORKFLOW_NODE_DETAIL(NODE_INSTANCE_ID) ON DELETE CASCADE
+);
+
+CREATE TABLE NODE_OUTPUT
+(
+       NODE_INSTANCE_ID VARCHAR(255),
+       OUTPUT_KEY VARCHAR(255) NOT NULL,
+       OUTPUT_KEY_TYPE VARCHAR(255),
+       METADATA VARCHAR(255),
+       VALUE VARCHAR(255),
+       PRIMARY KEY(NODE_INSTANCE_ID,OUTPUT_KEY),
+       FOREIGN KEY (NODE_INSTANCE_ID) REFERENCES WORKFLOW_NODE_DETAIL(NODE_INSTANCE_ID) ON DELETE CASCADE
+);
+
+CREATE TABLE JOB_DETAIL
+(
+        JOB_ID VARCHAR(255),
+        TASK_ID VARCHAR(255),
+        JOB_DESCRIPTION CLOB NOT NULL,
+        CREATION_TIME TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+        COMPUTE_RESOURCE_CONSUMED VARCHAR(255),
+        PRIMARY KEY (TASK_ID, JOB_ID),
+        FOREIGN KEY (TASK_ID) REFERENCES TASK_DETAIL(TASK_ID) ON DELETE CASCADE
+);
+
+CREATE TABLE DATA_TRANSFER_DETAIL
+(
+        TRANSFER_ID VARCHAR(255),
+        TASK_ID VARCHAR(255),
+        CREATION_TIME TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+        TRANSFER_DESC CLOB NOT NULL,
+        PRIMARY KEY(TRANSFER_ID),
+        FOREIGN KEY (TASK_ID) REFERENCES TASK_DETAIL(TASK_ID) ON DELETE CASCADE
+);
+
+CREATE TABLE STATUS
+(
+        STATUS_ID INTEGER NOT NULL GENERATED BY DEFAULT AS IDENTITY,
+        EXPERIMENT_ID VARCHAR(255),
+        NODE_INSTANCE_ID VARCHAR(255),
+        TRANSFER_ID VARCHAR(255),
+        TASK_ID VARCHAR(255),
+        JOB_ID VARCHAR(255),
+        STATE VARCHAR(255),
+        STATUS_UPDATE_TIME TIMESTAMP DEFAULT '0000-00-00 00:00:00',
+        STATUS_TYPE VARCHAR(255),
+        PRIMARY KEY(STATUS_ID),
+        FOREIGN KEY (EXPERIMENT_ID) REFERENCES EXPERIMENT(EXPERIMENT_ID) ON DELETE CASCADE,
+        FOREIGN KEY (TASK_ID) REFERENCES TASK_DETAIL(TASK_ID) ON DELETE CASCADE,
+        FOREIGN KEY (NODE_INSTANCE_ID) REFERENCES WORKFLOW_NODE_DETAIL(NODE_INSTANCE_ID) ON DELETE CASCADE,
+        FOREIGN KEY (TRANSFER_ID) REFERENCES DATA_TRANSFER_DETAIL(TRANSFER_ID) ON DELETE CASCADE
+);
+
+CREATE TABLE CONFIG_DATA
+(
+        EXPERIMENT_ID VARCHAR(255),
+        AIRAVATA_AUTO_SCHEDULE SMALLINT NOT NULL,
+        OVERRIDE_MANUAL_SCHEDULE_PARAMS SMALLINT NOT NULL,
+        SHARE_EXPERIMENT SMALLINT,
+        PRIMARY KEY(EXPERIMENT_ID)
+);
+
+CREATE TABLE COMPUTATIONAL_RESOURCE_SCHEDULING
+(
+        RESOURCE_SCHEDULING_ID INTEGER NOT NULL GENERATED BY DEFAULT AS IDENTITY,
+        EXPERIMENT_ID VARCHAR(255),
+        TASK_ID VARCHAR(255),
+        RESOURCE_HOST_ID VARCHAR(255),
+        CPU_COUNT INTEGER,
+        NODE_COUNT INTEGER,
+        NO_OF_THREADS INTEGER,
+        QUEUE_NAME VARCHAR(255),
+        WALLTIME_LIMIT INTEGER,
+        JOB_START_TIME TIMESTAMP DEFAULT '0000-00-00 00:00:00',
+        TOTAL_PHYSICAL_MEMORY INTEGER,
+        COMPUTATIONAL_PROJECT_ACCOUNT VARCHAR(255),
+        PRIMARY KEY(RESOURCE_SCHEDULING_ID),
+        FOREIGN KEY (EXPERIMENT_ID) REFERENCES EXPERIMENT(EXPERIMENT_ID) ON DELETE CASCADE,
+        FOREIGN KEY (TASK_ID) REFERENCES TASK_DETAIL(TASK_ID) ON DELETE CASCADE
+);
+
+CREATE TABLE ADVANCE_INPUT_DATA_HANDLING
+(
+       INPUT_DATA_HANDLING_ID INTEGER NOT NULL GENERATED BY DEFAULT AS IDENTITY,
+       EXPERIMENT_ID VARCHAR(255),
+       TASK_ID VARCHAR(255),
+       WORKING_DIR_PARENT VARCHAR(255),
+       UNIQUE_WORKING_DIR VARCHAR(255),
+       STAGE_INPUT_FILES_TO_WORKING_DIR SMALLINT,
+       CLEAN_AFTER_JOB SMALLINT,
+       PRIMARY KEY(INPUT_DATA_HANDLING_ID),
+       FOREIGN KEY (EXPERIMENT_ID) REFERENCES EXPERIMENT(EXPERIMENT_ID) ON DELETE CASCADE,
+       FOREIGN KEY (TASK_ID) REFERENCES TASK_DETAIL(TASK_ID) ON DELETE CASCADE
+);
+
+CREATE TABLE ADVANCE_OUTPUT_DATA_HANDLING
+(
+       OUTPUT_DATA_HANDLING_ID INTEGER NOT NULL GENERATED BY DEFAULT AS IDENTITY,
+       EXPERIMENT_ID VARCHAR(255),
+       TASK_ID VARCHAR(255),
+       OUTPUT_DATA_DIR VARCHAR(255),
+       DATA_REG_URL VARCHAR (255),
+       PERSIST_OUTPUT_DATA SMALLINT,
+       PRIMARY KEY(OUTPUT_DATA_HANDLING_ID),
+       FOREIGN KEY (EXPERIMENT_ID) REFERENCES EXPERIMENT(EXPERIMENT_ID) ON DELETE CASCADE,
+       FOREIGN KEY (TASK_ID) REFERENCES TASK_DETAIL(TASK_ID) ON DELETE CASCADE
+);
+
+CREATE TABLE QOS_PARAM
+(
+        QOS_ID INTEGER NOT NULL GENERATED BY DEFAULT AS IDENTITY,
+        EXPERIMENT_ID VARCHAR(255),
+        TASK_ID VARCHAR(255),
+        START_EXECUTION_AT VARCHAR(255),
+        EXECUTE_BEFORE VARCHAR(255),
+        NO_OF_RETRIES INTEGER,
+        PRIMARY KEY(QOS_ID),
+        FOREIGN KEY (EXPERIMENT_ID) REFERENCES EXPERIMENT(EXPERIMENT_ID) ON DELETE CASCADE,
+        FOREIGN KEY (TASK_ID) REFERENCES TASK_DETAIL(TASK_ID) ON DELETE CASCADE
+);
+
+CREATE TABLE COMMUNITY_USER
+(
+        GATEWAY_NAME VARCHAR(256) NOT NULL,
+        COMMUNITY_USER_NAME VARCHAR(256) NOT NULL,
+        TOKEN_ID VARCHAR(256) NOT NULL,
+        COMMUNITY_USER_EMAIL VARCHAR(256) NOT NULL,
+        PRIMARY KEY (GATEWAY_NAME, COMMUNITY_USER_NAME, TOKEN_ID)
+);
+
+CREATE TABLE CREDENTIALS
+(
+        GATEWAY_ID VARCHAR(256) NOT NULL,
+        TOKEN_ID VARCHAR(256) NOT NULL,
+        CREDENTIAL BLOB NOT NULL,
+        PORTAL_USER_ID VARCHAR(256) NOT NULL,
+        TIME_PERSISTED TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+        PRIMARY KEY (GATEWAY_ID, TOKEN_ID)
+);
+
+

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-service/src/test/resources/zoo.cfg
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-service/src/test/resources/zoo.cfg b/modules/gfac/gfac-service/src/test/resources/zoo.cfg
new file mode 100644
index 0000000..add0758
--- /dev/null
+++ b/modules/gfac/gfac-service/src/test/resources/zoo.cfg
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+tickTime=2000
+initLimit=10
+syncLimit=5
+dataDir=data
+clientPort=2181
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-ssh/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/pom.xml b/modules/gfac/gfac-ssh/pom.xml
deleted file mode 100644
index 316568e..0000000
--- a/modules/gfac/gfac-ssh/pom.xml
+++ /dev/null
@@ -1,114 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-
-<!--Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file
-    distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under
-    the Apache License, Version 2.0 (theƏ "License"); you may not use this file except in compliance with the License. You may
-    obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to
-    in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
-    ANY ~ KIND, either express or implied. See the License for the specific language governing permissions and limitations under
-    the License. -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <parent>
-        <groupId>org.apache.airavata</groupId>
-        <artifactId>gfac</artifactId>
-        <version>0.16-SNAPSHOT</version>
-        <relativePath>../pom.xml</relativePath>
-    </parent>
-
-    <modelVersion>4.0.0</modelVersion>
-    <artifactId>airavata-gfac-ssh</artifactId>
-    <name>Airavata GFac SSH implementation</name>
-    <description>This is the extension of</description>
-    <url>http://airavata.apache.org/</url>
-
-    <dependencies>
-
-        <!--email monitoring-->
-        <dependency>
-            <groupId>org.apache.airavata</groupId>
-            <artifactId>airavata-gfac-email-monitor</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <!-- Logging -->
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-api</artifactId>
-        </dependency>
-
-        <!-- GFAC schemas -->
-        <dependency>
-            <groupId>org.apache.airavata</groupId>
-            <artifactId>airavata-gfac-core</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <!-- Credential Store -->
-        <dependency>
-            <groupId>org.apache.airavata</groupId>
-            <artifactId>airavata-credential-store</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.airavata</groupId>
-            <artifactId>airavata-server-configuration</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.airavata</groupId>
-            <artifactId>airavata-client-configuration</artifactId>
-            <scope>test</scope>
-        </dependency>
-
-
-        <!-- Test -->
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.testng</groupId>
-            <artifactId>testng</artifactId>
-            <version>6.1.1</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>jcl-over-slf4j</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-            <scope>test</scope>
-        </dependency>
-
-        <!-- gsi-ssh api dependencies -->
-        <dependency>
-            <groupId>org.apache.airavata</groupId>
-            <artifactId>gsissh</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.airavata</groupId>
-            <artifactId>airavata-data-models</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>com.jcraft</groupId>
-            <artifactId>jsch</artifactId>
-            <version>0.1.50</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.xmlbeans</groupId>
-            <artifactId>xmlbeans</artifactId>
-            <version>${xmlbeans.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>net.schmizz</groupId>
-            <artifactId>sshj</artifactId>
-            <version>0.6.1</version>
-        </dependency>
-    </dependencies>
-
-</project>

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/context/SSHAuthWrapper.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/context/SSHAuthWrapper.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/context/SSHAuthWrapper.java
deleted file mode 100644
index 1c31138..0000000
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/context/SSHAuthWrapper.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.ssh.context;
-
-import org.apache.airavata.gsi.ssh.api.ServerInfo;
-import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
-
-public class SSHAuthWrapper {
-    private ServerInfo serverInfo;
-
-    private AuthenticationInfo authenticationInfo;
-
-    private String key;
-
-    public SSHAuthWrapper(ServerInfo serverInfo, AuthenticationInfo authenticationInfo, String key) {
-        this.serverInfo = serverInfo;
-        this.authenticationInfo = authenticationInfo;
-        this.key = key;
-    }
-
-    public ServerInfo getServerInfo() {
-        return serverInfo;
-    }
-
-    public AuthenticationInfo getAuthenticationInfo() {
-        return authenticationInfo;
-    }
-
-    public String getKey() {
-        return key;
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
deleted file mode 100644
index 46f1dc3..0000000
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.ssh.handler;
-
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.core.context.JobExecutionContext;
-import org.apache.airavata.gfac.core.context.MessageContext;
-import org.apache.airavata.gfac.core.handler.AbstractHandler;
-import org.apache.airavata.gfac.core.handler.GFacHandlerException;
-import org.apache.airavata.gfac.core.utils.GFacUtils;
-import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
-import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
-import org.apache.airavata.gsi.ssh.api.Cluster;
-import org.apache.airavata.gsi.ssh.api.SSHApiException;
-import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
-import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPasswordAuthenticationInfo;
-import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPublicKeyFileAuthentication;
-import org.apache.airavata.model.appcatalog.appinterface.DataType;
-import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
-import org.apache.airavata.model.workspace.experiment.*;
-import org.apache.airavata.registry.cpi.ChildDataType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.*;
-
-/**
- * This handler will copy input data from gateway machine to airavata
- * installed machine, later running handlers can copy the input files to computing resource
- * <Handler class="AdvancedSCPOutputHandler">
- * <property name="privateKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa"/>
- * <property name="publicKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa.pub"/>
- * <property name="userName" value="airavata"/>
- * <property name="hostName" value="gw98.iu.xsede.org"/>
- * <property name="inputPath" value="/home/airavata/outputData"/>
- */
-public class AdvancedSCPInputHandler extends AbstractHandler {
-    private static final Logger log = LoggerFactory.getLogger(AdvancedSCPInputHandler.class);
-    public static final String ADVANCED_SSH_AUTH = "advanced.ssh.auth";
-    public static final int DEFAULT_SSH_PORT = 22;
-
-    private String password = null;
-
-    private String publicKeyPath;
-
-    private String passPhrase;
-
-    private String privateKeyPath;
-
-    private String userName;
-
-    private String hostName;
-
-    private String inputPath;
-
-    public void initProperties(Properties properties) throws GFacHandlerException {
-        password = (String) properties.get("password");
-        passPhrase = (String) properties.get("passPhrase");
-        privateKeyPath = (String) properties.get("privateKeyPath");
-        publicKeyPath = (String) properties.get("publicKeyPath");
-        userName = (String) properties.get("userName");
-        hostName = (String) properties.get("hostName");
-        inputPath = (String) properties.get("inputPath");
-    }
-
-    public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
-        super.invoke(jobExecutionContext);
-        int index = 0;
-        int oldIndex = 0;
-        List<String> oldFiles = new ArrayList<String>();
-        MessageContext inputNew = new MessageContext();
-        StringBuffer data = new StringBuffer("|");
-        Cluster pbsCluster = null;
-
-        try {
-            String pluginData = GFacUtils.getHandlerData(jobExecutionContext, this.getClass().getName());
-            if (pluginData != null) {
-                try {
-                    oldIndex = Integer.parseInt(pluginData.split("\\|")[0].trim());
-                    oldFiles = Arrays.asList(pluginData.split("\\|")[1].split(","));
-                    if (oldIndex == oldFiles.size()) {
-                        log.info("Old data looks good !!!!");
-                    } else {
-                        oldIndex = 0;
-                        oldFiles.clear();
-                    }
-                } catch (NumberFormatException e) {
-                    log.error("Previously stored data " + pluginData + " is wrong so we continue the operations");
-                }
-            }
-
-            AuthenticationInfo authenticationInfo = null;
-            if (password != null) {
-                authenticationInfo = new DefaultPasswordAuthenticationInfo(this.password);
-            } else {
-                authenticationInfo = new DefaultPublicKeyFileAuthentication(this.publicKeyPath, this.privateKeyPath,
-                        this.passPhrase);
-            }
-
-            // Server info
-            String parentPath = inputPath + File.separator + jobExecutionContext.getExperimentID() + File.separator + jobExecutionContext.getTaskData().getTaskID();
-            if (index < oldIndex) {
-                parentPath = oldFiles.get(index);
-                data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index
-            } else {
-                (new File(parentPath)).mkdirs();
-                StringBuffer temp = new StringBuffer(data.append(parentPath).append(",").toString());
-                GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
-            }
-            DataTransferDetails detail = new DataTransferDetails();
-            TransferStatus status = new TransferStatus();
-            // here doesn't matter what the job manager is because we are only doing some file handling
-            // not really dealing with monitoring or job submission, so we pa
-
-            MessageContext input = jobExecutionContext.getInMessageContext();
-            Set<String> parameters = input.getParameters().keySet();
-            for (String paramName : parameters) {
-                InputDataObjectType inputParamType = (InputDataObjectType) input.getParameters().get(paramName);
-                String paramValue = inputParamType.getValue();
-                // TODO: Review this with type
-                if (inputParamType.getType() == DataType.URI) {
-                    try {
-                        URL file = new URL(paramValue);
-                        String key = file.getUserInfo() + file.getHost() + DEFAULT_SSH_PORT;
-                        GFACSSHUtils.prepareSecurityContext(jobExecutionContext, authenticationInfo, file.getUserInfo(), file.getHost(), DEFAULT_SSH_PORT);
-                        pbsCluster = ((SSHSecurityContext)jobExecutionContext.getSecurityContext(key)).getPbsCluster();
-                        paramValue = file.getPath();
-                    } catch (MalformedURLException e) {
-                        String key = this.userName + this.hostName + DEFAULT_SSH_PORT;
-                        GFACSSHUtils.prepareSecurityContext(jobExecutionContext, authenticationInfo, this.userName, this.hostName, DEFAULT_SSH_PORT);
-                        pbsCluster = ((SSHSecurityContext)jobExecutionContext.getSecurityContext(key)).getPbsCluster();
-                        log.error(e.getLocalizedMessage(), e);
-                    }
-
-                    if (index < oldIndex) {
-                        log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!");
-                        inputParamType.setValue(oldFiles.get(index));
-                        data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index
-                    } else {
-                        String stageInputFile = stageInputFiles(pbsCluster, paramValue, parentPath);
-                        inputParamType.setValue(stageInputFile);
-                        StringBuffer temp = new StringBuffer(data.append(stageInputFile).append(",").toString());
-                        status.setTransferState(TransferState.UPLOAD);
-                        detail.setTransferStatus(status);
-                        detail.setTransferDescription("Input Data Staged: " + stageInputFile);
-                        registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
-
-                        GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
-                    }
-                }
-                // FIXME: what is the thrift model DataType equivalent for URIArray type?
-//                else if ("URIArray".equals(actualParameter.getType().getType().toString())) {
-//                    List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue));
-//                    List<String> newFiles = new ArrayList<String>();
-//                    for (String paramValueEach : split) {
-//                        try {
-//                            URL file = new URL(paramValue);
-//                            this.userName = file.getUserInfo();
-//                            this.hostName = file.getHost();
-//                            paramValueEach = file.getPath();
-//                        } catch (MalformedURLException e) {
-//                            log.error(e.getLocalizedMessage(), e);
-//                        }
-//                        if (index < oldIndex) {
-//                            log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!");
-//                            newFiles.add(oldFiles.get(index));
-//                            data.append(oldFiles.get(index++)).append(",");
-//                        } else {
-//                            String stageInputFiles = stageInputFiles(pbsCluster, paramValueEach, parentPath);
-//                            StringBuffer temp = new StringBuffer(data.append(stageInputFiles).append(",").toString());
-//                            GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
-//                            newFiles.add(stageInputFiles);
-//                        }
-//                    }
-//                    ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
-//                }
-                inputNew.getParameters().put(paramName, inputParamType);
-            }
-        } catch (Exception e) {
-            log.error(e.getMessage());
-            try {
-                StringWriter errors = new StringWriter();
-                e.printStackTrace(new PrintWriter(errors));
-                GFacUtils.saveErrorDetails(jobExecutionContext,  errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
-            } catch (GFacException e1) {
-                log.error(e1.getLocalizedMessage());
-            }
-            throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
-        }
-        jobExecutionContext.setInMessageContext(inputNew);
-    }
-
-    public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
-        this.invoke(jobExecutionContext);
-    }
-
-    private String stageInputFiles(Cluster cluster, String paramValue, String parentPath) throws GFacException {
-        try {
-            cluster.scpFrom(paramValue, parentPath);
-            return "file://" + parentPath + File.separator + (new File(paramValue)).getName();
-        } catch (SSHApiException e) {
-            log.error("Error tranfering remote file to local file, remote path: " + paramValue);
-            throw new GFacException(e);
-        }
-    }
-}