You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datalab.apache.org by of...@apache.org on 2020/11/24 16:06:12 UTC

[incubator-datalab] branch develop updated: [DATALAB-2152] -- added cluster instances to scheduler status sync (#989)

This is an automated email from the ASF dual-hosted git repository.

ofuks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/incubator-datalab.git


The following commit(s) were added to refs/heads/develop by this push:
     new 5cea7a1  [DATALAB-2152] -- added cluster instances to scheduler status sync (#989)
5cea7a1 is described below

commit 5cea7a15892f375ad51e4e7ee1092467a9a48c83
Author: Kinash Yurii <ur...@gmail.com>
AuthorDate: Tue Nov 24 18:02:57 2020 +0200

    [DATALAB-2152] -- added cluster instances to scheduler status sync (#989)
    
    [DATALAB-2152] -- added cluster instances to scheduler status sync
---
 .../com/epam/datalab/dto/base/DataEngineType.java  |   5 +-
 .../handlers/ResourcesStatusCallbackHandler.java   |   2 +-
 .../CheckInfrastructureStatusScheduler.java        | 207 ++++++++++++---------
 3 files changed, 118 insertions(+), 96 deletions(-)

diff --git a/services/datalab-model/src/main/java/com/epam/datalab/dto/base/DataEngineType.java b/services/datalab-model/src/main/java/com/epam/datalab/dto/base/DataEngineType.java
index adeb5dc..14b13eb 100644
--- a/services/datalab-model/src/main/java/com/epam/datalab/dto/base/DataEngineType.java
+++ b/services/datalab-model/src/main/java/com/epam/datalab/dto/base/DataEngineType.java
@@ -25,7 +25,8 @@ import java.util.HashMap;
 import java.util.Map;
 
 public enum DataEngineType {
-    CLOUD_SERVICE("dataengine-service"), SPARK_STANDALONE("dataengine");
+    CLOUD_SERVICE("dataengine-service"),
+    SPARK_STANDALONE("dataengine");
 
     private static final String DOCKER_IMAGE_PREFIX = "docker.datalab-";
 
@@ -37,7 +38,7 @@ public enum DataEngineType {
         }
     }
 
-    private String name;
+    private final String name;
 
     DataEngineType(String name) {
         this.name = name;
diff --git a/services/provisioning-service/src/main/java/com/epam/datalab/backendapi/core/response/handlers/ResourcesStatusCallbackHandler.java b/services/provisioning-service/src/main/java/com/epam/datalab/backendapi/core/response/handlers/ResourcesStatusCallbackHandler.java
index 347d650..04fe951 100644
--- a/services/provisioning-service/src/main/java/com/epam/datalab/backendapi/core/response/handlers/ResourcesStatusCallbackHandler.java
+++ b/services/provisioning-service/src/main/java/com/epam/datalab/backendapi/core/response/handlers/ResourcesStatusCallbackHandler.java
@@ -47,7 +47,7 @@ import static com.epam.datalab.rest.contracts.ApiCallbacks.STATUS_URI;
 @Slf4j
 public class ResourcesStatusCallbackHandler extends ResourceCallbackHandler<EnvStatusDTO> {
 
-    private Map<String, EnvResource> datalabHostResources;
+    private final Map<String, EnvResource> datalabHostResources;
 
     @JsonCreator
     public ResourcesStatusCallbackHandler(@JacksonInject RESTService selfService, @JsonProperty("action") DockerAction action,
diff --git a/services/self-service/src/main/java/com/epam/datalab/backendapi/schedulers/CheckInfrastructureStatusScheduler.java b/services/self-service/src/main/java/com/epam/datalab/backendapi/schedulers/CheckInfrastructureStatusScheduler.java
index 3bd194c..c081ab2 100644
--- a/services/self-service/src/main/java/com/epam/datalab/backendapi/schedulers/CheckInfrastructureStatusScheduler.java
+++ b/services/self-service/src/main/java/com/epam/datalab/backendapi/schedulers/CheckInfrastructureStatusScheduler.java
@@ -40,7 +40,6 @@ import org.quartz.JobExecutionContext;
 
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -50,100 +49,122 @@ import java.util.stream.Stream;
 @Scheduled("checkInfrastructureStatusScheduler")
 public class CheckInfrastructureStatusScheduler implements Job {
 
-	private static final List<UserInstanceStatus> statusesToCheck = Arrays.asList(UserInstanceStatus.RUNNING, UserInstanceStatus.STOPPED);
-
-	private final InfrastructureInfoService infrastructureInfoService;
-	private final SecurityService securityService;
-	private final EndpointService endpointService;
-	private final ExploratoryDAO exploratoryDAO;
-	private final ProjectService projectService;
-
-	@Inject
-	public CheckInfrastructureStatusScheduler(InfrastructureInfoService infrastructureInfoService, SecurityService securityService,
-	                                          EndpointService endpointService, ExploratoryDAO exploratoryDAO, ProjectService projectService) {
-		this.infrastructureInfoService = infrastructureInfoService;
-		this.securityService = securityService;
-		this.endpointService = endpointService;
-		this.exploratoryDAO = exploratoryDAO;
-		this.projectService = projectService;
-	}
-
-	@Override
-	public void execute(JobExecutionContext context) {
-		UserInfo serviceUser = securityService.getServiceAccountInfo("admin");
-
-		List<String> activeEndpoints = endpointService.getEndpointsWithStatus(EndpointDTO.EndpointStatus.ACTIVE)
-				.stream()
-				.map(EndpointDTO::getName)
-				.collect(Collectors.toList());
-
-		List<UserInstanceDTO> userInstanceDTOS = exploratoryDAO.fetchExploratoriesByEndpointWhereStatusIn(activeEndpoints, statusesToCheck, Boolean.TRUE);
-
-		Map<String, List<EnvResource>> exploratoryAndSparkInstances = userInstanceDTOS
-				.stream()
-				.map(this::getExploratoryAndSparkInstances)
+    private static final List<UserInstanceStatus> statusesToCheck = Arrays.asList(UserInstanceStatus.RUNNING, UserInstanceStatus.STOPPED);
+
+    private final InfrastructureInfoService infrastructureInfoService;
+    private final SecurityService securityService;
+    private final EndpointService endpointService;
+    private final ExploratoryDAO exploratoryDAO;
+    private final ProjectService projectService;
+
+    @Inject
+    public CheckInfrastructureStatusScheduler(InfrastructureInfoService infrastructureInfoService, SecurityService securityService,
+                                              EndpointService endpointService, ExploratoryDAO exploratoryDAO, ProjectService projectService) {
+        this.infrastructureInfoService = infrastructureInfoService;
+        this.securityService = securityService;
+        this.endpointService = endpointService;
+        this.exploratoryDAO = exploratoryDAO;
+        this.projectService = projectService;
+    }
+
+    @Override
+    public void execute(JobExecutionContext context) {
+        UserInfo serviceUser = securityService.getServiceAccountInfo("admin");
+
+        List<String> activeEndpoints = endpointService.getEndpointsWithStatus(EndpointDTO.EndpointStatus.ACTIVE)
+                .stream()
+                .map(EndpointDTO::getName)
+                .collect(Collectors.toList());
+
+        List<UserInstanceDTO> userInstanceDTOS = exploratoryDAO.fetchExploratoriesByEndpointWhereStatusIn(activeEndpoints, statusesToCheck, Boolean.TRUE);
+
+        Map<String, List<EnvResource>> exploratoryAndSparkInstances = userInstanceDTOS
+                .stream()
+                .map(this::getExploratoryAndSparkInstances)
 				.flatMap(Collection::stream)
 				.collect(Collectors.groupingBy(EnvResource::getEndpoint));
 
-		activeEndpoints.forEach(e -> {
-					List<EnvResource> hostInstances = Stream.of(getEdgeInstances(e), exploratoryAndSparkInstances.get(e))
-							.flatMap(Collection::stream)
-							.collect(Collectors.toList());
-					infrastructureInfoService.updateInfrastructureStatuses(serviceUser, e, hostInstances, Collections.emptyList());
-				}
-		);
-	}
-
-	private List<EnvResource> getExploratoryAndSparkInstances(UserInstanceDTO userInstanceDTO) {
-		List<EnvResource> instances = userInstanceDTO.getResources()
-				.stream()
-				.filter(c -> DataEngineType.SPARK_STANDALONE == DataEngineType.fromDockerImageName(c.getImageName()))
-				.filter(c -> statusesToCheck.contains(UserInstanceStatus.of(c.getStatus())))
-				.map(r -> new EnvResource()
-						.withId(r.getInstanceId())
-						.withName(r.getComputationalName())
-						.withProject(userInstanceDTO.getProject())
-						.withEndpoint(userInstanceDTO.getEndpoint())
-						.withStatus(r.getStatus())
-						.withResourceType(ResourceType.COMPUTATIONAL))
-				.collect(Collectors.toList());
-
-		instances.add(new EnvResource()
-				.withId(userInstanceDTO.getInstanceId())
-				.withName(userInstanceDTO.getExploratoryName())
-				.withProject(userInstanceDTO.getProject())
-				.withEndpoint(userInstanceDTO.getEndpoint())
-				.withStatus(userInstanceDTO.getStatus())
-				.withResourceType(ResourceType.EXPLORATORY));
-
-		return instances;
-	}
-
-	private List<EnvResource> getEdgeInstances(String endpoint) {
-		return projectService.getProjectsByEndpoint(endpoint)
-				.stream()
-				.collect(Collectors.toMap(ProjectDTO::getName, ProjectDTO::getEndpoints))
-				.entrySet()
-				.stream()
-				.map(entry -> getEdgeInstances(endpoint, entry))
+        Map<String, List<EnvResource>> clusterInstances = userInstanceDTOS
+                .stream()
+                .map(this::getCloudInstances)
 				.flatMap(Collection::stream)
-				.collect(Collectors.toList());
-	}
-
-	private List<EnvResource> getEdgeInstances(String endpoint, Map.Entry<String, List<ProjectEndpointDTO>> entry) {
-		return entry.getValue()
-				.stream()
-				.filter(e -> statusesToCheck.contains(e.getStatus()))
-				.filter(e -> e.getName().equals(endpoint))
-				.filter(e -> Objects.nonNull(e.getEdgeInfo()))
-				.map(e -> new EnvResource()
-						.withId(e.getEdgeInfo().getInstanceId())
-						.withName(e.getName())
-						.withProject(entry.getKey())
-						.withEndpoint(endpoint)
-						.withStatus(e.getStatus().toString())
-						.withResourceType(ResourceType.EDGE)
-				)
-				.collect(Collectors.toList());
-	}
+				.collect(Collectors.groupingBy(EnvResource::getEndpoint));
+
+        activeEndpoints.forEach(e -> {
+                    List<EnvResource> hostInstances = Stream.of(getEdgeInstances(e), exploratoryAndSparkInstances.get(e))
+                            .flatMap(Collection::stream)
+                            .collect(Collectors.toList());
+
+                    infrastructureInfoService.updateInfrastructureStatuses(serviceUser, e, hostInstances, clusterInstances.get(e));
+                }
+        );
+    }
+
+    private List<EnvResource> getExploratoryAndSparkInstances(UserInstanceDTO userInstanceDTO) {
+        List<EnvResource> instances = userInstanceDTO.getResources()
+                .stream()
+                .filter(c -> DataEngineType.SPARK_STANDALONE == DataEngineType.fromDockerImageName(c.getImageName()))
+                .filter(c -> statusesToCheck.contains(UserInstanceStatus.of(c.getStatus())))
+                .map(r -> new EnvResource()
+                        .withId(r.getInstanceId())
+                        .withName(r.getComputationalName())
+                        .withProject(userInstanceDTO.getProject())
+                        .withEndpoint(userInstanceDTO.getEndpoint())
+                        .withStatus(r.getStatus())
+                        .withResourceType(ResourceType.COMPUTATIONAL))
+                .collect(Collectors.toList());
+
+        instances.add(new EnvResource()
+                .withId(userInstanceDTO.getInstanceId())
+                .withName(userInstanceDTO.getExploratoryName())
+                .withProject(userInstanceDTO.getProject())
+                .withEndpoint(userInstanceDTO.getEndpoint())
+                .withStatus(userInstanceDTO.getStatus())
+                .withResourceType(ResourceType.EXPLORATORY));
+
+        return instances;
+    }
+
+    private List<EnvResource> getCloudInstances(UserInstanceDTO userInstanceDTO) {
+        return userInstanceDTO.getResources().stream()
+                .filter(c -> DataEngineType.CLOUD_SERVICE == DataEngineType.fromDockerImageName(c.getImageName()))
+                .filter(c -> statusesToCheck.contains(UserInstanceStatus.of(c.getStatus())))
+                .map(r -> new EnvResource()
+                        .withId(r.getInstanceId())
+                        .withName(r.getComputationalName())
+                        .withProject(userInstanceDTO.getProject())
+                        .withEndpoint(userInstanceDTO.getEndpoint())
+                        .withStatus(r.getStatus())
+                        .withResourceType(ResourceType.COMPUTATIONAL))
+                .collect(Collectors.toList());
+    }
+
+
+    private List<EnvResource> getEdgeInstances(String endpoint) {
+        return projectService.getProjectsByEndpoint(endpoint)
+                .stream()
+                .collect(Collectors.toMap(ProjectDTO::getName, ProjectDTO::getEndpoints))
+                .entrySet()
+                .stream()
+                .map(entry -> getEdgeInstances(endpoint, entry))
+                .flatMap(Collection::stream)
+                .collect(Collectors.toList());
+    }
+
+    private List<EnvResource> getEdgeInstances(String endpoint, Map.Entry<String, List<ProjectEndpointDTO>> entry) {
+        return entry.getValue()
+                .stream()
+                .filter(e -> statusesToCheck.contains(e.getStatus()))
+                .filter(e -> e.getName().equals(endpoint))
+                .filter(e -> Objects.nonNull(e.getEdgeInfo()))
+                .map(e -> new EnvResource()
+                        .withId(e.getEdgeInfo().getInstanceId())
+                        .withName(e.getName())
+                        .withProject(entry.getKey())
+                        .withEndpoint(endpoint)
+                        .withStatus(e.getStatus().toString())
+                        .withResourceType(ResourceType.EDGE)
+                )
+                .collect(Collectors.toList());
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datalab.apache.org
For additional commands, e-mail: commits-help@datalab.apache.org