You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by is...@apache.org on 2021/06/01 17:59:26 UTC
[airavata-data-lake] 43/46: custos data synchronizer
This is an automated email from the ASF dual-hosted git repository.
isjarana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git
commit 229140eedd92d1c162741ff1f891a2c45dfe3603
Author: Isuru Ranawaka <ir...@gmail.com>
AuthorDate: Tue Jun 1 10:46:44 2021 -0400
custos data synchronizer
---
.../apache/airavata/drms/core/Neo4JConnector.java | 32 ++++-
.../drms/core/constants/UserAndGroupConstants.java | 6 +
.../deserializer/UserAndGroupDeserializer.java | 51 +++++++
.../drms-custos-synchronizer/pom.xml | 77 ++++++++++
.../drms/custos/synchronizer/Configuration.java | 139 ++++++++++++++++++
.../custos/synchronizer/CustosSynchronizer.java | 60 ++++++++
.../airavata/drms/custos/synchronizer/Utils.java | 62 ++++++++
.../datafetcher/CustosDataFetchingJob.java | 43 ++++++
.../synchronizer/handlers/SharingHandler.java | 140 ++++++++++++++++++
.../synchronizer/handlers/UserAndGroupHandler.java | 156 +++++++++++++++++++++
.../src/main/resources/config.yml | 12 ++
.../src/main/resources/logback.xml | 45 ++++++
data-resource-management-service/pom.xml | 1 +
13 files changed, 823 insertions(+), 1 deletion(-)
diff --git a/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/Neo4JConnector.java b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/Neo4JConnector.java
index 1d39ce4..96a283b 100644
--- a/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/Neo4JConnector.java
+++ b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/Neo4JConnector.java
@@ -28,12 +28,24 @@ public class Neo4JConnector {
private String userName;
private String password;
+ private Driver driver;
+
+ public Neo4JConnector() {
+ }
+
public Neo4JConnector(String uri, String userName, String password) {
this.uri = uri;
this.userName = userName;
this.password = password;
}
+ public void init(String uri, String userName, String password) {
+ this.uri = uri;
+ this.userName = userName;
+ this.password = password;
+ this.driver = GraphDatabase.driver(uri, AuthTokens.basic(userName, password));
+ }
+
public List<Record> searchNodes(String query) {
Driver driver = GraphDatabase.driver(uri, AuthTokens.basic(userName, password));
Session session = driver.session();
@@ -52,6 +64,22 @@ public class Neo4JConnector {
tx.close();
}
+ public void runTransactionalQuery(Map<String, Object> parameters, String query) {
+ Session session = driver.session();
+ Transaction tx = session.beginTransaction();
+ Result result = tx.run(query, parameters);
+ tx.commit();
+ tx.close();
+ }
+
+ public void runTransactionalQuery(String query) {
+ Session session = driver.session();
+ Transaction tx = session.beginTransaction();
+ Result result = tx.run(query);
+ tx.commit();
+ tx.close();
+ }
+
public void createMetadataNode(String parentLabel, String parentIdName, String parentIdValue,
String userId, String key, String value) {
Driver driver = GraphDatabase.driver(uri, AuthTokens.basic(userName, password));
@@ -59,8 +87,10 @@ public class Neo4JConnector {
Transaction tx = session.beginTransaction();
tx.run("match (u:User)-[r1:MEMBER_OF]->(g:Group)<-[r2:SHARED_WITH]-(s:" + parentLabel + ") where u.userId='" + userId +
"' and s." + parentIdName + "='" + parentIdValue +
- "' merge (m:Metadata)<-[r3:HAS_METADATA]-(s) set m." + key + "='" + value + "' return m");
+ "' merge (m:Metadata)<-[r3:HAS_METADATA]-(s) set m." + key + "='" + value + "' return m");
tx.commit();
tx.close();
}
+
+
}
diff --git a/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/constants/UserAndGroupConstants.java b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/constants/UserAndGroupConstants.java
new file mode 100644
index 0000000..a65b053
--- /dev/null
+++ b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/constants/UserAndGroupConstants.java
@@ -0,0 +1,6 @@
+package org.apache.airavata.drms.core.constants;
+
+public class UserAndGroupConstants {
+
+ public static final String USER_LABEL = "User";
+}
diff --git a/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/deserializer/UserAndGroupDeserializer.java b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/deserializer/UserAndGroupDeserializer.java
new file mode 100644
index 0000000..aea14d8
--- /dev/null
+++ b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/deserializer/UserAndGroupDeserializer.java
@@ -0,0 +1,51 @@
+package org.apache.airavata.drms.core.deserializer;
+
+import org.apache.airavata.datalake.drms.groups.User;
+import org.apache.airavata.drms.core.constants.UserAndGroupConstants;
+import org.neo4j.driver.Record;
+import org.neo4j.driver.Value;
+import org.neo4j.driver.internal.InternalRecord;
+import org.neo4j.driver.types.Node;
+import org.springframework.beans.BeanWrapper;
+import org.springframework.beans.PropertyAccessorFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class UserAndGroupDeserializer {
+
+
+ public static List<User> deserializeUserList(List<Record> neo4jRecords) throws Exception {
+ List<User> userList = new ArrayList<>();
+ for (Record record : neo4jRecords) {
+ InternalRecord internalRecord = (InternalRecord) record;
+ List<Value> values = internalRecord.values();
+ for (Value value : values) {
+ Node node = value.asNode();
+ if (node.hasLabel(UserAndGroupConstants.USER_LABEL)) {
+ userList.add(deriveUserFromMap(node.asMap()));
+ }
+ }
+ }
+ return userList;
+ }
+
+ public static User deriveUserFromMap(Map<String, Object> fixedMap) throws Exception {
+
+ Map<String, Object> asMap = new HashMap<>(fixedMap);
+ User.Builder builder = User.newBuilder();
+ asMap.remove(UserAndGroupConstants.USER_LABEL);
+ setObjectFieldsUsingMap(builder, asMap);
+ return builder.build();
+ }
+
+
+ private static void setObjectFieldsUsingMap(Object target, Map<String, Object> values) {
+ for (String field : values.keySet()) {
+ BeanWrapper beanWrapper = PropertyAccessorFactory.forBeanPropertyAccess(target);
+ beanWrapper.setPropertyValue(field, values.get(field));
+ }
+ }
+}
diff --git a/data-resource-management-service/drms-custos-synchronizer/pom.xml b/data-resource-management-service/drms-custos-synchronizer/pom.xml
new file mode 100644
index 0000000..166cf3b
--- /dev/null
+++ b/data-resource-management-service/drms-custos-synchronizer/pom.xml
@@ -0,0 +1,77 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>data-resource-management-service</artifactId>
+ <groupId>org.apache.airavata.data.lake</groupId>
+ <version>0.01-SNAPSHOT</version>
+ </parent>
+ <artifactId>drms-custos-synchronizer</artifactId>
+
+ <properties>
+ <maven.compiler.source>11</maven.compiler.source>
+ <maven.compiler.target>11</maven.compiler.target>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter</artifactId>
+ <version>${spring.boot.data.jpa}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>net.sf.dozer</groupId>
+ <artifactId>dozer</artifactId>
+ <version>5.5.1</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>${protobuf.java}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ <version>${io.grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
+ <version>${io.grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty</artifactId>
+ <version>${io.grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <version>${log4j.over.slf4j}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.yaml</groupId>
+ <artifactId>snakeyaml</artifactId>
+ <version>${yaml.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.quartz-scheduler</groupId>
+ <artifactId>quartz</artifactId>
+ <version>2.3.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata.data.lake</groupId>
+ <artifactId>drms-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.custos</groupId>
+ <artifactId>custos-java-sdk</artifactId>
+ <version>${custos.clients.version}</version>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/Configuration.java b/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/Configuration.java
new file mode 100644
index 0000000..f302a98
--- /dev/null
+++ b/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/Configuration.java
@@ -0,0 +1,139 @@
+package org.apache.airavata.drms.custos.synchronizer;
+
+public class Configuration {
+
+ private long pollingInterval;
+ private Custos custos;
+ private DataResourceManagementService dataResourceManagementService;
+
+ public Configuration() {
+
+ }
+
+ public Custos getCustos() {
+ return custos;
+ }
+
+ public void setCustos(Custos custos) {
+ this.custos = custos;
+ }
+
+ public DataResourceManagementService getDataResourceManagementService() {
+ return dataResourceManagementService;
+ }
+
+ public void setDataResourceManagementService(DataResourceManagementService dataResourceManagementService) {
+ this.dataResourceManagementService = dataResourceManagementService;
+ }
+
+ public long getPollingInterval() {
+ return pollingInterval;
+ }
+
+ public void setPollingInterval(long pollingInterval) {
+ this.pollingInterval = pollingInterval;
+ }
+
+ public static class Custos {
+
+ private String host;
+ private int port;
+ private String custosId;
+ private String custosSec;
+ private String[] tenantsToBeSynced;
+
+ public Custos(String host, int port, String custosId, String custosSec, String[] tenantsToBeSynced) {
+ this.host = host;
+ this.port = port;
+ this.custosId = custosId;
+ this.custosSec = custosSec;
+ this.tenantsToBeSynced = tenantsToBeSynced;
+ }
+
+ public Custos() {
+
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ public String getCustosId() {
+ return custosId;
+ }
+
+ public void setCustosId(String custosId) {
+ this.custosId = custosId;
+ }
+
+ public String getCustosSec() {
+ return custosSec;
+ }
+
+ public void setCustosSec(String custosSec) {
+ this.custosSec = custosSec;
+ }
+
+ public String[] getTenantsToBeSynced() {
+ return tenantsToBeSynced;
+ }
+
+ public void setTenantsToBeSynced(String[] tenantsToBeSynced) {
+ this.tenantsToBeSynced = tenantsToBeSynced;
+ }
+ }
+
+ public static class DataResourceManagementService {
+
+ private String dbURI;
+ private String dbUser;
+ private String dbPassword;
+
+ public DataResourceManagementService(String dbURI, String dbUser, String dbPassword) {
+ this.dbURI = dbURI;
+ this.dbUser = dbUser;
+ this.dbPassword = dbPassword;
+ }
+
+ public DataResourceManagementService() {
+ }
+
+ public String getDbURI() {
+ return dbURI;
+ }
+
+ public void setDbURI(String dbURI) {
+ this.dbURI = dbURI;
+ }
+
+ public String getDbUser() {
+ return dbUser;
+ }
+
+ public void setDbUser(String dbUser) {
+ this.dbUser = dbUser;
+ }
+
+ public String getDbPassword() {
+ return dbPassword;
+ }
+
+ public void setDbPassword(String dbPassword) {
+ this.dbPassword = dbPassword;
+ }
+ }
+
+
+}
diff --git a/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/CustosSynchronizer.java b/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/CustosSynchronizer.java
new file mode 100644
index 0000000..b19dbf3
--- /dev/null
+++ b/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/CustosSynchronizer.java
@@ -0,0 +1,60 @@
+package org.apache.airavata.drms.custos.synchronizer;
+
+import org.apache.airavata.drms.custos.synchronizer.datafetcher.CustosDataFetchingJob;
+import org.quartz.*;
+import org.quartz.impl.StdSchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class CustosSynchronizer implements CommandLineRunner {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CustosSynchronizer.class);
+ private static String configFilePath;
+
+
+ public static void main(String[] args) {
+ SpringApplication.run(CustosSynchronizer.class, args);
+ }
+
+ @Override
+ public void run(String... args) throws Exception {
+ LOGGER.info("Starting Custos synchronizer ...");
+ if (args.length > 0) {
+ configFilePath = args[0];
+ }
+ configFilePath = "/Users/isururanawaka/Documents/Airavata_Repository/airavata-data-lake" +
+ "/data-resource-management-service/drms-custos-synchronizer/src/main/resources/config.yml";
+
+ LOGGER.info("Configuring scheduler ...");
+ Utils.initializeConnectors(Utils.loadConfiguration(configFilePath));
+ configureScheduler(configFilePath);
+
+ }
+
+
+ private void configureScheduler(String configPath) throws SchedulerException {
+ SchedulerFactory schedulerFactory = new StdSchedulerFactory();
+ Scheduler scheduler = schedulerFactory.getScheduler();
+ Configuration configuration = Utils.loadConfig(configPath);
+ Trigger trigger = TriggerBuilder.newTrigger()
+ .withIdentity("custosDataFetcher", "synchronizer1")
+ .startNow()
+ .withSchedule(SimpleScheduleBuilder.simpleSchedule()
+ .withIntervalInSeconds((int) configuration.getPollingInterval())
+ .repeatForever())
+ .build();
+
+ JobDetail job = JobBuilder.newJob(CustosDataFetchingJob.class)
+ .withIdentity("myJob", "group1")
+ .usingJobData("configurationPath", configPath)
+ .build();
+ scheduler.start();
+ scheduler.scheduleJob(job, trigger);
+ }
+
+
+}
diff --git a/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/Utils.java b/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/Utils.java
new file mode 100644
index 0000000..ee664ef
--- /dev/null
+++ b/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/Utils.java
@@ -0,0 +1,62 @@
+package org.apache.airavata.drms.custos.synchronizer;
+
+import org.apache.airavata.drms.core.Neo4JConnector;
+import org.apache.custos.clients.CustosClientProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Optional;
+
+public class Utils {
+ private static final Logger LOGGER = LoggerFactory.getLogger(Utils.class);
+
+ private static final Neo4JConnector neo4JConnector = new Neo4JConnector();
+ private static CustosClientProvider custosClientProvider = null;
+
+
+ public static Configuration loadConfiguration(String path) {
+ return Optional.ofNullable(path).
+ map(Utils::loadConfig)
+ .orElseThrow(() -> {
+ String msg = "Configuration path cannot be null";
+ LOGGER.error(msg);
+ throw new RuntimeException(msg);
+ });
+ }
+
+ public static Configuration loadConfig(String filePath) {
+ try (InputStream in = new FileInputStream(filePath)) {
+ Yaml yaml = new Yaml();
+ return yaml.loadAs(in, Configuration.class);
+ } catch (Exception exception) {
+ LOGGER.error("Error loading config file", exception);
+ }
+ return null;
+ }
+
+ public static void initializeConnectors(Configuration configuration) {
+ neo4JConnector.init(configuration.getDataResourceManagementService().getDbURI(),
+ configuration.getDataResourceManagementService().getDbUser(),
+ configuration.getDataResourceManagementService().getDbPassword());
+ LOGGER.info(configuration.getCustos().getCustosId());
+ LOGGER.info(configuration.getCustos().getCustosSec());
+ custosClientProvider = new CustosClientProvider.Builder()
+ .setClientId(configuration.getCustos().getCustosId())
+ .setClientSec(configuration.getCustos().getCustosSec())
+ .setServerHost(configuration.getCustos().getHost())
+ .setServerPort(configuration.getCustos().getPort())
+ .build();
+ }
+
+ public static Neo4JConnector getNeo4JConnector() {
+ return neo4JConnector;
+ }
+
+ public static CustosClientProvider getCustosClientProvider() {
+ return custosClientProvider;
+ }
+
+}
diff --git a/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/datafetcher/CustosDataFetchingJob.java b/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/datafetcher/CustosDataFetchingJob.java
new file mode 100644
index 0000000..b6960b7
--- /dev/null
+++ b/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/datafetcher/CustosDataFetchingJob.java
@@ -0,0 +1,43 @@
+package org.apache.airavata.drms.custos.synchronizer.datafetcher;
+
+import org.apache.airavata.drms.custos.synchronizer.Configuration;
+import org.apache.airavata.drms.custos.synchronizer.handlers.SharingHandler;
+import org.apache.airavata.drms.custos.synchronizer.handlers.UserAndGroupHandler;
+import org.quartz.Job;
+import org.quartz.JobDataMap;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.airavata.drms.custos.synchronizer.Utils.loadConfiguration;
+
+/**
+ * Custos data fetching job
+ */
+
+public class CustosDataFetchingJob implements Job {
+ private static final Logger LOGGER = LoggerFactory.getLogger(CustosDataFetchingJob.class);
+
+
+ @Override
+ public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
+ try {
+ LOGGER.debug("Executing CustosDataFetchingJob ....... ");
+ JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
+ String path = jobDataMap.getString("configurationPath");
+ Configuration configuration = loadConfiguration(path);
+ UserAndGroupHandler userAndGroupHandler = new UserAndGroupHandler();
+ userAndGroupHandler.mergeUserAndGroups(configuration);
+ SharingHandler sharingHandler = new SharingHandler();
+ sharingHandler.mergeSharings(configuration);
+ } catch (Exception ex) {
+ String msg = "Error occurred while executing job" + ex.getMessage();
+ LOGGER.error(msg, ex);
+ }
+
+
+ }
+
+
+}
diff --git a/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/handlers/SharingHandler.java b/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/handlers/SharingHandler.java
new file mode 100644
index 0000000..332aa8d
--- /dev/null
+++ b/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/handlers/SharingHandler.java
@@ -0,0 +1,140 @@
+package org.apache.airavata.drms.custos.synchronizer.handlers;
+
+import org.apache.airavata.drms.core.Neo4JConnector;
+import org.apache.airavata.drms.custos.synchronizer.Configuration;
+import org.apache.airavata.drms.custos.synchronizer.Utils;
+import org.apache.custos.clients.CustosClientProvider;
+import org.apache.custos.sharing.management.client.SharingManagementClient;
+import org.apache.custos.sharing.service.Entity;
+import org.apache.custos.sharing.service.GetAllDirectSharingsResponse;
+import org.apache.custos.sharing.service.SharingMetadata;
+import org.apache.custos.sharing.service.SharingRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SharingHandler {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SharingHandler.class);
+
+ private final Neo4JConnector neo4JConnector;
+ private CustosClientProvider custosClientProvider;
+
+ public SharingHandler() {
+ this.neo4JConnector = Utils.getNeo4JConnector();
+ this.custosClientProvider = Utils.getCustosClientProvider();
+ }
+
+ public void mergeSharings(Configuration configuration) {
+ try {
+ LOGGER.debug("Merging sharings for custos client with id "+ configuration.getCustos().getCustosId());
+ SharingManagementClient sharingManagementClient = custosClientProvider.getSharingManagementClient();
+ mergeSharings(sharingManagementClient, configuration.getCustos().getTenantsToBeSynced());
+
+ } catch (Exception ex) {
+ String msg = "Exception occurred while merging user" + ex.getMessage();
+ LOGGER.error(msg, ex);
+ }
+
+ }
+
+
+ private void mergeSharings(SharingManagementClient sharingManagementClient, String[] clientIds) {
+ try {
+ SharingRequest sharingRequest = SharingRequest.newBuilder().build();
+ Arrays.stream(clientIds).forEach(clientId -> {
+ GetAllDirectSharingsResponse response = sharingManagementClient
+ .getAllDirectSharings(clientId, sharingRequest);
+ List<SharingMetadata> metadataList = response.getSharedDataList();
+ metadataList.forEach(metadata -> {
+ mergeEntities(metadata.getEntity(), clientId);
+
+ });
+ metadataList.forEach(metadata -> {
+ mergeEntityParentChildRelationShips(sharingManagementClient, metadata.getEntity(), clientId);
+ mergeEntitySharings(metadata, clientId);
+ });
+ });
+
+ } catch (Exception ex) {
+ String msg = "Error occurred while merging sharings from Custos ";
+ LOGGER.error(msg, ex);
+ }
+
+
+ }
+
+ private void mergeEntities(Entity entity, String clientId) {
+ String query = "Merge (u:" + entity.getType() + "{entityId: '" + entity.getId() + "',"
+ + "custosClientId:'" + clientId + "'}" + ")"
+ + " SET u = $props return u ";
+ Map<String, Object> map = new HashMap<>();
+ map.put("description", entity.getDescription());
+ map.put("name", entity.getName());
+ map.put("createdTime", entity.getCreatedAt());
+ map.put("custosClientId", clientId);
+ map.put("entityId", entity.getId());
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("props", map);
+ try {
+ this.neo4JConnector.runTransactionalQuery(parameters, query);
+ } catch (Exception ex) {
+ String msg = "Error occurred while merging entities ";
+ LOGGER.error(msg, ex);
+ }
+
+ }
+
+ private void mergeEntityParentChildRelationShips(SharingManagementClient sharingManagementClient, Entity entity,
+ String clientId) {
+
+ if (!entity.getParentId().trim().isEmpty()) {
+ Entity parentEntity = Entity.newBuilder().setId(entity.getParentId()).build();
+ Entity fullParentEntity = sharingManagementClient.getEntity(clientId, parentEntity);
+ String query = "MATCH (a:" + entity.getType() + "), (b:" + fullParentEntity.getType() + ") WHERE a.entityId = '"
+ + entity.getId() + "' AND a.custosClientId = '"
+ + clientId + "' AND " + "b.entityId ='" + fullParentEntity.getId() + "' AND b.custosClientId ='" + clientId +
+ "' MERGE (a)-[r:CHILD_OF]->(b) RETURN a, b";
+ try {
+ this.neo4JConnector.runTransactionalQuery(query);
+ } catch (Exception ex) {
+ String msg = "Error occurred while merging parent child relationships ";
+ LOGGER.error(msg, ex);
+ }
+ }
+ }
+
+ private void mergeEntitySharings(SharingMetadata metadata, String clientId) {
+ Entity entity = metadata.getEntity();
+ String sourceId = metadata.getEntity().getId();
+ String permissionId = metadata.getPermission().getId();
+ String userId = metadata.getOwnerId();
+ String type = metadata.getOwnerType();
+ String query = null;
+ if (type.equalsIgnoreCase("USER")) {
+ query = "MATCH (a:" + entity.getType() + "), (b:User) WHERE a.entityId = '"
+ + sourceId + "' AND a.custosClientId = '"
+ + clientId + "' AND " + "b.username ='" + userId + "' AND b.custosClientId ='" + clientId +
+ "' MERGE (a)-[r:SHARED_WITH]->(b) SET r.permission='" + permissionId + "' RETURN a, b";
+
+ } else if (type.equalsIgnoreCase("GROUP")) {
+ query = "MATCH (a:" + entity.getType() + "), (b:Group) WHERE a.entityId = '"
+ + sourceId + "' AND a.custosClientId = '"
+ + clientId + "' AND " + "b.groupId ='" + userId + "' AND b.custosClientId ='" + clientId +
+ "' MERGE (a)-[r:SHARED_WITH]->(b) SET r.permission='" + permissionId + "' RETURN a, b";
+ }
+ if (query != null) {
+ try {
+ this.neo4JConnector.runTransactionalQuery(query);
+ } catch (Exception ex) {
+ String msg = "Error occurred while merging sharings ";
+ LOGGER.error(msg, ex);
+ }
+ }
+
+ }
+}
diff --git a/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/handlers/UserAndGroupHandler.java b/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/handlers/UserAndGroupHandler.java
new file mode 100644
index 0000000..625cfcb
--- /dev/null
+++ b/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/handlers/UserAndGroupHandler.java
@@ -0,0 +1,156 @@
+package org.apache.airavata.drms.custos.synchronizer.handlers;
+
+import org.apache.airavata.drms.core.Neo4JConnector;
+import org.apache.airavata.drms.custos.synchronizer.Configuration;
+import org.apache.airavata.drms.custos.synchronizer.Utils;
+import org.apache.custos.clients.CustosClientProvider;
+import org.apache.custos.group.management.client.GroupManagementClient;
+import org.apache.custos.user.management.client.UserManagementClient;
+import org.apache.custos.user.profile.service.GetAllGroupsResponse;
+import org.apache.custos.user.profile.service.GetAllUserProfilesResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class UserAndGroupHandler {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(UserAndGroupHandler.class);
+
+ private final Neo4JConnector neo4JConnector;
+ private CustosClientProvider custosClientProvider;
+
+ public UserAndGroupHandler() {
+ this.neo4JConnector = Utils.getNeo4JConnector();
+ this.custosClientProvider = Utils.getCustosClientProvider();
+ }
+
+ public void mergeUserAndGroups(Configuration configuration) {
+ try {
+ LOGGER.debug("Merging groups for custos client with id " + configuration.getCustos().getCustosId());
+ String[] clientIds = configuration.getCustos().getTenantsToBeSynced();
+ UserManagementClient userManagementClient = this.custosClientProvider.getUserManagementClient();
+ GroupManagementClient groupManagementClient = this.custosClientProvider.getGroupManagementClient();
+ mergeUsers(userManagementClient, clientIds);
+ mergeGroups(groupManagementClient, clientIds);
+ mergeUserAndGroupMemberships(groupManagementClient, userManagementClient, clientIds);
+ } catch (Exception ex) {
+ String msg = "Exception occurred while merging user" + ex.getMessage();
+ LOGGER.error(msg, ex);
+ }
+
+
+ }
+
+ private void mergeUsers(UserManagementClient userManagementClient, String[] clientIds) {
+ try {
+ Arrays.stream(clientIds).forEach(val -> {
+ GetAllUserProfilesResponse response = userManagementClient.getAllUserProfiles(val);
+ response.getProfilesList().forEach(userProfile -> {
+ String query = "Merge (u:User{username: '" + userProfile.getUsername() + "',"
+ + "custosClientId:'" + val + "'}" + ")"
+ + " SET u = $props return u ";
+ Map<String, Object> map = new HashMap<>();
+ map.put("firstName", userProfile.getFirstName());
+ map.put("name", userProfile.getUsername());
+ map.put("lastName", userProfile.getLastName());
+ map.put("email", userProfile.getEmail());
+ map.put("username", userProfile.getUsername());
+ map.put("custosClientId", val);
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("props", map);
+ this.neo4JConnector.runTransactionalQuery(parameters, query);
+ });
+
+ });
+
+
+ } catch (Exception ex) {
+ LOGGER.error("Error occurred while merging user ", ex);
+ }
+ }
+
+
+ private void mergeGroups(GroupManagementClient groupManagementClient, String[] clientIds) {
+ try {
+ Arrays.stream(clientIds).forEach(val -> {
+ GetAllGroupsResponse response = groupManagementClient.getAllGroups(val);
+ response.getGroupsList().forEach(gr -> {
+ String id = gr.getId().replaceAll("'", "");
+ String query = "Merge (u:Group{groupId: '" + id + "',"
+ + "custosClientId:'" + val + "'}" + ")"
+ + " SET u = $props return u ";
+ Map<String, Object> map = new HashMap<>();
+ map.put("description", gr.getDescription());
+ map.put("name", gr.getName());
+ map.put("groupId", id);
+ map.put("createdTime", gr.getCreatedTime());
+ map.put("lastModifiedTime", gr.getLastModifiedTime());
+ map.put("custosClientId", val);
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("props", map);
+ try {
+ this.neo4JConnector.runTransactionalQuery(parameters, query);
+ } catch (Exception ex) {
+ LOGGER.error("Error occurred while merging groups ", ex);
+ }
+ });
+ });
+ } catch (Exception ex) {
+ LOGGER.error("Error occurred while merging groups ", ex);
+ }
+ }
+
+ private void mergeUserAndGroupMemberships(GroupManagementClient groupManagementClient, UserManagementClient userManagementClient,
+ String[] clientIds) {
+ try {
+ Arrays.stream(clientIds).forEach(val -> {
+ GetAllGroupsResponse response = groupManagementClient.getAllGroups(val);
+ response.getGroupsList().forEach(gr -> {
+ String id = gr.getId().replaceAll("'", "");
+ GetAllUserProfilesResponse userProfilesResponse = groupManagementClient.getAllChildUsers(val, gr.getId());
+ userProfilesResponse.getProfilesList().forEach(prof -> {
+ String memberShipType = prof.getMembershipType();
+ String userId = prof.getUsername();
+ mergeUserMemberShip(userId, id, val, memberShipType);
+ });
+ GetAllGroupsResponse getAllGroupsResponse = groupManagementClient.getAllChildGroups(val, gr.getId());
+ getAllGroupsResponse.getGroupsList().forEach(grMem -> {
+ String childId = gr.getId().replaceAll("'", "");
+ mergeGroupMemberShip(id, childId, val);
+ });
+ });
+ });
+ } catch (Exception ex) {
+ LOGGER.error("Error occurred while merging groups ", ex);
+ }
+ }
+
+ private void mergeUserMemberShip(String username, String groupId, String custosClientId, String role) {
+ String query = "MATCH (a:User), (b:Group) WHERE a.username = '" + username + "' AND a.custosClientId = '"
+ + custosClientId + "' AND " + "b.groupId ='" + groupId + "' AND b.custosClientId ='" + custosClientId +
+ "' MERGE (a)-[r:MEMBER_OF]->(b) SET r.role='" + role + "' RETURN a, b";
+ try {
+ this.neo4JConnector.runTransactionalQuery(query);
+ } catch (Exception ex) {
+ LOGGER.error("Error occurred while merging UserGroupMembership ", ex);
+ }
+
+ }
+
+ private void mergeGroupMemberShip(String parentGroupId, String childGroupId, String custosClientId) {
+ String query = "MATCH (a:Group), (b:Group) WHERE a.groupId = '" + parentGroupId + "' AND a.custosClientId = '"
+ + custosClientId + "' AND " + "b.groupId ='" + childGroupId + "' AND b.custosClientId ='"
+ + custosClientId + "' MERGE (a)<-[r:CHILD_OF]-(b) RETURN a, b";
+ try {
+ this.neo4JConnector.runTransactionalQuery(query);
+ } catch (Exception ex) {
+ LOGGER.error("Error occurred while merging Group memberships ", ex);
+ }
+
+ }
+
+}
diff --git a/data-resource-management-service/drms-custos-synchronizer/src/main/resources/config.yml b/data-resource-management-service/drms-custos-synchronizer/src/main/resources/config.yml
new file mode 100644
index 0000000..f3d467f
--- /dev/null
+++ b/data-resource-management-service/drms-custos-synchronizer/src/main/resources/config.yml
@@ -0,0 +1,12 @@
+pollingInterval: 60
+dataResourceManagementService:
+ dbURI: "bolt://149.165.156.173:7687"
+ dbUser: "neo4j"
+ dbPassword: "blastcovid19"
+custos:
+ host: "custos.scigap.org"
+ port: 31499
+ custosId: "custos-2zuomcugra3ebgsqtzmf-10000514"
+ custosSec: "mupUhF4JL0S3IFHBjfhiTfLJS1NgSWfvkCj3l6c7"
+ tenantsToBeSynced:
+ - "custos-cmcdclbywlxmc2ktzv0d-10000702"
diff --git a/data-resource-management-service/drms-custos-synchronizer/src/main/resources/logback.xml b/data-resource-management-service/drms-custos-synchronizer/src/main/resources/logback.xml
new file mode 100644
index 0000000..270e6f7
--- /dev/null
+++ b/data-resource-management-service/drms-custos-synchronizer/src/main/resources/logback.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<configuration>
+
+ <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d [%t] %-5p %c{30} %m [%X]%n</pattern>
+ </encoder>
+ </appender>
+
+ <appender name="LOGFILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <File>../logs/custos_synchronizer.log</File>
+ <Append>true</Append>
+ <encoder>
+ <pattern>%d [%t] %-5p %c{30} %m [%X]%n</pattern>
+ </encoder>
+ <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+ <fileNamePattern>../logs/custos_synchronizer.log.%d{yyyy-MM-dd}</fileNamePattern>
+ <maxHistory>30</maxHistory>
+ <totalSizeCap>1GB</totalSizeCap>
+ </rollingPolicy>
+ </appender>
+
+ <logger name="ch.qos.logback" level="WARN"/>
+ <logger name="org.apache.airavata" level="INFO"/>
+ <root level="INFO">
+ <appender-ref ref="CONSOLE"/>
+ <appender-ref ref="LOGFILE"/>
+ </root>
+</configuration>
\ No newline at end of file
diff --git a/data-resource-management-service/pom.xml b/data-resource-management-service/pom.xml
index d1c7685..f8eb041 100644
--- a/data-resource-management-service/pom.xml
+++ b/data-resource-management-service/pom.xml
@@ -31,6 +31,7 @@
<module>drms-stubs</module>
<module>drms-core</module>
<module>drms-api</module>
+ <module>drms-custos-synchronizer</module>
</modules>
<dependencies>