You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2015/01/26 17:30:51 UTC

[2/2] airavata git commit: creating gfac storm version

creating gfac storm version


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/69d17821
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/69d17821
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/69d17821

Branch: refs/heads/gfac-storm
Commit: 69d1782149c11a60dded61e7b9c77a9395fc4eea
Parents: cf7290a
Author: Lahiru Gunathilake <gl...@gmail.com>
Authored: Mon Jan 26 11:30:10 2015 -0500
Committer: Lahiru Gunathilake <gl...@gmail.com>
Committed: Mon Jan 26 11:30:10 2015 -0500

----------------------------------------------------------------------
 .../java-client-samples/pom.xml                 |   5 +
 .../client/samples/CreateLaunchExperiment.java  |  48 +++-
 .../catalog/data/util/AppCatalogJPAUtils.java   |   4 +-
 .../common/utils/ApplicationSettings.java       |   8 +-
 .../main/resources/airavata-server.properties   | 234 ---------------
 modules/gfac/gfac-core/pom.xml                  |   6 -
 .../gfac/core/context/JobExecutionContext.java  |   3 +
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  |   1 -
 .../airavata/gfac/core/utils/GFacUtils.java     |  14 +
 .../apache/airavata/job/GFacConfigXmlTest.java  |   4 +-
 modules/gfac/gfac-ec2/pom.xml                   |   2 +-
 modules/gfac/gfac-local/pom.xml                 |  26 +-
 .../gfac/local/AiravataRabbitMQSpout.java       | 158 +++++++++++
 .../gfac/local/RandomSentenceSpout.java         |  67 +++++
 .../local/handler/LocalDirectorySetupBolt.java  | 159 +++++++++++
 .../gfac/local/provider/impl/LocalProvider.java |   2 -
 .../local/provider/impl/LocalProviderBolt.java  | 283 +++++++++++++++++++
 .../gfac/local/utils/CustomStormDeclarator.java |  61 ++++
 .../gfac/local/utils/ExperimentLauncher.java    |  28 ++
 .../gfac/local/utils/ExperimentModelUtil.java   | 187 ++++++++++++
 .../gfac/local/utils/MessageScheme.java         |  46 +++
 .../services/impl/GfacTopologyBuilderTest.java  | 112 ++++++++
 .../gfac/services/impl/LocalProviderTest.java   | 184 ------------
 .../services/impl/WordCountTopologyTest.java    | 135 +++++++++
 modules/gfac/pom.xml                            |  12 +-
 modules/orchestrator/orchestrator-core/pom.xml  |   2 +-
 .../registry/jpa/impl/ExperimentRegistry.java   |   3 +-
 .../registry/jpa/impl/ProjectRegistry.java      |   3 +-
 .../registry/jpa/impl/RegistryImpl.java         |   4 +-
 .../jpa/resources/AbstractResource.java         |   3 +-
 .../registry/jpa/resources/GatewayResource.java |   3 +-
 .../registry/jpa/resources/WorkerResource.java  |   3 +-
 pom.xml                                         |  19 +-
 tools/gsissh/pom.xml                            |   2 +-
 34 files changed, 1343 insertions(+), 488 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/airavata-api/airavata-client-sdks/java-client-samples/pom.xml
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/pom.xml b/airavata-api/airavata-client-sdks/java-client-samples/pom.xml
index 5f12ace..fa7a398 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/pom.xml
+++ b/airavata-api/airavata-client-sdks/java-client-samples/pom.xml
@@ -56,6 +56,11 @@
             <artifactId>airavata-client-configuration</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>com.rabbitmq</groupId>
+            <artifactId>amqp-client</artifactId>
+            <version>3.3.5</version>
+        </dependency>
     </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index b2c5469..8eb28b6 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -21,6 +21,9 @@
 
 package org.apache.airavata.client.samples;
 
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
 import org.apache.airavata.api.Airavata;
 import org.apache.airavata.api.client.AiravataClientFactory;
 import org.apache.airavata.client.tools.RegisterSampleApplications;
