You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/06/03 06:46:10 UTC
[2/7] airavata git commit: Renamed orchestrator component nested
modules and artifact ids
http://git-wip-us.apache.org/repos/asf/airavata/blob/82773c73/modules/orchestrator/orchestrator-client/src/main/java/org/apache/airavata/orchestrator/cpi/orchestrator_cpi_serviceConstants.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-client/src/main/java/org/apache/airavata/orchestrator/cpi/orchestrator_cpi_serviceConstants.java b/modules/orchestrator/orchestrator-client/src/main/java/org/apache/airavata/orchestrator/cpi/orchestrator_cpi_serviceConstants.java
new file mode 100644
index 0000000..3fa2946
--- /dev/null
+++ b/modules/orchestrator/orchestrator-client/src/main/java/org/apache/airavata/orchestrator/cpi/orchestrator_cpi_serviceConstants.java
@@ -0,0 +1,55 @@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.1)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package org.apache.airavata.orchestrator.cpi;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("all") public class orchestrator_cpi_serviceConstants {
+
+ public static final String ORCHESTRATOR_CPI_VERSION = "0.13.0";
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/82773c73/modules/orchestrator/orchestrator-client/src/main/java/org/apache/airavata/orchestrator/sample/OrchestratorClientSample.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-client/src/main/java/org/apache/airavata/orchestrator/sample/OrchestratorClientSample.java b/modules/orchestrator/orchestrator-client/src/main/java/org/apache/airavata/orchestrator/sample/OrchestratorClientSample.java
new file mode 100644
index 0000000..009d110
--- /dev/null
+++ b/modules/orchestrator/orchestrator-client/src/main/java/org/apache/airavata/orchestrator/sample/OrchestratorClientSample.java
@@ -0,0 +1,136 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.orchestrator.sample;
+
+//import org.apache.airavata.client.AiravataAPIFactory;
+//import org.apache.airavata.client.api.AiravataAPI;
+//import org.apache.airavata.client.api.exception.AiravataAPIInvocationException;
+//import org.apache.airavata.client.tools.DocumentCreator;
+
+import org.apache.airavata.model.appcatalog.appinterface.DataType;
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.model.util.ExperimentModelUtil;
+import org.apache.airavata.model.workspace.experiment.ComputationalResourceScheduling;
+import org.apache.airavata.model.workspace.experiment.Experiment;
+import org.apache.airavata.model.workspace.experiment.UserConfigurationData;
+import org.apache.airavata.orchestrator.cpi.OrchestratorService;
+import org.apache.thrift.TException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class OrchestratorClientSample {
+// private static DocumentCreator documentCreator;
+ private static OrchestratorService.Client orchestratorClient;
+// private static Registry registry;
+ private static int NUM_CONCURRENT_REQUESTS = 1;
+ private static final String DEFAULT_USER = "default.registry.user";
+ private static final String DEFAULT_USER_PASSWORD = "default.registry.password";
+ private static final String DEFAULT_GATEWAY = "default.registry.gateway";
+ private static String sysUser;
+ private static String sysUserPwd;
+ private static String gateway;
+/*
+
+ public static void main(String[] args) {
+ try {
+ AiravataUtils.setExecutionAsClient();
+ sysUser = ClientSettings.getSetting(DEFAULT_USER);
+ sysUserPwd = ClientSettings.getSetting(DEFAULT_USER_PASSWORD);
+ gateway = ClientSettings.getSetting(DEFAULT_GATEWAY);
+ orchestratorClient = OrchestratorClientFactory.createOrchestratorClient("localhost", 8940);
+ registry = RegistryFactory.getRegistry(gateway, sysUser, sysUserPwd);
+ documentCreator = new DocumentCreator(getAiravataAPI());
+ documentCreator.createLocalHostDocs();
+ documentCreator.createGramDocs();
+ documentCreator.createPBSDocsForOGCE();
+ storeExperimentDetail();
+ } catch (ApplicationSettingsException e) {
+ e.printStackTrace();
+ } catch (RegistryException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ private static AiravataAPI getAiravataAPI() {
+ AiravataAPI airavataAPI = null;
+ try {
+ airavataAPI = AiravataAPIFactory.getAPI(gateway, sysUser);
+ } catch (AiravataAPIInvocationException e) {
+ e.printStackTrace();
+ }
+ return airavataAPI;
+ }
+*/
+
+ public static void storeExperimentDetail() {
+ for (int i = 0; i < NUM_CONCURRENT_REQUESTS; i++) {
+ Thread thread = new Thread() {
+ public void run() {
+ List<InputDataObjectType> exInputs = new ArrayList<InputDataObjectType>();
+ InputDataObjectType input = new InputDataObjectType();
+ input.setName("echo_input");
+ input.setType(DataType.STRING);
+ input.setValue("echo_output=Hello World");
+ exInputs.add(input);
+
+
+ List<OutputDataObjectType> exOut = new ArrayList<OutputDataObjectType>();
+ OutputDataObjectType output = new OutputDataObjectType();
+ output.setName("echo_output");
+ output.setType(DataType.STRING);
+ output.setValue("");
+ exOut.add(output);
+
+ Experiment simpleExperiment = ExperimentModelUtil.createSimpleExperiment("default", "admin", "echoExperiment", "SimpleEcho2", "SimpleEcho2", exInputs);
+ simpleExperiment.setExperimentOutputs(exOut);
+
+ ComputationalResourceScheduling scheduling = ExperimentModelUtil.createComputationResourceScheduling("trestles.sdsc.edu", 1, 1, 1, "normal", 0, 0, 1, "sds128");
+ scheduling.setResourceHostId("gsissh-trestles");
+ UserConfigurationData userConfigurationData = new UserConfigurationData();
+ userConfigurationData.setComputationalResourceScheduling(scheduling);
+ simpleExperiment.setUserConfigurationData(userConfigurationData);
+ String expId = null;
+ try {
+// expId = (String) registry.add(ParentDataType.EXPERIMENT, simpleExperiment);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ try {
+ orchestratorClient.launchExperiment(expId, "airavataToken");
+ } catch (TException e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ thread.start();
+ try {
+ thread.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/82773c73/modules/orchestrator/orchestrator-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/pom.xml b/modules/orchestrator/orchestrator-core/pom.xml
index e7984b4..80543df 100644
--- a/modules/orchestrator/orchestrator-core/pom.xml
+++ b/modules/orchestrator/orchestrator-core/pom.xml
@@ -18,7 +18,7 @@ the License. -->
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>airavata-orchestrator-core</artifactId>
+ <artifactId>orchestrator-core</artifactId>
<packaging>jar</packaging>
<name>Airavata Orchestrator Core</name>
<url>http://airavata.apache.org/</url>
http://git-wip-us.apache.org/repos/asf/airavata/blob/82773c73/modules/orchestrator/orchestrator-service/pom.xml
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/pom.xml b/modules/orchestrator/orchestrator-service/pom.xml
new file mode 100644
index 0000000..a3f47c7
--- /dev/null
+++ b/modules/orchestrator/orchestrator-service/pom.xml
@@ -0,0 +1,85 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under
+ the Apache License, Version 2.0 (theĆ "License"); you may not use this file except in compliance with the License. You may
+ obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to
+ in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+ ANY ~ KIND, either express or implied. See the License for the specific language governing permissions and limitations under
+ the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>orchestrator</artifactId>
+ <groupId>org.apache.airavata</groupId>
+ <version>0.16-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <name>Airavata Orchestrator Service</name>
+ <artifactId>orchestrator-service</artifactId>
+ <packaging>jar</packaging>
+ <url>http://airavata.apache.org/</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-credential-store</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ <version>${thrift.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>${org.slf4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>orchestrator-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>orchestrator-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-workflow-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>app-catalog-data</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>app-catalog-cpi</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-model-utils</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-server-configuration</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+ </properties>
+
+</project>
http://git-wip-us.apache.org/repos/asf/airavata/blob/82773c73/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java
new file mode 100644
index 0000000..78957ac
--- /dev/null
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java
@@ -0,0 +1,160 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.orchestrator.server;
+
+import java.net.InetSocketAddress;
+
+import org.apache.airavata.common.utils.IServer;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.orchestrator.cpi.OrchestratorService;
+import org.apache.airavata.orchestrator.util.Constants;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OrchestratorServer implements IServer{
+
+ private final static Logger logger = LoggerFactory.getLogger(OrchestratorServer.class);
+ private static final String SERVER_NAME = "Orchestrator Server";
+ private static final String SERVER_VERSION = "1.0";
+
+ private ServerStatus status;
+
+ private TServer server;
+
+ public OrchestratorServer() {
+ setStatus(ServerStatus.STOPPED);
+ }
+
+ public void StartOrchestratorServer(OrchestratorService.Processor<OrchestratorServerHandler> orchestratorServerHandlerProcessor)
+ throws Exception {
+ try {
+ final int serverPort = Integer.parseInt(ServerSettings.getSetting(Constants.ORCHESTRATOT_SERVER_PORT,"8940"));
+
+ final String serverHost = ServerSettings.getSetting(Constants.ORCHESTRATOT_SERVER_HOST, null);
+
+ TServerTransport serverTransport;
+
+ if(serverHost == null){
+ serverTransport = new TServerSocket(serverPort);
+ }else{
+ InetSocketAddress inetSocketAddress = new InetSocketAddress(serverHost, serverPort);
+ serverTransport = new TServerSocket(inetSocketAddress);
+ }
+
+ //server = new TSimpleServer(
+ // new TServer.Args(serverTransport).processor(orchestratorServerHandlerProcessor));
+ TThreadPoolServer.Args options = new TThreadPoolServer.Args(serverTransport);
+ options.minWorkerThreads = Integer.parseInt(ServerSettings.getSetting(Constants.ORCHESTRATOT_SERVER_MIN_THREADS, "30"));
+ server = new TThreadPoolServer(options.processor(orchestratorServerHandlerProcessor));
+
+ new Thread() {
+ public void run() {
+ server.serve();
+ setStatus(ServerStatus.STOPPED);
+ logger.info("Orchestrator Server Stopped.");
+ }
+ }.start();
+ new Thread() {
+ public void run() {
+ while(!server.isServing()){
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ if (server.isServing()){
+ setStatus(ServerStatus.STARTED);
+ logger.info("Starting Orchestrator Server on Port " + serverPort);
+ logger.info("Listening to Orchestrator Clients ....");
+ }
+ }
+ }.start();
+ } catch (TTransportException e) {
+ logger.error(e.getMessage());
+ setStatus(ServerStatus.FAILED);
+ }
+ }
+
+ public static void main(String[] args) {
+ try {
+ new OrchestratorServer().start();
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void start() throws Exception {
+ setStatus(ServerStatus.STARTING);
+ OrchestratorService.Processor<OrchestratorServerHandler> orchestratorService =
+ new OrchestratorService.Processor<OrchestratorServerHandler>(new OrchestratorServerHandler());
+ StartOrchestratorServer(orchestratorService);
+ }
+
+ @Override
+ public void stop() throws Exception {
+ if (server!=null && server.isServing()){
+ setStatus(ServerStatus.STOPING);
+ server.stop();
+ }
+
+ }
+
+ @Override
+ public void restart() throws Exception {
+ stop();
+ start();
+ }
+
+ @Override
+ public void configure() throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public ServerStatus getStatus() throws Exception {
+ return status;
+ }
+
+ private void setStatus(ServerStatus stat){
+ status=stat;
+ status.updateTime();
+ }
+
+ @Override
+ public String getName() {
+ return SERVER_NAME;
+ }
+
+ @Override
+ public String getVersion() {
+ return SERVER_VERSION;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/82773c73/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
new file mode 100644
index 0000000..4ef9dbc
--- /dev/null
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -0,0 +1,643 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.orchestrator.server;
+
+import org.airavata.appcatalog.cpi.AppCatalog;
+import org.airavata.appcatalog.cpi.AppCatalogException;
+import org.airavata.appcatalog.cpi.ComputeResource;
+import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
+import org.apache.aiaravata.application.catalog.data.resources.AbstractResource;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.logger.AiravataLogger;
+import org.apache.airavata.common.logger.AiravataLoggerFactory;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.AiravataZKUtils;
+import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.credential.store.store.CredentialReader;
+import org.apache.airavata.gfac.core.scheduler.HostScheduler;
+import org.apache.airavata.gfac.core.utils.GFacUtils;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.messaging.core.MessagingConstants;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.messaging.core.PublisherFactory;
+import org.apache.airavata.messaging.core.impl.RabbitMQProcessConsumer;
+import org.apache.airavata.messaging.core.impl.RabbitMQProcessPublisher;
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.error.LaunchValidationException;
+import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.ProcessSubmitEvent;
+import org.apache.airavata.model.util.ExecutionType;
+import org.apache.airavata.model.workspace.experiment.Experiment;
+import org.apache.airavata.model.workspace.experiment.ExperimentState;
+import org.apache.airavata.model.workspace.experiment.ExperimentStatus;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.TaskState;
+import org.apache.airavata.model.workspace.experiment.TaskStatus;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeStatus;
+import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
+import org.apache.airavata.orchestrator.cpi.OrchestratorService;
+import org.apache.airavata.orchestrator.cpi.impl.SimpleOrchestratorImpl;
+import org.apache.airavata.orchestrator.cpi.orchestrator_cpi_serviceConstants;
+import org.apache.airavata.orchestrator.util.DataModelUtils;
+import org.apache.airavata.orchestrator.util.OrchestratorServerThreadPoolExecutor;
+import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.TaskDetailConstants;
+import org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.WorkflowNodeConstants;
+import org.apache.airavata.workflow.core.WorkflowEnactmentService;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class OrchestratorServerHandler implements OrchestratorService.Iface {
+ private static AiravataLogger log = AiravataLoggerFactory .getLogger(OrchestratorServerHandler.class);
+ private SimpleOrchestratorImpl orchestrator = null;
+ private Registry registry;
+ private static Integer mutex = new Integer(-1);
+ private String airavataUserName;
+ private String gatewayName;
+ private Publisher publisher;
+ private RabbitMQProcessConsumer rabbitMQProcessConsumer;
+ private RabbitMQProcessPublisher rabbitMQProcessPublisher;
+
+ /**
+ * Query orchestrator server to fetch the CPI version
+ */
+ public String getOrchestratorCPIVersion() throws TException {
+ return orchestrator_cpi_serviceConstants.ORCHESTRATOR_CPI_VERSION;
+ }
+
+ public OrchestratorServerHandler() throws OrchestratorException{
+ // registering with zk
+ try {
+ publisher = PublisherFactory.createActivityPublisher();
+ String zkhostPort = AiravataZKUtils.getZKhostPort();
+ String airavataServerHostPort = ServerSettings
+ .getSetting(Constants.ORCHESTRATOR_SERVER_HOST)
+ + ":"
+ + ServerSettings
+ .getSetting(Constants.ORCHESTRATOR_SERVER_PORT);
+
+// setGatewayName(ServerSettings.getDefaultUserGateway());
+ setAiravataUserName(ServerSettings.getDefaultUser());
+ } catch (AiravataException e) {
+ log.error(e.getMessage(), e);
+ throw new OrchestratorException("Error while initializing orchestrator service", e);
+ }
+ // orchestrator init
+ try {
+ // first constructing the monitorManager and orchestrator, then fill
+ // the required properties
+ orchestrator = new SimpleOrchestratorImpl();
+ registry = RegistryFactory.getDefaultRegistry();
+ orchestrator.initialize();
+ orchestrator.getOrchestratorContext().setPublisher(this.publisher);
+ startProcessConsumer();
+ } catch (OrchestratorException e) {
+ log.error(e.getMessage(), e);
+ throw new OrchestratorException("Error while initializing orchestrator service", e);
+ } catch (RegistryException e) {
+ log.error(e.getMessage(), e);
+ throw new OrchestratorException("Error while initializing orchestrator service", e);
+ }
+ }
+
+ private void startProcessConsumer() throws OrchestratorException {
+ try {
+ rabbitMQProcessConsumer = new RabbitMQProcessConsumer();
+ ProcessConsumer processConsumer = new ProcessConsumer();
+ Thread thread = new Thread(processConsumer);
+ thread.start();
+
+ } catch (AiravataException e) {
+ throw new OrchestratorException("Error while starting process consumer", e);
+ }
+
+ }
+
+ /**
+ * * After creating the experiment Data user have the * experimentID as the
+ * handler to the experiment, during the launchExperiment * We just have to
+ * give the experimentID * * @param experimentID * @return sucess/failure *
+ * *
+ *
+ * @param experimentId
+ */
+ public boolean launchExperiment(String experimentId, String token) throws TException {
+ Experiment experiment = null; // this will inside the bottom catch statement
+ try {
+ experiment = (Experiment) registry.get(
+ RegistryModelType.EXPERIMENT, experimentId);
+ if (experiment == null) {
+ log.errorId(experimentId, "Error retrieving the Experiment by the given experimentID: {} ", experimentId);
+ return false;
+ }
+ CredentialReader credentialReader = GFacUtils.getCredentialReader();
+ String gatewayId = null;
+ if (credentialReader != null) {
+ try {
+ gatewayId = credentialReader.getGatewayID(token);
+ } catch (Exception e) {
+ log.error(e.getLocalizedMessage());
+ }
+ }
+ if (gatewayId == null) {
+ gatewayId = ServerSettings.getDefaultUserGateway();
+ log.info("Couldn't identify the gateway Id using the credential token, Use default gateway Id");
+// throw new AiravataException("Couldn't identify the gateway Id using the credential token");
+ }
+ ExecutionType executionType = DataModelUtils.getExecutionType(gatewayId, experiment);
+ if (executionType == ExecutionType.SINGLE_APP) {
+ //its an single application execution experiment
+ log.debugId(experimentId, "Launching single application experiment {}.", experimentId);
+ OrchestratorServerThreadPoolExecutor.getCachedThreadPool().execute(new SingleAppExperimentRunner(experimentId, token));
+ } else if (executionType == ExecutionType.WORKFLOW) {
+ //its a workflow execution experiment
+ log.debugId(experimentId, "Launching workflow experiment {}.", experimentId);
+ launchWorkflowExperiment(experimentId, token);
+ } else {
+ log.errorId(experimentId, "Couldn't identify experiment type, experiment {} is neither single application nor workflow.", experimentId);
+ throw new TException("Experiment '" + experimentId + "' launch failed. Unable to figureout execution type for application " + experiment.getApplicationId());
+ }
+ } catch (Exception e) {
+ throw new TException("Experiment '" + experimentId + "' launch failed. Unable to figureout execution type for application " + experiment.getApplicationId(), e);
+ }
+ return true;
+ }
+
+ /**
+ * This method will validate the experiment before launching, if is failed
+ * we do not run the launch in airavata thrift service (only if validation
+ * is enabled
+ *
+ * @param experimentId
+ * @return
+ * @throws TException
+ */
+ public boolean validateExperiment(String experimentId) throws TException,
+ LaunchValidationException {
+ // TODO: Write the Orchestrator implementaion
+ try {
+ List<TaskDetails> tasks = orchestrator.createTasks(experimentId);
+ if (tasks.size() > 1) {
+ log.info("There are multiple tasks for this experiment, So Orchestrator will launch multiple Jobs");
+ }
+ List<String> ids = registry.getIds(
+ RegistryModelType.WORKFLOW_NODE_DETAIL,
+ WorkflowNodeConstants.EXPERIMENT_ID, experimentId);
+ for (String workflowNodeId : ids) {
+ WorkflowNodeDetails workflowNodeDetail = (WorkflowNodeDetails) registry
+ .get(RegistryModelType.WORKFLOW_NODE_DETAIL,
+ workflowNodeId);
+ List<Object> taskDetailList = registry.get(
+ RegistryModelType.TASK_DETAIL,
+ TaskDetailConstants.NODE_ID, workflowNodeId);
+ for (Object o : taskDetailList) {
+ TaskDetails taskID = (TaskDetails) o;
+ // iterate through all the generated tasks and performs the
+ // job submisssion+monitoring
+ Experiment experiment = (Experiment) registry.get(
+ RegistryModelType.EXPERIMENT, experimentId);
+ if (experiment == null) {
+ log.errorId(experimentId, "Error retrieving the Experiment by the given experimentID: {}.",
+ experimentId);
+ return false;
+ }
+ return orchestrator.validateExperiment(experiment,
+ workflowNodeDetail, taskID).isSetValidationState();
+ }
+ }
+
+ } catch (OrchestratorException e) {
+ log.errorId(experimentId, "Error while validating experiment", e);
+ throw new TException(e);
+ } catch (RegistryException e) {
+ log.errorId(experimentId, "Error while validating experiment", e);
+ throw new TException(e);
+ }
+ return false;
+ }
+
+ /**
+ * This can be used to cancel a running experiment and store the status to
+ * terminated in registry
+ *
+ * @param experimentId
+ * @return
+ * @throws TException
+ */
+ public boolean terminateExperiment(String experimentId, String tokenId) throws TException {
+ log.infoId(experimentId, "Experiment: {} is cancelling !!!!!", experimentId);
+ return validateStatesAndCancel(experimentId, tokenId);
+ }
+
+ private String getAiravataUserName() {
+ return airavataUserName;
+ }
+
+ private String getGatewayName() {
+ return gatewayName;
+ }
+
+ public void setAiravataUserName(String airavataUserName) {
+ this.airavataUserName = airavataUserName;
+ }
+
+ public void setGatewayName(String gatewayName) {
+ this.gatewayName = gatewayName;
+ }
+
+ @Override
+ public boolean launchTask(String taskId, String airavataCredStoreToken) throws TException {
+ try {
+ TaskDetails taskData = (TaskDetails) registry.get(
+ RegistryModelType.TASK_DETAIL, taskId);
+ String applicationId = taskData.getApplicationId();
+ if (applicationId == null) {
+ log.errorId(taskId, "Application id shouldn't be null.");
+ throw new OrchestratorException("Error executing the job, application id shouldn't be null.");
+ }
+ ApplicationDeploymentDescription applicationDeploymentDescription = getAppDeployment(taskData, applicationId);
+ taskData.setApplicationDeploymentId(applicationDeploymentDescription.getAppDeploymentId());
+ registry.update(RegistryModelType.TASK_DETAIL, taskData,taskData.getTaskID());
+ List<Object> workflowNodeDetailList = registry.get(RegistryModelType.WORKFLOW_NODE_DETAIL,
+ org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.WorkflowNodeConstants.TASK_LIST, taskData);
+ if (workflowNodeDetailList != null
+ && workflowNodeDetailList.size() > 0) {
+ List<Object> experimentList = registry.get(RegistryModelType.EXPERIMENT,
+ org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.ExperimentConstants.WORKFLOW_NODE_LIST,
+ (WorkflowNodeDetails) workflowNodeDetailList.get(0));
+ if (experimentList != null && experimentList.size() > 0) {
+ return orchestrator
+ .launchExperiment(
+ (Experiment) experimentList.get(0),
+ (WorkflowNodeDetails) workflowNodeDetailList
+ .get(0), taskData,airavataCredStoreToken);
+ }
+ }
+ } catch (Exception e) {
+ log.errorId(taskId, "Error while launching task ", e);
+ throw new TException(e);
+ }
+ log.infoId(taskId, "No experiment found associated in task {}", taskId);
+ return false;
+ }
+
+ private ApplicationDeploymentDescription getAppDeployment(
+ TaskDetails taskData, String applicationId)
+ throws AppCatalogException, OrchestratorException,
+ ClassNotFoundException, ApplicationSettingsException,
+ InstantiationException, IllegalAccessException {
+ AppCatalog appCatalog = AppCatalogFactory.getAppCatalog();
+ String selectedModuleId = getModuleId(appCatalog, applicationId);
+ ApplicationDeploymentDescription applicationDeploymentDescription = getAppDeployment(
+ appCatalog, taskData, selectedModuleId);
+ return applicationDeploymentDescription;
+ }
+
+ private ApplicationDeploymentDescription getAppDeployment(
+ AppCatalog appCatalog, TaskDetails taskData, String selectedModuleId)
+ throws AppCatalogException, ClassNotFoundException,
+ ApplicationSettingsException, InstantiationException,
+ IllegalAccessException {
+ Map<String, String> moduleIdFilter = new HashMap<String, String>();
+ moduleIdFilter.put(AbstractResource.ApplicationDeploymentConstants.APP_MODULE_ID, selectedModuleId);
+ if (taskData.getTaskScheduling()!=null && taskData.getTaskScheduling().getResourceHostId() != null) {
+ moduleIdFilter.put(AbstractResource.ApplicationDeploymentConstants.COMPUTE_HOST_ID, taskData.getTaskScheduling().getResourceHostId());
+ }
+ List<ApplicationDeploymentDescription> applicationDeployements = appCatalog.getApplicationDeployment().getApplicationDeployements(moduleIdFilter);
+ Map<ComputeResourceDescription, ApplicationDeploymentDescription> deploymentMap = new HashMap<ComputeResourceDescription, ApplicationDeploymentDescription>();
+ ComputeResource computeResource = appCatalog.getComputeResource();
+ for (ApplicationDeploymentDescription deploymentDescription : applicationDeployements) {
+ deploymentMap.put(computeResource.getComputeResource(deploymentDescription.getComputeHostId()),deploymentDescription);
+ }
+ List<ComputeResourceDescription> computeHostList = Arrays.asList(deploymentMap.keySet().toArray(new ComputeResourceDescription[]{}));
+ Class<? extends HostScheduler> aClass = Class.forName(
+ ServerSettings.getHostScheduler()).asSubclass(
+ HostScheduler.class);
+ HostScheduler hostScheduler = aClass.newInstance();
+ ComputeResourceDescription ComputeResourceDescription = hostScheduler.schedule(computeHostList);
+ ApplicationDeploymentDescription applicationDeploymentDescription = deploymentMap.get(ComputeResourceDescription);
+ return applicationDeploymentDescription;
+ }
+
+ private String getModuleId(AppCatalog appCatalog, String applicationId)
+ throws AppCatalogException, OrchestratorException {
+ ApplicationInterfaceDescription applicationInterface = appCatalog.getApplicationInterface().getApplicationInterface(applicationId);
+ List<String> applicationModules = applicationInterface.getApplicationModules();
+ if (applicationModules.size()==0){
+ throw new OrchestratorException(
+ "No modules defined for application "
+ + applicationId);
+ }
+// AiravataAPI airavataAPI = getAiravataAPI();
+ String selectedModuleId=applicationModules.get(0);
+ return selectedModuleId;
+ }
+
+ private boolean validateStatesAndCancel(String experimentId, String tokenId)throws TException{
+ try {
+ Experiment experiment = (Experiment) registry.get(
+ RegistryModelType.EXPERIMENT, experimentId);
+ log.info("Waiting for zookeeper to connect to the server");
+ synchronized (mutex){
+ mutex.wait(5000);
+ }
+ if (experiment == null) {
+ log.errorId(experimentId, "Error retrieving the Experiment by the given experimentID: {}.", experimentId);
+ throw new OrchestratorException("Error retrieving the Experiment by the given experimentID: " + experimentId);
+ }
+ ExperimentState experimentState = experiment.getExperimentStatus().getExperimentState();
+ if (isCancelValid(experimentState)){
+ ExperimentStatus status = new ExperimentStatus();
+ status.setExperimentState(ExperimentState.CANCELING);
+ status.setTimeOfStateChange(Calendar.getInstance()
+ .getTimeInMillis());
+ experiment.setExperimentStatus(status);
+ registry.update(RegistryModelType.EXPERIMENT, experiment,
+ experimentId);
+
+ List<String> ids = registry.getIds(
+ RegistryModelType.WORKFLOW_NODE_DETAIL,
+ WorkflowNodeConstants.EXPERIMENT_ID, experimentId);
+ for (String workflowNodeId : ids) {
+ WorkflowNodeDetails workflowNodeDetail = (WorkflowNodeDetails) registry
+ .get(RegistryModelType.WORKFLOW_NODE_DETAIL,
+ workflowNodeId);
+ int value = workflowNodeDetail.getWorkflowNodeStatus().getWorkflowNodeState().getValue();
+ if ( value> 1 && value < 7) { // we skip the unknown state
+ log.error(workflowNodeDetail.getNodeName() + " Workflow Node status cannot mark as cancelled, because " +
+ "current status is " + workflowNodeDetail.getWorkflowNodeStatus().getWorkflowNodeState().toString());
+ continue; // this continue is very useful not to process deeper loops if the upper layers have non-cancel states
+ } else {
+ WorkflowNodeStatus workflowNodeStatus = new WorkflowNodeStatus();
+ workflowNodeStatus.setWorkflowNodeState(WorkflowNodeState.CANCELING);
+ workflowNodeStatus.setTimeOfStateChange(Calendar.getInstance()
+ .getTimeInMillis());
+ workflowNodeDetail.setWorkflowNodeStatus(workflowNodeStatus);
+ registry.update(RegistryModelType.WORKFLOW_NODE_DETAIL, workflowNodeDetail,
+ workflowNodeId);
+ }
+ List<Object> taskDetailList = registry.get(
+ RegistryModelType.TASK_DETAIL,
+ TaskDetailConstants.NODE_ID, workflowNodeId);
+ for (Object o : taskDetailList) {
+ TaskDetails taskDetails = (TaskDetails) o;
+ TaskStatus taskStatus = ((TaskDetails) o).getTaskStatus();
+ if (taskStatus.getExecutionState().getValue() > 7 && taskStatus.getExecutionState().getValue()<12) {
+ log.error(((TaskDetails) o).getTaskID() + " Task status cannot mark as cancelled, because " +
+ "current task state is " + ((TaskDetails) o).getTaskStatus().getExecutionState().toString());
+ continue;// this continue is very useful not to process deeper loops if the upper layers have non-cancel states
+ } else {
+ taskStatus.setExecutionState(TaskState.CANCELING);
+ taskStatus.setTimeOfStateChange(Calendar.getInstance()
+ .getTimeInMillis());
+ taskDetails.setTaskStatus(taskStatus);
+ registry.update(RegistryModelType.TASK_DETAIL, o,
+ taskDetails.getTaskID());
+ }
+ orchestrator.cancelExperiment(experiment,
+ workflowNodeDetail, taskDetails, tokenId);
+ // Status update should be done at the monitor
+ }
+ }
+ }else {
+ if (isCancelAllowed(experimentState)){
+ // when experiment status is < 3 no jobDetails object is created,
+ // so we don't have to worry, we simply have to change the status and stop the execution
+ ExperimentStatus status = new ExperimentStatus();
+ status.setExperimentState(ExperimentState.CANCELED);
+ status.setTimeOfStateChange(Calendar.getInstance()
+ .getTimeInMillis());
+ experiment.setExperimentStatus(status);
+ registry.update(RegistryModelType.EXPERIMENT, experiment,
+ experimentId);
+ List<String> ids = registry.getIds(
+ RegistryModelType.WORKFLOW_NODE_DETAIL,
+ WorkflowNodeConstants.EXPERIMENT_ID, experimentId);
+ for (String workflowNodeId : ids) {
+ WorkflowNodeDetails workflowNodeDetail = (WorkflowNodeDetails) registry
+ .get(RegistryModelType.WORKFLOW_NODE_DETAIL,
+ workflowNodeId);
+ WorkflowNodeStatus workflowNodeStatus = new WorkflowNodeStatus();
+ workflowNodeStatus.setWorkflowNodeState(WorkflowNodeState.CANCELED);
+ workflowNodeStatus.setTimeOfStateChange(Calendar.getInstance()
+ .getTimeInMillis());
+ workflowNodeDetail.setWorkflowNodeStatus(workflowNodeStatus);
+ registry.update(RegistryModelType.WORKFLOW_NODE_DETAIL, workflowNodeDetail,
+ workflowNodeId);
+ List<Object> taskDetailList = registry.get(
+ RegistryModelType.TASK_DETAIL,
+ TaskDetailConstants.NODE_ID, workflowNodeId);
+ for (Object o : taskDetailList) {
+ TaskDetails taskDetails = (TaskDetails) o;
+ TaskStatus taskStatus = ((TaskDetails) o).getTaskStatus();
+ taskStatus.setExecutionState(TaskState.CANCELED);
+ taskStatus.setTimeOfStateChange(Calendar.getInstance()
+ .getTimeInMillis());
+ taskDetails.setTaskStatus(taskStatus);
+ registry.update(RegistryModelType.TASK_DETAIL, o,
+ taskDetails);
+ }
+ }
+ }else {
+ log.errorId(experimentId, "Unable to mark experiment as Cancelled, current state {} doesn't allow to cancel the experiment {}.",
+ experiment.getExperimentStatus().getExperimentState().toString(), experimentId);
+ throw new OrchestratorException("Unable to mark experiment as Cancelled, because current state is: "
+ + experiment.getExperimentStatus().getExperimentState().toString());
+ }
+ }
+ log.info("Experiment: " + experimentId + " is cancelled !!!!!");
+ } catch (Exception e) {
+ throw new TException(e);
+ }
+ return true;
+ }
+
+ private boolean isCancelValid(ExperimentState state){
+ switch (state) {
+ case LAUNCHED:
+ case EXECUTING:
+ case CANCELING:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ private boolean isCancelAllowed(ExperimentState state){
+ switch (state) {
+ case CREATED:
+ case VALIDATED:
+ case SCHEDULED:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ private void launchWorkflowExperiment(String experimentId, String airavataCredStoreToken) throws TException {
+ try {
+ WorkflowEnactmentService.getInstance().
+ submitWorkflow(experimentId, airavataCredStoreToken, getGatewayName(), getRabbitMQProcessPublisher());
+ } catch (Exception e) {
+ log.error("Error while launching workflow", e);
+ }
+ }
+
+ public synchronized RabbitMQProcessPublisher getRabbitMQProcessPublisher() throws Exception {
+ if (rabbitMQProcessPublisher == null) {
+ rabbitMQProcessPublisher = new RabbitMQProcessPublisher();
+ }
+ return rabbitMQProcessPublisher;
+ }
+
+
+ private class SingleAppExperimentRunner implements Runnable {
+
+ String experimentId;
+ String airavataCredStoreToken;
+ public SingleAppExperimentRunner(String experimentId,String airavataCredStoreToken){
+ this.experimentId = experimentId;
+ this.airavataCredStoreToken = airavataCredStoreToken;
+ }
+ @Override
+ public void run() {
+ try {
+ launchSingleAppExperiment();
+ } catch (TException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private boolean launchSingleAppExperiment() throws TException {
+ Experiment experiment = null;
+ try {
+ List<String> ids = registry.getIds(RegistryModelType.WORKFLOW_NODE_DETAIL, WorkflowNodeConstants.EXPERIMENT_ID, experimentId);
+ for (String workflowNodeId : ids) {
+// WorkflowNodeDetails workflowNodeDetail = (WorkflowNodeDetails) registry.get(RegistryModelType.WORKFLOW_NODE_DETAIL, workflowNodeId);
+ List<Object> taskDetailList = registry.get(RegistryModelType.TASK_DETAIL, TaskDetailConstants.NODE_ID, workflowNodeId);
+ for (Object o : taskDetailList) {
+ TaskDetails taskData = (TaskDetails) o;
+ //iterate through all the generated tasks and performs the job submisssion+monitoring
+ experiment = (Experiment) registry.get(RegistryModelType.EXPERIMENT, experimentId);
+ if (experiment == null) {
+ log.errorId(experimentId, "Error retrieving the Experiment by the given experimentID: {}", experimentId);
+ return false;
+ }
+ String gatewayId = null;
+ CredentialReader credentialReader = GFacUtils.getCredentialReader();
+ if (credentialReader != null) {
+ try {
+ gatewayId = credentialReader.getGatewayID(airavataCredStoreToken);
+ } catch (Exception e) {
+ log.error(e.getLocalizedMessage());
+ }
+ }
+ if (gatewayId == null || gatewayId.isEmpty()) {
+ gatewayId = ServerSettings.getDefaultUserGateway();
+ }
+ ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent(ExperimentState.LAUNCHED,
+ experimentId,
+ gatewayId);
+ String messageId = AiravataUtils.getId("EXPERIMENT");
+ MessageContext messageContext = new MessageContext(event, MessageType.EXPERIMENT, messageId, gatewayId);
+ messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+ publisher.publish(messageContext);
+ registry.update(RegistryModelType.TASK_DETAIL, taskData, taskData.getTaskID());
+ //launching the experiment
+ launchTask(taskData.getTaskID(), airavataCredStoreToken);
+ }
+ }
+
+ } catch (Exception e) {
+ // Here we really do not have to do much because only potential failure can happen
+ // is in gfac, if there are errors in gfac, it will handle the experiment/task/job statuses
+ // We might get failures in registry access before submitting the jobs to gfac, in that case we
+ // leave the status of these as created.
+ ExperimentStatus status = new ExperimentStatus();
+ status.setExperimentState(ExperimentState.FAILED);
+ status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+ experiment.setExperimentStatus(status);
+ try {
+ registry.update(RegistryModelType.EXPERIMENT_STATUS, status, experimentId);
+ } catch (RegistryException e1) {
+ log.errorId(experimentId, "Error while updating experiment status to " + status.toString(), e);
+ throw new TException(e);
+ }
+ log.errorId(experimentId, "Error while updating task status, hence updated experiment status to " + status.toString(), e);
+ throw new TException(e);
+ }
+ return true;
+ }
+ }
+
+ private class ProcessConsumer implements Runnable, MessageHandler{
+
+
+ @Override
+ public void run() {
+ try {
+ rabbitMQProcessConsumer.listen(this);
+ } catch (AiravataException e) {
+ log.error("Error while listen to the RabbitMQProcessConsumer");
+ }
+ }
+
+ @Override
+ public Map<String, Object> getProperties() {
+ Map<String, Object> props = new HashMap<String, Object>();
+ props.put(MessagingConstants.RABBIT_QUEUE, RabbitMQProcessPublisher.PROCESS);
+ props.put(MessagingConstants.RABBIT_ROUTING_KEY, RabbitMQProcessPublisher.PROCESS);
+ return props;
+ }
+
+ @Override
+ public void onMessage(MessageContext msgCtx) {
+ TBase event = msgCtx.getEvent();
+ if (event instanceof ProcessSubmitEvent) {
+ ProcessSubmitEvent processSubmitEvent = (ProcessSubmitEvent) event;
+ try {
+ launchTask(processSubmitEvent.getTaskId(), processSubmitEvent.getCredentialToken());
+ } catch (TException e) {
+ log.error("Error while launching task : " + processSubmitEvent.getTaskId());
+ }
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/82773c73/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/Constants.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/Constants.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/Constants.java
new file mode 100644
index 0000000..f9833e9
--- /dev/null
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/Constants.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.orchestrator.util;
+
+public class Constants {
+ public static final String ORCHESTRATOT_SERVER_PORT = "orchestrator.server.port";
+ public static final String ORCHESTRATOT_SERVER_HOST = "orchestrator.server.host";
+ public static final String ORCHESTRATOT_SERVER_MIN_THREADS = "orchestrator.server.min.threads";
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/82773c73/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/DataModelUtils.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/DataModelUtils.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/DataModelUtils.java
new file mode 100644
index 0000000..da11a59
--- /dev/null
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/DataModelUtils.java
@@ -0,0 +1,55 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.orchestrator.util;
+
+import java.util.List;
+
+import org.airavata.appcatalog.cpi.AppCatalogException;
+import org.airavata.appcatalog.cpi.ApplicationInterface;
+import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
+import org.apache.airavata.model.util.ExecutionType;
+import org.apache.airavata.model.workspace.experiment.Experiment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataModelUtils {
+
+ private final static Logger logger = LoggerFactory.getLogger(DataModelUtils.class);
+ public static ExecutionType getExecutionType(String gatewayId, Experiment experiment){
+ try {
+ ApplicationInterface applicationInterface = AppCatalogFactory.getAppCatalog().getApplicationInterface();
+ List<String> allApplicationInterfaceIds = applicationInterface.getAllApplicationInterfaceIds();
+ String applicationId = experiment.getApplicationId();
+ if (allApplicationInterfaceIds.contains(applicationId)){
+ return ExecutionType.SINGLE_APP;
+ } else {
+ List<String> allWorkflows = AppCatalogFactory.getAppCatalog().getWorkflowCatalog().getAllWorkflows(gatewayId);
+ if (allWorkflows.contains(applicationId)){
+ return ExecutionType.WORKFLOW;
+ }
+ }
+ } catch (AppCatalogException e) {
+ logger.error(e.getMessage(), e);
+ }
+ return ExecutionType.UNKNOWN;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/82773c73/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorServerThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorServerThreadPoolExecutor.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorServerThreadPoolExecutor.java
new file mode 100644
index 0000000..1730998
--- /dev/null
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorServerThreadPoolExecutor.java
@@ -0,0 +1,56 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.orchestrator.util;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.logger.AiravataLogger;
+import org.apache.airavata.common.logger.AiravataLoggerFactory;
+import org.apache.airavata.common.utils.ServerSettings;
+
+public class OrchestratorServerThreadPoolExecutor {
+ private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(OrchestratorServerThreadPoolExecutor.class);
+ public static final String AIRAVATA_SERVER_THREAD_POOL_SIZE = "airavata.server.thread.pool.size";
+
+ private static ExecutorService threadPool;
+
+ public static ExecutorService getCachedThreadPool() {
+ if(threadPool ==null){
+ threadPool = Executors.newCachedThreadPool();
+ }
+ return threadPool;
+ }
+
+ public static ExecutorService getFixedThreadPool() {
+ if(threadPool ==null){
+ try {
+ threadPool = Executors.newFixedThreadPool(Integer.parseInt(ServerSettings.getSetting(AIRAVATA_SERVER_THREAD_POOL_SIZE)));
+ } catch (ApplicationSettingsException e) {
+ logger.error("Error reading " + AIRAVATA_SERVER_THREAD_POOL_SIZE+ " property");
+ }
+ }
+ return threadPool;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/82773c73/modules/orchestrator/orchestrator-service/src/main/resources/gsissh.properties
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/resources/gsissh.properties b/modules/orchestrator/orchestrator-service/src/main/resources/gsissh.properties
new file mode 100644
index 0000000..3fdf76d
--- /dev/null
+++ b/modules/orchestrator/orchestrator-service/src/main/resources/gsissh.properties
@@ -0,0 +1,26 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+###########################################################################
+# Specifies system level configurations as a key/value pairs.
+###########################################################################
+
+StrictHostKeyChecking=no
+ssh.session.timeout=360000
http://git-wip-us.apache.org/repos/asf/airavata/blob/82773c73/modules/orchestrator/orchestrator-service/src/test/java/org/apache/airavata/orchestrator/client/OrchestratorClientFactoryTest.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/test/java/org/apache/airavata/orchestrator/client/OrchestratorClientFactoryTest.java b/modules/orchestrator/orchestrator-service/src/test/java/org/apache/airavata/orchestrator/client/OrchestratorClientFactoryTest.java
new file mode 100644
index 0000000..18168c7
--- /dev/null
+++ b/modules/orchestrator/orchestrator-service/src/test/java/org/apache/airavata/orchestrator/client/OrchestratorClientFactoryTest.java
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.orchestrator.client;
+
+//import org.apache.airavata.client.AiravataAPIFactory;
+//import org.apache.airavata.client.api.exception.AiravataAPIInvocationException;
+//import org.apache.airavata.client.tools.DocumentCreator;
+//import org.apache.airavata.client.tools.DocumentCreatorNew;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.AiravataZKUtils;
+import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.model.error.AiravataClientConnectException;
+import org.apache.airavata.orchestrator.client.util.Initialize;
+import org.apache.airavata.orchestrator.cpi.OrchestratorService;
+import org.apache.airavata.orchestrator.server.OrchestratorServer;
+import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.junit.Test;
+
+public class OrchestratorClientFactoryTest {
+/* private DocumentCreatorNew documentCreator;
+ private OrchestratorService.Client orchestratorClient;
+ private Registry registry;
+ private int NUM_CONCURRENT_REQUESTS = 1;
+ Initialize initialize;
+ OrchestratorServer service;
+ private static ServerCnxnFactory cnxnFactory;
+
+ @Test
+ public void setUp() {
+ AiravataUtils.setExecutionAsServer();
+ initialize = new Initialize("registry-derby.sql");
+ initialize.initializeDB();
+ System.setProperty(Constants.ZOOKEEPER_SERVER_PORT,"2185");
+ AiravataZKUtils.startEmbeddedZK(cnxnFactory);
+
+ try {
+ service = (new OrchestratorServer());
+ service.start();
+ registry = RegistryFactory.getDefaultRegistry();
+ documentCreator = new DocumentCreatorNew(getAiravataClient());
+ documentCreator.createLocalHostDocs();
+ } catch (Exception e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ AiravataUtils.setExecutionAsServer();
+ try {
+ service.stop();
+ } catch (Exception e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+
+ }
+
+ private Airavata.Client getAiravataClient() {
+ Airavata.Client client = null;
+ try {
+ client = AiravataClientFactory.createAiravataClient("localhost", 8930);
+ } catch (AiravataClientConnectException e) {
+ e.printStackTrace();
+ }
+ return client;
+ }
+
+ private void storeDescriptors() {
+
+ }*/
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/82773c73/modules/orchestrator/orchestrator-service/src/test/java/org/apache/airavata/orchestrator/client/util/Initialize.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/test/java/org/apache/airavata/orchestrator/client/util/Initialize.java b/modules/orchestrator/orchestrator-service/src/test/java/org/apache/airavata/orchestrator/client/util/Initialize.java
new file mode 100644
index 0000000..c827fc4
--- /dev/null
+++ b/modules/orchestrator/orchestrator-service/src/test/java/org/apache/airavata/orchestrator/client/util/Initialize.java
@@ -0,0 +1,330 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.orchestrator.client.util;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.persistance.registry.jpa.ResourceType;
+import org.apache.airavata.persistance.registry.jpa.resources.*;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.derby.drda.NetworkServerControl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.InetAddress;
+import java.sql.*;
+import java.util.StringTokenizer;
+
+public class Initialize {
+ private static final Logger logger = LoggerFactory.getLogger(Initialize.class);
+ public static final String DERBY_SERVER_MODE_SYS_PROPERTY = "derby.drda.startNetworkServer";
+ public String scriptName = "registry-derby.sql";
+ private NetworkServerControl server;
+ private static final String delimiter = ";";
+ public static final String PERSISTANT_DATA = "Configuration";
+
+ public Initialize(String scriptName) {
+ this.scriptName = scriptName;
+ }
+
+ public static boolean checkStringBufferEndsWith(StringBuffer buffer, String suffix) {
+ if (suffix.length() > buffer.length()) {
+ return false;
+ }
+ // this loop is done on purpose to avoid memory allocation performance
+ // problems on various JDKs
+ // StringBuffer.lastIndexOf() was introduced in jdk 1.4 and
+ // implementation is ok though does allocation/copying
+ // StringBuffer.toString().endsWith() does massive memory
+ // allocation/copying on JDK 1.5
+ // See http://issues.apache.org/bugzilla/show_bug.cgi?id=37169
+ int endIndex = suffix.length() - 1;
+ int bufferIndex = buffer.length() - 1;
+ while (endIndex >= 0) {
+ if (buffer.charAt(bufferIndex) != suffix.charAt(endIndex)) {
+ return false;
+ }
+ bufferIndex--;
+ endIndex--;
+ }
+ return true;
+ }
+
+ private static boolean isServerStarted(NetworkServerControl server, int ntries)
+ {
+ for (int i = 1; i <= ntries; i ++)
+ {
+ try {
+ Thread.sleep(500);
+ server.ping();
+ return true;
+ }
+ catch (Exception e) {
+ if (i == ntries)
+ return false;
+ }
+ }
+ return false;
+ }
+
+ public void initializeDB() throws SQLException{
+ String jdbcUrl = null;
+ String jdbcUser = null;
+ String jdbcPassword = null;
+ try{
+ jdbcUrl = ServerSettings.getSetting("registry.jdbc.url");
+ jdbcUser = ServerSettings.getSetting("registry.jdbc.user");
+ jdbcPassword = ServerSettings.getSetting("registry.jdbc.password");
+ jdbcUrl = jdbcUrl + "?" + "user=" + jdbcUser + "&" + "password=" + jdbcPassword;
+ } catch (ApplicationSettingsException e) {
+ logger.error("Unable to read properties", e);
+ }
+ startDerbyInServerMode();
+ if(!isServerStarted(server, 20)){
+ throw new RuntimeException("Derby server cound not started within five seconds...");
+ }
+
+ Connection conn = null;
+ try {
+ Class.forName(Utils.getJDBCDriver()).newInstance();
+ conn = DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPassword);
+ if (!isDatabaseStructureCreated(PERSISTANT_DATA, conn)) {
+ executeSQLScript(conn);
+ logger.info("New Database created for Registry");
+ } else {
+ logger.debug("Database already created for Registry!");
+ }
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ throw new RuntimeException("Database failure", e);
+ } finally {
+ try {
+ if (conn != null){
+ if (!conn.getAutoCommit()) {
+ conn.commit();
+ }
+ conn.close();
+ }
+ } catch (SQLException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+
+ try{
+ GatewayResource gatewayResource = new GatewayResource();
+ gatewayResource.setGatewayId(ServerSettings.getSetting("default.registry.gateway"));
+ gatewayResource.setGatewayName(ServerSettings.getSetting("default.registry.gateway"));
+ gatewayResource.setDomain("test-domain");
+ gatewayResource.setEmailAddress("test-email");
+ gatewayResource.save();
+
+ UserResource userResource = new UserResource();
+ userResource.setUserName(ServerSettings.getSetting("default.registry.user"));
+ userResource.setPassword(ServerSettings.getSetting("default.registry.password"));
+ userResource.save();
+
+ WorkerResource workerResource = (WorkerResource) gatewayResource.create(ResourceType.GATEWAY_WORKER);
+ workerResource.setUser(userResource.getUserName());
+ workerResource.save();
+
+ ProjectResource projectResource = (ProjectResource)workerResource.create(ResourceType.PROJECT);
+ projectResource.setGatewayId(gatewayResource.getGatewayId());
+ projectResource.setId("default");
+ projectResource.setName("default");
+ projectResource.setWorker(workerResource);
+ projectResource.save();
+
+
+ } catch (ApplicationSettingsException e) {
+ logger.error("Unable to read properties", e);
+ throw new SQLException(e.getMessage(), e);
+ } catch (RegistryException e) {
+ logger.error("Unable to save data to registry", e);
+ throw new SQLException(e.getMessage(), e);
+ }
+ }
+
+ public static boolean isDatabaseStructureCreated(String tableName, Connection conn) {
+ try {
+ System.out.println("Running a query to test the database tables existence.");
+ // check whether the tables are already created with a query
+ Statement statement = null;
+ try {
+ statement = conn.createStatement();
+ ResultSet rs = statement.executeQuery("select * from " + tableName);
+ if (rs != null) {
+ rs.close();
+ }
+ } finally {
+ try {
+ if (statement != null) {
+ statement.close();
+ }
+ } catch (SQLException e) {
+ return false;
+ }
+ }
+ } catch (SQLException e) {
+ return false;
+ }
+
+ return true;
+ }
+
+ private void executeSQLScript(Connection conn) throws Exception {
+ StringBuffer sql = new StringBuffer();
+ BufferedReader reader = null;
+ try{
+
+ InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream(scriptName);
+ reader = new BufferedReader(new InputStreamReader(inputStream));
+ String line;
+ while ((line = reader.readLine()) != null) {
+ line = line.trim();
+ if (line.startsWith("//")) {
+ continue;
+ }
+ if (line.startsWith("--")) {
+ continue;
+ }
+ StringTokenizer st = new StringTokenizer(line);
+ if (st.hasMoreTokens()) {
+ String token = st.nextToken();
+ if ("REM".equalsIgnoreCase(token)) {
+ continue;
+ }
+ }
+ sql.append(" ").append(line);
+
+ // SQL defines "--" as a comment to EOL
+ // and in Oracle it may contain a hint
+ // so we cannot just remove it, instead we must end it
+ if (line.indexOf("--") >= 0) {
+ sql.append("\n");
+ }
+ if ((checkStringBufferEndsWith(sql, delimiter))) {
+ executeSQL(sql.substring(0, sql.length() - delimiter.length()), conn);
+ sql.replace(0, sql.length(), "");
+ }
+ }
+ // Catch any statements not followed by ;
+ if (sql.length() > 0) {
+ executeSQL(sql.toString(), conn);
+ }
+ }catch (IOException e){
+ logger.error("Error occurred while executing SQL script for creating Airavata database", e);
+ throw new Exception("Error occurred while executing SQL script for creating Airavata database", e);
+ }finally {
+ if (reader != null) {
+ reader.close();
+ }
+
+ }
+
+ }
+
+ private static void executeSQL(String sql, Connection conn) throws Exception {
+ // Check and ignore empty statements
+ if ("".equals(sql.trim())) {
+ return;
+ }
+
+ Statement statement = null;
+ try {
+ logger.debug("SQL : " + sql);
+
+ boolean ret;
+ int updateCount = 0, updateCountTotal = 0;
+ statement = conn.createStatement();
+ ret = statement.execute(sql);
+ updateCount = statement.getUpdateCount();
+ do {
+ if (!ret) {
+ if (updateCount != -1) {
+ updateCountTotal += updateCount;
+ }
+ }
+ ret = statement.getMoreResults();
+ if (ret) {
+ updateCount = statement.getUpdateCount();
+ }
+ } while (ret);
+
+ logger.debug(sql + " : " + updateCountTotal + " rows affected");
+
+ SQLWarning warning = conn.getWarnings();
+ while (warning != null) {
+ logger.warn(warning + " sql warning");
+ warning = warning.getNextWarning();
+ }
+ conn.clearWarnings();
+ } catch (SQLException e) {
+ if (e.getSQLState().equals("X0Y32")) {
+ // eliminating the table already exception for the derby
+ // database
+ logger.info("Table Already Exists", e);
+ } else {
+ throw new Exception("Error occurred while executing : " + sql, e);
+ }
+ } finally {
+ if (statement != null) {
+ try {
+ statement.close();
+ } catch (SQLException e) {
+ logger.error("Error occurred while closing result set.", e);
+ }
+ }
+ }
+ }
+
+ private void startDerbyInServerMode() {
+ try {
+ System.setProperty(DERBY_SERVER_MODE_SYS_PROPERTY, "true");
+ server = new NetworkServerControl(InetAddress.getByName(Utils.getHost()),
+ 20000,
+ Utils.getJDBCUser(), Utils.getJDBCPassword());
+ java.io.PrintWriter consoleWriter = new java.io.PrintWriter(System.out, true);
+ server.start(consoleWriter);
+ } catch (IOException e) {
+ logger.error("Unable to start Apache derby in the server mode! Check whether " +
+ "specified port is available");
+ } catch (Exception e) {
+ logger.error("Unable to start Apache derby in the server mode! Check whether " +
+ "specified port is available");
+ }
+
+ }
+
+ public void stopDerbyServer() throws SQLException{
+ try {
+ server.shutdown();
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ throw new SQLException("Error while stopping derby server", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/82773c73/modules/orchestrator/orchestrator-service/src/test/resources/gsissh.properties
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/test/resources/gsissh.properties b/modules/orchestrator/orchestrator-service/src/test/resources/gsissh.properties
new file mode 100644
index 0000000..3fdf76d
--- /dev/null
+++ b/modules/orchestrator/orchestrator-service/src/test/resources/gsissh.properties
@@ -0,0 +1,26 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+###########################################################################
+# Specifies system level configurations as a key/value pairs.
+###########################################################################
+
+StrictHostKeyChecking=no
+ssh.session.timeout=360000
http://git-wip-us.apache.org/repos/asf/airavata/blob/82773c73/modules/orchestrator/orchestrator-service/src/test/resources/monitor.properties
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/test/resources/monitor.properties b/modules/orchestrator/orchestrator-service/src/test/resources/monitor.properties
new file mode 100644
index 0000000..7f0299a
--- /dev/null
+++ b/modules/orchestrator/orchestrator-service/src/test/resources/monitor.properties
@@ -0,0 +1,30 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+primaryMonitor=org.apache.airavata.gfac.monitor.impl.push.amqp.AMQPMonitor
+secondaryMonitor=org.apache.airavata.gfac.monitor.impl.pull.qstat.QstatMonitor
+amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org
+connection.name=xsede_private
+trusted.certificate.location=/Users/chathuri/dev/airavata/cert/certificates
+certificate.path=/Users/chathuri/dev/airavata/cert/certificates
+myproxy.server=myproxy.teragrid.org
+myproxy.user=ogce
+myproxy.password=
+myproxy.life=3600
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/82773c73/modules/orchestrator/orchestrator-service/src/test/resources/orchestrator.properties
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/test/resources/orchestrator.properties b/modules/orchestrator/orchestrator-service/src/test/resources/orchestrator.properties
new file mode 100644
index 0000000..e84c429
--- /dev/null
+++ b/modules/orchestrator/orchestrator-service/src/test/resources/orchestrator.properties
@@ -0,0 +1,26 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+job.submitter=org.apache.airavata.orchestrator.core.impl.GFACEmbeddedJobSubmitter
+job.validators=org.apache.airavata.orchestrator.core.validator.impl.BatchQueueValidator
+submitter.interval=10000
+threadpool.size=0
+start.submitter=true
+embedded.mode=true
+enable.validation=false