You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by is...@apache.org on 2021/06/01 17:59:26 UTC

[airavata-data-lake] 43/46: custos data synchronizer

This is an automated email from the ASF dual-hosted git repository.

isjarana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git

commit 229140eedd92d1c162741ff1f891a2c45dfe3603
Author: Isuru Ranawaka <ir...@gmail.com>
AuthorDate: Tue Jun 1 10:46:44 2021 -0400

    custos data synchronizer
---
 .../apache/airavata/drms/core/Neo4JConnector.java  |  32 ++++-
 .../drms/core/constants/UserAndGroupConstants.java |   6 +
 .../deserializer/UserAndGroupDeserializer.java     |  51 +++++++
 .../drms-custos-synchronizer/pom.xml               |  77 ++++++++++
 .../drms/custos/synchronizer/Configuration.java    | 139 ++++++++++++++++++
 .../custos/synchronizer/CustosSynchronizer.java    |  60 ++++++++
 .../airavata/drms/custos/synchronizer/Utils.java   |  62 ++++++++
 .../datafetcher/CustosDataFetchingJob.java         |  43 ++++++
 .../synchronizer/handlers/SharingHandler.java      | 140 ++++++++++++++++++
 .../synchronizer/handlers/UserAndGroupHandler.java | 156 +++++++++++++++++++++
 .../src/main/resources/config.yml                  |  12 ++
 .../src/main/resources/logback.xml                 |  45 ++++++
 data-resource-management-service/pom.xml           |   1 +
 13 files changed, 823 insertions(+), 1 deletion(-)