@@ -57,7 +60,7 @@ public class CreateLaunchExperiment {
     private static final String DEFAULT_GATEWAY = "default.registry.gateway";
     private static Airavata.Client airavataClient;
 
-    private static String echoAppId = "Echo_0018dc21-9359-4063-9672-6ad15a24294d";
+    private static String echoAppId = "Echo_53cd4995-1a3b-4290-9615-2592a59051b8";
     private static String mpiAppId = "HelloMPI_da45305f-5d90-4a18-8716-8dd54c3b2376";
     private static String wrfAppId = "WRF_7ad5da38-c08b-417c-a9ea-da9298839762";
     private static String amberAppId = "Amber_49b16f6f-93ab-4885-9971-6ab2ab5eb3d3";
@@ -94,11 +97,11 @@ public class CreateLaunchExperiment {
         try {
             for (int i = 0; i < 1; i++) {
 //                final String expId = createExperimentForSSHHost(airavata);
-                final String expId = createEchoExperimentForFSD(airavataClient);
+//                final String expId = createEchoExperimentForFSD(airavataClient);
 //                final String expId = createMPIExperimentForFSD(airavataClient);
 //                final String expId = createEchoExperimentForStampede(airavataClient);
 //                final String expId = createEchoExperimentForTrestles(airavataClient);
-//                final String expId = createExperimentEchoForLocalHost(airavataClient);
+                final String expId = createExperimentEchoForLocalHost(airavataClient);
 //                final String expId = createExperimentWRFTrestles(airavataClient);
 //                final String expId = createExperimentForBR2(airavataClient);
 //                final String expId = createExperimentForBR2Amber(airavataClient);
@@ -113,7 +116,7 @@ public class CreateLaunchExperiment {
 //                final String expId = createExperimentAUTODOCKStampede(airavataClient); // this is not working , we need to register AutoDock app on stampede
                 System.out.println("Experiment ID : " + expId);
 //                updateExperiment(airavata, expId);
-                launchExperiment(airavataClient, expId);
+//                launchExperimentPassive(expId);
             }
         } catch (Exception e) {
             logger.error("Error while connecting with server", e.getMessage());
@@ -1553,6 +1556,16 @@ public class CreateLaunchExperiment {
         return null;
     }
 
+    public static void launchExperiment(String expId)
+            throws TException {
+        try {
+            String gatewayId = "default";
+        } catch (Exception e) {
+            logger.error("Error occured while launching the experiment...", e.getMessage());
+            throw new TException(e);
+        }
+    }
+
     public static void launchExperiment(Airavata.Client client, String expId)
             throws TException {
         try {
@@ -1577,6 +1590,33 @@ public class CreateLaunchExperiment {
         }
     }
 
+    public static void launchExperimentPassive(String expId)
+            throws TException {
+        try {
+            String uri = "amqp://localhost";
+            String message = expId;
+            String exchange = "airavata_rabbitmq_exchange";
+
+            ConnectionFactory cfconn = new ConnectionFactory();
+            cfconn.setUri(uri);
+            Connection conn = cfconn.newConnection();
+
+            Channel ch = conn.createChannel();
+
+            if (exchange.equals("")) {
+                ch.queueDeclare("gfac.submit", false, false, false, null);
+            }
+            ch.basicPublish(exchange, expId, null, message.getBytes());
+            ch.close();
+            conn.close();
+        } catch (Exception e) {
+            System.err.println("Main thread caught exception: " + e);
+            e.printStackTrace();
+            System.exit(1);
+        }
+
+    }
+
     public static List<Experiment> getExperimentsForUser(Airavata.Client client, String user) {
         try {
             return client.getAllUserExperiments(user);

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/util/AppCatalogJPAUtils.java
----------------------------------------------------------------------
diff --git a/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/util/AppCatalogJPAUtils.java b/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/util/AppCatalogJPAUtils.java
index 6b99bd9..d8116d9 100644
--- a/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/util/AppCatalogJPAUtils.java
+++ b/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/util/AppCatalogJPAUtils.java
@@ -58,8 +58,8 @@ public class AppCatalogJPAUtils {
             properties.put("openjpa.ConnectionProperties", connectionProperties);
             properties.put("openjpa.DynamicEnhancementAgent", "true");
             properties.put("openjpa.RuntimeUnenhancedClasses", "unsupported");
-            properties.put("openjpa.DataCache","true(CacheSize=" + Integer.valueOf(readServerProperties(JPA_CACHE_SIZE))  + ", SoftReferenceSize=0)");
-            properties.put("openjpa.QueryCache","true(CacheSize=" + Integer.valueOf(readServerProperties(JPA_CACHE_SIZE))  + ", SoftReferenceSize=0)");
+//            properties.put("openjpa.DataCache","true(CacheSize=" + Integer.valueOf(readServerProperties(JPA_CACHE_SIZE))  + ", SoftReferenceSize=0)");
+//            properties.put("openjpa.QueryCache","true(CacheSize=" + Integer.valueOf(readServerProperties(JPA_CACHE_SIZE))  + ", SoftReferenceSize=0)");
             properties.put("openjpa.RemoteCommitProvider","sjvm");
             properties.put("openjpa.Log","DefaultLevel=INFO, Runtime=INFO, Tool=INFO, SQL=INFO");
             properties.put("openjpa.jdbc.SynchronizeMappings", "buildSchema(ForeignKeys=true)");

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ApplicationSettings.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ApplicationSettings.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ApplicationSettings.java
index 9a7ad16..b8d306d 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ApplicationSettings.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ApplicationSettings.java
@@ -85,13 +85,7 @@ public class ApplicationSettings {
 	}
 
 	protected URL getPropertyFileURL() {
-		URL url;
-		if (AiravataUtils.isClient()){
-            url=ApplicationSettings.class.getClassLoader().getResource(CLIENT_PROPERTIES);
-        }else{
-            url=ApplicationSettings.class.getClassLoader().getResource(SERVER_PROPERTIES);
-        }
-		return url;
+            return ApplicationSettings.class.getClassLoader().getResource(SERVER_PROPERTIES);
 	}
 	
 	protected URL[] getExternalSettingsFileURLs(){

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties b/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties
deleted file mode 100644
index fb02901..0000000
--- a/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties
+++ /dev/null
@@ -1,234 +0,0 @@
-#
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-###########################################################################
-#
-#  This properties file provides configuration for all Airavata Services:
-#  API Server, Registry, Workflow Interpreter, GFac, Orchestrator
-#
-###########################################################################
-
-###########################################################################
-#  API Server Registry Configuration
-###########################################################################
-
-#for derby [AiravataJPARegistry]
-registry.jdbc.driver=org.apache.derby.jdbc.ClientDriver
-registry.jdbc.url=jdbc:derby://localhost:1527/persistent_data;create=true;user=airavata;password=airavata
-# MySql database configuration
-#registry.jdbc.driver=com.mysql.jdbc.Driver
-#registry.jdbc.url=jdbc:mysql://localhost:3306/persistent_data
-registry.jdbc.user=airavata
-registry.jdbc.password=airavata
-start.derby.server.mode=true
-validationQuery=SELECT 1 from CONFIGURATION
-jpa.cache.size=5000
-#jpa.connection.properties=MaxActive=10,MaxIdle=5,MinIdle=2,MaxWait=60000,testWhileIdle=true,testOnBorrow=true
-
-# Properties for default user mode
-default.registry.user=admin
-default.registry.password=admin
-default.registry.password.hash.method=SHA
-default.registry.gateway=default
-
-#ip=127.0.0.1
-
-###########################################################################
-#  Application Catalog DB Configuration
-###########################################################################
-#for derby [AiravataJPARegistry]
-appcatalog.jdbc.driver=org.apache.derby.jdbc.ClientDriver
-appcatalog.jdbc.url=jdbc:derby://localhost:1527/app_catalog;create=true;user=airavata;password=airavata
-# MySql database configuration
-#appcatalog.jdbc.driver=com.mysql.jdbc.Driver
-#appcatalog.jdbc.url=jdbc:mysql://localhost:3306/app_catalog
-appcatalog.jdbc.user=airavata
-appcatalog.jdbc.password=airavata
-appcatalog.validationQuery=SELECT 1 from CONFIGURATION
-
-###########################################################################
-#  Server module Configuration
-###########################################################################
-
-servers=apiserver,orchestrator,gfac,workflowserver
-#shutdown.trategy=NONE
-shutdown.trategy=SELF_TERMINATE
-
-
-apiserver.server.host=localhost
-apiserver.server.port=8930
-apiserver.server.min.threads=50
-workflow.server.host=localhost
-workflow.server.port=8931
-orchestrator.server.host=localhost
-orchestrator.server.port=8940
-gfac.server.host=localhost
-gfac.server.port=8950
-orchestrator.server.min.threads=50
-
-###########################################################################
-# Credential Store module Configuration
-###########################################################################
-credential.store.keystore.url=/Users/lahirugunathilake/Downloads/airavata_sym.jks
-credential.store.keystore.alias=airavata
-credential.store.keystore.password=airavata
-credential.store.jdbc.url=jdbc:derby://localhost:1527/persistent_data;create=true;user=airavata;password=airavata
-credential.store.jdbc.user=airavata
-credential.store.jdbc.password=airavata
-credential.store.jdbc.driver=org.apache.derby.jdbc.ClientDriver
-
-notifier.enabled=false
-#period in milliseconds
-notifier.duration=5000
-
-email.server=smtp.googlemail.com
-email.server.port=465
-email.user=airavata
-email.password=xxx
-email.ssl=true
-email.from=airavata@apache.org
-
-###########################################################################
-# Airavata GFac MyProxy GSI credentials to access Grid Resources.
-###########################################################################
-#
-# Security Configuration used by Airavata Generic Factory Service
-#  to interact with Computational Resources.
-#
-gfac=org.apache.airavata.gfac.server.GfacServer
-myproxy.server=myproxy.teragrid.org
-myproxy.username=ogce
-myproxy.password=
-myproxy.life=3600
-# XSEDE Trusted certificates can be downloaded from https://software.xsede.org/security/xsede-certs.tar.gz
-trusted.cert.location=/Users/lahirugunathilake/Downloads/certificates
-# SSH PKI key pair or ssh password can be used SSH based authentication is used.
-# if user specify both password authentication gets the higher preference
-
-################# ---------- For ssh key pair authentication ------------------- ################
-#public.ssh.key=/path to public key for ssh
-#ssh.username=username for ssh connection
-#private.ssh.key=/path to private key file for ssh
-#ssh.keypass=passphrase for the private key
-
-
-################# ---------- For ssh key pair authentication ------------------- ################
-#ssh.username=username for ssh connection
-#ssh.password=Password for ssh connection
-
-
-
-###########################################################################
-# Airavata Workflow Interpreter Configurations
-###########################################################################
-
-#runInThread=true
-#provenance=true
-#provenanceWriterThreadPoolSize=20
-#gfac.embedded=true
-#workflowserver=org.apache.airavata.api.server.WorkflowServer
-
-
-###########################################################################
-# API Server module Configuration
-###########################################################################
-apiserver=org.apache.airavata.api.server.AiravataAPIServer
-
-###########################################################################
-# Workflow Server module Configuration
-###########################################################################
-
-workflowserver=org.apache.airavata.api.server.WorkflowServer
-
-###########################################################################
-# Advance configuration to change service implementations
-###########################################################################
-# If false, disables two phase commit when submitting jobs
-TwoPhase=true
-#
-# Class which implemented HostScheduler interface. It will determine the which host to submit the request
-#
-host.scheduler=org.apache.airavata.gfac.core.scheduler.impl.SimpleHostScheduler
-
-###########################################################################
-# Monitoring module Configuration
-###########################################################################
-
-#This will be the primary monitoring tool which runs in airavata, in future there will be multiple monitoring
-#mechanisms and one would be able to start a monitor
-monitors=org.apache.airavata.gfac.monitor.impl.pull.qstat.QstatMonitor,org.apache.airavata.gfac.monitor.impl.LocalJobMonitor
-
-
-###########################################################################
-# AMQP Notification Configuration
-###########################################################################
-
-
-amqp.notification.enable=1
-
-amqp.broker.host=localhost
-amqp.broker.port=5672
-amqp.broker.username=guest
-amqp.broker.password=guest
-
-amqp.sender=org.apache.airavata.wsmg.client.amqp.rabbitmq.AMQPSenderImpl
-amqp.topic.sender=org.apache.airavata.wsmg.client.amqp.rabbitmq.AMQPTopicSenderImpl
-amqp.broadcast.sender=org.apache.airavata.wsmg.client.amqp.rabbitmq.AMQPBroadcastSenderImpl
-
-#,org.apache.airavata.gfac.monitor.impl.push.amqp.AMQPMonitor
-#This is the amqp related configuration and this lists down the Rabbitmq host, this is an xsede specific configuration
-amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org
-proxy.file.path=/Users/lahirugunathilake/Downloads/x509up_u503876
-connection.name=xsede
-#publisher
-activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.api.server.listener.AiravataExperimentStatusUpdator,org.apache.airavata.gfac.core.monitor.GfacInternalStatusUpdator,org.apache.airavata.workflow.engine.util.ProxyMonitorPublisher
-publish.rabbitmq=false
-activity.publisher=org.apache.airavata.messaging.core.impl.RabbitMQPublisher
-rabbitmq.broker.url=amqp://localhost:5672
-rabbitmq.exchange.name=airavata_rabbitmq_exchange
-
-###########################################################################
-# Orchestrator module Configuration
-###########################################################################
-
-#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACEmbeddedJobSubmitter
-job.submitter=org.apache.airavata.orchestrator.core.impl.GFACServiceJobSubmitter
-job.validators=org.apache.airavata.orchestrator.core.validator.impl.SimpleAppDataValidator,org.apache.airavata.orchestrator.core.validator.impl.ExperimentStatusValidator
-submitter.interval=10000
-threadpool.size=10
-start.submitter=true
-embedded.mode=true
-enable.validation=true
-orchestrator=org.apache.airavata.orchestrator.server.OrchestratorServer
-
-###########################################################################
-# Zookeeper Server Configuration
-###########################################################################
-
-embedded.zk=true
-zookeeper.server.host=localhost
-zookeeper.server.port=2181
-airavata-server=/api-server
-orchestrator-server=/orchestrator-server
-gfac-server=/gfac-server
-gfac-experiments=/gfac-experiments
-gfac-server-name=gfac-node0
-orchestrator-server-name=orch-node0
-airavata-server-name=api-node0

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/pom.xml b/modules/gfac/gfac-core/pom.xml
index 2a85282..53b60ec 100644
--- a/modules/gfac/gfac-core/pom.xml
+++ b/modules/gfac/gfac-core/pom.xml
@@ -81,12 +81,6 @@
             <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>org.testng</groupId>
-            <artifactId>testng</artifactId>
-            <version>6.1.1</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>jcl-over-slf4j</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
index c8c48ef..97c2572 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
@@ -177,6 +177,9 @@ public class JobExecutionContext extends AbstractContext implements Serializable
      */
     private Map<String, SecurityContext> securityContext = new HashMap<String, SecurityContext>();
 
+    public JobExecutionContext() {
+    }
+
     public JobExecutionContext(GFacConfiguration gFacConfiguration,String applicationName){
         this.gfacConfiguration = gFacConfiguration;
         notifier = new GFacNotifier();

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 8403f8c..c81a179 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -301,7 +301,6 @@ public class BetterGfacImpl implements GFac,Watcher {
 //        List<InputDataObjectType> experimentInputs = experiment.getExperimentInputs();
 //        jobExecutionContext.setInMessageContext(new MessageContext(GFacUtils.getInputParamMap(experimentInputs)));
         List<InputDataObjectType> taskInputs = taskData.getApplicationInputs();
-        jobExecutionContext.setInMessageContext(new MessageContext(GFacUtils.getInputParamMap(taskInputs)));
 
 //        List<OutputDataObjectType> outputData = experiment.getExperimentOutputs();
         List<OutputDataObjectType> taskOutputs = taskData.getApplicationOutputs();

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
index c71ed27..6286d91 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
@@ -768,6 +768,20 @@ public class GFacUtils {
 		}
 	}
 
+	public static void saveJobStatus(Registry registry,
+									 JobDetails details,String taskId, JobState state) throws GFacException {
+		try {
+			JobStatus status = new JobStatus();
+			status.setJobState(state);
+			details.setJobStatus(status);
+			registry.add(ChildDataType.JOB_DETAIL, details,
+					new CompositeIdentifier(taskId, details.getJobID()));
+		} catch (Exception e) {
+			throw new GFacException("Error persisting job status"
+					+ e.getLocalizedMessage(), e);
+		}
+	}
+
 	public static void updateJobStatus(JobExecutionContext jobExecutionContext,
 			JobDetails details, JobState state) throws GFacException {
 		try {

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/GFacConfigXmlTest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/GFacConfigXmlTest.java b/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/GFacConfigXmlTest.java
index 4744772..503baa2 100644
--- a/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/GFacConfigXmlTest.java
+++ b/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/GFacConfigXmlTest.java
@@ -32,8 +32,8 @@ import org.apache.airavata.gfac.core.context.ApplicationContext;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
 import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
 import org.apache.airavata.model.appcatalog.computeresource.*;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
+import org.junit.BeforeClass;
+import org.junit.Test;
 import org.xml.sax.SAXException;
 
 import javax.xml.parsers.ParserConfigurationException;

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-ec2/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ec2/pom.xml b/modules/gfac/gfac-ec2/pom.xml
index d51fddb..0905606 100644
--- a/modules/gfac/gfac-ec2/pom.xml
+++ b/modules/gfac/gfac-ec2/pom.xml
@@ -110,7 +110,7 @@
 		<dependency>
 			<groupId>org.testng</groupId>
 			<artifactId>testng</artifactId>
-			<version>6.1.1</version>
+			<version>6.8.5</version>
 			<scope>test</scope>
 		</dependency>
 		<dependency>

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-local/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/pom.xml b/modules/gfac/gfac-local/pom.xml
index 9776f0a..c13b189 100644
--- a/modules/gfac/gfac-local/pom.xml
+++ b/modules/gfac/gfac-local/pom.xml
@@ -8,7 +8,8 @@
     ANY ~ KIND, either express or implied. See the License for the specific language governing permissions and limitations under 
     the License. -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <groupId>org.apache.airavata</groupId>
         <artifactId>gfac</artifactId>
@@ -22,6 +23,12 @@
     <description>This is the extension of GFAC Local.</description>
     <url>http://airavata.apache.org/</url>
 
+    <repositories>
+        <repository>
+            <id>clojars.org</id>
+            <url>http://clojars.org/repo</url>
+        </repository>
+    </repositories>
     <dependencies>
 
         <!-- Logging -->
@@ -36,7 +43,6 @@
             <artifactId>airavata-gfac-core</artifactId>
             <version>${project.version}</version>
         </dependency>
-
         <!-- Test -->
         <dependency>
             <groupId>junit</groupId>
@@ -44,12 +50,6 @@
             <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>org.testng</groupId>
-            <artifactId>testng</artifactId>
-            <version>6.1.1</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>jcl-over-slf4j</artifactId>
             <scope>test</scope>
@@ -59,7 +59,17 @@
             <artifactId>slf4j-log4j12</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>0.10.0-SNAPSHOT</version>
+        </dependency>
 
+        <dependency>
+            <groupId>io.latent</groupId>
+            <artifactId>storm-rabbitmq</artifactId>
+            <version>0.5.10</version>
+        </dependency>
     </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/AiravataRabbitMQSpout.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/AiravataRabbitMQSpout.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/AiravataRabbitMQSpout.java
new file mode 100644
index 0000000..985bc83
--- /dev/null
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/AiravataRabbitMQSpout.java
@@ -0,0 +1,158 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.local;
+import backtype.storm.spout.Scheme;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import io.latent.storm.rabbitmq.*;
+import io.latent.storm.rabbitmq.config.ConsumerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A simple RabbitMQ spout that emits an anchored tuple stream (on the default stream). This can be used with
+ * Storm's guaranteed message processing.
+ *
+ * @author peter@latent.io
+ */
+public class AiravataRabbitMQSpout extends BaseRichSpout {
+
+    private final MessageScheme scheme;
+    private final Declarator declarator;
+
+    private transient Logger logger;
+    private transient RabbitMQConsumer consumer;
+    private transient SpoutOutputCollector collector;
+
+    public AiravataRabbitMQSpout(Scheme scheme) {
+        this(MessageScheme.Builder.from(scheme), new Declarator.NoOp());
+    }
+
+    public AiravataRabbitMQSpout(Scheme scheme, Declarator declarator) {
+        this(MessageScheme.Builder.from(scheme), declarator);
+    }
+
+    public AiravataRabbitMQSpout(MessageScheme scheme, Declarator declarator) {
+        this.scheme = scheme;
+        this.declarator = declarator;
+    }
+
+    @Override
+    public void open(final Map config,
+                     final TopologyContext context,
+                     final SpoutOutputCollector spoutOutputCollector) {
+        ConsumerConfig consumerConfig = ConsumerConfig.getFromStormConfig(config);
+
+        ErrorReporter reporter = new ErrorReporter() {
+            @Override
+            public void reportError(Throwable error) {
+                spoutOutputCollector.reportError(error);
+            }
+        };
+        consumer = loadConsumer(declarator, reporter, consumerConfig);
+        scheme.open(config, context);
+        consumer.open();
+        logger = LoggerFactory.getLogger(AiravataRabbitMQSpout.class);
+        collector = spoutOutputCollector;
+    }
+
+    protected RabbitMQConsumer loadConsumer(Declarator declarator,
+                                            ErrorReporter reporter,
+                                            ConsumerConfig config) {
+        return new RabbitMQConsumer(config.getConnectionConfig(),
+                config.getPrefetchCount(),
+                config.getQueueName(),
+                config.isRequeueOnFail(),
+                declarator,
+                reporter);
+    }
+
+    @Override
+    public void close() {
+        consumer.close();
+        scheme.close();
+        super.close();
+    }
+
+    @Override
+    public void nextTuple() {
+        Message message;
+        System.out.println("Waiting for messages !!!");
+        while ((message = consumer.nextMessage()) != Message.NONE) {
+            List<Object> tuple = extractTuple(message);
+            if (!tuple.isEmpty()) {
+                System.out.println(new String(message.getBody()));
+                emit(tuple, message, collector);
+            }
+        }
+    }
+
+    protected List<Integer> emit(List<Object> tuple,
+                                 Message message,
+                                 SpoutOutputCollector spoutOutputCollector) {
+        return spoutOutputCollector.emit(tuple, getDeliveryTag(message));
+    }
+
+    private List<Object> extractTuple(Message message) {
+        long deliveryTag = getDeliveryTag(message);
+        try {
+            List<Object> tuple = scheme.deserialize(message);
+            if (tuple != null && !tuple.isEmpty()) {
+                return tuple;
+            }
+            String errorMsg = "Deserialization error for msgId " + deliveryTag;
+            logger.warn(errorMsg);
+            collector.reportError(new Exception(errorMsg));
+        } catch (Exception e) {
+            logger.warn("Deserialization error for msgId " + deliveryTag, e);
+            collector.reportError(e);
+        }
+        // get the malformed message out of the way by dead-lettering (if dead-lettering is configured) and move on
+        consumer.deadLetter(deliveryTag);
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void ack(Object msgId) {
+        if (msgId instanceof Long) consumer.ack((Long) msgId);
+    }
+
+    @Override
+    public void fail(Object msgId) {
+        if (msgId instanceof Long) consumer.fail((Long) msgId);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+        outputFieldsDeclarer.declare(scheme.getOutputFields());
+    }
+
+    protected long getDeliveryTag(Message message) {
+        return ((Message.DeliveredMessage) message).getDeliveryTag();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/RandomSentenceSpout.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/RandomSentenceSpout.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/RandomSentenceSpout.java
new file mode 100644
index 0000000..ad53add
--- /dev/null
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/RandomSentenceSpout.java
@@ -0,0 +1,67 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.local;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+
+import java.util.Map;
+import java.util.Random;
+
+public class RandomSentenceSpout extends BaseRichSpout {
+    SpoutOutputCollector _collector;
+    Random _rand;
+
+
+    @Override
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+        _collector = collector;
+        _rand = new Random();
+    }
+
+    @Override
+    public void nextTuple() {
+        Utils.sleep(100);
+        String[] sentences = new String[]{ "echoExperiment_be59db00-020f-4d46-b157-05577ab5c065" };
+        String sentence = sentences[_rand.nextInt(sentences.length)];
+        System.out.println(sentence);
+        _collector.emit(new Values(sentence));
+    }
+
+    @Override
+    public void ack(Object id) {
+    }
+
+    @Override
+    public void fail(Object id) {
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("word"));
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/handler/LocalDirectorySetupBolt.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/handler/LocalDirectorySetupBolt.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/handler/LocalDirectorySetupBolt.java
new file mode 100644
index 0000000..92b6f9d
--- /dev/null
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/handler/LocalDirectorySetupBolt.java
@@ -0,0 +1,159 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.local.handler;
+
+import backtype.storm.topology.BasicOutputCollector;
+import backtype.storm.topology.FailedException;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseBasicBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import org.airavata.appcatalog.cpi.AppCatalog;
+import org.airavata.appcatalog.cpi.AppCatalogException;
+import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.gfac.local.utils.ExperimentModelUtil;
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
+import org.apache.airavata.model.workspace.experiment.*;
+import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+public class LocalDirectorySetupBolt extends BaseBasicBolt {
+    private final static Logger logger = LoggerFactory.getLogger(LocalDirectorySetupBolt.class);
+
+    Registry registry;
+
+    public LocalDirectorySetupBolt() throws RegistryException {
+        registry = RegistryFactory.getDefaultRegistry();
+    }
+
+    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
+        try {
+            Registry registry = RegistryFactory.getDefaultRegistry();
+            String Ids = tuple.getString(0);
+            String[] split = Ids.split(",");
+            if (split.length != 3) {
+                throw new FailedException("Wrong tuple given: " + Ids);
+            }
+            String gatewayId = split[0];
+            String expId = split[1];
+            List<TaskDetails> tasks = createTasks(expId);
+            String taskId = tasks.get(0).getTaskID();
+            TaskDetails taskData = (TaskDetails) registry.get(RegistryModelType.TASK_DETAIL, taskId);
+
+            String applicationInterfaceId = taskData.getApplicationId();
+            String applicationDeploymentId = taskData.getApplicationDeploymentId();
+            if (null == applicationInterfaceId) {
+                throw new FailedException("Error executing the job. The required Application Id is missing");
+            }
+            if (null == applicationDeploymentId) {
+                throw new FailedException("Error executing the job. The required Application deployment Id is missing");
+            }
+
+            AppCatalog appCatalog = AppCatalogFactory.getAppCatalog();
+            //fetch the compute resource, application interface and deployment information from app catalog
+            ApplicationInterfaceDescription applicationInterface = appCatalog.
+                    getApplicationInterface().getApplicationInterface(applicationInterfaceId);
+            ApplicationDeploymentDescription applicationDeployment = appCatalog.
+                    getApplicationDeployment().getApplicationDeployement(applicationDeploymentId);
+            ComputeResourceDescription computeResource = appCatalog.getComputeResource().
+                    getComputeResource(applicationDeployment.getComputeHostId());
+            ComputeResourcePreference gatewayResourcePreferences = appCatalog.getGatewayProfile().
+                    getComputeResourcePreference(gatewayId, applicationDeployment.getComputeHostId());
+
+            if (gatewayResourcePreferences == null) {
+                List<String> gatewayProfileIds = appCatalog.getGatewayProfile()
+                        .getGatewayProfileIds(gatewayId);
+                for (String profileId : gatewayProfileIds) {
+                    gatewayId = profileId;
+                    gatewayResourcePreferences = appCatalog.getGatewayProfile().
+                            getComputeResourcePreference(gatewayId, applicationDeployment.getComputeHostId());
+                    if (gatewayResourcePreferences != null) {
+                        break;
+                    }
+                }
+            }
+
+            String scratchLocation = gatewayResourcePreferences.getScratchLocation();
+            String workingDir = scratchLocation + File.separator + expId;
+            String inputDir = workingDir + File.separator + Constants.INPUT_DATA_DIR_VAR_NAME;
+            String outputDir = workingDir + File.separator + Constants.OUTPUT_DATA_DIR_VAR_NAME;
+            makeFileSystemDir(workingDir);
+            makeFileSystemDir(inputDir);
+            makeFileSystemDir(outputDir);
+
+            basicOutputCollector.emit(new Values(Ids, taskId));
+
+        } catch (RegistryException e) {
+            logger.error(e.getMessage(), e);
+            throw new FailedException(e);
+        } catch (AppCatalogException e) {
+            logger.error(e.getMessage(), e);
+            throw new FailedException(e);
+        } catch (GFacHandlerException e) {
+            logger.error(e.getMessage(), e);
+            throw new FailedException(e);
+        }
+    }
+
+    public List<TaskDetails> createTasks(String experimentId) throws RegistryException {
+        Experiment experiment = null;
+        List<TaskDetails> tasks = new ArrayList<TaskDetails>();
+        experiment = (Experiment) registry.get(RegistryModelType.EXPERIMENT, experimentId);
+
+
+        WorkflowNodeDetails iDontNeedaNode = ExperimentModelUtil.createWorkflowNode("IDontNeedaNode", null);
+        String nodeID = (String) registry.add(ChildDataType.WORKFLOW_NODE_DETAIL, iDontNeedaNode, experimentId);
+
+        TaskDetails taskDetails = ExperimentModelUtil.cloneTaskFromExperiment(experiment);
+        taskDetails.setTaskID((String) registry.add(ChildDataType.TASK_DETAIL, taskDetails, nodeID));
+        tasks.add(taskDetails);
+        return tasks;
+    }
+
+
+    private void makeFileSystemDir(String dir) throws GFacHandlerException {
+        File f = new File(dir);
+        if (f.isDirectory() && f.exists()) {
+            return;
+        } else if (!new File(dir).mkdir()) {
+            throw new GFacHandlerException("Cannot create directory " + dir);
+        }
+    }
+
+    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+        outputFieldsDeclarer.declare(new Fields("experimentId","taskId"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
index 9f055e9..59303ec 100644
--- a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
@@ -23,7 +23,6 @@ package org.apache.airavata.gfac.local.provider.impl;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
@@ -36,7 +35,6 @@ import org.apache.airavata.gfac.core.context.JobExecutionContext;
 import org.apache.airavata.gfac.core.notification.events.StartExecutionEvent;
 import org.apache.airavata.gfac.core.provider.AbstractProvider;
 import org.apache.airavata.gfac.core.provider.GFacProviderException;
-import org.apache.airavata.gfac.core.provider.utils.ProviderUtils;
 import org.apache.airavata.gfac.core.utils.GFacUtils;
 import org.apache.airavata.gfac.core.utils.OutputUtils;
 import org.apache.airavata.gfac.local.utils.InputStreamToFileWriter;

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProviderBolt.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProviderBolt.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProviderBolt.java
new file mode 100644
index 0000000..13954de
--- /dev/null
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProviderBolt.java
@@ -0,0 +1,283 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.local.provider.impl;
+
+import backtype.storm.topology.BasicOutputCollector;
+import backtype.storm.topology.FailedException;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseBasicBolt;
+import backtype.storm.tuple.Tuple;
+import org.airavata.appcatalog.cpi.AppCatalog;
+import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.utils.GFacUtils;
+import org.apache.airavata.gfac.local.utils.InputStreamToFileWriter;
+import org.apache.airavata.gfac.local.utils.InputUtils;
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+import org.apache.airavata.model.appcatalog.appdeployment.SetEnvPaths;
+import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
+import org.apache.airavata.model.workspace.experiment.*;
+import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.*;
+
+public class LocalProviderBolt extends BaseBasicBolt {
+    private final static Logger logger = LoggerFactory.getLogger(LocalProviderBolt.class);
+
+    private ProcessBuilder builder;
+    private List<String> cmdList;
+    private String jobId;
+    private Registry registry;
+
+    public static class LocalProviderJobData {
+        private String applicationName;
+        private List<String> inputParameters;
+        private String workingDir;
+        private String inputDir;
+        private String outputDir;
+
+        public String getApplicationName() {
+            return applicationName;
+        }
+
+        public void setApplicationName(String applicationName) {
+            this.applicationName = applicationName;
+        }
+
+        public List<String> getInputParameters() {
+            return inputParameters;
+        }
+
+        public void setInputParameters(List<String> inputParameters) {
+            this.inputParameters = inputParameters;
+        }
+
+        public String getWorkingDir() {
+            return workingDir;
+        }
+
+        public void setWorkingDir(String workingDir) {
+            this.workingDir = workingDir;
+        }
+
+        public String getInputDir() {
+            return inputDir;
+        }
+
+        public void setInputDir(String inputDir) {
+            this.inputDir = inputDir;
+        }
+
+        public String getOutputDir() {
+            return outputDir;
+        }
+
+        public void setOutputDir(String outputDir) {
+            this.outputDir = outputDir;
+        }
+    }
+
+    public LocalProviderBolt() throws RegistryException {
+        cmdList = new ArrayList<String>();
+        registry = RegistryFactory.getDefaultRegistry();
+    }
+
+    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
+        try {
+            registry = RegistryFactory.getDefaultRegistry();
+            String ids = tuple.getString(0);
+            String taskId = tuple.getString(1);
+
+            String[] split = ids.split(",");
+            if (split.length != 3) {
+                throw new FailedException("Wrong tuple given: " + ids);
+            }
+            String gatewayId = split[0];
+            String experimentId = split[1];
+
+            TaskDetails taskData = (TaskDetails) registry.get(RegistryModelType.TASK_DETAIL, taskId);
+
+            String applicationInterfaceId = taskData.getApplicationId();
+            String applicationDeploymentId = taskData.getApplicationDeploymentId();
+            if (null == applicationInterfaceId) {
+                throw new FailedException("Error executing the job. The required Application Id is missing");
+            }
+            if (null == applicationDeploymentId) {
+                throw new FailedException("Error executing the job. The required Application deployment Id is missing");
+            }
+
+            AppCatalog appCatalog = AppCatalogFactory.getAppCatalog();
+            //fetch the compute resource, application interface and deployment information from app catalog
+            ApplicationInterfaceDescription applicationInterface = appCatalog.
+                    getApplicationInterface().getApplicationInterface(applicationInterfaceId);
+            ApplicationDeploymentDescription applicationDeployment = appCatalog.
+                    getApplicationDeployment().getApplicationDeployement(applicationDeploymentId);
+            ComputeResourceDescription computeResource = appCatalog.getComputeResource().
+                    getComputeResource(applicationDeployment.getComputeHostId());
+            ComputeResourcePreference gatewayResourcePreferences = appCatalog.getGatewayProfile().
+                    getComputeResourcePreference(gatewayId, applicationDeployment.getComputeHostId());
+
+            if (gatewayResourcePreferences == null) {
+                List<String> gatewayProfileIds = appCatalog.getGatewayProfile()
+                        .getGatewayProfileIds(gatewayId);
+                for (String profileId : gatewayProfileIds) {
+                    gatewayId = profileId;
+                    gatewayResourcePreferences = appCatalog.getGatewayProfile().
+                            getComputeResourcePreference(gatewayId, applicationDeployment.getComputeHostId());
+                    if (gatewayResourcePreferences != null) {
+                        break;
+                    }
+                }
+            }
+            String executablePath = applicationDeployment.getExecutablePath();
+
+
+            buildCommand(applicationDeployment, taskId);
+            initProcessBuilder(applicationDeployment);
+
+            // extra environment variables
+            String workingDir = "/tmp";
+            workingDir = workingDir + experimentId + File.separator + org.apache.airavata.gfac.Constants.INPUT_DATA_DIR_VAR_NAME;
+            String stdOutFile = workingDir + File.separator + applicationInterface.getApplicationName().replaceAll("\\s+", "") + ".stdout";
+            String stdErrFile = workingDir + File.separator + applicationInterface.getApplicationName().replaceAll("\\s+", "") + ".stderr";
+
+            builder.environment().put(org.apache.airavata.gfac.Constants.INPUT_DATA_DIR_VAR_NAME, stdOutFile);
+            builder.environment().put(org.apache.airavata.gfac.Constants.OUTPUT_DATA_DIR_VAR_NAME, stdErrFile);
+
+            // set working directory
+            builder.directory(new File(workingDir));
+
+            // log info
+            logger.info("Command = " + InputUtils.buildCommand(cmdList));
+            logger.info("Working dir = " + builder.directory());
+            for (String key : builder.environment().keySet()) {
+                logger.info("Env[" + key + "] = " + builder.environment().get(key));
+            }
+
+
+            JobDetails jobDetails = new JobDetails();
+            jobId = taskData.getTaskID();
+            jobDetails.setJobID(jobId);
+            jobDetails.setJobDescription(applicationDeployment.getAppDeploymentDescription());
+            GFacUtils.saveJobStatus(registry, jobDetails, taskId, JobState.SETUP);
+            // running cmd
+            Process process = builder.start();
+
+            Thread standardOutWriter = new InputStreamToFileWriter(process.getInputStream(), stdOutFile);
+            Thread standardErrorWriter = new InputStreamToFileWriter(process.getErrorStream(), stdErrFile);
+
+            // start output threads
+            standardOutWriter.setDaemon(true);
+            standardErrorWriter.setDaemon(true);
+            standardOutWriter.start();
+            standardErrorWriter.start();
+
+            int returnValue = process.waitFor();
+
+            // make sure other two threads are done
+            standardOutWriter.join();
+            standardErrorWriter.join();
+
+            /*
+             * check return value. usually not very helpful to draw conclusions based on return values so don't bother.
+             * just provide warning in the log messages
+             */
+            if (returnValue != 0) {
+                logger.error("Process finished with non zero return value. Process may have failed");
+            } else {
+                logger.info("Process finished with return value of zero.");
+            }
+
+            StringBuffer buf = new StringBuffer();
+            buf.append("Executed ").append(InputUtils.buildCommand(cmdList))
+                    .append(" on the localHost, working directory = ").append(workingDir)
+                    .append(" tempDirectory = ").append(workingDir).append(" With the status ")
+                    .append(String.valueOf(returnValue));
+
+            logger.info(buf.toString());
+        }catch (Exception e){
+            e.printStackTrace();
+        }
+    }
+
+    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+
+    }
+
+
+    private void buildCommand(ApplicationDeploymentDescription applicationDeploymentDescription, String taskID) throws RegistryException, GFacException {
+        cmdList.add(applicationDeploymentDescription.getExecutablePath());
+        String appModuleId = applicationDeploymentDescription.getAppModuleId();
+        TaskDetails taskData = (TaskDetails) registry.get(RegistryModelType.TASK_DETAIL, taskID);
+
+        List<InputDataObjectType> taskInputs = taskData.getApplicationInputs();
+        Map<String, Object> inputParamMap = GFacUtils.getInputParamMap(taskInputs);
+
+        // sort the inputs first and then build the command List
+        Comparator<InputDataObjectType> inputOrderComparator = new Comparator<InputDataObjectType>() {
+            @Override
+            public int compare(InputDataObjectType inputDataObjectType, InputDataObjectType t1) {
+                return inputDataObjectType.getInputOrder() - t1.getInputOrder();
+            }
+        };
+        Set<InputDataObjectType> sortedInputSet = new TreeSet<InputDataObjectType>(inputOrderComparator);
+        for (Object object : inputParamMap.values()) {
+            if (object instanceof InputDataObjectType) {
+                InputDataObjectType inputDOT = (InputDataObjectType) object;
+                sortedInputSet.add(inputDOT);
+            }
+        }
+        for (InputDataObjectType inputDataObjectType : sortedInputSet) {
+            if (inputDataObjectType.getApplicationArgument() != null
+                    && !inputDataObjectType.getApplicationArgument().equals("")) {
+                cmdList.add(inputDataObjectType.getApplicationArgument());
+            }
+
+            if (inputDataObjectType.getValue() != null
+                    && !inputDataObjectType.getValue().equals("")) {
+                cmdList.add(inputDataObjectType.getValue());
+            }
+        }
+
+    }
+
+    private void initProcessBuilder(ApplicationDeploymentDescription app) {
+        builder = new ProcessBuilder(cmdList);
+
+        List<SetEnvPaths> setEnvironment = app.getSetEnvironment();
+        if (setEnvironment != null) {
+            for (SetEnvPaths envPath : setEnvironment) {
+                Map<String, String> builderEnv = builder.environment();
+                builderEnv.put(envPath.getName(), envPath.getValue());
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/CustomStormDeclarator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/CustomStormDeclarator.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/CustomStormDeclarator.java
new file mode 100644
index 0000000..c5568de
--- /dev/null
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/CustomStormDeclarator.java
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.local.utils;
+
+import com.rabbitmq.client.Channel;
+import io.latent.storm.rabbitmq.Declarator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class CustomStormDeclarator implements Declarator {
+    private final static Logger logger = LoggerFactory.getLogger(CustomStormDeclarator.class);
+
+    private final String exchange;
+    private final String queue;
+    private final String routingKey;
+
+    public CustomStormDeclarator(String exchange, String queue) {
+        this(exchange, queue, "");
+    }
+
+    public CustomStormDeclarator(String exchange, String queue, String routingKey) {
+        this.exchange = exchange;
+        this.queue = queue;
+        this.routingKey = routingKey;
+    }
+
+    @Override
+    public void execute(Channel channel) {
+        // you're given a RabbitMQ Channel so you're free to wire up your exchange/queue bindings as you see fit
+        try {
+            Map<String, Object> args = new HashMap<String,Object>();
+            channel.queueDeclare(queue, true, false, false, args);
+            channel.exchangeDeclare(exchange, "topic", true);
+            channel.queueBind(queue, exchange, routingKey);
+        } catch (IOException e) {
+            throw new RuntimeException("Error executing rabbitmq declarations.", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/ExperimentLauncher.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/ExperimentLauncher.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/ExperimentLauncher.java
new file mode 100644
index 0000000..69466c6
--- /dev/null
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/ExperimentLauncher.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.local.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ExperimentLauncher {
+    private final static Logger logger = LoggerFactory.getLogger(ExperimentLauncher.class);
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/ExperimentModelUtil.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/ExperimentModelUtil.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/ExperimentModelUtil.java
new file mode 100644
index 0000000..c14ea48
--- /dev/null
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/ExperimentModelUtil.java
@@ -0,0 +1,187 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.local.utils;
+
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.model.workspace.experiment.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Calendar;
+import java.util.List;
+
+
+public class ExperimentModelUtil {
+
+    public static WorkflowNodeStatus createWorkflowNodeStatus(WorkflowNodeState state){
+        WorkflowNodeStatus status = new WorkflowNodeStatus();
+        status.setWorkflowNodeState(state);
+        status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+        return status;
+    }
+
+    public static Experiment createSimpleExperiment(String projectID,
+                                                    String userName,
+                                                    String experimentName,
+                                                    String expDescription,
+                                                    String applicationId,
+                                                    List<InputDataObjectType> experimentInputList) {
+        Experiment experiment = new Experiment();
+        experiment.setProjectID(projectID);
+        experiment.setUserName(userName);
+        experiment.setName(experimentName);
+        experiment.setDescription(expDescription);
+        experiment.setApplicationId(applicationId);
+        experiment.setExperimentInputs(experimentInputList);
+        return experiment;
+    }
+
+
+    public static ComputationalResourceScheduling createComputationResourceScheduling(String resourceHostId,
+                                                                                      int cpuCount,
+                                                                                      int nodeCount,
+                                                                                      int numberOfThreads,
+                                                                                      String queueName,
+                                                                                      int wallTimeLimit,
+                                                                                      long jobstartTime,
+                                                                                      int totalPhysicalMemory,
+                                                                                      String projectAccount) {
+
+        ComputationalResourceScheduling cmRS = new ComputationalResourceScheduling();
+        cmRS.setResourceHostId(resourceHostId);
+        cmRS.setTotalCPUCount(cpuCount);
+        cmRS.setNodeCount(nodeCount);
+        cmRS.setNumberOfThreads(numberOfThreads);
+        cmRS.setQueueName(queueName);
+        cmRS.setWallTimeLimit(wallTimeLimit);
+        cmRS.setJobStartTime((int) jobstartTime);
+        cmRS.setTotalPhysicalMemory(totalPhysicalMemory);
+        cmRS.setComputationalProjectAccount(projectAccount);
+        return cmRS;
+    }
+
+    public static AdvancedInputDataHandling createAdvancedInputHandling(boolean stageInputFilesToWorkingDir,
+                                                                        String parentWorkingDir,
+                                                                        String uniqueWorkingDir,
+                                                                        boolean cleanupAfterJob) {
+        AdvancedInputDataHandling inputDataHandling = new AdvancedInputDataHandling();
+        inputDataHandling.setStageInputFilesToWorkingDir(stageInputFilesToWorkingDir);
+        inputDataHandling.setParentWorkingDirectory(parentWorkingDir);
+        inputDataHandling.setUniqueWorkingDirectory(uniqueWorkingDir);
+        inputDataHandling.setCleanUpWorkingDirAfterJob(cleanupAfterJob);
+        return inputDataHandling;
+    }
+
+    public static AdvancedOutputDataHandling createAdvancedOutputDataHandling(String outputDatadir,
+                                                                              String dataRegUrl,
+                                                                              boolean persistOutput) {
+        AdvancedOutputDataHandling outputDataHandling = new AdvancedOutputDataHandling();
+        outputDataHandling.setOutputDataDir(outputDatadir);
+        outputDataHandling.setDataRegistryURL(dataRegUrl);
+        outputDataHandling.setPersistOutputData(persistOutput);
+        return outputDataHandling;
+    }
+
+    public static QualityOfServiceParams createQOSParams(String startExecutionAt,
+                                                         String executeBefore,
+                                                         int numberOfRetires) {
+        QualityOfServiceParams qosParams = new QualityOfServiceParams();
+        qosParams.setStartExecutionAt(startExecutionAt);
+        qosParams.setExecuteBefore(executeBefore);
+        qosParams.setNumberofRetries(numberOfRetires);
+        return qosParams;
+    }
+
+    public static TaskDetails cloneTaskFromExperiment (Experiment experiment){
+        TaskDetails taskDetails = new TaskDetails();
+        taskDetails.setCreationTime(experiment.getCreationTime());
+        taskDetails.setApplicationId(experiment.getApplicationId());
+        taskDetails.setApplicationVersion(experiment.getApplicationVersion());
+        List<InputDataObjectType> experimentInputs = experiment.getExperimentInputs();
+        if (experimentInputs != null){
+            taskDetails.setApplicationInputs(experimentInputs);
+        }
+
+        List<OutputDataObjectType> experimentOutputs = experiment.getExperimentOutputs();
+        if (experimentOutputs != null){
+            taskDetails.setApplicationOutputs(experimentOutputs);
+        }
+
+        UserConfigurationData configData = experiment.getUserConfigurationData();
+        if (configData != null){
+            ComputationalResourceScheduling scheduling = configData.getComputationalResourceScheduling();
+            if (scheduling != null){
+                taskDetails.setTaskScheduling(scheduling);
+            }
+            AdvancedInputDataHandling advanceInputDataHandling = configData.getAdvanceInputDataHandling();
+            if (advanceInputDataHandling != null){
+                taskDetails.setAdvancedInputDataHandling(advanceInputDataHandling);
+            }
+            AdvancedOutputDataHandling outputHandling = configData.getAdvanceOutputDataHandling();
+            if (outputHandling != null){
+                taskDetails.setAdvancedOutputDataHandling(outputHandling);
+            }
+        }
+        return taskDetails;
+    }
+
+    public static TaskDetails cloneTaskFromWorkflowNodeDetails(Experiment experiment, WorkflowNodeDetails nodeDetails){
+        TaskDetails taskDetails = new TaskDetails();
+        taskDetails.setCreationTime(nodeDetails.getCreationTime());
+//        String[] split = ;
+        taskDetails.setApplicationId(nodeDetails.getExecutionUnitData());
+//        taskDetails.setApplicationVersion(split[1]);
+        List<InputDataObjectType> experimentInputs = nodeDetails.getNodeInputs();
+        if (experimentInputs != null){
+            taskDetails.setApplicationInputs(experimentInputs);
+        }
+
+        List<OutputDataObjectType> experimentOutputs = nodeDetails.getNodeOutputs();
+        if (experimentOutputs != null){
+            taskDetails.setApplicationOutputs(experimentOutputs);
+        }
+
+        UserConfigurationData configData = experiment.getUserConfigurationData();
+        if (configData != null){
+            ComputationalResourceScheduling scheduling = configData.getComputationalResourceScheduling();
+            if (scheduling != null){
+                taskDetails.setTaskScheduling(scheduling);
+            }
+            AdvancedInputDataHandling advanceInputDataHandling = configData.getAdvanceInputDataHandling();
+            if (advanceInputDataHandling != null){
+                taskDetails.setAdvancedInputDataHandling(advanceInputDataHandling);
+            }
+            AdvancedOutputDataHandling outputHandling = configData.getAdvanceOutputDataHandling();
+            if (outputHandling != null){
+                taskDetails.setAdvancedOutputDataHandling(outputHandling);
+            }
+        }
+        return taskDetails;
+    }
+    public static WorkflowNodeDetails createWorkflowNode (String nodeName,
+                                                          List<InputDataObjectType> nodeInputs){
+        WorkflowNodeDetails wfnod = new WorkflowNodeDetails();
+        wfnod.setNodeName(nodeName);
+        wfnod.setNodeInputs(nodeInputs);
+        return wfnod;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/MessageScheme.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/MessageScheme.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/MessageScheme.java
new file mode 100644
index 0000000..9a69a13
--- /dev/null
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/MessageScheme.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.local.utils;
+
+import backtype.storm.spout.Scheme;
+import backtype.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class MessageScheme implements Scheme {
+    private final static Logger logger = LoggerFactory.getLogger(MessageScheme.class);
+
+    @Override
+    public List<Object> deserialize(byte[] bytes) {
+        ArrayList<Object> message = new ArrayList<Object>();
+        message.add(new String(bytes));
+
+        return message;
+    }
+
+    @Override
+    public Fields getOutputFields() {
+        return new Fields("experiment");
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/GfacTopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/GfacTopologyBuilderTest.java b/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/GfacTopologyBuilderTest.java
new file mode 100644
index 0000000..32f9479
--- /dev/null
+++ b/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/GfacTopologyBuilderTest.java
@@ -0,0 +1,112 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.core.gfac.services.impl;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.topology.TopologyBuilder;
+import com.rabbitmq.client.ConnectionFactory;
+import io.latent.storm.rabbitmq.Declarator;
+import io.latent.storm.rabbitmq.config.ConnectionConfig;
+import io.latent.storm.rabbitmq.config.ConsumerConfig;
+import io.latent.storm.rabbitmq.config.ConsumerConfigBuilder;
+import org.apache.airavata.gfac.local.AiravataRabbitMQSpout;
+import org.apache.airavata.gfac.local.RandomSentenceSpout;
+import org.apache.airavata.gfac.local.handler.LocalDirectorySetupBolt;
+import org.apache.airavata.gfac.local.provider.impl.LocalProviderBolt;
+import org.apache.airavata.gfac.local.utils.CustomStormDeclarator;
+import org.apache.airavata.gfac.local.utils.MessageScheme;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GfacTopologyBuilderTest {
+    private final static Logger logger = LoggerFactory.getLogger(GfacTopologyBuilderTest.class);
+
+
+    public static void main(String[] args) throws RegistryException, InterruptedException {
+        TopologyBuilder builder = new TopologyBuilder();
+        ConnectionConfig connectionConfig = new ConnectionConfig("localhost", 5672, "guest", "guest", ConnectionFactory.DEFAULT_VHOST, 10); // host, port, username, password, virtualHost, heartBeat
+        ConsumerConfig spoutConfig = new ConsumerConfigBuilder().connection(connectionConfig)
+                .queue("gfac.submit")
+                .prefetch(200)
+                .requeueOnFail()
+                .build();
+
+//
+        MessageScheme messageScheme = new MessageScheme();
+        Declarator declarator = new CustomStormDeclarator("airavata_rabbitmq_exchange", "gfac.submit", "*");
+
+        builder.setSpout("spout", new AiravataRabbitMQSpout(messageScheme,declarator), 5).addConfigurations(spoutConfig.asMap())
+                .setMaxSpoutPending(200);;
+//        builder.setSpout("spout", new RandomSentenceSpout(), 5);
+
+        builder.setBolt("directoryset", new LocalDirectorySetupBolt(), 8).shuffleGrouping("spout").setNumTasks(16);
+        builder.setBolt("count", new LocalProviderBolt(), 12).shuffleGrouping("directoryset");
+        Config conf = new Config();
+        conf.setDebug(true);
+
+
+        conf.setNumWorkers(3);
+
+
+        LocalCluster cluster = new LocalCluster();
+        cluster.submitTopology("word-count", conf, builder.createTopology());
+
+        Thread.sleep(10000000);
+
+//        cluster.shutdown();
+    }
+
+    public void testTopology() throws RegistryException, InterruptedException {
+        TopologyBuilder builder = new TopologyBuilder();
+        ConnectionConfig connectionConfig = new ConnectionConfig("localhost", 5672, "guest", "guest", ConnectionFactory.DEFAULT_VHOST, 10); // host, port, username, password, virtualHost, heartBeat
+        ConsumerConfig spoutConfig = new ConsumerConfigBuilder().connection(connectionConfig)
+                .queue("gfac.submit")
+                .prefetch(200)
+                .requeueOnFail()
+                .build();
+
+//
+        MessageScheme messageScheme = new MessageScheme();
+        Declarator declarator = new CustomStormDeclarator("airavata_rabbitmq_exchange", "gfac.submit", "*");
+
+        builder.setSpout("spout", new AiravataRabbitMQSpout(messageScheme,declarator), 5).addConfigurations(spoutConfig.asMap())
+                .setMaxSpoutPending(200);;
+
+        builder.setBolt("directoryset", new LocalDirectorySetupBolt(), 8).shuffleGrouping("spout").setNumTasks(16);
+        builder.setBolt("count", new LocalProviderBolt(), 12).shuffleGrouping("directoryset");
+        Config conf = new Config();
+        conf.setDebug(true);
+
+
+        conf.setNumWorkers(3);
+
+
+        LocalCluster cluster = new LocalCluster();
+        cluster.submitTopology("word-count", conf, builder.createTopology());
+
+        Thread.sleep(10000000);
+
+//        cluster.shutdown();
+    }
+}