You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sc...@apache.org on 2016/03/23 19:37:50 UTC

airavata git commit: resolving replicas at the orchestrator

Repository: airavata
Updated Branches:
  refs/heads/develop faabd9713 -> 37487421c


resolving replicas at the orchestrator


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/37487421
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/37487421
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/37487421

Branch: refs/heads/develop
Commit: 37487421cf150718b810e2aed2b42c72f1293526
Parents: faabd97
Author: scnakandala <su...@gmail.com>
Authored: Wed Mar 23 14:37:44 2016 -0400
Committer: scnakandala <su...@gmail.com>
Committed: Wed Mar 23 14:37:44 2016 -0400

----------------------------------------------------------------------
 .../server/OrchestratorServerHandler.java       | 37 +++++++++++++-------
 .../airavata/registry/cpi/DataCatalog.java      | 10 +++---
 2 files changed, 30 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/37487421/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 7c192f5..25b3404 100644
--- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -29,17 +29,16 @@ import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.common.utils.ZkConstants;
 import org.apache.airavata.gfac.core.GFacUtils;
 import org.apache.airavata.gfac.core.scheduler.HostScheduler;
-import org.apache.airavata.messaging.core.MessageContext;
-import org.apache.airavata.messaging.core.MessageHandler;
-import org.apache.airavata.messaging.core.MessagingConstants;
-import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.messaging.core.PublisherFactory;
+import org.apache.airavata.messaging.core.*;
 import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
 import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
 import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
 import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
 import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
 import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile;
+import org.apache.airavata.model.application.io.DataType;
+import org.apache.airavata.model.data.product.DataProductModel;
+import org.apache.airavata.model.data.product.ReplicaLocationCategory;
 import org.apache.airavata.model.error.LaunchValidationException;
 import org.apache.airavata.model.experiment.ExperimentModel;
 import org.apache.airavata.model.experiment.ExperimentType;
@@ -71,11 +70,7 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 public class OrchestratorServerHandler implements OrchestratorService.Iface {
 	private static Logger log = LoggerFactory.getLogger(OrchestratorServerHandler.class);
@@ -156,7 +151,8 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
             }
             // still the token is empty, then we fail the experiment
             if (token == null || token.isEmpty()){
-                log.error("You have not configured credential store token at gateway profile or compute resource preference. Please provide the correct token at gateway profile or compute resource preference.");
+                log.error("You have not configured credential store token at gateway profile or compute resource preference." +
+						" Please provide the correct token at gateway profile or compute resource preference.");
                 return false;
             }
             ExperimentType executionType = experiment.getExperimentType();
@@ -165,9 +161,26 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
                 List<ProcessModel> processes = orchestrator.createProcesses(experimentId, gatewayId);
 
 				for (ProcessModel processModel : processes){
+					//FIXME Resolving replica if available. This is a very crude way of resolving input replicas. A full featured
+					//FIXME replica resolving logic should come here
+					DataCatalog dataCatalog = RegistryFactory.getDataCatalog();
+					processModel.getProcessInputs().stream().forEach(pi -> {
+						if (pi.getType().equals(DataType.URI) && pi.getValue().startsWith("airavata-dp://")) {
+							try {
+								DataProductModel dataProductModel = dataCatalog.getDataProduct(pi.getValue());
+								dataProductModel.getReplicaLocations().stream().filter(rpModel -> rpModel.getReplicaLocationCategory()
+										.equals(ReplicaLocationCategory.GATEWAY_DATA_STORE)).forEach(rpModel -> {
+									//TODO Should set storage resource id specific to each of these inputs
+									pi.setValue(rpModel.getFilePath());
+								});
+							} catch (DataCatalogException e) {
+								log.error(e.getMessage(), e);
+							}
+						}
+					});
 					String taskDag = orchestrator.createAndSaveTasks(gatewayId, processModel, experiment.getUserConfigurationData().isAiravataAutoSchedule());
 					processModel.setTaskDag(taskDag);
-					experimentCatalog.update(ExperimentCatalogModelType.PROCESS,processModel, processModel.getProcessId());
+					experimentCatalog.update(ExperimentCatalogModelType.PROCESS, processModel, processModel.getProcessId());
 				}
 
 				if (!validateProcess(experimentId, processes)) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/37487421/modules/registry/registry-cpi/src/main/java/org/apache/airavata/registry/cpi/DataCatalog.java
----------------------------------------------------------------------
diff --git a/modules/registry/registry-cpi/src/main/java/org/apache/airavata/registry/cpi/DataCatalog.java b/modules/registry/registry-cpi/src/main/java/org/apache/airavata/registry/cpi/DataCatalog.java
index 89b0cc8..2ddda7c 100644
--- a/modules/registry/registry-cpi/src/main/java/org/apache/airavata/registry/cpi/DataCatalog.java
+++ b/modules/registry/registry-cpi/src/main/java/org/apache/airavata/registry/cpi/DataCatalog.java
@@ -28,13 +28,13 @@ import java.util.List;
 public interface DataCatalog {
     String schema = "airavata-dp";
 
-    String registerDataProduct(DataProductModel resource) throws DataCatalogException;
+    String registerDataProduct(DataProductModel product) throws DataCatalogException;
 
-    boolean removeDataProduct(String resourceId) throws DataCatalogException;
+    boolean removeDataProduct(String productUri) throws DataCatalogException;
 
-    boolean updateDataProduct(DataProductModel resource) throws DataCatalogException;
+    boolean updateDataProduct(DataProductModel product) throws DataCatalogException;
 
-    DataProductModel getDataProduct(String resourceId) throws DataCatalogException;
+    DataProductModel getDataProduct(String productUri) throws DataCatalogException;
 
     String registerReplicaLocation(DataReplicaLocationModel dataReplicaLocationModel) throws DataCatalogException;
 
@@ -44,5 +44,5 @@ public interface DataCatalog {
 
     DataReplicaLocationModel getReplicaLocation(String replicaId) throws DataCatalogException;
 
-    List<DataReplicaLocationModel> getAllReplicaLocations(String resourceId) throws DataCatalogException;
+    List<DataReplicaLocationModel> getAllReplicaLocations(String productUri) throws DataCatalogException;
 }