You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2015/01/26 17:30:50 UTC
[1/2] airavata git commit: creating gfac storm version
Repository: airavata
Updated Branches:
refs/heads/gfac-storm [created] 69d178214
http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java b/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
deleted file mode 100644
index aeb8158..0000000
--- a/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
+++ /dev/null
@@ -1,184 +0,0 @@
-///*
-// *
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements. See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership. The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License. You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing,
-// * software distributed under the License is distributed on an
-// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// * KIND, either express or implied. See the License for the
-// * specific language governing permissions and limitations
-// * under the License.
-// *
-//*/
-//package org.apache.airavata.core.gfac.services.impl;
-//
-//import java.io.File;
-//import java.net.URL;
-//import java.util.ArrayList;
-//import java.util.List;
-//
-//import org.apache.airavata.common.utils.MonitorPublisher;
-//import org.apache.airavata.commons.gfac.type.ActualParameter;
-//import org.apache.airavata.commons.gfac.type.ApplicationDescription;
-//import org.apache.airavata.commons.gfac.type.HostDescription;
-//import org.apache.airavata.commons.gfac.type.ServiceDescription;
-//import org.apache.airavata.gfac.GFacConfiguration;
-//import org.apache.airavata.gfac.GFacException;
-//import org.apache.airavata.gfac.core.context.ApplicationContext;
-//import org.apache.airavata.gfac.core.context.JobExecutionContext;
-//import org.apache.airavata.gfac.core.context.MessageContext;
-//import org.apache.airavata.gfac.core.provider.GFacProviderException;
-//import org.apache.airavata.gfac.local.handler.LocalDirectorySetupHandler;
-//import org.apache.airavata.gfac.local.provider.impl.LocalProvider;
-//import org.apache.airavata.model.workspace.experiment.ExecutionUnit;
-//import org.apache.airavata.model.workspace.experiment.Experiment;
-//import org.apache.airavata.model.workspace.experiment.TaskDetails;
-//import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
-//import org.apache.airavata.persistance.registry.jpa.impl.LoggingRegistryImpl;
-//import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
-//import org.apache.airavata.schemas.gfac.InputParameterType;
-//import org.apache.airavata.schemas.gfac.OutputParameterType;
-//import org.apache.airavata.schemas.gfac.StringParameterType;
-//import org.apache.commons.lang.SystemUtils;
-//import org.testng.annotations.BeforeTest;
-//import org.testng.annotations.Test;
-//
-//import com.google.common.eventbus.EventBus;
-//
-//public class LocalProviderTest {
-// private JobExecutionContext jobExecutionContext;
-// @BeforeTest
-// public void setUp() throws Exception {
-//
-// URL resource = this.getClass().getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
-// File configFile = new File(resource.getPath());
-// GFacConfiguration gFacConfiguration = GFacConfiguration.create(configFile, null);
-// //have to set InFlwo Handlers and outFlowHandlers
-// ApplicationContext applicationContext = new ApplicationContext();
-// HostDescription host = new HostDescription();
-// host.getType().setHostName("localhost");
-// host.getType().setHostAddress("localhost");
-// applicationContext.setHostDescription(host);
-// /*
-// * App
-// */
-// ApplicationDescription appDesc = new ApplicationDescription();
-// ApplicationDeploymentDescriptionType app = appDesc.getType();
-// ApplicationDeploymentDescriptionType.ApplicationName name = ApplicationDeploymentDescriptionType.ApplicationName.Factory.newInstance();
-// name.setStringValue("EchoLocal");
-// app.setApplicationName(name);
-//
-// /*
-// * Use bat file if it is compiled on Windows
-// */
-// if (SystemUtils.IS_OS_WINDOWS) {
-// URL url = this.getClass().getClassLoader().getResource("echo.bat");
-// app.setExecutableLocation(url.getFile());
-// } else {
-// //for unix and Mac
-// app.setExecutableLocation("/bin/echo");
-// }
-//
-// /*
-// * Default tmp location
-// */
-// String tempDir = System.getProperty("java.io.tmpdir");
-// if (tempDir == null) {
-// tempDir = "/tmp";
-// }
-//
-// app.setScratchWorkingDirectory(tempDir);
-// app.setStaticWorkingDirectory(tempDir);
-// app.setInputDataDirectory(tempDir + File.separator + "input");
-// app.setOutputDataDirectory(tempDir + File.separator + "output");
-// app.setStandardOutput(tempDir + File.separator + "echo.stdout");
-// app.setStandardError(tempDir + File.separator + "echo.stderr");
-//
-// applicationContext.setApplicationDeploymentDescription(appDesc);
-//
-// /*
-// * Service
-// */
-// ServiceDescription serv = new ServiceDescription();
-// serv.getType().setName("SimpleEcho");
-//
-// List<InputParameterType> inputList = new ArrayList<InputParameterType>();
-// InputParameterType input = InputParameterType.Factory.newInstance();
-// input.setParameterName("echo_input");
-// input.setParameterType(StringParameterType.Factory.newInstance());
-// inputList.add(input);
-// InputParameterType[] inputParamList = inputList.toArray(new InputParameterType[inputList
-// .size()]);
-//
-// List<OutputParameterType> outputList = new ArrayList<OutputParameterType>();
-// OutputParameterType output = OutputParameterType.Factory.newInstance();
-// output.setParameterName("echo_output");
-// output.setParameterType(StringParameterType.Factory.newInstance());
-// outputList.add(output);
-// OutputParameterType[] outputParamList = outputList
-// .toArray(new OutputParameterType[outputList.size()]);
-//
-// serv.getType().setInputParametersArray(inputParamList);
-// serv.getType().setOutputParametersArray(outputParamList);
-//
-// jobExecutionContext = new JobExecutionContext(gFacConfiguration, serv.getType().getName());
-// jobExecutionContext.setApplicationContext(applicationContext);
-// /*
-// * Host
-// */
-// applicationContext.setServiceDescription(serv);
-//
-// MessageContext inMessage = new MessageContext();
-// ActualParameter echo_input = new ActualParameter();
-// ((StringParameterType) echo_input.getType()).setValue("echo_output=hello");
-// inMessage.addParameter("echo_input", echo_input);
-//
-// jobExecutionContext.setInMessageContext(inMessage);
-//
-// MessageContext outMessage = new MessageContext();
-// ActualParameter echo_out = new ActualParameter();
-// outMessage.addParameter("echo_output", echo_out);
-//
-// jobExecutionContext.setOutMessageContext(outMessage);
-//
-// jobExecutionContext.setExperimentID("test123");
-// jobExecutionContext.setExperiment(new Experiment("test123","project1","admin","testExp"));
-// jobExecutionContext.setTaskData(new TaskDetails(jobExecutionContext.getExperimentID()));
-// jobExecutionContext.setRegistry(new LoggingRegistryImpl());
-// jobExecutionContext.setWorkflowNodeDetails(new WorkflowNodeDetails(jobExecutionContext.getExperimentID(),"none", ExecutionUnit.APPLICATION));
-//
-//
-// }
-//
-// @Test
-// public void testLocalDirectorySetupHandler() throws GFacException {
-// LocalDirectorySetupHandler localDirectorySetupHandler = new LocalDirectorySetupHandler();
-// localDirectorySetupHandler.invoke(jobExecutionContext);
-//
-// ApplicationDescription applicationDeploymentDescription = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription();
-// ApplicationDeploymentDescriptionType app = applicationDeploymentDescription.getType();
-// junit.framework.Assert.assertTrue(new File(app.getStaticWorkingDirectory()).exists());
-// junit.framework.Assert.assertTrue(new File(app.getScratchWorkingDirectory()).exists());
-// junit.framework.Assert.assertTrue(new File(app.getInputDataDirectory()).exists());
-// junit.framework.Assert.assertTrue(new File(app.getOutputDataDirectory()).exists());
-// }
-//
-// @Test
-// public void testLocalProvider() throws GFacException,GFacProviderException {
-// LocalDirectorySetupHandler localDirectorySetupHandler = new LocalDirectorySetupHandler();
-// localDirectorySetupHandler.invoke(jobExecutionContext);
-// LocalProvider localProvider = new LocalProvider();
-// localProvider.setMonitorPublisher(new MonitorPublisher(new EventBus()));
-// localProvider.initialize(jobExecutionContext);
-// localProvider.execute(jobExecutionContext);
-// localProvider.dispose(jobExecutionContext);
-// }
-//}
http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/WordCountTopologyTest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/WordCountTopologyTest.java b/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/WordCountTopologyTest.java
new file mode 100644
index 0000000..914c9b8
--- /dev/null
+++ b/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/WordCountTopologyTest.java
@@ -0,0 +1,135 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.core.gfac.services.impl;
+
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.task.ShellBolt;
+import backtype.storm.topology.BasicOutputCollector;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseBasicBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import org.apache.airavata.gfac.local.RandomSentenceSpout;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This topology demonstrates Storm's stream groupings and multilang capabilities.
+ */
+public class WordCountTopologyTest {
+ public static class SplitSentence extends ShellBolt implements IRichBolt {
+
+ public SplitSentence() {
+ super("python", "splitsentence.py");
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word"));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+ }
+
+ public static class WordCount extends BaseBasicBolt {
+ Map<String, Integer> counts = new HashMap<String, Integer>();
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ String word = tuple.getString(0);
+ Integer count = counts.get(word);
+ if (count == null)
+ count = 0;
+ count++;
+ counts.put(word, count);
+ collector.emit(new Values(word, count));
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word", "count"));
+ }
+ }
+
+
+ public static void main(String[] args) {
+
+ TopologyBuilder builder = new TopologyBuilder();
+
+ builder.setSpout("spout", new RandomSentenceSpout(), 5);
+
+ builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
+
+ Config conf = new Config();
+ conf.setDebug(true);
+
+
+ conf.setMaxTaskParallelism(3);
+
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("word-count", conf, builder.createTopology());
+
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ }
+
+ cluster.shutdown();
+ }
+
+ @Test
+ public void testTopology() {
+
+ TopologyBuilder builder = new TopologyBuilder();
+
+ builder.setSpout("spout", new RandomSentenceSpout(), 5);
+
+ builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
+ builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
+
+ Config conf = new Config();
+ conf.setDebug(true);
+
+
+ conf.setMaxTaskParallelism(3);
+
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("word-count", conf, builder.createTopology());
+
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ }
+
+ cluster.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/pom.xml b/modules/gfac/pom.xml
index e85ef7d..e8c5f1d 100644
--- a/modules/gfac/pom.xml
+++ b/modules/gfac/pom.xml
@@ -32,17 +32,7 @@
</activation>
<modules>
<module>gfac-core</module>
- <module>gfac-ec2</module>
- <module>gfac-ssh</module>
- <module>gfac-local</module>
- <!--<module>gfac-hadoop</module>-->
- <!--<module>gfac-gram</module>-->
- <module>gfac-gsissh</module>
- <module>gfac-bes</module>
- <module>gfac-monitor</module>
- <module>airavata-gfac-service</module>
- <module>airavata-gfac-stubs</module>
- <module>gfac-application-specific-handlers</module>
+ <module>storm-starter</module>
</modules>
</profile>
</profiles>
http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/orchestrator/orchestrator-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/pom.xml b/modules/orchestrator/orchestrator-core/pom.xml
index 691d288..a72ae7b 100644
--- a/modules/orchestrator/orchestrator-core/pom.xml
+++ b/modules/orchestrator/orchestrator-core/pom.xml
@@ -113,7 +113,7 @@ the License. -->
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
- <version>6.1.1</version>
+ <version>6.8.5</version>
<scope>test</scope>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/ExperimentRegistry.java
----------------------------------------------------------------------
diff --git a/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/ExperimentRegistry.java b/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/ExperimentRegistry.java
index 5768a0d..ff4402b 100644
--- a/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/ExperimentRegistry.java
+++ b/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/ExperimentRegistry.java
@@ -38,10 +38,11 @@ import org.apache.airavata.registry.cpi.RegistryModelType;
import org.apache.airavata.registry.cpi.utils.Constants;
import org.apache.airavata.registry.cpi.utils.StatusType;
+import java.io.Serializable;
import java.sql.Timestamp;
import java.util.*;
-public class ExperimentRegistry {
+public class ExperimentRegistry implements Serializable{
private GatewayResource gatewayResource;
private WorkerResource workerResource;
private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(ExperimentRegistry.class);
http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/ProjectRegistry.java
----------------------------------------------------------------------
diff --git a/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/ProjectRegistry.java b/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/ProjectRegistry.java
index d85cd83..f959356 100644
--- a/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/ProjectRegistry.java
+++ b/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/ProjectRegistry.java
@@ -21,6 +21,7 @@
package org.apache.airavata.persistance.registry.jpa.impl;
+import java.io.Serializable;
import java.util.*;
import org.apache.airavata.common.utils.AiravataUtils;
@@ -34,7 +35,7 @@ import org.apache.airavata.registry.cpi.utils.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ProjectRegistry {
+public class ProjectRegistry implements Serializable {
private GatewayResource gatewayResource;
private WorkerResource workerResource;
private final static Logger logger = LoggerFactory.getLogger(ProjectRegistry.class);
http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/RegistryImpl.java
----------------------------------------------------------------------
diff --git a/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/RegistryImpl.java b/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/RegistryImpl.java
index a76bb5d..f619e1f 100644
--- a/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/RegistryImpl.java
+++ b/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/RegistryImpl.java
@@ -33,11 +33,12 @@ import org.apache.airavata.registry.cpi.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-public class RegistryImpl implements Registry {
+public class RegistryImpl implements Registry,Serializable {
private GatewayResource gatewayResource;
private UserResource user;
private final static Logger logger = LoggerFactory.getLogger(RegistryImpl.class);
@@ -62,6 +63,7 @@ public class RegistryImpl implements Registry {
experimentRegistry = new ExperimentRegistry(gatewayResource, user);
projectRegistry = new ProjectRegistry(gatewayResource, user);
} catch (ApplicationSettingsException e) {
+ e.printStackTrace();
logger.error("Unable to read airavata server properties..", e);
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/resources/AbstractResource.java
----------------------------------------------------------------------
diff --git a/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/resources/AbstractResource.java b/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/resources/AbstractResource.java
index 449c072..708fe35 100644
--- a/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/resources/AbstractResource.java
+++ b/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/resources/AbstractResource.java
@@ -20,6 +20,7 @@
*/
package org.apache.airavata.persistance.registry.jpa.resources;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
@@ -27,7 +28,7 @@ import org.apache.airavata.persistance.registry.jpa.Resource;
import org.apache.airavata.persistance.registry.jpa.ResourceType;
import org.apache.airavata.registry.cpi.RegistryException;
-public abstract class AbstractResource implements Resource {
+public abstract class AbstractResource implements Resource, Serializable {
// table names
public static final String GATEWAY = "Gateway";
public static final String CONFIGURATION = "Configuration";
http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/resources/GatewayResource.java
----------------------------------------------------------------------
diff --git a/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/resources/GatewayResource.java b/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/resources/GatewayResource.java
index 34aa67e..feb664c 100644
--- a/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/resources/GatewayResource.java
+++ b/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/resources/GatewayResource.java
@@ -20,6 +20,7 @@
*/
package org.apache.airavata.persistance.registry.jpa.resources;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
@@ -35,7 +36,7 @@ import org.apache.airavata.registry.cpi.RegistryException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class GatewayResource extends AbstractResource {
+public class GatewayResource extends AbstractResource implements Serializable {
private final static Logger logger = LoggerFactory.getLogger(GatewayResource.class);
private String gatewayName;
private String owner;
http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/resources/WorkerResource.java
----------------------------------------------------------------------
diff --git a/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/resources/WorkerResource.java b/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/resources/WorkerResource.java
index 63939ca..337f964 100644
--- a/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/resources/WorkerResource.java
+++ b/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/resources/WorkerResource.java
@@ -33,13 +33,14 @@ import org.apache.airavata.registry.cpi.RegistryException;
import javax.persistence.EntityManager;
import javax.persistence.Query;
+import java.io.Serializable;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-public class WorkerResource extends AbstractResource {
+public class WorkerResource extends AbstractResource implements Serializable {
private final static Logger logger = LoggerFactory.getLogger(WorkerResource.class);
private String user;
private GatewayResource gateway;
http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4e68213..01ae686 100644
--- a/pom.xml
+++ b/pom.xml
@@ -71,7 +71,7 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<axis2.version>1.5.1</axis2.version>
<derby.version>10.9.1.0</derby.version>
- <org.slf4j.version>1.7.6</org.slf4j.version>
+ <org.slf4j.version>1.6.6</org.slf4j.version>
<log4j.version>1.2.16</log4j.version>
<axiom.version>1.2.8</axiom.version>
<surefire.version>2.12</surefire.version>
@@ -513,20 +513,11 @@
<module>modules/airavata-client</module>
<module>modules/commons</module>
<module>modules/gfac</module>
- <module>modules/workflow-model</module>
<module>modules/registry</module>
<module>modules/app-catalog</module>
<module>modules/security</module>
<module>modules/credential-store-service</module>
- <module>modules/orchestrator</module>
- <module>tools</module>
- <module>modules/server</module>
- <module>modules/test-suite</module>
- <module>modules/distribution</module>
- <module>modules/ws-messenger</module>
<module>modules/messaging</module>
- <module>modules/integration-tests</module>
- <module>modules/xbaya-gui</module>
</modules>
</profile>
<profile>
@@ -599,18 +590,10 @@
<module>airavata-api</module>
<module>modules/commons</module>
<module>modules/gfac</module>
- <module>modules/ws-messenger</module>
- <module>modules/workflow-model</module>
<module>modules/registry</module>
<module>modules/airavata-client</module>
<module>modules/security</module>
<module>modules/credential-store-service</module>
- <module>modules/orchestrator</module>
- <module>tools</module>
- <module>modules/server</module>
- <module>modules/test-suite</module>
- <module>modules/distribution</module>
- <module>modules/integration-tests</module>
</modules>
</profile>
<!--profile> <id>ec2Tests</id> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId>
http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/tools/gsissh/pom.xml
----------------------------------------------------------------------
diff --git a/tools/gsissh/pom.xml b/tools/gsissh/pom.xml
index 16f3878..76f04cc 100644
--- a/tools/gsissh/pom.xml
+++ b/tools/gsissh/pom.xml
@@ -73,7 +73,7 @@
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
- <version>6.1.1</version>
+ <version>6.8.5</version>
<scope>test</scope>
</dependency>
<dependency>
[2/2] airavata git commit: creating gfac storm version
Posted by la...@apache.org.
creating gfac storm version
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/69d17821
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/69d17821
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/69d17821
Branch: refs/heads/gfac-storm
Commit: 69d1782149c11a60dded61e7b9c77a9395fc4eea
Parents: cf7290a
Author: Lahiru Gunathilake <gl...@gmail.com>
Authored: Mon Jan 26 11:30:10 2015 -0500
Committer: Lahiru Gunathilake <gl...@gmail.com>
Committed: Mon Jan 26 11:30:10 2015 -0500
----------------------------------------------------------------------
.../java-client-samples/pom.xml | 5 +
.../client/samples/CreateLaunchExperiment.java | 48 +++-
.../catalog/data/util/AppCatalogJPAUtils.java | 4 +-
.../common/utils/ApplicationSettings.java | 8 +-
.../main/resources/airavata-server.properties | 234 ---------------
modules/gfac/gfac-core/pom.xml | 6 -
.../gfac/core/context/JobExecutionContext.java | 3 +
.../airavata/gfac/core/cpi/BetterGfacImpl.java | 1 -
.../airavata/gfac/core/utils/GFacUtils.java | 14 +
.../apache/airavata/job/GFacConfigXmlTest.java | 4 +-
modules/gfac/gfac-ec2/pom.xml | 2 +-
modules/gfac/gfac-local/pom.xml | 26 +-
.../gfac/local/AiravataRabbitMQSpout.java | 158 +++++++++++
.../gfac/local/RandomSentenceSpout.java | 67 +++++
.../local/handler/LocalDirectorySetupBolt.java | 159 +++++++++++
.../gfac/local/provider/impl/LocalProvider.java | 2 -
.../local/provider/impl/LocalProviderBolt.java | 283 +++++++++++++++++++
.../gfac/local/utils/CustomStormDeclarator.java | 61 ++++
.../gfac/local/utils/ExperimentLauncher.java | 28 ++
.../gfac/local/utils/ExperimentModelUtil.java | 187 ++++++++++++
.../gfac/local/utils/MessageScheme.java | 46 +++
.../services/impl/GfacTopologyBuilderTest.java | 112 ++++++++
.../gfac/services/impl/LocalProviderTest.java | 184 ------------
.../services/impl/WordCountTopologyTest.java | 135 +++++++++
modules/gfac/pom.xml | 12 +-
modules/orchestrator/orchestrator-core/pom.xml | 2 +-
.../registry/jpa/impl/ExperimentRegistry.java | 3 +-
.../registry/jpa/impl/ProjectRegistry.java | 3 +-
.../registry/jpa/impl/RegistryImpl.java | 4 +-
.../jpa/resources/AbstractResource.java | 3 +-
.../registry/jpa/resources/GatewayResource.java | 3 +-
.../registry/jpa/resources/WorkerResource.java | 3 +-
pom.xml | 19 +-
tools/gsissh/pom.xml | 2 +-
34 files changed, 1343 insertions(+), 488 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/airavata-api/airavata-client-sdks/java-client-samples/pom.xml
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/pom.xml b/airavata-api/airavata-client-sdks/java-client-samples/pom.xml
index 5f12ace..fa7a398 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/pom.xml
+++ b/airavata-api/airavata-client-sdks/java-client-samples/pom.xml
@@ -56,6 +56,11 @@
<artifactId>airavata-client-configuration</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.rabbitmq</groupId>
+ <artifactId>amqp-client</artifactId>
+ <version>3.3.5</version>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index b2c5469..8eb28b6 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -21,6 +21,9 @@
package org.apache.airavata.client.samples;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
import org.apache.airavata.api.Airavata;
import org.apache.airavata.api.client.AiravataClientFactory;
import org.apache.airavata.client.tools.RegisterSampleApplications;
@@ -57,7 +60,7 @@ public class CreateLaunchExperiment {
private static final String DEFAULT_GATEWAY = "default.registry.gateway";
private static Airavata.Client airavataClient;
- private static String echoAppId = "Echo_0018dc21-9359-4063-9672-6ad15a24294d";
+ private static String echoAppId = "Echo_53cd4995-1a3b-4290-9615-2592a59051b8";
private static String mpiAppId = "HelloMPI_da45305f-5d90-4a18-8716-8dd54c3b2376";
private static String wrfAppId = "WRF_7ad5da38-c08b-417c-a9ea-da9298839762";
private static String amberAppId = "Amber_49b16f6f-93ab-4885-9971-6ab2ab5eb3d3";
@@ -94,11 +97,11 @@ public class CreateLaunchExperiment {
try {
for (int i = 0; i < 1; i++) {
// final String expId = createExperimentForSSHHost(airavata);
- final String expId = createEchoExperimentForFSD(airavataClient);
+// final String expId = createEchoExperimentForFSD(airavataClient);
// final String expId = createMPIExperimentForFSD(airavataClient);
// final String expId = createEchoExperimentForStampede(airavataClient);
// final String expId = createEchoExperimentForTrestles(airavataClient);
-// final String expId = createExperimentEchoForLocalHost(airavataClient);
+ final String expId = createExperimentEchoForLocalHost(airavataClient);
// final String expId = createExperimentWRFTrestles(airavataClient);
// final String expId = createExperimentForBR2(airavataClient);
// final String expId = createExperimentForBR2Amber(airavataClient);
@@ -113,7 +116,7 @@ public class CreateLaunchExperiment {
// final String expId = createExperimentAUTODOCKStampede(airavataClient); // this is not working , we need to register AutoDock app on stampede
System.out.println("Experiment ID : " + expId);
// updateExperiment(airavata, expId);
- launchExperiment(airavataClient, expId);
+// launchExperimentPassive(expId);
}
} catch (Exception e) {
logger.error("Error while connecting with server", e.getMessage());
@@ -1553,6 +1556,16 @@ public class CreateLaunchExperiment {
return null;
}
+ public static void launchExperiment(String expId)
+ throws TException {
+ try {
+ String gatewayId = "default";
+ } catch (Exception e) {
+ logger.error("Error occured while launching the experiment...", e.getMessage());
+ throw new TException(e);
+ }
+ }
+
public static void launchExperiment(Airavata.Client client, String expId)
throws TException {
try {
@@ -1577,6 +1590,33 @@ public class CreateLaunchExperiment {
}
}
+ public static void launchExperimentPassive(String expId)
+ throws TException {
+ try {
+ String uri = "amqp://localhost";
+ String message = expId;
+ String exchange = "airavata_rabbitmq_exchange";
+
+ ConnectionFactory cfconn = new ConnectionFactory();
+ cfconn.setUri(uri);
+ Connection conn = cfconn.newConnection();
+
+ Channel ch = conn.createChannel();
+
+ if (exchange.equals("")) {
+ ch.queueDeclare("gfac.submit", false, false, false, null);
+ }
+ ch.basicPublish(exchange, expId, null, message.getBytes());
+ ch.close();
+ conn.close();
+ } catch (Exception e) {
+ System.err.println("Main thread caught exception: " + e);
+ e.printStackTrace();
+ System.exit(1);
+ }
+
+ }
+
public static List<Experiment> getExperimentsForUser(Airavata.Client client, String user) {
try {
return client.getAllUserExperiments(user);
http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/util/AppCatalogJPAUtils.java
----------------------------------------------------------------------
diff --git a/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/util/AppCatalogJPAUtils.java b/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/util/AppCatalogJPAUtils.java
index 6b99bd9..d8116d9 100644
--- a/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/util/AppCatalogJPAUtils.java
+++ b/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/util/AppCatalogJPAUtils.java
@@ -58,8 +58,8 @@ public class AppCatalogJPAUtils {
properties.put("openjpa.ConnectionProperties", connectionProperties);
properties.put("openjpa.DynamicEnhancementAgent", "true");
properties.put("openjpa.RuntimeUnenhancedClasses", "unsupported");
- properties.put("openjpa.DataCache","true(CacheSize=" + Integer.valueOf(readServerProperties(JPA_CACHE_SIZE)) + ", SoftReferenceSize=0)");
- properties.put("openjpa.QueryCache","true(CacheSize=" + Integer.valueOf(readServerProperties(JPA_CACHE_SIZE)) + ", SoftReferenceSize=0)");
+// properties.put("openjpa.DataCache","true(CacheSize=" + Integer.valueOf(readServerProperties(JPA_CACHE_SIZE)) + ", SoftReferenceSize=0)");
+// properties.put("openjpa.QueryCache","true(CacheSize=" + Integer.valueOf(readServerProperties(JPA_CACHE_SIZE)) + ", SoftReferenceSize=0)");
properties.put("openjpa.RemoteCommitProvider","sjvm");
properties.put("openjpa.Log","DefaultLevel=INFO, Runtime=INFO, Tool=INFO, SQL=INFO");
properties.put("openjpa.jdbc.SynchronizeMappings", "buildSchema(ForeignKeys=true)");
http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ApplicationSettings.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ApplicationSettings.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ApplicationSettings.java
index 9a7ad16..b8d306d 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ApplicationSettings.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ApplicationSettings.java
@@ -85,13 +85,7 @@ public class ApplicationSettings {
}
protected URL getPropertyFileURL() {
- URL url;
- if (AiravataUtils.isClient()){
- url=ApplicationSettings.class.getClassLoader().getResource(CLIENT_PROPERTIES);
- }else{
- url=ApplicationSettings.class.getClassLoader().getResource(SERVER_PROPERTIES);
- }
- return url;
+ return ApplicationSettings.class.getClassLoader().getResource(SERVER_PROPERTIES);
}
protected URL[] getExternalSettingsFileURLs(){
http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties b/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties
deleted file mode 100644
index fb02901..0000000
--- a/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties
+++ /dev/null
@@ -1,234 +0,0 @@
-#
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-###########################################################################
-#
-# This properties file provides configuration for all Airavata Services:
-# API Server, Registry, Workflow Interpreter, GFac, Orchestrator
-#
-###########################################################################
-
-###########################################################################
-# API Server Registry Configuration
-###########################################################################
-
-#for derby [AiravataJPARegistry]
-registry.jdbc.driver=org.apache.derby.jdbc.ClientDriver
-registry.jdbc.url=jdbc:derby://localhost:1527/persistent_data;create=true;user=airavata;password=airavata
-# MySql database configuration
-#registry.jdbc.driver=com.mysql.jdbc.Driver
-#registry.jdbc.url=jdbc:mysql://localhost:3306/persistent_data
-registry.jdbc.user=airavata
-registry.jdbc.password=airavata
-start.derby.server.mode=true
-validationQuery=SELECT 1 from CONFIGURATION
-jpa.cache.size=5000
-#jpa.connection.properties=MaxActive=10,MaxIdle=5,MinIdle=2,MaxWait=60000,testWhileIdle=true,testOnBorrow=true
-
-# Properties for default user mode
-default.registry.user=admin
-default.registry.password=admin
-default.registry.password.hash.method=SHA
-default.registry.gateway=default
-
-#ip=127.0.0.1
-
-###########################################################################
-# Application Catalog DB Configuration
-###########################################################################
-#for derby [AiravataJPARegistry]
-appcatalog.jdbc.driver=org.apache.derby.jdbc.ClientDriver
-appcatalog.jdbc.url=jdbc:derby://localhost:1527/app_catalog;create=true;user=airavata;password=airavata
-# MySql database configuration
-#appcatalog.jdbc.driver=com.mysql.jdbc.Driver
-#appcatalog.jdbc.url=jdbc:mysql://localhost:3306/app_catalog
-appcatalog.jdbc.user=airavata
-appcatalog.jdbc.password=airavata
-appcatalog.validationQuery=SELECT 1 from CONFIGURATION
-
-###########################################################################
-# Server module Configuration
-###########################################################################
-
-servers=apiserver,orchestrator,gfac,workflowserver
-#shutdown.trategy=NONE
-shutdown.trategy=SELF_TERMINATE
-
-
-apiserver.server.host=localhost
-apiserver.server.port=8930
-apiserver.server.min.threads=50
-workflow.server.host=localhost
-workflow.server.port=8931
-orchestrator.server.host=localhost
-orchestrator.server.port=8940
-gfac.server.host=localhost
-gfac.server.port=8950
-orchestrator.server.min.threads=50
-
-###########################################################################
-# Credential Store module Configuration
-###########################################################################
-credential.store.keystore.url=/Users/lahirugunathilake/Downloads/airavata_sym.jks
-credential.store.keystore.alias=airavata
-credential.store.keystore.password=airavata
-credential.store.jdbc.url=jdbc:derby://localhost:1527/persistent_data;create=true;user=airavata;password=airavata
-credential.store.jdbc.user=airavata
-credential.store.jdbc.password=airavata
-credential.store.jdbc.driver=org.apache.derby.jdbc.ClientDriver
-
-notifier.enabled=false
-#period in milliseconds
-notifier.duration=5000
-
-email.server=smtp.googlemail.com
-email.server.port=465
-email.user=airavata
-email.password=xxx
-email.ssl=true
-email.from=airavata@apache.org
-
-###########################################################################
-# Airavata GFac MyProxy GSI credentials to access Grid Resources.
-###########################################################################
-#
-# Security Configuration used by Airavata Generic Factory Service
-# to interact with Computational Resources.
-#
-gfac=org.apache.airavata.gfac.server.GfacServer
-myproxy.server=myproxy.teragrid.org
-myproxy.username=ogce
-myproxy.password=
-myproxy.life=3600
-# XSEDE Trusted certificates can be downloaded from https://software.xsede.org/security/xsede-certs.tar.gz
-trusted.cert.location=/Users/lahirugunathilake/Downloads/certificates
-# SSH PKI key pair or ssh password can be used SSH based authentication is used.
-# if user specify both password authentication gets the higher preference
-
-################# ---------- For ssh key pair authentication ------------------- ################
-#public.ssh.key=/path to public key for ssh
-#ssh.username=username for ssh connection
-#private.ssh.key=/path to private key file for ssh
-#ssh.keypass=passphrase for the private key
-
-
-################# ---------- For ssh key pair authentication ------------------- ################
-#ssh.username=username for ssh connection
-#ssh.password=Password for ssh connection
-
-
-
-###########################################################################
-# Airavata Workflow Interpreter Configurations
-###########################################################################
-
-#runInThread=true
-#provenance=true
-#provenanceWriterThreadPoolSize=20
-#gfac.embedded=true
-#workflowserver=org.apache.airavata.api.server.WorkflowServer
-
-
-###########################################################################
-# API Server module Configuration
-###########################################################################
-apiserver=org.apache.airavata.api.server.AiravataAPIServer
-
-###########################################################################
-# Workflow Server module Configuration
-###########################################################################
-
-workflowserver=org.apache.airavata.api.server.WorkflowServer
-
-###########################################################################
-# Advance configuration to change service implementations
-###########################################################################
-# If false, disables two phase commit when submitting jobs
-TwoPhase=true
-#
-# Class which implemented HostScheduler interface. It will determine the which host to submit the request
-#
-host.scheduler=org.apache.airavata.gfac.core.scheduler.impl.SimpleHostScheduler
-
-###########################################################################
-# Monitoring module Configuration
-###########################################################################
-
-#This will be the primary monitoring tool which runs in airavata, in future there will be multiple monitoring
-#mechanisms and one would be able to start a monitor
-monitors=org.apache.airavata.gfac.monitor.impl.pull.qstat.QstatMonitor,org.apache.airavata.gfac.monitor.impl.LocalJobMonitor
-
-
-###########################################################################
-# AMQP Notification Configuration
-###########################################################################
-
-
-amqp.notification.enable=1
-
-amqp.broker.host=localhost
-amqp.broker.port=5672
-amqp.broker.username=guest
-amqp.broker.password=guest
-
-amqp.sender=org.apache.airavata.wsmg.client.amqp.rabbitmq.AMQPSenderImpl
-amqp.topic.sender=org.apache.airavata.wsmg.client.amqp.rabbitmq.AMQPTopicSenderImpl
-amqp.broadcast.sender=org.apache.airavata.wsmg.client.amqp.rabbitmq.AMQPBroadcastSenderImpl
-
-#,org.apache.airavata.gfac.monitor.impl.push.amqp.AMQPMonitor
-#This is the amqp related configuration and this lists down the Rabbitmq host, this is an xsede specific configuration
-amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org
-proxy.file.path=/Users/lahirugunathilake/Downloads/x509up_u503876
-connection.name=xsede
-#publisher
-activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.api.server.listener.AiravataExperimentStatusUpdator,org.apache.airavata.gfac.core.monitor.GfacInternalStatusUpdator,org.apache.airavata.workflow.engine.util.ProxyMonitorPublisher
-publish.rabbitmq=false
-activity.publisher=org.apache.airavata.messaging.core.impl.RabbitMQPublisher
-rabbitmq.broker.url=amqp://localhost:5672
-rabbitmq.exchange.name=airavata_rabbitmq_exchange
-
-###########################################################################
-# Orchestrator module Configuration
-###########################################################################
-
-#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACEmbeddedJobSubmitter
-job.submitter=org.apache.airavata.orchestrator.core.impl.GFACServiceJobSubmitter
-job.validators=org.apache.airavata.orchestrator.core.validator.impl.SimpleAppDataValidator,org.apache.airavata.orchestrator.core.validator.impl.ExperimentStatusValidator
-submitter.interval=10000
-threadpool.size=10
-start.submitter=true
-embedded.mode=true
-enable.validation=true
-orchestrator=org.apache.airavata.orchestrator.server.OrchestratorServer
-
-###########################################################################
-# Zookeeper Server Configuration
-###########################################################################
-
-embedded.zk=true
-zookeeper.server.host=localhost
-zookeeper.server.port=2181
-airavata-server=/api-server
-orchestrator-server=/orchestrator-server
-gfac-server=/gfac-server
-gfac-experiments=/gfac-experiments
-gfac-server-name=gfac-node0
-orchestrator-server-name=orch-node0
-airavata-server-name=api-node0
http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/pom.xml b/modules/gfac/gfac-core/pom.xml
index 2a85282..53b60ec 100644
--- a/modules/gfac/gfac-core/pom.xml
+++ b/modules/gfac/gfac-core/pom.xml
@@ -81,12 +81,6 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.testng</groupId>
- <artifactId>testng</artifactId>
- <version>6.1.1</version>
- <scope>test</scope>
- </dependency>
- <dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
index c8c48ef..97c2572 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
@@ -177,6 +177,9 @@ public class JobExecutionContext extends AbstractContext implements Serializable
*/
private Map<String, SecurityContext> securityContext = new HashMap<String, SecurityContext>();
+ public JobExecutionContext() {
+ }
+
public JobExecutionContext(GFacConfiguration gFacConfiguration,String applicationName){
this.gfacConfiguration = gFacConfiguration;
notifier = new GFacNotifier();
http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 8403f8c..c81a179 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -301,7 +301,6 @@ public class BetterGfacImpl implements GFac,Watcher {
// List<InputDataObjectType> experimentInputs = experiment.getExperimentInputs();
// jobExecutionContext.setInMessageContext(new MessageContext(GFacUtils.getInputParamMap(experimentInputs)));
List<InputDataObjectType> taskInputs = taskData.getApplicationInputs();
- jobExecutionContext.setInMessageContext(new MessageContext(GFacUtils.getInputParamMap(taskInputs)));
// List<OutputDataObjectType> outputData = experiment.getExperimentOutputs();
List<OutputDataObjectType> taskOutputs = taskData.getApplicationOutputs();
http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
index c71ed27..6286d91 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
@@ -768,6 +768,20 @@ public class GFacUtils {
}
}
+ public static void saveJobStatus(Registry registry,
+ JobDetails details,String taskId, JobState state) throws GFacException {
+ try {
+ JobStatus status = new JobStatus();
+ status.setJobState(state);
+ details.setJobStatus(status);
+ registry.add(ChildDataType.JOB_DETAIL, details,
+ new CompositeIdentifier(taskId, details.getJobID()));
+ } catch (Exception e) {
+ throw new GFacException("Error persisting job status"
+ + e.getLocalizedMessage(), e);
+ }
+ }
+
public static void updateJobStatus(JobExecutionContext jobExecutionContext,
JobDetails details, JobState state) throws GFacException {
try {
http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/GFacConfigXmlTest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/GFacConfigXmlTest.java b/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/GFacConfigXmlTest.java
index 4744772..503baa2 100644
--- a/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/GFacConfigXmlTest.java
+++ b/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/GFacConfigXmlTest.java
@@ -32,8 +32,8 @@ import org.apache.airavata.gfac.core.context.ApplicationContext;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
import org.apache.airavata.model.appcatalog.computeresource.*;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
+import org.junit.BeforeClass;
+import org.junit.Test;
import org.xml.sax.SAXException;
import javax.xml.parsers.ParserConfigurationException;
http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-ec2/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ec2/pom.xml b/modules/gfac/gfac-ec2/pom.xml
index d51fddb..0905606 100644
--- a/modules/gfac/gfac-ec2/pom.xml
+++ b/modules/gfac/gfac-ec2/pom.xml
@@ -110,7 +110,7 @@
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
- <version>6.1.1</version>
+ <version>6.8.5</version>
<scope>test</scope>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-local/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/pom.xml b/modules/gfac/gfac-local/pom.xml
index 9776f0a..c13b189 100644
--- a/modules/gfac/gfac-local/pom.xml
+++ b/modules/gfac/gfac-local/pom.xml
@@ -8,7 +8,8 @@
ANY ~ KIND, either express or implied. See the License for the specific language governing permissions and limitations under
the License. -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.airavata</groupId>
<artifactId>gfac</artifactId>
@@ -22,6 +23,12 @@
<description>This is the extension of GFAC Local.</description>
<url>http://airavata.apache.org/</url>
+ <repositories>
+ <repository>
+ <id>clojars.org</id>
+ <url>http://clojars.org/repo</url>
+ </repository>
+ </repositories>
<dependencies>
<!-- Logging -->
@@ -36,7 +43,6 @@
<artifactId>airavata-gfac-core</artifactId>
<version>${project.version}</version>
</dependency>
-
<!-- Test -->
<dependency>
<groupId>junit</groupId>
@@ -44,12 +50,6 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.testng</groupId>
- <artifactId>testng</artifactId>
- <version>6.1.1</version>
- <scope>test</scope>
- </dependency>
- <dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<scope>test</scope>
@@ -59,7 +59,17 @@
<artifactId>slf4j-log4j12</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>0.10.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>io.latent</groupId>
+ <artifactId>storm-rabbitmq</artifactId>
+ <version>0.5.10</version>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/AiravataRabbitMQSpout.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/AiravataRabbitMQSpout.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/AiravataRabbitMQSpout.java
new file mode 100644
index 0000000..985bc83
--- /dev/null
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/AiravataRabbitMQSpout.java
@@ -0,0 +1,158 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.local;
+import backtype.storm.spout.Scheme;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import io.latent.storm.rabbitmq.*;
+import io.latent.storm.rabbitmq.config.ConsumerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A simple RabbitMQ spout that emits an anchored tuple stream (on the default stream). This can be used with
+ * Storm's guaranteed message processing.
+ *
+ * @author peter@latent.io
+ */
+public class AiravataRabbitMQSpout extends BaseRichSpout {
+
+ private final MessageScheme scheme;
+ private final Declarator declarator;
+
+ private transient Logger logger;
+ private transient RabbitMQConsumer consumer;
+ private transient SpoutOutputCollector collector;
+
+ public AiravataRabbitMQSpout(Scheme scheme) {
+ this(MessageScheme.Builder.from(scheme), new Declarator.NoOp());
+ }
+
+ public AiravataRabbitMQSpout(Scheme scheme, Declarator declarator) {
+ this(MessageScheme.Builder.from(scheme), declarator);
+ }
+
+ public AiravataRabbitMQSpout(MessageScheme scheme, Declarator declarator) {
+ this.scheme = scheme;
+ this.declarator = declarator;
+ }
+
+ @Override
+ public void open(final Map config,
+ final TopologyContext context,
+ final SpoutOutputCollector spoutOutputCollector) {
+ ConsumerConfig consumerConfig = ConsumerConfig.getFromStormConfig(config);
+
+ ErrorReporter reporter = new ErrorReporter() {
+ @Override
+ public void reportError(Throwable error) {
+ spoutOutputCollector.reportError(error);
+ }
+ };
+ consumer = loadConsumer(declarator, reporter, consumerConfig);
+ scheme.open(config, context);
+ consumer.open();
+ logger = LoggerFactory.getLogger(AiravataRabbitMQSpout.class);
+ collector = spoutOutputCollector;
+ }
+
+ protected RabbitMQConsumer loadConsumer(Declarator declarator,
+ ErrorReporter reporter,
+ ConsumerConfig config) {
+ return new RabbitMQConsumer(config.getConnectionConfig(),
+ config.getPrefetchCount(),
+ config.getQueueName(),
+ config.isRequeueOnFail(),
+ declarator,
+ reporter);
+ }
+
+ @Override
+ public void close() {
+ consumer.close();
+ scheme.close();
+ super.close();
+ }
+
+ @Override
+ public void nextTuple() {
+ Message message;
+ System.out.println("Waiting for messages !!!");
+ while ((message = consumer.nextMessage()) != Message.NONE) {
+ List<Object> tuple = extractTuple(message);
+ if (!tuple.isEmpty()) {
+ System.out.println(new String(message.getBody()));
+ emit(tuple, message, collector);
+ }
+ }
+ }
+
+ protected List<Integer> emit(List<Object> tuple,
+ Message message,
+ SpoutOutputCollector spoutOutputCollector) {
+ return spoutOutputCollector.emit(tuple, getDeliveryTag(message));
+ }
+
+ private List<Object> extractTuple(Message message) {
+ long deliveryTag = getDeliveryTag(message);
+ try {
+ List<Object> tuple = scheme.deserialize(message);
+ if (tuple != null && !tuple.isEmpty()) {
+ return tuple;
+ }
+ String errorMsg = "Deserialization error for msgId " + deliveryTag;
+ logger.warn(errorMsg);
+ collector.reportError(new Exception(errorMsg));
+ } catch (Exception e) {
+ logger.warn("Deserialization error for msgId " + deliveryTag, e);
+ collector.reportError(e);
+ }
+ // get the malformed message out of the way by dead-lettering (if dead-lettering is configured) and move on
+ consumer.deadLetter(deliveryTag);
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void ack(Object msgId) {
+ if (msgId instanceof Long) consumer.ack((Long) msgId);
+ }
+
+ @Override
+ public void fail(Object msgId) {
+ if (msgId instanceof Long) consumer.fail((Long) msgId);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+ outputFieldsDeclarer.declare(scheme.getOutputFields());
+ }
+
+ protected long getDeliveryTag(Message message) {
+ return ((Message.DeliveredMessage) message).getDeliveryTag();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/RandomSentenceSpout.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/RandomSentenceSpout.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/RandomSentenceSpout.java
new file mode 100644
index 0000000..ad53add
--- /dev/null
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/RandomSentenceSpout.java
@@ -0,0 +1,67 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.local;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+
+import java.util.Map;
+import java.util.Random;
+
+public class RandomSentenceSpout extends BaseRichSpout {
+ SpoutOutputCollector _collector;
+ Random _rand;
+
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ _collector = collector;
+ _rand = new Random();
+ }
+
+ @Override
+ public void nextTuple() {
+ Utils.sleep(100);
+ String[] sentences = new String[]{ "echoExperiment_be59db00-020f-4d46-b157-05577ab5c065" };
+ String sentence = sentences[_rand.nextInt(sentences.length)];
+ System.out.println(sentence);
+ _collector.emit(new Values(sentence));
+ }
+
+ @Override
+ public void ack(Object id) {
+ }
+
+ @Override
+ public void fail(Object id) {
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word"));
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/handler/LocalDirectorySetupBolt.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/handler/LocalDirectorySetupBolt.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/handler/LocalDirectorySetupBolt.java
new file mode 100644
index 0000000..92b6f9d
--- /dev/null
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/handler/LocalDirectorySetupBolt.java
@@ -0,0 +1,159 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.local.handler;
+
+import backtype.storm.topology.BasicOutputCollector;
+import backtype.storm.topology.FailedException;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseBasicBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import org.airavata.appcatalog.cpi.AppCatalog;
+import org.airavata.appcatalog.cpi.AppCatalogException;
+import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.gfac.local.utils.ExperimentModelUtil;
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
+import org.apache.airavata.model.workspace.experiment.*;
+import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+public class LocalDirectorySetupBolt extends BaseBasicBolt {
+ private final static Logger logger = LoggerFactory.getLogger(LocalDirectorySetupBolt.class);
+
+ Registry registry;
+
+ public LocalDirectorySetupBolt() throws RegistryException {
+ registry = RegistryFactory.getDefaultRegistry();
+ }
+
+ public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
+ try {
+ Registry registry = RegistryFactory.getDefaultRegistry();
+ String Ids = tuple.getString(0);
+ String[] split = Ids.split(",");
+ if (split.length != 3) {
+ throw new FailedException("Wrong tuple given: " + Ids);
+ }
+ String gatewayId = split[0];
+ String expId = split[1];
+ List<TaskDetails> tasks = createTasks(expId);
+ String taskId = tasks.get(0).getTaskID();
+ TaskDetails taskData = (TaskDetails) registry.get(RegistryModelType.TASK_DETAIL, taskId);
+
+ String applicationInterfaceId = taskData.getApplicationId();
+ String applicationDeploymentId = taskData.getApplicationDeploymentId();
+ if (null == applicationInterfaceId) {
+ throw new FailedException("Error executing the job. The required Application Id is missing");
+ }
+ if (null == applicationDeploymentId) {
+ throw new FailedException("Error executing the job. The required Application deployment Id is missing");
+ }
+
+ AppCatalog appCatalog = AppCatalogFactory.getAppCatalog();
+ //fetch the compute resource, application interface and deployment information from app catalog
+ ApplicationInterfaceDescription applicationInterface = appCatalog.
+ getApplicationInterface().getApplicationInterface(applicationInterfaceId);
+ ApplicationDeploymentDescription applicationDeployment = appCatalog.
+ getApplicationDeployment().getApplicationDeployement(applicationDeploymentId);
+ ComputeResourceDescription computeResource = appCatalog.getComputeResource().
+ getComputeResource(applicationDeployment.getComputeHostId());
+ ComputeResourcePreference gatewayResourcePreferences = appCatalog.getGatewayProfile().
+ getComputeResourcePreference(gatewayId, applicationDeployment.getComputeHostId());
+
+ if (gatewayResourcePreferences == null) {
+ List<String> gatewayProfileIds = appCatalog.getGatewayProfile()
+ .getGatewayProfileIds(gatewayId);
+ for (String profileId : gatewayProfileIds) {
+ gatewayId = profileId;
+ gatewayResourcePreferences = appCatalog.getGatewayProfile().
+ getComputeResourcePreference(gatewayId, applicationDeployment.getComputeHostId());
+ if (gatewayResourcePreferences != null) {
+ break;
+ }
+ }
+ }
+
+ String scratchLocation = gatewayResourcePreferences.getScratchLocation();
+ String workingDir = scratchLocation + File.separator + expId;
+ String inputDir = workingDir + File.separator + Constants.INPUT_DATA_DIR_VAR_NAME;
+ String outputDir = workingDir + File.separator + Constants.OUTPUT_DATA_DIR_VAR_NAME;
+ makeFileSystemDir(workingDir);
+ makeFileSystemDir(inputDir);
+ makeFileSystemDir(outputDir);
+
+ basicOutputCollector.emit(new Values(Ids, taskId));
+
+ } catch (RegistryException e) {
+ logger.error(e.getMessage(), e);
+ throw new FailedException(e);
+ } catch (AppCatalogException e) {
+ logger.error(e.getMessage(), e);
+ throw new FailedException(e);
+ } catch (GFacHandlerException e) {
+ logger.error(e.getMessage(), e);
+ throw new FailedException(e);
+ }
+ }
+
+ public List<TaskDetails> createTasks(String experimentId) throws RegistryException {
+ Experiment experiment = null;
+ List<TaskDetails> tasks = new ArrayList<TaskDetails>();
+ experiment = (Experiment) registry.get(RegistryModelType.EXPERIMENT, experimentId);
+
+
+ WorkflowNodeDetails iDontNeedaNode = ExperimentModelUtil.createWorkflowNode("IDontNeedaNode", null);
+ String nodeID = (String) registry.add(ChildDataType.WORKFLOW_NODE_DETAIL, iDontNeedaNode, experimentId);
+
+ TaskDetails taskDetails = ExperimentModelUtil.cloneTaskFromExperiment(experiment);
+ taskDetails.setTaskID((String) registry.add(ChildDataType.TASK_DETAIL, taskDetails, nodeID));
+ tasks.add(taskDetails);
+ return tasks;
+ }
+
+
+ private void makeFileSystemDir(String dir) throws GFacHandlerException {
+ File f = new File(dir);
+ if (f.isDirectory() && f.exists()) {
+ return;
+ } else if (!new File(dir).mkdir()) {
+ throw new GFacHandlerException("Cannot create directory " + dir);
+ }
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+ outputFieldsDeclarer.declare(new Fields("experimentId","taskId"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
index 9f055e9..59303ec 100644
--- a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
@@ -23,7 +23,6 @@ package org.apache.airavata.gfac.local.provider.impl;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@@ -36,7 +35,6 @@ import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.notification.events.StartExecutionEvent;
import org.apache.airavata.gfac.core.provider.AbstractProvider;
import org.apache.airavata.gfac.core.provider.GFacProviderException;
-import org.apache.airavata.gfac.core.provider.utils.ProviderUtils;
import org.apache.airavata.gfac.core.utils.GFacUtils;
import org.apache.airavata.gfac.core.utils.OutputUtils;
import org.apache.airavata.gfac.local.utils.InputStreamToFileWriter;
http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProviderBolt.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProviderBolt.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProviderBolt.java
new file mode 100644
index 0000000..13954de
--- /dev/null
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProviderBolt.java
@@ -0,0 +1,283 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.local.provider.impl;
+
+import backtype.storm.topology.BasicOutputCollector;
+import backtype.storm.topology.FailedException;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseBasicBolt;
+import backtype.storm.tuple.Tuple;
+import org.airavata.appcatalog.cpi.AppCatalog;
+import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.utils.GFacUtils;
+import org.apache.airavata.gfac.local.utils.InputStreamToFileWriter;
+import org.apache.airavata.gfac.local.utils.InputUtils;
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+import org.apache.airavata.model.appcatalog.appdeployment.SetEnvPaths;
+import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
+import org.apache.airavata.model.workspace.experiment.*;
+import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.*;
+
+public class LocalProviderBolt extends BaseBasicBolt {
+ private final static Logger logger = LoggerFactory.getLogger(LocalProviderBolt.class);
+
+ private ProcessBuilder builder;
+ private List<String> cmdList;
+ private String jobId;
+ private Registry registry;
+
+ public static class LocalProviderJobData {
+ private String applicationName;
+ private List<String> inputParameters;
+ private String workingDir;
+ private String inputDir;
+ private String outputDir;
+
+ public String getApplicationName() {
+ return applicationName;
+ }
+
+ public void setApplicationName(String applicationName) {
+ this.applicationName = applicationName;
+ }
+
+ public List<String> getInputParameters() {
+ return inputParameters;
+ }
+
+ public void setInputParameters(List<String> inputParameters) {
+ this.inputParameters = inputParameters;
+ }
+
+ public String getWorkingDir() {
+ return workingDir;
+ }
+
+ public void setWorkingDir(String workingDir) {
+ this.workingDir = workingDir;
+ }
+
+ public String getInputDir() {
+ return inputDir;
+ }
+
+ public void setInputDir(String inputDir) {
+ this.inputDir = inputDir;
+ }
+
+ public String getOutputDir() {
+ return outputDir;
+ }
+
+ public void setOutputDir(String outputDir) {
+ this.outputDir = outputDir;
+ }
+ }
+
+ public LocalProviderBolt() throws RegistryException {
+ cmdList = new ArrayList<String>();
+ registry = RegistryFactory.getDefaultRegistry();
+ }
+
+ public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
+ try {
+ registry = RegistryFactory.getDefaultRegistry();
+ String ids = tuple.getString(0);
+ String taskId = tuple.getString(1);
+
+ String[] split = ids.split(",");
+ if (split.length != 3) {
+ throw new FailedException("Wrong tuple given: " + ids);
+ }
+ String gatewayId = split[0];
+ String experimentId = split[1];
+
+ TaskDetails taskData = (TaskDetails) registry.get(RegistryModelType.TASK_DETAIL, taskId);
+
+ String applicationInterfaceId = taskData.getApplicationId();
+ String applicationDeploymentId = taskData.getApplicationDeploymentId();
+ if (null == applicationInterfaceId) {
+ throw new FailedException("Error executing the job. The required Application Id is missing");
+ }
+ if (null == applicationDeploymentId) {
+ throw new FailedException("Error executing the job. The required Application deployment Id is missing");
+ }
+
+ AppCatalog appCatalog = AppCatalogFactory.getAppCatalog();
+ //fetch the compute resource, application interface and deployment information from app catalog
+ ApplicationInterfaceDescription applicationInterface = appCatalog.
+ getApplicationInterface().getApplicationInterface(applicationInterfaceId);
+ ApplicationDeploymentDescription applicationDeployment = appCatalog.
+ getApplicationDeployment().getApplicationDeployement(applicationDeploymentId);
+ ComputeResourceDescription computeResource = appCatalog.getComputeResource().
+ getComputeResource(applicationDeployment.getComputeHostId());
+ ComputeResourcePreference gatewayResourcePreferences = appCatalog.getGatewayProfile().
+ getComputeResourcePreference(gatewayId, applicationDeployment.getComputeHostId());
+
+ if (gatewayResourcePreferences == null) {
+ List<String> gatewayProfileIds = appCatalog.getGatewayProfile()
+ .getGatewayProfileIds(gatewayId);
+ for (String profileId : gatewayProfileIds) {
+ gatewayId = profileId;
+ gatewayResourcePreferences = appCatalog.getGatewayProfile().
+ getComputeResourcePreference(gatewayId, applicationDeployment.getComputeHostId());
+ if (gatewayResourcePreferences != null) {
+ break;
+ }
+ }
+ }
+ String executablePath = applicationDeployment.getExecutablePath();
+
+
+ buildCommand(applicationDeployment, taskId);
+ initProcessBuilder(applicationDeployment);
+
+ // extra environment variables
+ String workingDir = "/tmp";
+ workingDir = workingDir + experimentId + File.separator + org.apache.airavata.gfac.Constants.INPUT_DATA_DIR_VAR_NAME;
+ String stdOutFile = workingDir + File.separator + applicationInterface.getApplicationName().replaceAll("\\s+", "") + ".stdout";
+ String stdErrFile = workingDir + File.separator + applicationInterface.getApplicationName().replaceAll("\\s+", "") + ".stderr";
+
+ builder.environment().put(org.apache.airavata.gfac.Constants.INPUT_DATA_DIR_VAR_NAME, stdOutFile);
+ builder.environment().put(org.apache.airavata.gfac.Constants.OUTPUT_DATA_DIR_VAR_NAME, stdErrFile);
+
+ // set working directory
+ builder.directory(new File(workingDir));
+
+ // log info
+ logger.info("Command = " + InputUtils.buildCommand(cmdList));
+ logger.info("Working dir = " + builder.directory());
+ for (String key : builder.environment().keySet()) {
+ logger.info("Env[" + key + "] = " + builder.environment().get(key));
+ }
+
+
+ JobDetails jobDetails = new JobDetails();
+ jobId = taskData.getTaskID();
+ jobDetails.setJobID(jobId);
+ jobDetails.setJobDescription(applicationDeployment.getAppDeploymentDescription());
+ GFacUtils.saveJobStatus(registry, jobDetails, taskId, JobState.SETUP);
+ // running cmd
+ Process process = builder.start();
+
+ Thread standardOutWriter = new InputStreamToFileWriter(process.getInputStream(), stdOutFile);
+ Thread standardErrorWriter = new InputStreamToFileWriter(process.getErrorStream(), stdErrFile);
+
+ // start output threads
+ standardOutWriter.setDaemon(true);
+ standardErrorWriter.setDaemon(true);
+ standardOutWriter.start();
+ standardErrorWriter.start();
+
+ int returnValue = process.waitFor();
+
+ // make sure other two threads are done
+ standardOutWriter.join();
+ standardErrorWriter.join();
+
+ /*
+ * check return value. usually not very helpful to draw conclusions based on return values so don't bother.
+ * just provide warning in the log messages
+ */
+ if (returnValue != 0) {
+ logger.error("Process finished with non zero return value. Process may have failed");
+ } else {
+ logger.info("Process finished with return value of zero.");
+ }
+
+ StringBuffer buf = new StringBuffer();
+ buf.append("Executed ").append(InputUtils.buildCommand(cmdList))
+ .append(" on the localHost, working directory = ").append(workingDir)
+ .append(" tempDirectory = ").append(workingDir).append(" With the status ")
+ .append(String.valueOf(returnValue));
+
+ logger.info(buf.toString());
+ }catch (Exception e){
+ e.printStackTrace();
+ }
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+
+ }
+
+
+ private void buildCommand(ApplicationDeploymentDescription applicationDeploymentDescription, String taskID) throws RegistryException, GFacException {
+ cmdList.add(applicationDeploymentDescription.getExecutablePath());
+ String appModuleId = applicationDeploymentDescription.getAppModuleId();
+ TaskDetails taskData = (TaskDetails) registry.get(RegistryModelType.TASK_DETAIL, taskID);
+
+ List<InputDataObjectType> taskInputs = taskData.getApplicationInputs();
+ Map<String, Object> inputParamMap = GFacUtils.getInputParamMap(taskInputs);
+
+ // sort the inputs first and then build the command List
+ Comparator<InputDataObjectType> inputOrderComparator = new Comparator<InputDataObjectType>() {
+ @Override
+ public int compare(InputDataObjectType inputDataObjectType, InputDataObjectType t1) {
+ return inputDataObjectType.getInputOrder() - t1.getInputOrder();
+ }
+ };
+ Set<InputDataObjectType> sortedInputSet = new TreeSet<InputDataObjectType>(inputOrderComparator);
+ for (Object object : inputParamMap.values()) {
+ if (object instanceof InputDataObjectType) {
+ InputDataObjectType inputDOT = (InputDataObjectType) object;
+ sortedInputSet.add(inputDOT);
+ }
+ }
+ for (InputDataObjectType inputDataObjectType : sortedInputSet) {
+ if (inputDataObjectType.getApplicationArgument() != null
+ && !inputDataObjectType.getApplicationArgument().equals("")) {
+ cmdList.add(inputDataObjectType.getApplicationArgument());
+ }
+
+ if (inputDataObjectType.getValue() != null
+ && !inputDataObjectType.getValue().equals("")) {
+ cmdList.add(inputDataObjectType.getValue());
+ }
+ }
+
+ }
+
+ private void initProcessBuilder(ApplicationDeploymentDescription app) {
+ builder = new ProcessBuilder(cmdList);
+
+ List<SetEnvPaths> setEnvironment = app.getSetEnvironment();
+ if (setEnvironment != null) {
+ for (SetEnvPaths envPath : setEnvironment) {
+ Map<String, String> builderEnv = builder.environment();
+ builderEnv.put(envPath.getName(), envPath.getValue());
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/CustomStormDeclarator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/CustomStormDeclarator.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/CustomStormDeclarator.java
new file mode 100644
index 0000000..c5568de
--- /dev/null
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/CustomStormDeclarator.java
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.local.utils;
+
+import com.rabbitmq.client.Channel;
+import io.latent.storm.rabbitmq.Declarator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class CustomStormDeclarator implements Declarator {
+ private final static Logger logger = LoggerFactory.getLogger(CustomStormDeclarator.class);
+
+ private final String exchange;
+ private final String queue;
+ private final String routingKey;
+
+ public CustomStormDeclarator(String exchange, String queue) {
+ this(exchange, queue, "");
+ }
+
+ public CustomStormDeclarator(String exchange, String queue, String routingKey) {
+ this.exchange = exchange;
+ this.queue = queue;
+ this.routingKey = routingKey;
+ }
+
+ @Override
+ public void execute(Channel channel) {
+ // you're given a RabbitMQ Channel so you're free to wire up your exchange/queue bindings as you see fit
+ try {
+ Map<String, Object> args = new HashMap<String,Object>();
+ channel.queueDeclare(queue, true, false, false, args);
+ channel.exchangeDeclare(exchange, "topic", true);
+ channel.queueBind(queue, exchange, routingKey);
+ } catch (IOException e) {
+ throw new RuntimeException("Error executing rabbitmq declarations.", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/ExperimentLauncher.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/ExperimentLauncher.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/ExperimentLauncher.java
new file mode 100644
index 0000000..69466c6
--- /dev/null
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/ExperimentLauncher.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.local.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ExperimentLauncher {
+ private final static Logger logger = LoggerFactory.getLogger(ExperimentLauncher.class);
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/ExperimentModelUtil.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/ExperimentModelUtil.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/ExperimentModelUtil.java
new file mode 100644
index 0000000..c14ea48
--- /dev/null
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/ExperimentModelUtil.java
@@ -0,0 +1,187 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.local.utils;
+
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.model.workspace.experiment.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Calendar;
+import java.util.List;
+
+
+public class ExperimentModelUtil {
+
+ public static WorkflowNodeStatus createWorkflowNodeStatus(WorkflowNodeState state){
+ WorkflowNodeStatus status = new WorkflowNodeStatus();
+ status.setWorkflowNodeState(state);
+ status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+ return status;
+ }
+
+ public static Experiment createSimpleExperiment(String projectID,
+ String userName,
+ String experimentName,
+ String expDescription,
+ String applicationId,
+ List<InputDataObjectType> experimentInputList) {
+ Experiment experiment = new Experiment();
+ experiment.setProjectID(projectID);
+ experiment.setUserName(userName);
+ experiment.setName(experimentName);
+ experiment.setDescription(expDescription);
+ experiment.setApplicationId(applicationId);
+ experiment.setExperimentInputs(experimentInputList);
+ return experiment;
+ }
+
+
+ public static ComputationalResourceScheduling createComputationResourceScheduling(String resourceHostId,
+ int cpuCount,
+ int nodeCount,
+ int numberOfThreads,
+ String queueName,
+ int wallTimeLimit,
+ long jobstartTime,
+ int totalPhysicalMemory,
+ String projectAccount) {
+
+ ComputationalResourceScheduling cmRS = new ComputationalResourceScheduling();
+ cmRS.setResourceHostId(resourceHostId);
+ cmRS.setTotalCPUCount(cpuCount);
+ cmRS.setNodeCount(nodeCount);
+ cmRS.setNumberOfThreads(numberOfThreads);
+ cmRS.setQueueName(queueName);
+ cmRS.setWallTimeLimit(wallTimeLimit);
+ cmRS.setJobStartTime((int) jobstartTime);
+ cmRS.setTotalPhysicalMemory(totalPhysicalMemory);
+ cmRS.setComputationalProjectAccount(projectAccount);
+ return cmRS;
+ }
+
+ public static AdvancedInputDataHandling createAdvancedInputHandling(boolean stageInputFilesToWorkingDir,
+ String parentWorkingDir,
+ String uniqueWorkingDir,
+ boolean cleanupAfterJob) {
+ AdvancedInputDataHandling inputDataHandling = new AdvancedInputDataHandling();
+ inputDataHandling.setStageInputFilesToWorkingDir(stageInputFilesToWorkingDir);
+ inputDataHandling.setParentWorkingDirectory(parentWorkingDir);
+ inputDataHandling.setUniqueWorkingDirectory(uniqueWorkingDir);
+ inputDataHandling.setCleanUpWorkingDirAfterJob(cleanupAfterJob);
+ return inputDataHandling;
+ }
+
+ public static AdvancedOutputDataHandling createAdvancedOutputDataHandling(String outputDatadir,
+ String dataRegUrl,
+ boolean persistOutput) {
+ AdvancedOutputDataHandling outputDataHandling = new AdvancedOutputDataHandling();
+ outputDataHandling.setOutputDataDir(outputDatadir);
+ outputDataHandling.setDataRegistryURL(dataRegUrl);
+ outputDataHandling.setPersistOutputData(persistOutput);
+ return outputDataHandling;
+ }
+
+ public static QualityOfServiceParams createQOSParams(String startExecutionAt,
+ String executeBefore,
+ int numberOfRetires) {
+ QualityOfServiceParams qosParams = new QualityOfServiceParams();
+ qosParams.setStartExecutionAt(startExecutionAt);
+ qosParams.setExecuteBefore(executeBefore);
+ qosParams.setNumberofRetries(numberOfRetires);
+ return qosParams;
+ }
+
+ public static TaskDetails cloneTaskFromExperiment (Experiment experiment){
+ TaskDetails taskDetails = new TaskDetails();
+ taskDetails.setCreationTime(experiment.getCreationTime());
+ taskDetails.setApplicationId(experiment.getApplicationId());
+ taskDetails.setApplicationVersion(experiment.getApplicationVersion());
+ List<InputDataObjectType> experimentInputs = experiment.getExperimentInputs();
+ if (experimentInputs != null){
+ taskDetails.setApplicationInputs(experimentInputs);
+ }
+
+ List<OutputDataObjectType> experimentOutputs = experiment.getExperimentOutputs();
+ if (experimentOutputs != null){
+ taskDetails.setApplicationOutputs(experimentOutputs);
+ }
+
+ UserConfigurationData configData = experiment.getUserConfigurationData();
+ if (configData != null){
+ ComputationalResourceScheduling scheduling = configData.getComputationalResourceScheduling();
+ if (scheduling != null){
+ taskDetails.setTaskScheduling(scheduling);
+ }
+ AdvancedInputDataHandling advanceInputDataHandling = configData.getAdvanceInputDataHandling();
+ if (advanceInputDataHandling != null){
+ taskDetails.setAdvancedInputDataHandling(advanceInputDataHandling);
+ }
+ AdvancedOutputDataHandling outputHandling = configData.getAdvanceOutputDataHandling();
+ if (outputHandling != null){
+ taskDetails.setAdvancedOutputDataHandling(outputHandling);
+ }
+ }
+ return taskDetails;
+ }
+
+ public static TaskDetails cloneTaskFromWorkflowNodeDetails(Experiment experiment, WorkflowNodeDetails nodeDetails){
+ TaskDetails taskDetails = new TaskDetails();
+ taskDetails.setCreationTime(nodeDetails.getCreationTime());
+// String[] split = ;
+ taskDetails.setApplicationId(nodeDetails.getExecutionUnitData());
+// taskDetails.setApplicationVersion(split[1]);
+ List<InputDataObjectType> experimentInputs = nodeDetails.getNodeInputs();
+ if (experimentInputs != null){
+ taskDetails.setApplicationInputs(experimentInputs);
+ }
+
+ List<OutputDataObjectType> experimentOutputs = nodeDetails.getNodeOutputs();
+ if (experimentOutputs != null){
+ taskDetails.setApplicationOutputs(experimentOutputs);
+ }
+
+ UserConfigurationData configData = experiment.getUserConfigurationData();
+ if (configData != null){
+ ComputationalResourceScheduling scheduling = configData.getComputationalResourceScheduling();
+ if (scheduling != null){
+ taskDetails.setTaskScheduling(scheduling);
+ }
+ AdvancedInputDataHandling advanceInputDataHandling = configData.getAdvanceInputDataHandling();
+ if (advanceInputDataHandling != null){
+ taskDetails.setAdvancedInputDataHandling(advanceInputDataHandling);
+ }
+ AdvancedOutputDataHandling outputHandling = configData.getAdvanceOutputDataHandling();
+ if (outputHandling != null){
+ taskDetails.setAdvancedOutputDataHandling(outputHandling);
+ }
+ }
+ return taskDetails;
+ }
+ public static WorkflowNodeDetails createWorkflowNode (String nodeName,
+ List<InputDataObjectType> nodeInputs){
+ WorkflowNodeDetails wfnod = new WorkflowNodeDetails();
+ wfnod.setNodeName(nodeName);
+ wfnod.setNodeInputs(nodeInputs);
+ return wfnod;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/MessageScheme.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/MessageScheme.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/MessageScheme.java
new file mode 100644
index 0000000..9a69a13
--- /dev/null
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/MessageScheme.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.local.utils;
+
+import backtype.storm.spout.Scheme;
+import backtype.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class MessageScheme implements Scheme {
+ private final static Logger logger = LoggerFactory.getLogger(MessageScheme.class);
+
+ @Override
+ public List<Object> deserialize(byte[] bytes) {
+ ArrayList<Object> message = new ArrayList<Object>();
+ message.add(new String(bytes));
+
+ return message;
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ return new Fields("experiment");
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/69d17821/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/GfacTopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/GfacTopologyBuilderTest.java b/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/GfacTopologyBuilderTest.java
new file mode 100644
index 0000000..32f9479
--- /dev/null
+++ b/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/GfacTopologyBuilderTest.java
@@ -0,0 +1,112 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.core.gfac.services.impl;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.topology.TopologyBuilder;
+import com.rabbitmq.client.ConnectionFactory;
+import io.latent.storm.rabbitmq.Declarator;
+import io.latent.storm.rabbitmq.config.ConnectionConfig;
+import io.latent.storm.rabbitmq.config.ConsumerConfig;
+import io.latent.storm.rabbitmq.config.ConsumerConfigBuilder;
+import org.apache.airavata.gfac.local.AiravataRabbitMQSpout;
+import org.apache.airavata.gfac.local.RandomSentenceSpout;
+import org.apache.airavata.gfac.local.handler.LocalDirectorySetupBolt;
+import org.apache.airavata.gfac.local.provider.impl.LocalProviderBolt;
+import org.apache.airavata.gfac.local.utils.CustomStormDeclarator;
+import org.apache.airavata.gfac.local.utils.MessageScheme;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GfacTopologyBuilderTest {
+ private final static Logger logger = LoggerFactory.getLogger(GfacTopologyBuilderTest.class);
+
+
+ public static void main(String[] args) throws RegistryException, InterruptedException {
+ TopologyBuilder builder = new TopologyBuilder();
+ ConnectionConfig connectionConfig = new ConnectionConfig("localhost", 5672, "guest", "guest", ConnectionFactory.DEFAULT_VHOST, 10); // host, port, username, password, virtualHost, heartBeat
+ ConsumerConfig spoutConfig = new ConsumerConfigBuilder().connection(connectionConfig)
+ .queue("gfac.submit")
+ .prefetch(200)
+ .requeueOnFail()
+ .build();
+
+//
+ MessageScheme messageScheme = new MessageScheme();
+ Declarator declarator = new CustomStormDeclarator("airavata_rabbitmq_exchange", "gfac.submit", "*");
+
+ builder.setSpout("spout", new AiravataRabbitMQSpout(messageScheme,declarator), 5).addConfigurations(spoutConfig.asMap())
+ .setMaxSpoutPending(200);;
+// builder.setSpout("spout", new RandomSentenceSpout(), 5);
+
+ builder.setBolt("directoryset", new LocalDirectorySetupBolt(), 8).shuffleGrouping("spout").setNumTasks(16);
+ builder.setBolt("count", new LocalProviderBolt(), 12).shuffleGrouping("directoryset");
+ Config conf = new Config();
+ conf.setDebug(true);
+
+
+ conf.setNumWorkers(3);
+
+
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("word-count", conf, builder.createTopology());
+
+ Thread.sleep(10000000);
+
+// cluster.shutdown();
+ }
+
+ public void testTopology() throws RegistryException, InterruptedException {
+ TopologyBuilder builder = new TopologyBuilder();
+ ConnectionConfig connectionConfig = new ConnectionConfig("localhost", 5672, "guest", "guest", ConnectionFactory.DEFAULT_VHOST, 10); // host, port, username, password, virtualHost, heartBeat
+ ConsumerConfig spoutConfig = new ConsumerConfigBuilder().connection(connectionConfig)
+ .queue("gfac.submit")
+ .prefetch(200)
+ .requeueOnFail()
+ .build();
+
+//
+ MessageScheme messageScheme = new MessageScheme();
+ Declarator declarator = new CustomStormDeclarator("airavata_rabbitmq_exchange", "gfac.submit", "*");
+
+ builder.setSpout("spout", new AiravataRabbitMQSpout(messageScheme,declarator), 5).addConfigurations(spoutConfig.asMap())
+ .setMaxSpoutPending(200);;
+
+ builder.setBolt("directoryset", new LocalDirectorySetupBolt(), 8).shuffleGrouping("spout").setNumTasks(16);
+ builder.setBolt("count", new LocalProviderBolt(), 12).shuffleGrouping("directoryset");
+ Config conf = new Config();
+ conf.setDebug(true);
+
+
+ conf.setNumWorkers(3);
+
+
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("word-count", conf, builder.createTopology());
+
+ Thread.sleep(10000000);
+
+// cluster.shutdown();
+ }
+}