You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2015/01/26 17:30:50 UTC

[1/2] airavata git commit: creating gfac storm version

Repository: airavata
Updated Branches:
  refs/heads/gfac-storm [created] 69d178214


http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java b/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
deleted file mode 100644
index aeb8158..0000000
--- a/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
+++ /dev/null
@@ -1,184 +0,0 @@
-///*
-// *
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *   http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing,
-// * software distributed under the License is distributed on an
-// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// * KIND, either express or implied.  See the License for the
-// * specific language governing permissions and limitations
-// * under the License.
-// *
-//*/
-//package org.apache.airavata.core.gfac.services.impl;
-//
-//import java.io.File;
-//import java.net.URL;
-//import java.util.ArrayList;
-//import java.util.List;
-//
-//import org.apache.airavata.common.utils.MonitorPublisher;
-//import org.apache.airavata.commons.gfac.type.ActualParameter;
-//import org.apache.airavata.commons.gfac.type.ApplicationDescription;
-//import org.apache.airavata.commons.gfac.type.HostDescription;
-//import org.apache.airavata.commons.gfac.type.ServiceDescription;
-//import org.apache.airavata.gfac.GFacConfiguration;
-//import org.apache.airavata.gfac.GFacException;
-//import org.apache.airavata.gfac.core.context.ApplicationContext;
-//import org.apache.airavata.gfac.core.context.JobExecutionContext;
-//import org.apache.airavata.gfac.core.context.MessageContext;
-//import org.apache.airavata.gfac.core.provider.GFacProviderException;
-//import org.apache.airavata.gfac.local.handler.LocalDirectorySetupHandler;
-//import org.apache.airavata.gfac.local.provider.impl.LocalProvider;
-//import org.apache.airavata.model.workspace.experiment.ExecutionUnit;
-//import org.apache.airavata.model.workspace.experiment.Experiment;
-//import org.apache.airavata.model.workspace.experiment.TaskDetails;
-//import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
-//import org.apache.airavata.persistance.registry.jpa.impl.LoggingRegistryImpl;
-//import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
-//import org.apache.airavata.schemas.gfac.InputParameterType;
-//import org.apache.airavata.schemas.gfac.OutputParameterType;
-//import org.apache.airavata.schemas.gfac.StringParameterType;
-//import org.apache.commons.lang.SystemUtils;
-//import org.testng.annotations.BeforeTest;
-//import org.testng.annotations.Test;
-//
-//import com.google.common.eventbus.EventBus;
-//
-//public class LocalProviderTest {
-//    private JobExecutionContext jobExecutionContext;
-//    @BeforeTest
-//    public void setUp() throws Exception {
-//
-//        URL resource = this.getClass().getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
-//        File configFile = new File(resource.getPath());
-//        GFacConfiguration gFacConfiguration = GFacConfiguration.create(configFile, null);
-//        //have to set InFlwo Handlers and outFlowHandlers
-//        ApplicationContext applicationContext = new ApplicationContext();
-//        HostDescription host = new HostDescription();
-//        host.getType().setHostName("localhost");
-//        host.getType().setHostAddress("localhost");
-//        applicationContext.setHostDescription(host);
-//        /*
-//           * App
-//           */
-//        ApplicationDescription appDesc = new ApplicationDescription();
-//        ApplicationDeploymentDescriptionType app = appDesc.getType();
-//        ApplicationDeploymentDescriptionType.ApplicationName name = ApplicationDeploymentDescriptionType.ApplicationName.Factory.newInstance();
-//        name.setStringValue("EchoLocal");
-//        app.setApplicationName(name);
-//
-//        /*
-//           * Use bat file if it is compiled on Windows
-//           */
-//        if (SystemUtils.IS_OS_WINDOWS) {
-//            URL url = this.getClass().getClassLoader().getResource("echo.bat");
-//            app.setExecutableLocation(url.getFile());
-//        } else {
-//            //for unix and Mac
-//            app.setExecutableLocation("/bin/echo");
-//        }
-//
-//        /*
-//           * Default tmp location
-//           */
-//        String tempDir = System.getProperty("java.io.tmpdir");
-//        if (tempDir == null) {
-//            tempDir = "/tmp";
-//        }
-//
-//        app.setScratchWorkingDirectory(tempDir);
-//        app.setStaticWorkingDirectory(tempDir);
-//        app.setInputDataDirectory(tempDir + File.separator + "input");
-//        app.setOutputDataDirectory(tempDir + File.separator + "output");
-//        app.setStandardOutput(tempDir + File.separator + "echo.stdout");
-//        app.setStandardError(tempDir + File.separator + "echo.stderr");
-//
-//        applicationContext.setApplicationDeploymentDescription(appDesc);
-//
-//        /*
-//           * Service
-//           */
-//        ServiceDescription serv = new ServiceDescription();
-//        serv.getType().setName("SimpleEcho");
-//
-//        List<InputParameterType> inputList = new ArrayList<InputParameterType>();
-//        InputParameterType input = InputParameterType.Factory.newInstance();
-//        input.setParameterName("echo_input");
-//        input.setParameterType(StringParameterType.Factory.newInstance());
-//        inputList.add(input);
-//        InputParameterType[] inputParamList = inputList.toArray(new InputParameterType[inputList
-//                .size()]);
-//
-//        List<OutputParameterType> outputList = new ArrayList<OutputParameterType>();
-//        OutputParameterType output = OutputParameterType.Factory.newInstance();
-//        output.setParameterName("echo_output");
-//        output.setParameterType(StringParameterType.Factory.newInstance());
-//        outputList.add(output);
-//        OutputParameterType[] outputParamList = outputList
-//                .toArray(new OutputParameterType[outputList.size()]);
-//
-//        serv.getType().setInputParametersArray(inputParamList);
-//        serv.getType().setOutputParametersArray(outputParamList);
-//
-//        jobExecutionContext = new JobExecutionContext(gFacConfiguration, serv.getType().getName());
-//        jobExecutionContext.setApplicationContext(applicationContext);
-//        /*
-//        * Host
-//        */
-//        applicationContext.setServiceDescription(serv);
-//
-//        MessageContext inMessage = new MessageContext();
-//        ActualParameter echo_input = new ActualParameter();
-//        ((StringParameterType) echo_input.getType()).setValue("echo_output=hello");
-//        inMessage.addParameter("echo_input", echo_input);
-//
-//        jobExecutionContext.setInMessageContext(inMessage);
-//
-//        MessageContext outMessage = new MessageContext();
-//        ActualParameter echo_out = new ActualParameter();
-//        outMessage.addParameter("echo_output", echo_out);
-//
-//        jobExecutionContext.setOutMessageContext(outMessage);
-//
-//        jobExecutionContext.setExperimentID("test123");
-//        jobExecutionContext.setExperiment(new Experiment("test123","project1","admin","testExp"));
-//        jobExecutionContext.setTaskData(new TaskDetails(jobExecutionContext.getExperimentID()));
-//        jobExecutionContext.setRegistry(new LoggingRegistryImpl());
-//        jobExecutionContext.setWorkflowNodeDetails(new WorkflowNodeDetails(jobExecutionContext.getExperimentID(),"none", ExecutionUnit.APPLICATION));
-//
-//
-//    }
-//
-//    @Test
-//    public void testLocalDirectorySetupHandler() throws GFacException {
-//        LocalDirectorySetupHandler localDirectorySetupHandler = new LocalDirectorySetupHandler();
-//        localDirectorySetupHandler.invoke(jobExecutionContext);
-//
-//        ApplicationDescription applicationDeploymentDescription = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription();
-//        ApplicationDeploymentDescriptionType app = applicationDeploymentDescription.getType();
-//        junit.framework.Assert.assertTrue(new File(app.getStaticWorkingDirectory()).exists());
-//        junit.framework.Assert.assertTrue(new File(app.getScratchWorkingDirectory()).exists());
-//        junit.framework.Assert.assertTrue(new File(app.getInputDataDirectory()).exists());
-//        junit.framework.Assert.assertTrue(new File(app.getOutputDataDirectory()).exists());
-//    }
-//
-//    @Test
-//    public void testLocalProvider() throws GFacException,GFacProviderException {
-//        LocalDirectorySetupHandler localDirectorySetupHandler = new LocalDirectorySetupHandler();
-//        localDirectorySetupHandler.invoke(jobExecutionContext);
-//        LocalProvider localProvider = new LocalProvider();
-//        localProvider.setMonitorPublisher(new MonitorPublisher(new EventBus()));
-//        localProvider.initialize(jobExecutionContext);
-//        localProvider.execute(jobExecutionContext);
-//        localProvider.dispose(jobExecutionContext);
-//    }
-//}

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/WordCountTopologyTest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/WordCountTopologyTest.java b/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/WordCountTopologyTest.java
new file mode 100644
index 0000000..914c9b8
--- /dev/null
+++ b/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/WordCountTopologyTest.java
@@ -0,0 +1,135 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.core.gfac.services.impl;
+
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.task.ShellBolt;
+import backtype.storm.topology.BasicOutputCollector;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseBasicBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import org.apache.airavata.gfac.local.RandomSentenceSpout;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This topology demonstrates Storm's stream groupings and multilang capabilities.
+ */
+public class WordCountTopologyTest {
+    public static class SplitSentence extends ShellBolt implements IRichBolt {
+
+        public SplitSentence() {
+            super("python", "splitsentence.py");
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word"));
+        }
+
+        @Override
+        public Map<String, Object> getComponentConfiguration() {
+            return null;
+        }
+    }
+
+    public static class WordCount extends BaseBasicBolt {
+        Map<String, Integer> counts = new HashMap<String, Integer>();
+
+        @Override
+        public void execute(Tuple tuple, BasicOutputCollector collector) {
+            String word = tuple.getString(0);
+            Integer count = counts.get(word);
+            if (count == null)
+                count = 0;
+            count++;
+            counts.put(word, count);
+            collector.emit(new Values(word, count));
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word", "count"));
+        }
+    }
+
+
+    public static void main(String[] args) {
+
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.setSpout("spout", new RandomSentenceSpout(), 5);
+
+        builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
+
+        Config conf = new Config();
+        conf.setDebug(true);
+
+
+        conf.setMaxTaskParallelism(3);
+
+        LocalCluster cluster = new LocalCluster();
+        cluster.submitTopology("word-count", conf, builder.createTopology());
+
+        try {
+            Thread.sleep(10000);
+        } catch (InterruptedException e) {
+        }
+
+        cluster.shutdown();
+    }
+
+    @Test
+    public void testTopology() {
+
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.setSpout("spout", new RandomSentenceSpout(), 5);
+
+        builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
+        builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
+
+        Config conf = new Config();
+        conf.setDebug(true);
+
+
+        conf.setMaxTaskParallelism(3);
+
+        LocalCluster cluster = new LocalCluster();
+        cluster.submitTopology("word-count", conf, builder.createTopology());
+
+        try {
+            Thread.sleep(10000);
+        } catch (InterruptedException e) {
+        }
+
+        cluster.shutdown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/pom.xml b/modules/gfac/pom.xml
index e85ef7d..e8c5f1d 100644
--- a/modules/gfac/pom.xml
+++ b/modules/gfac/pom.xml
@@ -32,17 +32,7 @@
             </activation>
             <modules>
                 <module>gfac-core</module>
-                <module>gfac-ec2</module>
-                <module>gfac-ssh</module>
-                <module>gfac-local</module>
-                <!--<module>gfac-hadoop</module>-->
-                <!--<module>gfac-gram</module>-->
-                <module>gfac-gsissh</module>
-                <module>gfac-bes</module>
-                <module>gfac-monitor</module>
-                <module>airavata-gfac-service</module>
-                <module>airavata-gfac-stubs</module>
-                <module>gfac-application-specific-handlers</module>
+		<module>storm-starter</module>
             </modules>
         </profile>
     </profiles>

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/orchestrator/orchestrator-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/pom.xml b/modules/orchestrator/orchestrator-core/pom.xml
index 691d288..a72ae7b 100644
--- a/modules/orchestrator/orchestrator-core/pom.xml
+++ b/modules/orchestrator/orchestrator-core/pom.xml
@@ -113,7 +113,7 @@ the License. -->
         <dependency>
             <groupId>org.testng</groupId>
             <artifactId>testng</artifactId>
-            <version>6.1.1</version>
+            <version>6.8.5</version>
             <scope>test</scope>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/ExperimentRegistry.java
----------------------------------------------------------------------
diff --git a/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/ExperimentRegistry.java b/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/ExperimentRegistry.java
index 5768a0d..ff4402b 100644
--- a/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/ExperimentRegistry.java
+++ b/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/ExperimentRegistry.java
@@ -38,10 +38,11 @@ import org.apache.airavata.registry.cpi.RegistryModelType;
 import org.apache.airavata.registry.cpi.utils.Constants;
 import org.apache.airavata.registry.cpi.utils.StatusType;
 
+import java.io.Serializable;
 import java.sql.Timestamp;
 import java.util.*;
 
-public class ExperimentRegistry {
+public class ExperimentRegistry implements Serializable{
     private GatewayResource gatewayResource;
     private WorkerResource workerResource;
     private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(ExperimentRegistry.class);

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/ProjectRegistry.java
----------------------------------------------------------------------
diff --git a/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/ProjectRegistry.java b/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/ProjectRegistry.java
index d85cd83..f959356 100644
--- a/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/ProjectRegistry.java
+++ b/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/ProjectRegistry.java
@@ -21,6 +21,7 @@
 
 package org.apache.airavata.persistance.registry.jpa.impl;
 
+import java.io.Serializable;
 import java.util.*;
 
 import org.apache.airavata.common.utils.AiravataUtils;
@@ -34,7 +35,7 @@ import org.apache.airavata.registry.cpi.utils.Constants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ProjectRegistry {
+public class ProjectRegistry implements Serializable {
     private GatewayResource gatewayResource;
     private WorkerResource workerResource;
     private final static Logger logger = LoggerFactory.getLogger(ProjectRegistry.class);

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/RegistryImpl.java
----------------------------------------------------------------------
diff --git a/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/RegistryImpl.java b/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/RegistryImpl.java
index a76bb5d..f619e1f 100644
--- a/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/RegistryImpl.java
+++ b/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/RegistryImpl.java
@@ -33,11 +33,12 @@ import org.apache.airavata.registry.cpi.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-public class RegistryImpl implements Registry {
+public class RegistryImpl implements Registry,Serializable {
     private GatewayResource gatewayResource;
     private UserResource user;
     private final static Logger logger = LoggerFactory.getLogger(RegistryImpl.class);
@@ -62,6 +63,7 @@ public class RegistryImpl implements Registry {
             experimentRegistry = new ExperimentRegistry(gatewayResource, user);
             projectRegistry = new ProjectRegistry(gatewayResource, user);
         } catch (ApplicationSettingsException e) {
+            e.printStackTrace();
             logger.error("Unable to read airavata server properties..", e);
         }
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/resources/AbstractResource.java
----------------------------------------------------------------------
diff --git a/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/resources/AbstractResource.java b/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/resources/AbstractResource.java
index 449c072..708fe35 100644
--- a/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/resources/AbstractResource.java
+++ b/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/resources/AbstractResource.java
@@ -20,6 +20,7 @@
  */
 package org.apache.airavata.persistance.registry.jpa.resources;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -27,7 +28,7 @@ import org.apache.airavata.persistance.registry.jpa.Resource;
 import org.apache.airavata.persistance.registry.jpa.ResourceType;
 import org.apache.airavata.registry.cpi.RegistryException;
 
-public abstract class AbstractResource implements Resource {
+public abstract class AbstractResource implements Resource, Serializable {
 	// table names
 	public static final String GATEWAY = "Gateway";
 	public static final String CONFIGURATION = "Configuration";

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/resources/GatewayResource.java
----------------------------------------------------------------------
diff --git a/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/resources/GatewayResource.java b/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/resources/GatewayResource.java
index 34aa67e..feb664c 100644
--- a/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/resources/GatewayResource.java
+++ b/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/resources/GatewayResource.java
@@ -20,6 +20,7 @@
 */
 package org.apache.airavata.persistance.registry.jpa.resources;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -35,7 +36,7 @@ import org.apache.airavata.registry.cpi.RegistryException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class GatewayResource extends AbstractResource {
+public class GatewayResource extends AbstractResource implements Serializable {
     private final static Logger logger = LoggerFactory.getLogger(GatewayResource.class);
     private String gatewayName;
     private String owner;

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/resources/WorkerResource.java
----------------------------------------------------------------------
diff --git a/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/resources/WorkerResource.java b/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/resources/WorkerResource.java
index 63939ca..337f964 100644
--- a/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/resources/WorkerResource.java
+++ b/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/resources/WorkerResource.java
@@ -33,13 +33,14 @@ import org.apache.airavata.registry.cpi.RegistryException;
 
 import javax.persistence.EntityManager;
 import javax.persistence.Query;
+import java.io.Serializable;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
-public class WorkerResource extends AbstractResource {
+public class WorkerResource extends AbstractResource implements Serializable {
     private final static Logger logger = LoggerFactory.getLogger(WorkerResource.class);
     private String user;
 	private GatewayResource gateway;

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4e68213..01ae686 100644
--- a/pom.xml
+++ b/pom.xml
@@ -71,7 +71,7 @@
 		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
 		<axis2.version>1.5.1</axis2.version>
 		<derby.version>10.9.1.0</derby.version>
-		<org.slf4j.version>1.7.6</org.slf4j.version>
+		<org.slf4j.version>1.6.6</org.slf4j.version>
 		<log4j.version>1.2.16</log4j.version>
 		<axiom.version>1.2.8</axiom.version>
 		<surefire.version>2.12</surefire.version>
@@ -513,20 +513,11 @@
 				<module>modules/airavata-client</module>
 				<module>modules/commons</module>
 				<module>modules/gfac</module>
-				<module>modules/workflow-model</module>
 				<module>modules/registry</module>
 				<module>modules/app-catalog</module>
 				<module>modules/security</module>
 				<module>modules/credential-store-service</module>
-				<module>modules/orchestrator</module>
-				<module>tools</module>
-				<module>modules/server</module>
-				<module>modules/test-suite</module>
-				<module>modules/distribution</module>
-				<module>modules/ws-messenger</module>
 				<module>modules/messaging</module>
-				<module>modules/integration-tests</module>
-				<module>modules/xbaya-gui</module>
 			</modules>
 		</profile>
 		<profile>
@@ -599,18 +590,10 @@
 				<module>airavata-api</module>
 				<module>modules/commons</module>
 				<module>modules/gfac</module>
-				<module>modules/ws-messenger</module>
-				<module>modules/workflow-model</module>
 				<module>modules/registry</module>
 				<module>modules/airavata-client</module>
 				<module>modules/security</module>
 				<module>modules/credential-store-service</module>
-				<module>modules/orchestrator</module>
-				<module>tools</module>
-				<module>modules/server</module>
-				<module>modules/test-suite</module>
-				<module>modules/distribution</module>
-				<module>modules/integration-tests</module>
 			</modules>
 		</profile>
 		<!--profile> <id>ec2Tests</id> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> 

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/tools/gsissh/pom.xml
----------------------------------------------------------------------
diff --git a/tools/gsissh/pom.xml b/tools/gsissh/pom.xml
index 16f3878..76f04cc 100644
--- a/tools/gsissh/pom.xml
+++ b/tools/gsissh/pom.xml
@@ -73,7 +73,7 @@
 		<dependency>
 			<groupId>org.testng</groupId>
 			<artifactId>testng</artifactId>
-			<version>6.1.1</version>
+			<version>6.8.5</version>
 			<scope>test</scope>
 		</dependency>
 		<dependency>


[2/2] airavata git commit: creating gfac storm version

Posted by la...@apache.org.
creating gfac storm version


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/69d17821
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/69d17821
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/69d17821

Branch: refs/heads/gfac-storm
Commit: 69d1782149c11a60dded61e7b9c77a9395fc4eea
Parents: cf7290a
Author: Lahiru Gunathilake <gl...@gmail.com>
Authored: Mon Jan 26 11:30:10 2015 -0500
Committer: Lahiru Gunathilake <gl...@gmail.com>
Committed: Mon Jan 26 11:30:10 2015 -0500

----------------------------------------------------------------------
 .../java-client-samples/pom.xml                 |   5 +
 .../client/samples/CreateLaunchExperiment.java  |  48 +++-
 .../catalog/data/util/AppCatalogJPAUtils.java   |   4 +-
 .../common/utils/ApplicationSettings.java       |   8 +-
 .../main/resources/airavata-server.properties   | 234 ---------------
 modules/gfac/gfac-core/pom.xml                  |   6 -
 .../gfac/core/context/JobExecutionContext.java  |   3 +
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  |   1 -
 .../airavata/gfac/core/utils/GFacUtils.java     |  14 +
 .../apache/airavata/job/GFacConfigXmlTest.java  |   4 +-
 modules/gfac/gfac-ec2/pom.xml                   |   2 +-
 modules/gfac/gfac-local/pom.xml                 |  26 +-
 .../gfac/local/AiravataRabbitMQSpout.java       | 158 +++++++++++
 .../gfac/local/RandomSentenceSpout.java         |  67 +++++
 .../local/handler/LocalDirectorySetupBolt.java  | 159 +++++++++++
 .../gfac/local/provider/impl/LocalProvider.java |   2 -
 .../local/provider/impl/LocalProviderBolt.java  | 283 +++++++++++++++++++
 .../gfac/local/utils/CustomStormDeclarator.java |  61 ++++
 .../gfac/local/utils/ExperimentLauncher.java    |  28 ++
 .../gfac/local/utils/ExperimentModelUtil.java   | 187 ++++++++++++
 .../gfac/local/utils/MessageScheme.java         |  46 +++
 .../services/impl/GfacTopologyBuilderTest.java  | 112 ++++++++
 .../gfac/services/impl/LocalProviderTest.java   | 184 ------------
 .../services/impl/WordCountTopologyTest.java    | 135 +++++++++
 modules/gfac/pom.xml                            |  12 +-
 modules/orchestrator/orchestrator-core/pom.xml  |   2 +-
 .../registry/jpa/impl/ExperimentRegistry.java   |   3 +-
 .../registry/jpa/impl/ProjectRegistry.java      |   3 +-
 .../registry/jpa/impl/RegistryImpl.java         |   4 +-
 .../jpa/resources/AbstractResource.java         |   3 +-
 .../registry/jpa/resources/GatewayResource.java |   3 +-
 .../registry/jpa/resources/WorkerResource.java  |   3 +-
 pom.xml                                         |  19 +-
 tools/gsissh/pom.xml                            |   2 +-
 34 files changed, 1343 insertions(+), 488 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/airavata-api/airavata-client-sdks/java-client-samples/pom.xml
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/pom.xml b/airavata-api/airavata-client-sdks/java-client-samples/pom.xml
index 5f12ace..fa7a398 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/pom.xml
+++ b/airavata-api/airavata-client-sdks/java-client-samples/pom.xml
@@ -56,6 +56,11 @@
             <artifactId>airavata-client-configuration</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>com.rabbitmq</groupId>
+            <artifactId>amqp-client</artifactId>
+            <version>3.3.5</version>
+        </dependency>
     </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index b2c5469..8eb28b6 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -21,6 +21,9 @@
 
 package org.apache.airavata.client.samples;
 
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
 import org.apache.airavata.api.Airavata;
 import org.apache.airavata.api.client.AiravataClientFactory;
 import org.apache.airavata.client.tools.RegisterSampleApplications;
@@ -57,7 +60,7 @@ public class CreateLaunchExperiment {
     private static final String DEFAULT_GATEWAY = "default.registry.gateway";
     private static Airavata.Client airavataClient;
 
-    private static String echoAppId = "Echo_0018dc21-9359-4063-9672-6ad15a24294d";
+    private static String echoAppId = "Echo_53cd4995-1a3b-4290-9615-2592a59051b8";
     private static String mpiAppId = "HelloMPI_da45305f-5d90-4a18-8716-8dd54c3b2376";
     private static String wrfAppId = "WRF_7ad5da38-c08b-417c-a9ea-da9298839762";
     private static String amberAppId = "Amber_49b16f6f-93ab-4885-9971-6ab2ab5eb3d3";
@@ -94,11 +97,11 @@ public class CreateLaunchExperiment {
         try {
             for (int i = 0; i < 1; i++) {
 //                final String expId = createExperimentForSSHHost(airavata);
-                final String expId = createEchoExperimentForFSD(airavataClient);
+//                final String expId = createEchoExperimentForFSD(airavataClient);
 //                final String expId = createMPIExperimentForFSD(airavataClient);
 //                final String expId = createEchoExperimentForStampede(airavataClient);
 //                final String expId = createEchoExperimentForTrestles(airavataClient);
-//                final String expId = createExperimentEchoForLocalHost(airavataClient);
+                final String expId = createExperimentEchoForLocalHost(airavataClient);
 //                final String expId = createExperimentWRFTrestles(airavataClient);
 //                final String expId = createExperimentForBR2(airavataClient);
 //                final String expId = createExperimentForBR2Amber(airavataClient);
@@ -113,7 +116,7 @@ public class CreateLaunchExperiment {
 //                final String expId = createExperimentAUTODOCKStampede(airavataClient); // this is not working , we need to register AutoDock app on stampede
                 System.out.println("Experiment ID : " + expId);
 //                updateExperiment(airavata, expId);
-                launchExperiment(airavataClient, expId);
+//                launchExperimentPassive(expId);
             }
         } catch (Exception e) {
             logger.error("Error while connecting with server", e.getMessage());
@@ -1553,6 +1556,16 @@ public class CreateLaunchExperiment {
         return null;
     }
 
+    public static void launchExperiment(String expId)
+            throws TException {
+        try {
+            String gatewayId = "default";
+        } catch (Exception e) {
+            logger.error("Error occured while launching the experiment...", e.getMessage());
+            throw new TException(e);
+        }
+    }
+
     public static void launchExperiment(Airavata.Client client, String expId)
             throws TException {
         try {
@@ -1577,6 +1590,33 @@ public class CreateLaunchExperiment {
         }
     }
 
+    public static void launchExperimentPassive(String expId)
+            throws TException {
+        try {
+            String uri = "amqp://localhost";
+            String message = expId;
+            String exchange = "airavata_rabbitmq_exchange";
+
+            ConnectionFactory cfconn = new ConnectionFactory();
+            cfconn.setUri(uri);
+            Connection conn = cfconn.newConnection();
+
+            Channel ch = conn.createChannel();
+
+            if (exchange.equals("")) {
+                ch.queueDeclare("gfac.submit", false, false, false, null);
+            }
+            ch.basicPublish(exchange, expId, null, message.getBytes());
+            ch.close();
+            conn.close();
+        } catch (Exception e) {
+            System.err.println("Main thread caught exception: " + e);
+            e.printStackTrace();
+            System.exit(1);
+        }
+
+    }
+
     public static List<Experiment> getExperimentsForUser(Airavata.Client client, String user) {
         try {
             return client.getAllUserExperiments(user);

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/util/AppCatalogJPAUtils.java
----------------------------------------------------------------------
diff --git a/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/util/AppCatalogJPAUtils.java b/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/util/AppCatalogJPAUtils.java
index 6b99bd9..d8116d9 100644
--- a/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/util/AppCatalogJPAUtils.java
+++ b/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/util/AppCatalogJPAUtils.java
@@ -58,8 +58,8 @@ public class AppCatalogJPAUtils {
             properties.put("openjpa.ConnectionProperties", connectionProperties);
             properties.put("openjpa.DynamicEnhancementAgent", "true");
             properties.put("openjpa.RuntimeUnenhancedClasses", "unsupported");
-            properties.put("openjpa.DataCache","true(CacheSize=" + Integer.valueOf(readServerProperties(JPA_CACHE_SIZE))  + ", SoftReferenceSize=0)");
-            properties.put("openjpa.QueryCache","true(CacheSize=" + Integer.valueOf(readServerProperties(JPA_CACHE_SIZE))  + ", SoftReferenceSize=0)");
+//            properties.put("openjpa.DataCache","true(CacheSize=" + Integer.valueOf(readServerProperties(JPA_CACHE_SIZE))  + ", SoftReferenceSize=0)");
+//            properties.put("openjpa.QueryCache","true(CacheSize=" + Integer.valueOf(readServerProperties(JPA_CACHE_SIZE))  + ", SoftReferenceSize=0)");
             properties.put("openjpa.RemoteCommitProvider","sjvm");
             properties.put("openjpa.Log","DefaultLevel=INFO, Runtime=INFO, Tool=INFO, SQL=INFO");
             properties.put("openjpa.jdbc.SynchronizeMappings", "buildSchema(ForeignKeys=true)");

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ApplicationSettings.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ApplicationSettings.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ApplicationSettings.java
index 9a7ad16..b8d306d 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ApplicationSettings.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ApplicationSettings.java
@@ -85,13 +85,7 @@ public class ApplicationSettings {
 	}
 
 	protected URL getPropertyFileURL() {
-		URL url;
-		if (AiravataUtils.isClient()){
-            url=ApplicationSettings.class.getClassLoader().getResource(CLIENT_PROPERTIES);
-        }else{
-            url=ApplicationSettings.class.getClassLoader().getResource(SERVER_PROPERTIES);
-        }
-		return url;
+            return ApplicationSettings.class.getClassLoader().getResource(SERVER_PROPERTIES);
 	}
 	
 	protected URL[] getExternalSettingsFileURLs(){

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties b/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties
deleted file mode 100644
index fb02901..0000000
--- a/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties
+++ /dev/null
@@ -1,234 +0,0 @@
-#
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-###########################################################################
-#
-#  This properties file provides configuration for all Airavata Services:
-#  API Server, Registry, Workflow Interpreter, GFac, Orchestrator
-#
-###########################################################################
-
-###########################################################################
-#  API Server Registry Configuration
-###########################################################################
-
-#for derby [AiravataJPARegistry]
-registry.jdbc.driver=org.apache.derby.jdbc.ClientDriver
-registry.jdbc.url=jdbc:derby://localhost:1527/persistent_data;create=true;user=airavata;password=airavata
-# MySql database configuration
-#registry.jdbc.driver=com.mysql.jdbc.Driver
-#registry.jdbc.url=jdbc:mysql://localhost:3306/persistent_data
-registry.jdbc.user=airavata
-registry.jdbc.password=airavata
-start.derby.server.mode=true
-validationQuery=SELECT 1 from CONFIGURATION
-jpa.cache.size=5000
-#jpa.connection.properties=MaxActive=10,MaxIdle=5,MinIdle=2,MaxWait=60000,testWhileIdle=true,testOnBorrow=true
-
-# Properties for default user mode
-default.registry.user=admin
-default.registry.password=admin
-default.registry.password.hash.method=SHA
-default.registry.gateway=default
-
-#ip=127.0.0.1
-
-###########################################################################
-#  Application Catalog DB Configuration
-###########################################################################
-#for derby [AiravataJPARegistry]
-appcatalog.jdbc.driver=org.apache.derby.jdbc.ClientDriver
-appcatalog.jdbc.url=jdbc:derby://localhost:1527/app_catalog;create=true;user=airavata;password=airavata
-# MySql database configuration
-#appcatalog.jdbc.driver=com.mysql.jdbc.Driver
-#appcatalog.jdbc.url=jdbc:mysql://localhost:3306/app_catalog
-appcatalog.jdbc.user=airavata
-appcatalog.jdbc.password=airavata
-appcatalog.validationQuery=SELECT 1 from CONFIGURATION
-
-###########################################################################
-#  Server module Configuration
-###########################################################################
-
-servers=apiserver,orchestrator,gfac,workflowserver
-#shutdown.trategy=NONE
-shutdown.trategy=SELF_TERMINATE
-
-
-apiserver.server.host=localhost
-apiserver.server.port=8930
-apiserver.server.min.threads=50
-workflow.server.host=localhost
-workflow.server.port=8931
-orchestrator.server.host=localhost
-orchestrator.server.port=8940
-gfac.server.host=localhost
-gfac.server.port=8950
-orchestrator.server.min.threads=50
-
-###########################################################################
-# Credential Store module Configuration
-###########################################################################
-credential.store.keystore.url=/Users/lahirugunathilake/Downloads/airavata_sym.jks
-credential.store.keystore.alias=airavata
-credential.store.keystore.password=airavata
-credential.store.jdbc.url=jdbc:derby://localhost:1527/persistent_data;create=true;user=airavata;password=airavata
-credential.store.jdbc.user=airavata
-credential.store.jdbc.password=airavata
-credential.store.jdbc.driver=org.apache.derby.jdbc.ClientDriver
-
-notifier.enabled=false
-#period in milliseconds
-notifier.duration=5000
-
-email.server=smtp.googlemail.com
-email.server.port=465
-email.user=airavata
-email.password=xxx
-email.ssl=true
-email.from=airavata@apache.org
-
-###########################################################################
-# Airavata GFac MyProxy GSI credentials to access Grid Resources.
-###########################################################################
-#
-# Security Configuration used by Airavata Generic Factory Service
-#  to interact with Computational Resources.
-#
-gfac=org.apache.airavata.gfac.server.GfacServer
-myproxy.server=myproxy.teragrid.org
-myproxy.username=ogce
-myproxy.password=
-myproxy.life=3600
-# XSEDE Trusted certificates can be downloaded from https://software.xsede.org/security/xsede-certs.tar.gz
-trusted.cert.location=/Users/lahirugunathilake/Downloads/certificates
-# SSH PKI key pair or ssh password can be used SSH based authentication is used.
-# if user specify both password authentication gets the higher preference
-
-################# ---------- For ssh key pair authentication ------------------- ################
-#public.ssh.key=/path to public key for ssh
-#ssh.username=username for ssh connection
-#private.ssh.key=/path to private key file for ssh
-#ssh.keypass=passphrase for the private key
-
-
-################# ---------- For ssh key pair authentication ------------------- ################
-#ssh.username=username for ssh connection
-#ssh.password=Password for ssh connection
-
-
-
-###########################################################################
-# Airavata Workflow Interpreter Configurations
-###########################################################################
-
-#runInThread=true
-#provenance=true
-#provenanceWriterThreadPoolSize=20
-#gfac.embedded=true
-#workflowserver=org.apache.airavata.api.server.WorkflowServer
-
-
-###########################################################################
-# API Server module Configuration
-###########################################################################
-apiserver=org.apache.airavata.api.server.AiravataAPIServer
-
-###########################################################################
-# Workflow Server module Configuration
-###########################################################################
-
-workflowserver=org.apache.airavata.api.server.WorkflowServer
-
-###########################################################################
-# Advance configuration to change service implementations
-###########################################################################
-# If false, disables two phase commit when submitting jobs
-TwoPhase=true
-#
-# Class which implemented HostScheduler interface. It will determine the which host to submit the request
-#
-host.scheduler=org.apache.airavata.gfac.core.scheduler.impl.SimpleHostScheduler
-
-###########################################################################
-# Monitoring module Configuration
-###########################################################################
-
-#This will be the primary monitoring tool which runs in airavata, in future there will be multiple monitoring
-#mechanisms and one would be able to start a monitor
-monitors=org.apache.airavata.gfac.monitor.impl.pull.qstat.QstatMonitor,org.apache.airavata.gfac.monitor.impl.LocalJobMonitor
-
-
-###########################################################################
-# AMQP Notification Configuration
-###########################################################################
-
-
-amqp.notification.enable=1
-
-amqp.broker.host=localhost
-amqp.broker.port=5672
-amqp.broker.username=guest
-amqp.broker.password=guest
-
-amqp.sender=org.apache.airavata.wsmg.client.amqp.rabbitmq.AMQPSenderImpl
-amqp.topic.sender=org.apache.airavata.wsmg.client.amqp.rabbitmq.AMQPTopicSenderImpl
-amqp.broadcast.sender=org.apache.airavata.wsmg.client.amqp.rabbitmq.AMQPBroadcastSenderImpl
-
-#,org.apache.airavata.gfac.monitor.impl.push.amqp.AMQPMonitor
-#This is the amqp related configuration and this lists down the Rabbitmq host, this is an xsede specific configuration
-amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org
-proxy.file.path=/Users/lahirugunathilake/Downloads/x509up_u503876
-connection.name=xsede
-#publisher
-activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.api.server.listener.AiravataExperimentStatusUpdator,org.apache.airavata.gfac.core.monitor.GfacInternalStatusUpdator,org.apache.airavata.workflow.engine.util.ProxyMonitorPublisher
-publish.rabbitmq=false
-activity.publisher=org.apache.airavata.messaging.core.impl.RabbitMQPublisher
-rabbitmq.broker.url=amqp://localhost:5672
-rabbitmq.exchange.name=airavata_rabbitmq_exchange
-
-###########################################################################
-# Orchestrator module Configuration
-###########################################################################
-
-#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACEmbeddedJobSubmitter
-job.submitter=org.apache.airavata.orchestrator.core.impl.GFACServiceJobSubmitter
-job.validators=org.apache.airavata.orchestrator.core.validator.impl.SimpleAppDataValidator,org.apache.airavata.orchestrator.core.validator.impl.ExperimentStatusValidator
-submitter.interval=10000
-threadpool.size=10
-start.submitter=true
-embedded.mode=true
-enable.validation=true
-orchestrator=org.apache.airavata.orchestrator.server.OrchestratorServer
-
-###########################################################################
-# Zookeeper Server Configuration
-###########################################################################
-
-embedded.zk=true
-zookeeper.server.host=localhost
-zookeeper.server.port=2181
-airavata-server=/api-server
-orchestrator-server=/orchestrator-server
-gfac-server=/gfac-server
-gfac-experiments=/gfac-experiments
-gfac-server-name=gfac-node0
-orchestrator-server-name=orch-node0
-airavata-server-name=api-node0

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/pom.xml b/modules/gfac/gfac-core/pom.xml
index 2a85282..53b60ec 100644
--- a/modules/gfac/gfac-core/pom.xml
+++ b/modules/gfac/gfac-core/pom.xml
@@ -81,12 +81,6 @@
             <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>org.testng</groupId>
-            <artifactId>testng</artifactId>
-            <version>6.1.1</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>jcl-over-slf4j</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
index c8c48ef..97c2572 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
@@ -177,6 +177,9 @@ public class JobExecutionContext extends AbstractContext implements Serializable
      */
     private Map<String, SecurityContext> securityContext = new HashMap<String, SecurityContext>();
 
+    public JobExecutionContext() {
+    }
+
     public JobExecutionContext(GFacConfiguration gFacConfiguration,String applicationName){
         this.gfacConfiguration = gFacConfiguration;
         notifier = new GFacNotifier();

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 8403f8c..c81a179 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -301,7 +301,6 @@ public class BetterGfacImpl implements GFac,Watcher {
 //        List<InputDataObjectType> experimentInputs = experiment.getExperimentInputs();
 //        jobExecutionContext.setInMessageContext(new MessageContext(GFacUtils.getInputParamMap(experimentInputs)));
         List<InputDataObjectType> taskInputs = taskData.getApplicationInputs();
-        jobExecutionContext.setInMessageContext(new MessageContext(GFacUtils.getInputParamMap(taskInputs)));
 
 //        List<OutputDataObjectType> outputData = experiment.getExperimentOutputs();
         List<OutputDataObjectType> taskOutputs = taskData.getApplicationOutputs();

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
index c71ed27..6286d91 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
@@ -768,6 +768,20 @@ public class GFacUtils {
 		}
 	}
 
+	public static void saveJobStatus(Registry registry,
+									 JobDetails details,String taskId, JobState state) throws GFacException {
+		try {
+			JobStatus status = new JobStatus();
+			status.setJobState(state);
+			details.setJobStatus(status);
+			registry.add(ChildDataType.JOB_DETAIL, details,
+					new CompositeIdentifier(taskId, details.getJobID()));
+		} catch (Exception e) {
+			throw new GFacException("Error persisting job status"
+					+ e.getLocalizedMessage(), e);
+		}
+	}
+
 	public static void updateJobStatus(JobExecutionContext jobExecutionContext,
 			JobDetails details, JobState state) throws GFacException {
 		try {

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/GFacConfigXmlTest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/GFacConfigXmlTest.java b/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/GFacConfigXmlTest.java
index 4744772..503baa2 100644
--- a/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/GFacConfigXmlTest.java
+++ b/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/GFacConfigXmlTest.java
@@ -32,8 +32,8 @@ import org.apache.airavata.gfac.core.context.ApplicationContext;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
 import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
 import org.apache.airavata.model.appcatalog.computeresource.*;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
+import org.junit.BeforeClass;
+import org.junit.Test;
 import org.xml.sax.SAXException;
 
 import javax.xml.parsers.ParserConfigurationException;

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-ec2/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ec2/pom.xml b/modules/gfac/gfac-ec2/pom.xml
index d51fddb..0905606 100644
--- a/modules/gfac/gfac-ec2/pom.xml
+++ b/modules/gfac/gfac-ec2/pom.xml
@@ -110,7 +110,7 @@
 		<dependency>
 			<groupId>org.testng</groupId>
 			<artifactId>testng</artifactId>
-			<version>6.1.1</version>
+			<version>6.8.5</version>
 			<scope>test</scope>
 		</dependency>
 		<dependency>

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-local/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/pom.xml b/modules/gfac/gfac-local/pom.xml
index 9776f0a..c13b189 100644
--- a/modules/gfac/gfac-local/pom.xml
+++ b/modules/gfac/gfac-local/pom.xml
@@ -8,7 +8,8 @@
     ANY ~ KIND, either express or implied. See the License for the specific language governing permissions and limitations under 
     the License. -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <groupId>org.apache.airavata</groupId>
         <artifactId>gfac</artifactId>
@@ -22,6 +23,12 @@
     <description>This is the extension of GFAC Local.</description>
     <url>http://airavata.apache.org/</url>
 
+    <repositories>
+        <repository>
+            <id>clojars.org</id>
+            <url>http://clojars.org/repo</url>
+        </repository>
+    </repositories>
     <dependencies>
 
         <!-- Logging -->
@@ -36,7 +43,6 @@
             <artifactId>airavata-gfac-core</artifactId>
             <version>${project.version}</version>
         </dependency>
-
         <!-- Test -->
         <dependency>
             <groupId>junit</groupId>
@@ -44,12 +50,6 @@
             <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>org.testng</groupId>
-            <artifactId>testng</artifactId>
-            <version>6.1.1</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>jcl-over-slf4j</artifactId>
             <scope>test</scope>
@@ -59,7 +59,17 @@
             <artifactId>slf4j-log4j12</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>0.10.0-SNAPSHOT</version>
+        </dependency>
 
+        <dependency>
+            <groupId>io.latent</groupId>
+            <artifactId>storm-rabbitmq</artifactId>
+            <version>0.5.10</version>
+        </dependency>
     </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/AiravataRabbitMQSpout.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/AiravataRabbitMQSpout.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/AiravataRabbitMQSpout.java
new file mode 100644
index 0000000..985bc83
--- /dev/null
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/AiravataRabbitMQSpout.java
@@ -0,0 +1,158 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.local;
+import backtype.storm.spout.Scheme;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import io.latent.storm.rabbitmq.*;
+import io.latent.storm.rabbitmq.config.ConsumerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A simple RabbitMQ spout that emits an anchored tuple stream (on the default stream). This can be used with
+ * Storm's guaranteed message processing.
+ *
+ * @author peter@latent.io
+ */
+public class AiravataRabbitMQSpout extends BaseRichSpout {
+
+    private final MessageScheme scheme;
+    private final Declarator declarator;
+
+    private transient Logger logger;
+    private transient RabbitMQConsumer consumer;
+    private transient SpoutOutputCollector collector;
+
+    public AiravataRabbitMQSpout(Scheme scheme) {
+        this(MessageScheme.Builder.from(scheme), new Declarator.NoOp());
+    }
+
+    public AiravataRabbitMQSpout(Scheme scheme, Declarator declarator) {
+        this(MessageScheme.Builder.from(scheme), declarator);
+    }
+
+    public AiravataRabbitMQSpout(MessageScheme scheme, Declarator declarator) {
+        this.scheme = scheme;
+        this.declarator = declarator;
+    }
+
+    @Override
+    public void open(final Map config,
+                     final TopologyContext context,
+                     final SpoutOutputCollector spoutOutputCollector) {
+        ConsumerConfig consumerConfig = ConsumerConfig.getFromStormConfig(config);
+
+        ErrorReporter reporter = new ErrorReporter() {
+            @Override
+            public void reportError(Throwable error) {
+                spoutOutputCollector.reportError(error);
+            }
+        };
+        consumer = loadConsumer(declarator, reporter, consumerConfig);
+        scheme.open(config, context);
+        consumer.open();
+        logger = LoggerFactory.getLogger(AiravataRabbitMQSpout.class);
+        collector = spoutOutputCollector;
+    }
+
+    protected RabbitMQConsumer loadConsumer(Declarator declarator,
+                                            ErrorReporter reporter,
+                                            ConsumerConfig config) {
+        return new RabbitMQConsumer(config.getConnectionConfig(),
+                config.getPrefetchCount(),
+                config.getQueueName(),
+                config.isRequeueOnFail(),
+                declarator,
+                reporter);
+    }
+
+    @Override
+    public void close() {
+        consumer.close();
+        scheme.close();
+        super.close();
+    }
+
+    @Override
+    public void nextTuple() {
+        Message message;
+        System.out.println("Waiting for messages !!!");
+        while ((message = consumer.nextMessage()) != Message.NONE) {
+            List<Object> tuple = extractTuple(message);
+            if (!tuple.isEmpty()) {
+                System.out.println(new String(message.getBody()));
+                emit(tuple, message, collector);
+            }
+        }
+    }
+
+    protected List<Integer> emit(List<Object> tuple,
+                                 Message message,
+                                 SpoutOutputCollector spoutOutputCollector) {
+        return spoutOutputCollector.emit(tuple, getDeliveryTag(message));
+    }
+
+    private List<Object> extractTuple(Message message) {
+        long deliveryTag = getDeliveryTag(message);
+        try {
+            List<Object> tuple = scheme.deserialize(message);
+            if (tuple != null && !tuple.isEmpty()) {
+                return tuple;
+            }
+            String errorMsg = "Deserialization error for msgId " + deliveryTag;
+            logger.warn(errorMsg);
+            collector.reportError(new Exception(errorMsg));
+        } catch (Exception e) {
+            logger.warn("Deserialization error for msgId " + deliveryTag, e);
+            collector.reportError(e);
+        }
+        // get the malformed message out of the way by dead-lettering (if dead-lettering is configured) and move on
+        consumer.deadLetter(deliveryTag);
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void ack(Object msgId) {
+        if (msgId instanceof Long) consumer.ack((Long) msgId);
+    }
+
+    @Override
+    public void fail(Object msgId) {
+        if (msgId instanceof Long) consumer.fail((Long) msgId);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+        outputFieldsDeclarer.declare(scheme.getOutputFields());
+    }
+
+    protected long getDeliveryTag(Message message) {
+        return ((Message.DeliveredMessage) message).getDeliveryTag();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/RandomSentenceSpout.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/RandomSentenceSpout.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/RandomSentenceSpout.java
new file mode 100644
index 0000000..ad53add
--- /dev/null
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/RandomSentenceSpout.java
@@ -0,0 +1,67 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.local;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+
+import java.util.Map;
+import java.util.Random;
+
+public class RandomSentenceSpout extends BaseRichSpout {
+    SpoutOutputCollector _collector;
+    Random _rand;
+
+
+    @Override
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+        _collector = collector;
+        _rand = new Random();
+    }
+
+    @Override
+    public void nextTuple() {
+        Utils.sleep(100);
+        String[] sentences = new String[]{ "echoExperiment_be59db00-020f-4d46-b157-05577ab5c065" };
+        String sentence = sentences[_rand.nextInt(sentences.length)];
+        System.out.println(sentence);
+        _collector.emit(new Values(sentence));
+    }
+
+    @Override
+    public void ack(Object id) {
+    }
+
+    @Override
+    public void fail(Object id) {
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("word"));
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/handler/LocalDirectorySetupBolt.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/handler/LocalDirectorySetupBolt.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/handler/LocalDirectorySetupBolt.java
new file mode 100644
index 0000000..92b6f9d
--- /dev/null
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/handler/LocalDirectorySetupBolt.java
@@ -0,0 +1,159 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.local.handler;
+
+import backtype.storm.topology.BasicOutputCollector;
+import backtype.storm.topology.FailedException;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseBasicBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import org.airavata.appcatalog.cpi.AppCatalog;
+import org.airavata.appcatalog.cpi.AppCatalogException;
+import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.gfac.local.utils.ExperimentModelUtil;
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
+import org.apache.airavata.model.workspace.experiment.*;
+import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+public class LocalDirectorySetupBolt extends BaseBasicBolt {
+    private final static Logger logger = LoggerFactory.getLogger(LocalDirectorySetupBolt.class);
+
+    Registry registry;
+
+    public LocalDirectorySetupBolt() throws RegistryException {
+        registry = RegistryFactory.getDefaultRegistry();
+    }
+
+    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
+        try {
+            Registry registry = RegistryFactory.getDefaultRegistry();
+            String Ids = tuple.getString(0);
+            String[] split = Ids.split(",");
+            if (split.length != 3) {
+                throw new FailedException("Wrong tuple given: " + Ids);
+            }
+            String gatewayId = split[0];
+            String expId = split[1];
+            List<TaskDetails> tasks = createTasks(expId);
+            String taskId = tasks.get(0).getTaskID();
+            TaskDetails taskData = (TaskDetails) registry.get(RegistryModelType.TASK_DETAIL, taskId);
+
+            String applicationInterfaceId = taskData.getApplicationId();
+            String applicationDeploymentId = taskData.getApplicationDeploymentId();
+            if (null == applicationInterfaceId) {
+                throw new FailedException("Error executing the job. The required Application Id is missing");
+            }
+            if (null == applicationDeploymentId) {
+                throw new FailedException("Error executing the job. The required Application deployment Id is missing");
+            }
+
+            AppCatalog appCatalog = AppCatalogFactory.getAppCatalog();
+            //fetch the compute resource, application interface and deployment information from app catalog
+            ApplicationInterfaceDescription applicationInterface = appCatalog.
+                    getApplicationInterface().getApplicationInterface(applicationInterfaceId);
+            ApplicationDeploymentDescription applicationDeployment = appCatalog.
+                    getApplicationDeployment().getApplicationDeployement(applicationDeploymentId);
+            ComputeResourceDescription computeResource = appCatalog.getComputeResource().
+                    getComputeResource(applicationDeployment.getComputeHostId());
+            ComputeResourcePreference gatewayResourcePreferences = appCatalog.getGatewayProfile().
+                    getComputeResourcePreference(gatewayId, applicationDeployment.getComputeHostId());
+
+            if (gatewayResourcePreferences == null) {
+                List<String> gatewayProfileIds = appCatalog.getGatewayProfile()
+                        .getGatewayProfileIds(gatewayId);
+                for (String profileId : gatewayProfileIds) {
+                    gatewayId = profileId;
+                    gatewayResourcePreferences = appCatalog.getGatewayProfile().
+                            getComputeResourcePreference(gatewayId, applicationDeployment.getComputeHostId());
+                    if (gatewayResourcePreferences != null) {
+                        break;
+                    }
+                }
+            }
+
+            String scratchLocation = gatewayResourcePreferences.getScratchLocation();
+            String workingDir = scratchLocation + File.separator + expId;
+            String inputDir = workingDir + File.separator + Constants.INPUT_DATA_DIR_VAR_NAME;
+            String outputDir = workingDir + File.separator + Constants.OUTPUT_DATA_DIR_VAR_NAME;
+            makeFileSystemDir(workingDir);
+            makeFileSystemDir(inputDir);
+            makeFileSystemDir(outputDir);
+
+            basicOutputCollector.emit(new Values(Ids, taskId));
+
+        } catch (RegistryException e) {
+            logger.error(e.getMessage(), e);
+            throw new FailedException(e);
+        } catch (AppCatalogException e) {
+            logger.error(e.getMessage(), e);
+            throw new FailedException(e);
+        } catch (GFacHandlerException e) {
+            logger.error(e.getMessage(), e);
+            throw new FailedException(e);
+        }
+    }
+
+    public List<TaskDetails> createTasks(String experimentId) throws RegistryException {
+        Experiment experiment = null;
+        List<TaskDetails> tasks = new ArrayList<TaskDetails>();
+        experiment = (Experiment) registry.get(RegistryModelType.EXPERIMENT, experimentId);
+
+
+        WorkflowNodeDetails iDontNeedaNode = ExperimentModelUtil.createWorkflowNode("IDontNeedaNode", null);
+        String nodeID = (String) registry.add(ChildDataType.WORKFLOW_NODE_DETAIL, iDontNeedaNode, experimentId);
+
+        TaskDetails taskDetails = ExperimentModelUtil.cloneTaskFromExperiment(experiment);
+        taskDetails.setTaskID((String) registry.add(ChildDataType.TASK_DETAIL, taskDetails, nodeID));
+        tasks.add(taskDetails);
+        return tasks;
+    }
+
+
+    private void makeFileSystemDir(String dir) throws GFacHandlerException {
+        File f = new File(dir);
+        if (f.isDirectory() && f.exists()) {
+            return;
+        } else if (!new File(dir).mkdir()) {
+            throw new GFacHandlerException("Cannot create directory " + dir);
+        }
+    }
+
+    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+        outputFieldsDeclarer.declare(new Fields("experimentId","taskId"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
index 9f055e9..59303ec 100644
--- a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
@@ -23,7 +23,6 @@ package org.apache.airavata.gfac.local.provider.impl;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
@@ -36,7 +35,6 @@ import org.apache.airavata.gfac.core.context.JobExecutionContext;
 import org.apache.airavata.gfac.core.notification.events.StartExecutionEvent;
 import org.apache.airavata.gfac.core.provider.AbstractProvider;
 import org.apache.airavata.gfac.core.provider.GFacProviderException;
-import org.apache.airavata.gfac.core.provider.utils.ProviderUtils;
 import org.apache.airavata.gfac.core.utils.GFacUtils;
 import org.apache.airavata.gfac.core.utils.OutputUtils;
 import org.apache.airavata.gfac.local.utils.InputStreamToFileWriter;

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProviderBolt.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProviderBolt.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProviderBolt.java
new file mode 100644
index 0000000..13954de
--- /dev/null
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProviderBolt.java
@@ -0,0 +1,283 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.local.provider.impl;
+
+import backtype.storm.topology.BasicOutputCollector;
+import backtype.storm.topology.FailedException;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseBasicBolt;
+import backtype.storm.tuple.Tuple;
+import org.airavata.appcatalog.cpi.AppCatalog;
+import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.utils.GFacUtils;
+import org.apache.airavata.gfac.local.utils.InputStreamToFileWriter;
+import org.apache.airavata.gfac.local.utils.InputUtils;
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+import org.apache.airavata.model.appcatalog.appdeployment.SetEnvPaths;
+import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
+import org.apache.airavata.model.workspace.experiment.*;
+import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.*;
+
+public class LocalProviderBolt extends BaseBasicBolt {
+    private final static Logger logger = LoggerFactory.getLogger(LocalProviderBolt.class);
+
+    private ProcessBuilder builder;
+    private List<String> cmdList;
+    private String jobId;
+    private Registry registry;
+
+    public static class LocalProviderJobData {
+        private String applicationName;
+        private List<String> inputParameters;
+        private String workingDir;
+        private String inputDir;
+        private String outputDir;
+
+        public String getApplicationName() {
+            return applicationName;
+        }
+
+        public void setApplicationName(String applicationName) {
+            this.applicationName = applicationName;
+        }
+
+        public List<String> getInputParameters() {
+            return inputParameters;
+        }
+
+        public void setInputParameters(List<String> inputParameters) {
+            this.inputParameters = inputParameters;
+        }
+
+        public String getWorkingDir() {
+            return workingDir;
+        }
+
+        public void setWorkingDir(String workingDir) {
+            this.workingDir = workingDir;
+        }
+
+        public String getInputDir() {
+            return inputDir;
+        }
+
+        public void setInputDir(String inputDir) {
+            this.inputDir = inputDir;
+        }
+
+        public String getOutputDir() {
+            return outputDir;
+        }
+
+        public void setOutputDir(String outputDir) {
+            this.outputDir = outputDir;
+        }
+    }
+
+    public LocalProviderBolt() throws RegistryException {
+        cmdList = new ArrayList<String>();
+        registry = RegistryFactory.getDefaultRegistry();
+    }
+
+    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
+        try {
+            registry = RegistryFactory.getDefaultRegistry();
+            String ids = tuple.getString(0);
+            String taskId = tuple.getString(1);
+
+            String[] split = ids.split(",");
+            if (split.length != 3) {
+                throw new FailedException("Wrong tuple given: " + ids);
+            }
+            String gatewayId = split[0];
+            String experimentId = split[1];
+
+            TaskDetails taskData = (TaskDetails) registry.get(RegistryModelType.TASK_DETAIL, taskId);
+
+            String applicationInterfaceId = taskData.getApplicationId();
+            String applicationDeploymentId = taskData.getApplicationDeploymentId();
+            if (null == applicationInterfaceId) {
+                throw new FailedException("Error executing the job. The required Application Id is missing");
+            }
+            if (null == applicationDeploymentId) {
+                throw new FailedException("Error executing the job. The required Application deployment Id is missing");
+            }
+
+            AppCatalog appCatalog = AppCatalogFactory.getAppCatalog();
+            //fetch the compute resource, application interface and deployment information from app catalog
+            ApplicationInterfaceDescription applicationInterface = appCatalog.
+                    getApplicationInterface().getApplicationInterface(applicationInterfaceId);
+            ApplicationDeploymentDescription applicationDeployment = appCatalog.
+                    getApplicationDeployment().getApplicationDeployement(applicationDeploymentId);
+            ComputeResourceDescription computeResource = appCatalog.getComputeResource().
+                    getComputeResource(applicationDeployment.getComputeHostId());
+            ComputeResourcePreference gatewayResourcePreferences = appCatalog.getGatewayProfile().
+                    getComputeResourcePreference(gatewayId, applicationDeployment.getComputeHostId());
+
+            if (gatewayResourcePreferences == null) {
+                List<String> gatewayProfileIds = appCatalog.getGatewayProfile()
+                        .getGatewayProfileIds(gatewayId);
+                for (String profileId : gatewayProfileIds) {
+                    gatewayId = profileId;
+                    gatewayResourcePreferences = appCatalog.getGatewayProfile().
+                            getComputeResourcePreference(gatewayId, applicationDeployment.getComputeHostId());
+                    if (gatewayResourcePreferences != null) {
+                        break;
+                    }
+                }
+            }
+            String executablePath = applicationDeployment.getExecutablePath();
+
+
+            buildCommand(applicationDeployment, taskId);
+            initProcessBuilder(applicationDeployment);
+
+            // extra environment variables
+            String workingDir = "/tmp";
+            workingDir = workingDir + experimentId + File.separator + org.apache.airavata.gfac.Constants.INPUT_DATA_DIR_VAR_NAME;
+            String stdOutFile = workingDir + File.separator + applicationInterface.getApplicationName().replaceAll("\\s+", "") + ".stdout";
+            String stdErrFile = workingDir + File.separator + applicationInterface.getApplicationName().replaceAll("\\s+", "") + ".stderr";
+
+            builder.environment().put(org.apache.airavata.gfac.Constants.INPUT_DATA_DIR_VAR_NAME, stdOutFile);
+            builder.environment().put(org.apache.airavata.gfac.Constants.OUTPUT_DATA_DIR_VAR_NAME, stdErrFile);
+
+            // set working directory
+            builder.directory(new File(workingDir));
+
+            // log info
+            logger.info("Command = " + InputUtils.buildCommand(cmdList));
+            logger.info("Working dir = " + builder.directory());
+            for (String key : builder.environment().keySet()) {
+                logger.info("Env[" + key + "] = " + builder.environment().get(key));
+            }
+
+
+            JobDetails jobDetails = new JobDetails();
+            jobId = taskData.getTaskID();
+            jobDetails.setJobID(jobId);
+            jobDetails.setJobDescription(applicationDeployment.getAppDeploymentDescription());
+            GFacUtils.saveJobStatus(registry, jobDetails, taskId, JobState.SETUP);
+            // running cmd
+            Process process = builder.start();
+
+            Thread standardOutWriter = new InputStreamToFileWriter(process.getInputStream(), stdOutFile);
+            Thread standardErrorWriter = new InputStreamToFileWriter(process.getErrorStream(), stdErrFile);
+
+            // start output threads
+            standardOutWriter.setDaemon(true);
+            standardErrorWriter.setDaemon(true);
+            standardOutWriter.start();
+            standardErrorWriter.start();
+
+            int returnValue = process.waitFor();
+
+            // make sure other two threads are done
+            standardOutWriter.join();
+            standardErrorWriter.join();
+
+            /*
+             * check return value. usually not very helpful to draw conclusions based on return values so don't bother.
+             * just provide warning in the log messages
+             */
+            if (returnValue != 0) {
+                logger.error("Process finished with non zero return value. Process may have failed");
+            } else {
+                logger.info("Process finished with return value of zero.");
+            }
+
+            StringBuffer buf = new StringBuffer();
+            buf.append("Executed ").append(InputUtils.buildCommand(cmdList))
+                    .append(" on the localHost, working directory = ").append(workingDir)
+                    .append(" tempDirectory = ").append(workingDir).append(" With the status ")
+                    .append(String.valueOf(returnValue));
+
+            logger.info(buf.toString());
+        }catch (Exception e){
+            e.printStackTrace();
+        }
+    }
+
+    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+
+    }
+
+
+    private void buildCommand(ApplicationDeploymentDescription applicationDeploymentDescription, String taskID) throws RegistryException, GFacException {
+        cmdList.add(applicationDeploymentDescription.getExecutablePath());
+        String appModuleId = applicationDeploymentDescription.getAppModuleId();
+        TaskDetails taskData = (TaskDetails) registry.get(RegistryModelType.TASK_DETAIL, taskID);
+
+        List<InputDataObjectType> taskInputs = taskData.getApplicationInputs();
+        Map<String, Object> inputParamMap = GFacUtils.getInputParamMap(taskInputs);
+
+        // sort the inputs first and then build the command List
+        Comparator<InputDataObjectType> inputOrderComparator = new Comparator<InputDataObjectType>() {
+            @Override
+            public int compare(InputDataObjectType inputDataObjectType, InputDataObjectType t1) {
+                return inputDataObjectType.getInputOrder() - t1.getInputOrder();
+            }
+        };
+        Set<InputDataObjectType> sortedInputSet = new TreeSet<InputDataObjectType>(inputOrderComparator);
+        for (Object object : inputParamMap.values()) {
+            if (object instanceof InputDataObjectType) {
+                InputDataObjectType inputDOT = (InputDataObjectType) object;
+                sortedInputSet.add(inputDOT);
+            }
+        }
+        for (InputDataObjectType inputDataObjectType : sortedInputSet) {
+            if (inputDataObjectType.getApplicationArgument() != null
+                    && !inputDataObjectType.getApplicationArgument().equals("")) {
+                cmdList.add(inputDataObjectType.getApplicationArgument());
+            }
+
+            if (inputDataObjectType.getValue() != null
+                    && !inputDataObjectType.getValue().equals("")) {
+                cmdList.add(inputDataObjectType.getValue());
+            }
+        }
+
+    }
+
+    private void initProcessBuilder(ApplicationDeploymentDescription app) {
+        builder = new ProcessBuilder(cmdList);
+
+        List<SetEnvPaths> setEnvironment = app.getSetEnvironment();
+        if (setEnvironment != null) {
+            for (SetEnvPaths envPath : setEnvironment) {
+                Map<String, String> builderEnv = builder.environment();
+                builderEnv.put(envPath.getName(), envPath.getValue());
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/CustomStormDeclarator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/CustomStormDeclarator.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/CustomStormDeclarator.java
new file mode 100644
index 0000000..c5568de
--- /dev/null
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/CustomStormDeclarator.java
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.local.utils;
+
+import com.rabbitmq.client.Channel;
+import io.latent.storm.rabbitmq.Declarator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class CustomStormDeclarator implements Declarator {
+    private final static Logger logger = LoggerFactory.getLogger(CustomStormDeclarator.class);
+
+    private final String exchange;
+    private final String queue;
+    private final String routingKey;
+
+    public CustomStormDeclarator(String exchange, String queue) {
+        this(exchange, queue, "");
+    }
+
+    public CustomStormDeclarator(String exchange, String queue, String routingKey) {
+        this.exchange = exchange;
+        this.queue = queue;
+        this.routingKey = routingKey;
+    }
+
+    @Override
+    public void execute(Channel channel) {
+        // you're given a RabbitMQ Channel so you're free to wire up your exchange/queue bindings as you see fit
+        try {
+            Map<String, Object> args = new HashMap<String,Object>();
+            channel.queueDeclare(queue, true, false, false, args);
+            channel.exchangeDeclare(exchange, "topic", true);
+            channel.queueBind(queue, exchange, routingKey);
+        } catch (IOException e) {
+            throw new RuntimeException("Error executing rabbitmq declarations.", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/ExperimentLauncher.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/ExperimentLauncher.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/ExperimentLauncher.java
new file mode 100644
index 0000000..69466c6
--- /dev/null
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/ExperimentLauncher.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.local.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ExperimentLauncher {
+    private final static Logger logger = LoggerFactory.getLogger(ExperimentLauncher.class);
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/ExperimentModelUtil.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/ExperimentModelUtil.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/ExperimentModelUtil.java
new file mode 100644
index 0000000..c14ea48
--- /dev/null
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/ExperimentModelUtil.java
@@ -0,0 +1,187 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.local.utils;
+
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.model.workspace.experiment.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Calendar;
+import java.util.List;
+
+
+public class ExperimentModelUtil {
+
+    public static WorkflowNodeStatus createWorkflowNodeStatus(WorkflowNodeState state){
+        WorkflowNodeStatus status = new WorkflowNodeStatus();
+        status.setWorkflowNodeState(state);
+        status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+        return status;
+    }
+
+    public static Experiment createSimpleExperiment(String projectID,
+                                                    String userName,
+                                                    String experimentName,
+                                                    String expDescription,
+                                                    String applicationId,
+                                                    List<InputDataObjectType> experimentInputList) {
+        Experiment experiment = new Experiment();
+        experiment.setProjectID(projectID);
+        experiment.setUserName(userName);
+        experiment.setName(experimentName);
+        experiment.setDescription(expDescription);
+        experiment.setApplicationId(applicationId);
+        experiment.setExperimentInputs(experimentInputList);
+        return experiment;
+    }
+
+
+    public static ComputationalResourceScheduling createComputationResourceScheduling(String resourceHostId,
+                                                                                      int cpuCount,
+                                                                                      int nodeCount,
+                                                                                      int numberOfThreads,
+                                                                                      String queueName,
+                                                                                      int wallTimeLimit,
+                                                                                      long jobstartTime,
+                                                                                      int totalPhysicalMemory,
+                                                                                      String projectAccount) {
+
+        ComputationalResourceScheduling cmRS = new ComputationalResourceScheduling();
+        cmRS.setResourceHostId(resourceHostId);
+        cmRS.setTotalCPUCount(cpuCount);
+        cmRS.setNodeCount(nodeCount);
+        cmRS.setNumberOfThreads(numberOfThreads);
+        cmRS.setQueueName(queueName);
+        cmRS.setWallTimeLimit(wallTimeLimit);
+        cmRS.setJobStartTime((int) jobstartTime);
+        cmRS.setTotalPhysicalMemory(totalPhysicalMemory);
+        cmRS.setComputationalProjectAccount(projectAccount);
+        return cmRS;
+    }
+
+    public static AdvancedInputDataHandling createAdvancedInputHandling(boolean stageInputFilesToWorkingDir,
+                                                                        String parentWorkingDir,
+                                                                        String uniqueWorkingDir,
+                                                                        boolean cleanupAfterJob) {
+        AdvancedInputDataHandling inputDataHandling = new AdvancedInputDataHandling();
+        inputDataHandling.setStageInputFilesToWorkingDir(stageInputFilesToWorkingDir);
+        inputDataHandling.setParentWorkingDirectory(parentWorkingDir);
+        inputDataHandling.setUniqueWorkingDirectory(uniqueWorkingDir);
+        inputDataHandling.setCleanUpWorkingDirAfterJob(cleanupAfterJob);
+        return inputDataHandling;
+    }
+
+    public static AdvancedOutputDataHandling createAdvancedOutputDataHandling(String outputDatadir,
+                                                                              String dataRegUrl,
+                                                                              boolean persistOutput) {
+        AdvancedOutputDataHandling outputDataHandling = new AdvancedOutputDataHandling();
+        outputDataHandling.setOutputDataDir(outputDatadir);
+        outputDataHandling.setDataRegistryURL(dataRegUrl);
+        outputDataHandling.setPersistOutputData(persistOutput);
+        return outputDataHandling;
+    }
+
+    public static QualityOfServiceParams createQOSParams(String startExecutionAt,
+                                                         String executeBefore,
+                                                         int numberOfRetires) {
+        QualityOfServiceParams qosParams = new QualityOfServiceParams();
+        qosParams.setStartExecutionAt(startExecutionAt);
+        qosParams.setExecuteBefore(executeBefore);
+        qosParams.setNumberofRetries(numberOfRetires);
+        return qosParams;
+    }
+
+    public static TaskDetails cloneTaskFromExperiment (Experiment experiment){
+        TaskDetails taskDetails = new TaskDetails();
+        taskDetails.setCreationTime(experiment.getCreationTime());
+        taskDetails.setApplicationId(experiment.getApplicationId());
+        taskDetails.setApplicationVersion(experiment.getApplicationVersion());
+        List<InputDataObjectType> experimentInputs = experiment.getExperimentInputs();
+        if (experimentInputs != null){
+            taskDetails.setApplicationInputs(experimentInputs);
+        }
+
+        List<OutputDataObjectType> experimentOutputs = experiment.getExperimentOutputs();
+        if (experimentOutputs != null){
+            taskDetails.setApplicationOutputs(experimentOutputs);
+        }
+
+        UserConfigurationData configData = experiment.getUserConfigurationData();
+        if (configData != null){
+            ComputationalResourceScheduling scheduling = configData.getComputationalResourceScheduling();
+            if (scheduling != null){
+                taskDetails.setTaskScheduling(scheduling);
+            }
+            AdvancedInputDataHandling advanceInputDataHandling = configData.getAdvanceInputDataHandling();
+            if (advanceInputDataHandling != null){
+                taskDetails.setAdvancedInputDataHandling(advanceInputDataHandling);
+            }
+            AdvancedOutputDataHandling outputHandling = configData.getAdvanceOutputDataHandling();
+            if (outputHandling != null){
+                taskDetails.setAdvancedOutputDataHandling(outputHandling);
+            }
+        }
+        return taskDetails;
+    }
+
+    public static TaskDetails cloneTaskFromWorkflowNodeDetails(Experiment experiment, WorkflowNodeDetails nodeDetails){
+        TaskDetails taskDetails = new TaskDetails();
+        taskDetails.setCreationTime(nodeDetails.getCreationTime());
+//        String[] split = ;
+        taskDetails.setApplicationId(nodeDetails.getExecutionUnitData());
+//        taskDetails.setApplicationVersion(split[1]);
+        List<InputDataObjectType> experimentInputs = nodeDetails.getNodeInputs();
+        if (experimentInputs != null){
+            taskDetails.setApplicationInputs(experimentInputs);
+        }
+
+        List<OutputDataObjectType> experimentOutputs = nodeDetails.getNodeOutputs();
+        if (experimentOutputs != null){
+            taskDetails.setApplicationOutputs(experimentOutputs);
+        }
+
+        UserConfigurationData configData = experiment.getUserConfigurationData();
+        if (configData != null){
+            ComputationalResourceScheduling scheduling = configData.getComputationalResourceScheduling();
+            if (scheduling != null){
+                taskDetails.setTaskScheduling(scheduling);
+            }
+            AdvancedInputDataHandling advanceInputDataHandling = configData.getAdvanceInputDataHandling();
+            if (advanceInputDataHandling != null){
+                taskDetails.setAdvancedInputDataHandling(advanceInputDataHandling);
+            }
+            AdvancedOutputDataHandling outputHandling = configData.getAdvanceOutputDataHandling();
+            if (outputHandling != null){
+                taskDetails.setAdvancedOutputDataHandling(outputHandling);
+            }
+        }
+        return taskDetails;
+    }
+    public static WorkflowNodeDetails createWorkflowNode (String nodeName,
+                                                          List<InputDataObjectType> nodeInputs){
+        WorkflowNodeDetails wfnod = new WorkflowNodeDetails();
+        wfnod.setNodeName(nodeName);
+        wfnod.setNodeInputs(nodeInputs);
+        return wfnod;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/MessageScheme.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/MessageScheme.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/MessageScheme.java
new file mode 100644
index 0000000..9a69a13
--- /dev/null
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/MessageScheme.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.local.utils;
+
+import backtype.storm.spout.Scheme;
+import backtype.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class MessageScheme implements Scheme {
+    private final static Logger logger = LoggerFactory.getLogger(MessageScheme.class);
+
+    @Override
+    public List<Object> deserialize(byte[] bytes) {
+        ArrayList<Object> message = new ArrayList<Object>();
+        message.add(new String(bytes));
+
+        return message;
+    }
+
+    @Override
+    public Fields getOutputFields() {
+        return new Fields("experiment");
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/GfacTopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/GfacTopologyBuilderTest.java b/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/GfacTopologyBuilderTest.java
new file mode 100644
index 0000000..32f9479
--- /dev/null
+++ b/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/GfacTopologyBuilderTest.java
@@ -0,0 +1,112 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.core.gfac.services.impl;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.topology.TopologyBuilder;
+import com.rabbitmq.client.ConnectionFactory;
+import io.latent.storm.rabbitmq.Declarator;
+import io.latent.storm.rabbitmq.config.ConnectionConfig;
+import io.latent.storm.rabbitmq.config.ConsumerConfig;
+import io.latent.storm.rabbitmq.config.ConsumerConfigBuilder;
+import org.apache.airavata.gfac.local.AiravataRabbitMQSpout;
+import org.apache.airavata.gfac.local.RandomSentenceSpout;
+import org.apache.airavata.gfac.local.handler.LocalDirectorySetupBolt;
+import org.apache.airavata.gfac.local.provider.impl.LocalProviderBolt;
+import org.apache.airavata.gfac.local.utils.CustomStormDeclarator;
+import org.apache.airavata.gfac.local.utils.MessageScheme;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GfacTopologyBuilderTest {
+    private final static Logger logger = LoggerFactory.getLogger(GfacTopologyBuilderTest.class);
+
+
+    public static void main(String[] args) throws RegistryException, InterruptedException {
+        TopologyBuilder builder = new TopologyBuilder();
+        ConnectionConfig connectionConfig = new ConnectionConfig("localhost", 5672, "guest", "guest", ConnectionFactory.DEFAULT_VHOST, 10); // host, port, username, password, virtualHost, heartBeat
+        ConsumerConfig spoutConfig = new ConsumerConfigBuilder().connection(connectionConfig)
+                .queue("gfac.submit")
+                .prefetch(200)
+                .requeueOnFail()
+                .build();
+
+//
+        MessageScheme messageScheme = new MessageScheme();
+        Declarator declarator = new CustomStormDeclarator("airavata_rabbitmq_exchange", "gfac.submit", "*");
+
+        builder.setSpout("spout", new AiravataRabbitMQSpout(messageScheme,declarator), 5).addConfigurations(spoutConfig.asMap())
+                .setMaxSpoutPending(200);;
+//        builder.setSpout("spout", new RandomSentenceSpout(), 5);
+
+        builder.setBolt("directoryset", new LocalDirectorySetupBolt(), 8).shuffleGrouping("spout").setNumTasks(16);
+        builder.setBolt("count", new LocalProviderBolt(), 12).shuffleGrouping("directoryset");
+        Config conf = new Config();
+        conf.setDebug(true);
+
+
+        conf.setNumWorkers(3);
+
+
+        LocalCluster cluster = new LocalCluster();
+        cluster.submitTopology("word-count", conf, builder.createTopology());
+
+        Thread.sleep(10000000);
+
+//        cluster.shutdown();
+    }
+
+    public void testTopology() throws RegistryException, InterruptedException {
+        TopologyBuilder builder = new TopologyBuilder();
+        ConnectionConfig connectionConfig = new ConnectionConfig("localhost", 5672, "guest", "guest", ConnectionFactory.DEFAULT_VHOST, 10); // host, port, username, password, virtualHost, heartBeat
+        ConsumerConfig spoutConfig = new ConsumerConfigBuilder().connection(connectionConfig)
+                .queue("gfac.submit")
+                .prefetch(200)
+                .requeueOnFail()
+                .build();
+
+//
+        MessageScheme messageScheme = new MessageScheme();
+        Declarator declarator = new CustomStormDeclarator("airavata_rabbitmq_exchange", "gfac.submit", "*");
+
+        builder.setSpout("spout", new AiravataRabbitMQSpout(messageScheme,declarator), 5).addConfigurations(spoutConfig.asMap())
+                .setMaxSpoutPending(200);;
+
+        builder.setBolt("directoryset", new LocalDirectorySetupBolt(), 8).shuffleGrouping("spout").setNumTasks(16);
+        builder.setBolt("count", new LocalProviderBolt(), 12).shuffleGrouping("directoryset");
+        Config conf = new Config();
+        conf.setDebug(true);
+
+
+        conf.setNumWorkers(3);
+
+
+        LocalCluster cluster = new LocalCluster();
+        cluster.submitTopology("word-count", conf, builder.createTopology());
+
+        Thread.sleep(10000000);
+
+//        cluster.shutdown();
+    }
+}