You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2016/03/25 22:22:35 UTC
[12/55] [abbrv] airavata git commit: resolving replicas at the
orchestrator
resolving replicas at the orchestrator
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/37487421
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/37487421
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/37487421
Branch: refs/heads/master
Commit: 37487421cf150718b810e2aed2b42c72f1293526
Parents: faabd97
Author: scnakandala <su...@gmail.com>
Authored: Wed Mar 23 14:37:44 2016 -0400
Committer: scnakandala <su...@gmail.com>
Committed: Wed Mar 23 14:37:44 2016 -0400
----------------------------------------------------------------------
.../server/OrchestratorServerHandler.java | 37 +++++++++++++-------
.../airavata/registry/cpi/DataCatalog.java | 10 +++---
2 files changed, 30 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/37487421/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 7c192f5..25b3404 100644
--- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -29,17 +29,16 @@ import org.apache.airavata.common.utils.ThriftUtils;
import org.apache.airavata.common.utils.ZkConstants;
import org.apache.airavata.gfac.core.GFacUtils;
import org.apache.airavata.gfac.core.scheduler.HostScheduler;
-import org.apache.airavata.messaging.core.MessageContext;
-import org.apache.airavata.messaging.core.MessageHandler;
-import org.apache.airavata.messaging.core.MessagingConstants;
-import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.messaging.core.PublisherFactory;
+import org.apache.airavata.messaging.core.*;
import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile;
+import org.apache.airavata.model.application.io.DataType;
+import org.apache.airavata.model.data.product.DataProductModel;
+import org.apache.airavata.model.data.product.ReplicaLocationCategory;
import org.apache.airavata.model.error.LaunchValidationException;
import org.apache.airavata.model.experiment.ExperimentModel;
import org.apache.airavata.model.experiment.ExperimentType;
@@ -71,11 +70,7 @@ import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
public class OrchestratorServerHandler implements OrchestratorService.Iface {
private static Logger log = LoggerFactory.getLogger(OrchestratorServerHandler.class);
@@ -156,7 +151,8 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
}
// still the token is empty, then we fail the experiment
if (token == null || token.isEmpty()){
- log.error("You have not configured credential store token at gateway profile or compute resource preference. Please provide the correct token at gateway profile or compute resource preference.");
+ log.error("You have not configured credential store token at gateway profile or compute resource preference." +
+ " Please provide the correct token at gateway profile or compute resource preference.");
return false;
}
ExperimentType executionType = experiment.getExperimentType();
@@ -165,9 +161,26 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
List<ProcessModel> processes = orchestrator.createProcesses(experimentId, gatewayId);
for (ProcessModel processModel : processes){
+ //FIXME Resolving replica if available. This is a very crude way of resolving input replicas. A full featured
+ //FIXME replica resolving logic should come here
+ DataCatalog dataCatalog = RegistryFactory.getDataCatalog();
+ processModel.getProcessInputs().stream().forEach(pi -> {
+ if (pi.getType().equals(DataType.URI) && pi.getValue().startsWith("airavata-dp://")) {
+ try {
+ DataProductModel dataProductModel = dataCatalog.getDataProduct(pi.getValue());
+ dataProductModel.getReplicaLocations().stream().filter(rpModel -> rpModel.getReplicaLocationCategory()
+ .equals(ReplicaLocationCategory.GATEWAY_DATA_STORE)).forEach(rpModel -> {
+ //TODO Should set storage resource id specific to each of these inputs
+ pi.setValue(rpModel.getFilePath());
+ });
+ } catch (DataCatalogException e) {
+ log.error(e.getMessage(), e);
+ }
+ }
+ });
String taskDag = orchestrator.createAndSaveTasks(gatewayId, processModel, experiment.getUserConfigurationData().isAiravataAutoSchedule());
processModel.setTaskDag(taskDag);
- experimentCatalog.update(ExperimentCatalogModelType.PROCESS,processModel, processModel.getProcessId());
+ experimentCatalog.update(ExperimentCatalogModelType.PROCESS, processModel, processModel.getProcessId());
}
if (!validateProcess(experimentId, processes)) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/37487421/modules/registry/registry-cpi/src/main/java/org/apache/airavata/registry/cpi/DataCatalog.java
----------------------------------------------------------------------
diff --git a/modules/registry/registry-cpi/src/main/java/org/apache/airavata/registry/cpi/DataCatalog.java b/modules/registry/registry-cpi/src/main/java/org/apache/airavata/registry/cpi/DataCatalog.java
index 89b0cc8..2ddda7c 100644
--- a/modules/registry/registry-cpi/src/main/java/org/apache/airavata/registry/cpi/DataCatalog.java
+++ b/modules/registry/registry-cpi/src/main/java/org/apache/airavata/registry/cpi/DataCatalog.java
@@ -28,13 +28,13 @@ import java.util.List;
public interface DataCatalog {
String schema = "airavata-dp";
- String registerDataProduct(DataProductModel resource) throws DataCatalogException;
+ String registerDataProduct(DataProductModel product) throws DataCatalogException;
- boolean removeDataProduct(String resourceId) throws DataCatalogException;
+ boolean removeDataProduct(String productUri) throws DataCatalogException;
- boolean updateDataProduct(DataProductModel resource) throws DataCatalogException;
+ boolean updateDataProduct(DataProductModel product) throws DataCatalogException;
- DataProductModel getDataProduct(String resourceId) throws DataCatalogException;
+ DataProductModel getDataProduct(String productUri) throws DataCatalogException;
String registerReplicaLocation(DataReplicaLocationModel dataReplicaLocationModel) throws DataCatalogException;
@@ -44,5 +44,5 @@ public interface DataCatalog {
DataReplicaLocationModel getReplicaLocation(String replicaId) throws DataCatalogException;
- List<DataReplicaLocationModel> getAllReplicaLocations(String resourceId) throws DataCatalogException;
+ List<DataReplicaLocationModel> getAllReplicaLocations(String productUri) throws DataCatalogException;
}