diff --git a/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/Neo4JConnector.java b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/Neo4JConnector.java
index 1d39ce4..96a283b 100644
--- a/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/Neo4JConnector.java
+++ b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/Neo4JConnector.java
@@ -28,12 +28,24 @@ public class Neo4JConnector {
     private String userName;
     private String password;
 
+    private Driver driver;
+
+    public Neo4JConnector() {
+    }
+
     public Neo4JConnector(String uri, String userName, String password) {
         this.uri = uri;
         this.userName = userName;
         this.password = password;
     }
 
+    public void init(String uri, String userName, String password) {
+        this.uri = uri;
+        this.userName = userName;
+        this.password = password;
+        this.driver = GraphDatabase.driver(uri, AuthTokens.basic(userName, password));
+    }
+
     public List<Record> searchNodes(String query) {
         Driver driver = GraphDatabase.driver(uri, AuthTokens.basic(userName, password));
         Session session = driver.session();
@@ -52,6 +64,22 @@ public class Neo4JConnector {
         tx.close();
     }
 
+    public void runTransactionalQuery(Map<String, Object> parameters, String query) {
+        Session session = driver.session();
+        Transaction tx = session.beginTransaction();
+        Result result = tx.run(query, parameters);
+        tx.commit();
+        tx.close();
+    }
+
+    public void runTransactionalQuery(String query) {
+        Session session = driver.session();
+        Transaction tx = session.beginTransaction();
+        Result result = tx.run(query);
+        tx.commit();
+        tx.close();
+    }
+
     public void createMetadataNode(String parentLabel, String parentIdName, String parentIdValue,
                                    String userId, String key, String value) {
         Driver driver = GraphDatabase.driver(uri, AuthTokens.basic(userName, password));
@@ -59,8 +87,10 @@ public class Neo4JConnector {
         Transaction tx = session.beginTransaction();
         tx.run("match (u:User)-[r1:MEMBER_OF]->(g:Group)<-[r2:SHARED_WITH]-(s:" + parentLabel + ") where u.userId='" + userId +
                 "' and s." + parentIdName + "='" + parentIdValue +
-                        "' merge (m:Metadata)<-[r3:HAS_METADATA]-(s) set m." + key + "='" + value + "' return m");
+                "' merge (m:Metadata)<-[r3:HAS_METADATA]-(s) set m." + key + "='" + value + "' return m");
         tx.commit();
         tx.close();
     }
+
+
 }
diff --git a/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/constants/UserAndGroupConstants.java b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/constants/UserAndGroupConstants.java
new file mode 100644
index 0000000..a65b053
--- /dev/null
+++ b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/constants/UserAndGroupConstants.java
@@ -0,0 +1,6 @@
+package org.apache.airavata.drms.core.constants;
+
+public class UserAndGroupConstants {
+
+    public static final String USER_LABEL = "User";
+}
diff --git a/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/deserializer/UserAndGroupDeserializer.java b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/deserializer/UserAndGroupDeserializer.java
new file mode 100644
index 0000000..aea14d8
--- /dev/null
+++ b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/deserializer/UserAndGroupDeserializer.java
@@ -0,0 +1,51 @@
+package org.apache.airavata.drms.core.deserializer;
+
+import org.apache.airavata.datalake.drms.groups.User;
+import org.apache.airavata.drms.core.constants.UserAndGroupConstants;
+import org.neo4j.driver.Record;
+import org.neo4j.driver.Value;
+import org.neo4j.driver.internal.InternalRecord;
+import org.neo4j.driver.types.Node;
+import org.springframework.beans.BeanWrapper;
+import org.springframework.beans.PropertyAccessorFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class UserAndGroupDeserializer {
+
+
+    public static List<User> deserializeUserList(List<Record> neo4jRecords) throws Exception {
+        List<User> userList = new ArrayList<>();
+        for (Record record : neo4jRecords) {
+            InternalRecord internalRecord = (InternalRecord) record;
+            List<Value> values = internalRecord.values();
+            for (Value value : values) {
+                Node node = value.asNode();
+                if (node.hasLabel(UserAndGroupConstants.USER_LABEL)) {
+                    userList.add(deriveUserFromMap(node.asMap()));
+                }
+            }
+        }
+        return userList;
+    }
+
+    public static User deriveUserFromMap(Map<String, Object> fixedMap) throws Exception {
+
+        Map<String, Object> asMap = new HashMap<>(fixedMap);
+        User.Builder builder = User.newBuilder();
+        asMap.remove(UserAndGroupConstants.USER_LABEL);
+        setObjectFieldsUsingMap(builder, asMap);
+        return builder.build();
+    }
+
+
+    private static void setObjectFieldsUsingMap(Object target, Map<String, Object> values) {
+        for (String field : values.keySet()) {
+            BeanWrapper beanWrapper = PropertyAccessorFactory.forBeanPropertyAccess(target);
+            beanWrapper.setPropertyValue(field, values.get(field));
+        }
+    }
+}
diff --git a/data-resource-management-service/drms-custos-synchronizer/pom.xml b/data-resource-management-service/drms-custos-synchronizer/pom.xml
new file mode 100644
index 0000000..166cf3b
--- /dev/null
+++ b/data-resource-management-service/drms-custos-synchronizer/pom.xml
@@ -0,0 +1,77 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>data-resource-management-service</artifactId>
+        <groupId>org.apache.airavata.data.lake</groupId>
+        <version>0.01-SNAPSHOT</version>
+    </parent>
+    <artifactId>drms-custos-synchronizer</artifactId>
+
+    <properties>
+        <maven.compiler.source>11</maven.compiler.source>
+        <maven.compiler.target>11</maven.compiler.target>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter</artifactId>
+            <version>${spring.boot.data.jpa}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>net.sf.dozer</groupId>
+            <artifactId>dozer</artifactId>
+            <version>5.5.1</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>${protobuf.java}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-stub</artifactId>
+            <version>${io.grpc.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-protobuf</artifactId>
+            <version>${io.grpc.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-netty</artifactId>
+            <version>${io.grpc.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>log4j-over-slf4j</artifactId>
+            <version>${log4j.over.slf4j}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.yaml</groupId>
+            <artifactId>snakeyaml</artifactId>
+            <version>${yaml.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.quartz-scheduler</groupId>
+            <artifactId>quartz</artifactId>
+            <version>2.3.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata.data.lake</groupId>
+            <artifactId>drms-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.custos</groupId>
+            <artifactId>custos-java-sdk</artifactId>
+            <version>${custos.clients.version}</version>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/Configuration.java b/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/Configuration.java
new file mode 100644
index 0000000..f302a98
--- /dev/null
+++ b/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/Configuration.java
@@ -0,0 +1,139 @@
+package org.apache.airavata.drms.custos.synchronizer;
+
+public class Configuration {
+
+    private long pollingInterval;
+    private Custos custos;
+    private DataResourceManagementService dataResourceManagementService;
+
+    public Configuration() {
+
+    }
+
+    public Custos getCustos() {
+        return custos;
+    }
+
+    public void setCustos(Custos custos) {
+        this.custos = custos;
+    }
+
+    public DataResourceManagementService getDataResourceManagementService() {
+        return dataResourceManagementService;
+    }
+
+    public void setDataResourceManagementService(DataResourceManagementService dataResourceManagementService) {
+        this.dataResourceManagementService = dataResourceManagementService;
+    }
+
+    public long getPollingInterval() {
+        return pollingInterval;
+    }
+
+    public void setPollingInterval(long pollingInterval) {
+        this.pollingInterval = pollingInterval;
+    }
+
+    public static class Custos {
+
+        private String host;
+        private int port;
+        private String custosId;
+        private String custosSec;
+        private String[] tenantsToBeSynced;
+
+        public Custos(String host, int port, String custosId, String custosSec, String[] tenantsToBeSynced) {
+            this.host = host;
+            this.port = port;
+            this.custosId = custosId;
+            this.custosSec = custosSec;
+            this.tenantsToBeSynced = tenantsToBeSynced;
+        }
+
+        public Custos() {
+
+        }
+
+        public String getHost() {
+            return host;
+        }
+
+        public void setHost(String host) {
+            this.host = host;
+        }
+
+        public int getPort() {
+            return port;
+        }
+
+        public void setPort(int port) {
+            this.port = port;
+        }
+
+        public String getCustosId() {
+            return custosId;
+        }
+
+        public void setCustosId(String custosId) {
+            this.custosId = custosId;
+        }
+
+        public String getCustosSec() {
+            return custosSec;
+        }
+
+        public void setCustosSec(String custosSec) {
+            this.custosSec = custosSec;
+        }
+
+        public String[] getTenantsToBeSynced() {
+            return tenantsToBeSynced;
+        }
+
+        public void setTenantsToBeSynced(String[] tenantsToBeSynced) {
+            this.tenantsToBeSynced = tenantsToBeSynced;
+        }
+    }
+
+    public static class DataResourceManagementService {
+
+        private String dbURI;
+        private String dbUser;
+        private String dbPassword;
+
+        public DataResourceManagementService(String dbURI, String dbUser, String dbPassword) {
+            this.dbURI = dbURI;
+            this.dbUser = dbUser;
+            this.dbPassword = dbPassword;
+        }
+
+        public DataResourceManagementService() {
+        }
+
+        public String getDbURI() {
+            return dbURI;
+        }
+
+        public void setDbURI(String dbURI) {
+            this.dbURI = dbURI;
+        }
+
+        public String getDbUser() {
+            return dbUser;
+        }
+
+        public void setDbUser(String dbUser) {
+            this.dbUser = dbUser;
+        }
+
+        public String getDbPassword() {
+            return dbPassword;
+        }
+
+        public void setDbPassword(String dbPassword) {
+            this.dbPassword = dbPassword;
+        }
+    }
+
+
+}
diff --git a/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/CustosSynchronizer.java b/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/CustosSynchronizer.java
new file mode 100644
index 0000000..b19dbf3
--- /dev/null
+++ b/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/CustosSynchronizer.java
@@ -0,0 +1,60 @@
+package org.apache.airavata.drms.custos.synchronizer;
+
+import org.apache.airavata.drms.custos.synchronizer.datafetcher.CustosDataFetchingJob;
+import org.quartz.*;
+import org.quartz.impl.StdSchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class CustosSynchronizer implements CommandLineRunner {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(CustosSynchronizer.class);
+    private static String configFilePath;
+
+
+    public static void main(String[] args) {
+        SpringApplication.run(CustosSynchronizer.class, args);
+    }
+
+    @Override
+    public void run(String... args) throws Exception {
+        LOGGER.info("Starting Custos synchronizer ...");
+        if (args.length > 0) {
+            configFilePath = args[0];
+        }
+        configFilePath = "/Users/isururanawaka/Documents/Airavata_Repository/airavata-data-lake" +
+                "/data-resource-management-service/drms-custos-synchronizer/src/main/resources/config.yml";
+
+        LOGGER.info("Configuring scheduler ...");
+        Utils.initializeConnectors(Utils.loadConfiguration(configFilePath));
+        configureScheduler(configFilePath);
+
+    }
+
+
+    private void configureScheduler(String configPath) throws SchedulerException {
+        SchedulerFactory schedulerFactory = new StdSchedulerFactory();
+        Scheduler scheduler = schedulerFactory.getScheduler();
+        Configuration configuration = Utils.loadConfig(configPath);
+        Trigger trigger = TriggerBuilder.newTrigger()
+                .withIdentity("custosDataFetcher", "synchronizer1")
+                .startNow()
+                .withSchedule(SimpleScheduleBuilder.simpleSchedule()
+                        .withIntervalInSeconds((int) configuration.getPollingInterval())
+                        .repeatForever())
+                .build();
+
+        JobDetail job = JobBuilder.newJob(CustosDataFetchingJob.class)
+                .withIdentity("myJob", "group1")
+                .usingJobData("configurationPath", configPath)
+                .build();
+        scheduler.start();
+        scheduler.scheduleJob(job, trigger);
+    }
+
+
+}
diff --git a/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/Utils.java b/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/Utils.java
new file mode 100644
index 0000000..ee664ef
--- /dev/null
+++ b/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/Utils.java
@@ -0,0 +1,62 @@
+package org.apache.airavata.drms.custos.synchronizer;
+
+import org.apache.airavata.drms.core.Neo4JConnector;
+import org.apache.custos.clients.CustosClientProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Optional;
+
+public class Utils {
+    private static final Logger LOGGER = LoggerFactory.getLogger(Utils.class);
+
+    private static final Neo4JConnector neo4JConnector = new Neo4JConnector();
+    private static CustosClientProvider custosClientProvider = null;
+
+
+    public static Configuration loadConfiguration(String path) {
+        return Optional.ofNullable(path).
+                map(Utils::loadConfig)
+                .orElseThrow(() -> {
+                    String msg = "Configuration path cannot be null";
+                    LOGGER.error(msg);
+                    throw new RuntimeException(msg);
+                });
+    }
+
+    public static Configuration loadConfig(String filePath) {
+        try (InputStream in = new FileInputStream(filePath)) {
+            Yaml yaml = new Yaml();
+            return yaml.loadAs(in, Configuration.class);
+        } catch (Exception exception) {
+            LOGGER.error("Error loading config file", exception);
+        }
+        return null;
+    }
+
+    public static void initializeConnectors(Configuration configuration) {
+        neo4JConnector.init(configuration.getDataResourceManagementService().getDbURI(),
+                configuration.getDataResourceManagementService().getDbUser(),
+                configuration.getDataResourceManagementService().getDbPassword());
+        LOGGER.info(configuration.getCustos().getCustosId());
+        LOGGER.info(configuration.getCustos().getCustosSec());
+        custosClientProvider = new CustosClientProvider.Builder()
+                .setClientId(configuration.getCustos().getCustosId())
+                .setClientSec(configuration.getCustos().getCustosSec())
+                .setServerHost(configuration.getCustos().getHost())
+                .setServerPort(configuration.getCustos().getPort())
+                .build();
+    }
+
+    public static Neo4JConnector getNeo4JConnector() {
+        return neo4JConnector;
+    }
+
+    public static CustosClientProvider getCustosClientProvider() {
+        return custosClientProvider;
+    }
+
+}
diff --git a/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/datafetcher/CustosDataFetchingJob.java b/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/datafetcher/CustosDataFetchingJob.java
new file mode 100644
index 0000000..b6960b7
--- /dev/null
+++ b/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/datafetcher/CustosDataFetchingJob.java
@@ -0,0 +1,43 @@
+package org.apache.airavata.drms.custos.synchronizer.datafetcher;
+
+import org.apache.airavata.drms.custos.synchronizer.Configuration;
+import org.apache.airavata.drms.custos.synchronizer.handlers.SharingHandler;
+import org.apache.airavata.drms.custos.synchronizer.handlers.UserAndGroupHandler;
+import org.quartz.Job;
+import org.quartz.JobDataMap;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.airavata.drms.custos.synchronizer.Utils.loadConfiguration;
+
+/**
+ * Custos data fetching job
+ */
+
+public class CustosDataFetchingJob implements Job {
+    private static final Logger LOGGER = LoggerFactory.getLogger(CustosDataFetchingJob.class);
+
+
+    @Override
+    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
+        try {
+            LOGGER.debug("Executing CustosDataFetchingJob ....... ");
+            JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
+            String path = jobDataMap.getString("configurationPath");
+            Configuration configuration = loadConfiguration(path);
+            UserAndGroupHandler userAndGroupHandler = new UserAndGroupHandler();
+            userAndGroupHandler.mergeUserAndGroups(configuration);
+            SharingHandler sharingHandler = new SharingHandler();
+            sharingHandler.mergeSharings(configuration);
+        } catch (Exception ex) {
+            String msg = "Error occurred while executing job" + ex.getMessage();
+            LOGGER.error(msg, ex);
+        }
+
+
+    }
+
+
+}
diff --git a/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/handlers/SharingHandler.java b/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/handlers/SharingHandler.java
new file mode 100644
index 0000000..332aa8d
--- /dev/null
+++ b/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/handlers/SharingHandler.java
@@ -0,0 +1,140 @@
+package org.apache.airavata.drms.custos.synchronizer.handlers;
+
+import org.apache.airavata.drms.core.Neo4JConnector;
+import org.apache.airavata.drms.custos.synchronizer.Configuration;
+import org.apache.airavata.drms.custos.synchronizer.Utils;
+import org.apache.custos.clients.CustosClientProvider;
+import org.apache.custos.sharing.management.client.SharingManagementClient;
+import org.apache.custos.sharing.service.Entity;
+import org.apache.custos.sharing.service.GetAllDirectSharingsResponse;
+import org.apache.custos.sharing.service.SharingMetadata;
+import org.apache.custos.sharing.service.SharingRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SharingHandler {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(SharingHandler.class);
+
+    private final Neo4JConnector neo4JConnector;
+    private CustosClientProvider custosClientProvider;
+
+    public SharingHandler() {
+        this.neo4JConnector = Utils.getNeo4JConnector();
+        this.custosClientProvider = Utils.getCustosClientProvider();
+    }
+
+    public void mergeSharings(Configuration configuration) {
+        try {
+            LOGGER.debug("Merging sharings for custos client with id "+ configuration.getCustos().getCustosId());
+            SharingManagementClient sharingManagementClient = custosClientProvider.getSharingManagementClient();
+            mergeSharings(sharingManagementClient, configuration.getCustos().getTenantsToBeSynced());
+
+        } catch (Exception ex) {
+            String msg = "Exception occurred while merging user" + ex.getMessage();
+            LOGGER.error(msg, ex);
+        }
+
+    }
+
+
+    private void mergeSharings(SharingManagementClient sharingManagementClient, String[] clientIds) {
+        try {
+            SharingRequest sharingRequest = SharingRequest.newBuilder().build();
+            Arrays.stream(clientIds).forEach(clientId -> {
+                GetAllDirectSharingsResponse response = sharingManagementClient
+                        .getAllDirectSharings(clientId, sharingRequest);
+                List<SharingMetadata> metadataList = response.getSharedDataList();
+                metadataList.forEach(metadata -> {
+                    mergeEntities(metadata.getEntity(), clientId);
+
+                });
+                metadataList.forEach(metadata -> {
+                    mergeEntityParentChildRelationShips(sharingManagementClient, metadata.getEntity(), clientId);
+                    mergeEntitySharings(metadata, clientId);
+                });
+            });
+
+        } catch (Exception ex) {
+            String msg = "Error occurred while merging sharings from Custos ";
+            LOGGER.error(msg, ex);
+        }
+
+
+    }
+
+    private void mergeEntities(Entity entity, String clientId) {
+        String query = "Merge (u:" + entity.getType() + "{entityId: '" + entity.getId() + "',"
+                + "custosClientId:'" + clientId + "'}" + ")"
+                + " SET u = $props return u ";
+        Map<String, Object> map = new HashMap<>();
+        map.put("description", entity.getDescription());
+        map.put("name", entity.getName());
+        map.put("createdTime", entity.getCreatedAt());
+        map.put("custosClientId", clientId);
+        map.put("entityId", entity.getId());
+        Map<String, Object> parameters = new HashMap<>();
+        parameters.put("props", map);
+        try {
+            this.neo4JConnector.runTransactionalQuery(parameters, query);
+        } catch (Exception ex) {
+            String msg = "Error occurred while merging entities ";
+            LOGGER.error(msg, ex);
+        }
+
+    }
+
+    private void mergeEntityParentChildRelationShips(SharingManagementClient sharingManagementClient, Entity entity,
+                                                     String clientId) {
+
+        if (!entity.getParentId().trim().isEmpty()) {
+            Entity parentEntity = Entity.newBuilder().setId(entity.getParentId()).build();
+            Entity fullParentEntity = sharingManagementClient.getEntity(clientId, parentEntity);
+            String query = "MATCH (a:" + entity.getType() + "), (b:" + fullParentEntity.getType() + ") WHERE a.entityId = '"
+                    + entity.getId() + "' AND a.custosClientId = '"
+                    + clientId + "' AND " + "b.entityId ='" + fullParentEntity.getId() + "' AND b.custosClientId ='" + clientId +
+                    "' MERGE (a)-[r:CHILD_OF]->(b) RETURN a, b";
+            try {
+                this.neo4JConnector.runTransactionalQuery(query);
+            } catch (Exception ex) {
+                String msg = "Error occurred while merging parent child relationships ";
+                LOGGER.error(msg, ex);
+            }
+        }
+    }
+
+    private void mergeEntitySharings(SharingMetadata metadata, String clientId) {
+        Entity entity = metadata.getEntity();
+        String sourceId = metadata.getEntity().getId();
+        String permissionId = metadata.getPermission().getId();
+        String userId = metadata.getOwnerId();
+        String type = metadata.getOwnerType();
+        String query = null;
+        if (type.equalsIgnoreCase("USER")) {
+            query = "MATCH (a:" + entity.getType() + "), (b:User) WHERE a.entityId = '"
+                    + sourceId + "' AND a.custosClientId = '"
+                    + clientId + "' AND " + "b.username ='" + userId + "' AND b.custosClientId ='" + clientId +
+                    "' MERGE (a)-[r:SHARED_WITH]->(b) SET r.permission='" + permissionId + "' RETURN a, b";
+
+        } else if (type.equalsIgnoreCase("GROUP")) {
+            query = "MATCH (a:" + entity.getType() + "), (b:Group) WHERE a.entityId = '"
+                    + sourceId + "' AND a.custosClientId = '"
+                    + clientId + "' AND " + "b.groupId ='" + userId + "' AND b.custosClientId ='" + clientId +
+                    "' MERGE (a)-[r:SHARED_WITH]->(b) SET r.permission='" + permissionId + "' RETURN a, b";
+        }
+        if (query != null) {
+            try {
+                this.neo4JConnector.runTransactionalQuery(query);
+            } catch (Exception ex) {
+                String msg = "Error occurred while merging sharings ";
+                LOGGER.error(msg, ex);
+            }
+        }
+
+    }
+}
diff --git a/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/handlers/UserAndGroupHandler.java b/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/handlers/UserAndGroupHandler.java
new file mode 100644
index 0000000..625cfcb
--- /dev/null
+++ b/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/handlers/UserAndGroupHandler.java
@@ -0,0 +1,156 @@
+package org.apache.airavata.drms.custos.synchronizer.handlers;
+
+import org.apache.airavata.drms.core.Neo4JConnector;
+import org.apache.airavata.drms.custos.synchronizer.Configuration;
+import org.apache.airavata.drms.custos.synchronizer.Utils;
+import org.apache.custos.clients.CustosClientProvider;
+import org.apache.custos.group.management.client.GroupManagementClient;
+import org.apache.custos.user.management.client.UserManagementClient;
+import org.apache.custos.user.profile.service.GetAllGroupsResponse;
+import org.apache.custos.user.profile.service.GetAllUserProfilesResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class UserAndGroupHandler {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(UserAndGroupHandler.class);
+
+    private final Neo4JConnector neo4JConnector;
+    private CustosClientProvider custosClientProvider;
+
+    public UserAndGroupHandler() {
+        this.neo4JConnector = Utils.getNeo4JConnector();
+        this.custosClientProvider = Utils.getCustosClientProvider();
+    }
+
+    public void mergeUserAndGroups(Configuration configuration) {
+        try {
+            LOGGER.debug("Merging groups for custos client with id " + configuration.getCustos().getCustosId());
+            String[] clientIds = configuration.getCustos().getTenantsToBeSynced();
+            UserManagementClient userManagementClient = this.custosClientProvider.getUserManagementClient();
+            GroupManagementClient groupManagementClient = this.custosClientProvider.getGroupManagementClient();
+            mergeUsers(userManagementClient, clientIds);
+            mergeGroups(groupManagementClient, clientIds);
+            mergeUserAndGroupMemberships(groupManagementClient, userManagementClient, clientIds);
+        } catch (Exception ex) {
+            String msg = "Exception occurred while merging user" + ex.getMessage();
+            LOGGER.error(msg, ex);
+        }
+
+
+    }
+
+    private void mergeUsers(UserManagementClient userManagementClient, String[] clientIds) {
+        try {
+            Arrays.stream(clientIds).forEach(val -> {
+                GetAllUserProfilesResponse response = userManagementClient.getAllUserProfiles(val);
+                response.getProfilesList().forEach(userProfile -> {
+                    String query = "Merge (u:User{username: '" + userProfile.getUsername() + "',"
+                            + "custosClientId:'" + val + "'}" + ")"
+                            + " SET u = $props return u ";
+                    Map<String, Object> map = new HashMap<>();
+                    map.put("firstName", userProfile.getFirstName());
+                    map.put("name", userProfile.getUsername());
+                    map.put("lastName", userProfile.getLastName());
+                    map.put("email", userProfile.getEmail());
+                    map.put("username", userProfile.getUsername());
+                    map.put("custosClientId", val);
+                    Map<String, Object> parameters = new HashMap<>();
+                    parameters.put("props", map);
+                    this.neo4JConnector.runTransactionalQuery(parameters, query);
+                });
+
+            });
+
+
+        } catch (Exception ex) {
+            LOGGER.error("Error occurred while merging user ", ex);
+        }
+    }
+
+
+    private void mergeGroups(GroupManagementClient groupManagementClient, String[] clientIds) {
+        try {
+            Arrays.stream(clientIds).forEach(val -> {
+                GetAllGroupsResponse response = groupManagementClient.getAllGroups(val);
+                response.getGroupsList().forEach(gr -> {
+                    String id = gr.getId().replaceAll("'", "");
+                    String query = "Merge (u:Group{groupId: '" + id + "',"
+                            + "custosClientId:'" + val + "'}" + ")"
+                            + " SET u = $props return u ";
+                    Map<String, Object> map = new HashMap<>();
+                    map.put("description", gr.getDescription());
+                    map.put("name", gr.getName());
+                    map.put("groupId", id);
+                    map.put("createdTime", gr.getCreatedTime());
+                    map.put("lastModifiedTime", gr.getLastModifiedTime());
+                    map.put("custosClientId", val);
+                    Map<String, Object> parameters = new HashMap<>();
+                    parameters.put("props", map);
+                    try {
+                        this.neo4JConnector.runTransactionalQuery(parameters, query);
+                    } catch (Exception ex) {
+                        LOGGER.error("Error occurred while merging groups ", ex);
+                    }
+                });
+            });
+        } catch (Exception ex) {
+            LOGGER.error("Error occurred while merging groups ", ex);
+        }
+    }
+
+    private void mergeUserAndGroupMemberships(GroupManagementClient groupManagementClient, UserManagementClient userManagementClient,
+                                              String[] clientIds) {
+        try {
+            Arrays.stream(clientIds).forEach(val -> {
+                GetAllGroupsResponse response = groupManagementClient.getAllGroups(val);
+                response.getGroupsList().forEach(gr -> {
+                    String id = gr.getId().replaceAll("'", "");
+                    GetAllUserProfilesResponse userProfilesResponse = groupManagementClient.getAllChildUsers(val, gr.getId());
+                    userProfilesResponse.getProfilesList().forEach(prof -> {
+                        String memberShipType = prof.getMembershipType();
+                        String userId = prof.getUsername();
+                        mergeUserMemberShip(userId, id, val, memberShipType);
+                    });
+                    GetAllGroupsResponse getAllGroupsResponse = groupManagementClient.getAllChildGroups(val, gr.getId());
+                    getAllGroupsResponse.getGroupsList().forEach(grMem -> {
+                        String childId = gr.getId().replaceAll("'", "");
+                        mergeGroupMemberShip(id, childId, val);
+                    });
+                });
+            });
+        } catch (Exception ex) {
+            LOGGER.error("Error occurred while merging groups ", ex);
+        }
+    }
+
+    private void mergeUserMemberShip(String username, String groupId, String custosClientId, String role) {
+        String query = "MATCH (a:User), (b:Group) WHERE a.username = '" + username + "' AND a.custosClientId = '"
+                + custosClientId + "' AND " + "b.groupId ='" + groupId + "' AND b.custosClientId ='" + custosClientId +
+                "' MERGE (a)-[r:MEMBER_OF]->(b) SET r.role='" + role + "' RETURN a, b";
+        try {
+            this.neo4JConnector.runTransactionalQuery(query);
+        } catch (Exception ex) {
+            LOGGER.error("Error occurred while merging UserGroupMembership ", ex);
+        }
+
+    }
+
+    private void mergeGroupMemberShip(String parentGroupId, String childGroupId, String custosClientId) {
+        String query = "MATCH (a:Group), (b:Group) WHERE a.groupId = '" + parentGroupId + "' AND a.custosClientId = '"
+                + custosClientId + "' AND " + "b.groupId ='" + childGroupId + "' AND b.custosClientId ='"
+                + custosClientId + "' MERGE (a)<-[r:CHILD_OF]-(b)  RETURN a, b";
+        try {
+            this.neo4JConnector.runTransactionalQuery(query);
+        } catch (Exception ex) {
+            LOGGER.error("Error occurred while merging Group memberships ", ex);
+        }
+
+    }
+
+}
diff --git a/data-resource-management-service/drms-custos-synchronizer/src/main/resources/config.yml b/data-resource-management-service/drms-custos-synchronizer/src/main/resources/config.yml
new file mode 100644
index 0000000..f3d467f
--- /dev/null
+++ b/data-resource-management-service/drms-custos-synchronizer/src/main/resources/config.yml
@@ -0,0 +1,12 @@
+pollingInterval: 60
+dataResourceManagementService:
+  dbURI: "bolt://149.165.156.173:7687"
+  dbUser: "neo4j"
+  dbPassword: "blastcovid19"
+custos:
+  host: "custos.scigap.org"
+  port: 31499
+  custosId: "custos-2zuomcugra3ebgsqtzmf-10000514"
+  custosSec: "mupUhF4JL0S3IFHBjfhiTfLJS1NgSWfvkCj3l6c7"
+  tenantsToBeSynced:
+    -  "custos-cmcdclbywlxmc2ktzv0d-10000702"
diff --git a/data-resource-management-service/drms-custos-synchronizer/src/main/resources/logback.xml b/data-resource-management-service/drms-custos-synchronizer/src/main/resources/logback.xml
new file mode 100644
index 0000000..270e6f7
--- /dev/null
+++ b/data-resource-management-service/drms-custos-synchronizer/src/main/resources/logback.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<configuration>
+
+    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d [%t] %-5p %c{30} %m [%X]%n</pattern>
+        </encoder>
+    </appender>
+
+    <appender name="LOGFILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <File>../logs/custos_synchronizer.log</File>
+        <Append>true</Append>
+        <encoder>
+            <pattern>%d [%t] %-5p %c{30} %m [%X]%n</pattern>
+        </encoder>
+        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+            <fileNamePattern>../logs/custos_synchronizer.log.%d{yyyy-MM-dd}</fileNamePattern>
+            <maxHistory>30</maxHistory>
+            <totalSizeCap>1GB</totalSizeCap>
+        </rollingPolicy>
+    </appender>
+
+    <logger name="ch.qos.logback" level="WARN"/>
+    <logger name="org.apache.airavata" level="INFO"/>
+    <root level="INFO">
+        <appender-ref ref="CONSOLE"/>
+        <appender-ref ref="LOGFILE"/>
+    </root>
+</configuration>
\ No newline at end of file
diff --git a/data-resource-management-service/pom.xml b/data-resource-management-service/pom.xml
index d1c7685..f8eb041 100644
--- a/data-resource-management-service/pom.xml
+++ b/data-resource-management-service/pom.xml
@@ -31,6 +31,7 @@
         <module>drms-stubs</module>
         <module>drms-core</module>
         <module>drms-api</module>
+        <module>drms-custos-synchronizer</module>
     </modules>
 
     <dependencies>