You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datalab.apache.org by of...@apache.org on 2020/11/24 16:06:12 UTC
[incubator-datalab] branch develop updated: [DATALAB-2152] -- added
cluster instances to scheduler status sync (#989)
This is an automated email from the ASF dual-hosted git repository.
ofuks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/incubator-datalab.git
The following commit(s) were added to refs/heads/develop by this push:
new 5cea7a1 [DATALAB-2152] -- added cluster instances to scheduler status sync (#989)
5cea7a1 is described below
commit 5cea7a15892f375ad51e4e7ee1092467a9a48c83
Author: Kinash Yurii <ur...@gmail.com>
AuthorDate: Tue Nov 24 18:02:57 2020 +0200
[DATALAB-2152] -- added cluster instances to scheduler status sync (#989)
[DATALAB-2152] -- added cluster instances to scheduler status sync
---
.../com/epam/datalab/dto/base/DataEngineType.java | 5 +-
.../handlers/ResourcesStatusCallbackHandler.java | 2 +-
.../CheckInfrastructureStatusScheduler.java | 207 ++++++++++++---------
3 files changed, 118 insertions(+), 96 deletions(-)
diff --git a/services/datalab-model/src/main/java/com/epam/datalab/dto/base/DataEngineType.java b/services/datalab-model/src/main/java/com/epam/datalab/dto/base/DataEngineType.java
index adeb5dc..14b13eb 100644
--- a/services/datalab-model/src/main/java/com/epam/datalab/dto/base/DataEngineType.java
+++ b/services/datalab-model/src/main/java/com/epam/datalab/dto/base/DataEngineType.java
@@ -25,7 +25,8 @@ import java.util.HashMap;
import java.util.Map;
public enum DataEngineType {
- CLOUD_SERVICE("dataengine-service"), SPARK_STANDALONE("dataengine");
+ CLOUD_SERVICE("dataengine-service"),
+ SPARK_STANDALONE("dataengine");
private static final String DOCKER_IMAGE_PREFIX = "docker.datalab-";
@@ -37,7 +38,7 @@ public enum DataEngineType {
}
}
- private String name;
+ private final String name;
DataEngineType(String name) {
this.name = name;
diff --git a/services/provisioning-service/src/main/java/com/epam/datalab/backendapi/core/response/handlers/ResourcesStatusCallbackHandler.java b/services/provisioning-service/src/main/java/com/epam/datalab/backendapi/core/response/handlers/ResourcesStatusCallbackHandler.java
index 347d650..04fe951 100644
--- a/services/provisioning-service/src/main/java/com/epam/datalab/backendapi/core/response/handlers/ResourcesStatusCallbackHandler.java
+++ b/services/provisioning-service/src/main/java/com/epam/datalab/backendapi/core/response/handlers/ResourcesStatusCallbackHandler.java
@@ -47,7 +47,7 @@ import static com.epam.datalab.rest.contracts.ApiCallbacks.STATUS_URI;
@Slf4j
public class ResourcesStatusCallbackHandler extends ResourceCallbackHandler<EnvStatusDTO> {
- private Map<String, EnvResource> datalabHostResources;
+ private final Map<String, EnvResource> datalabHostResources;
@JsonCreator
public ResourcesStatusCallbackHandler(@JacksonInject RESTService selfService, @JsonProperty("action") DockerAction action,
diff --git a/services/self-service/src/main/java/com/epam/datalab/backendapi/schedulers/CheckInfrastructureStatusScheduler.java b/services/self-service/src/main/java/com/epam/datalab/backendapi/schedulers/CheckInfrastructureStatusScheduler.java
index 3bd194c..c081ab2 100644
--- a/services/self-service/src/main/java/com/epam/datalab/backendapi/schedulers/CheckInfrastructureStatusScheduler.java
+++ b/services/self-service/src/main/java/com/epam/datalab/backendapi/schedulers/CheckInfrastructureStatusScheduler.java
@@ -40,7 +40,6 @@ import org.quartz.JobExecutionContext;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -50,100 +49,122 @@ import java.util.stream.Stream;
@Scheduled("checkInfrastructureStatusScheduler")
public class CheckInfrastructureStatusScheduler implements Job {
- private static final List<UserInstanceStatus> statusesToCheck = Arrays.asList(UserInstanceStatus.RUNNING, UserInstanceStatus.STOPPED);
-
- private final InfrastructureInfoService infrastructureInfoService;
- private final SecurityService securityService;
- private final EndpointService endpointService;
- private final ExploratoryDAO exploratoryDAO;
- private final ProjectService projectService;
-
- @Inject
- public CheckInfrastructureStatusScheduler(InfrastructureInfoService infrastructureInfoService, SecurityService securityService,
- EndpointService endpointService, ExploratoryDAO exploratoryDAO, ProjectService projectService) {
- this.infrastructureInfoService = infrastructureInfoService;
- this.securityService = securityService;
- this.endpointService = endpointService;
- this.exploratoryDAO = exploratoryDAO;
- this.projectService = projectService;
- }
-
- @Override
- public void execute(JobExecutionContext context) {
- UserInfo serviceUser = securityService.getServiceAccountInfo("admin");
-
- List<String> activeEndpoints = endpointService.getEndpointsWithStatus(EndpointDTO.EndpointStatus.ACTIVE)
- .stream()
- .map(EndpointDTO::getName)
- .collect(Collectors.toList());
-
- List<UserInstanceDTO> userInstanceDTOS = exploratoryDAO.fetchExploratoriesByEndpointWhereStatusIn(activeEndpoints, statusesToCheck, Boolean.TRUE);
-
- Map<String, List<EnvResource>> exploratoryAndSparkInstances = userInstanceDTOS
- .stream()
- .map(this::getExploratoryAndSparkInstances)
+ private static final List<UserInstanceStatus> statusesToCheck = Arrays.asList(UserInstanceStatus.RUNNING, UserInstanceStatus.STOPPED);
+
+ private final InfrastructureInfoService infrastructureInfoService;
+ private final SecurityService securityService;
+ private final EndpointService endpointService;
+ private final ExploratoryDAO exploratoryDAO;
+ private final ProjectService projectService;
+
+ @Inject
+ public CheckInfrastructureStatusScheduler(InfrastructureInfoService infrastructureInfoService, SecurityService securityService,
+ EndpointService endpointService, ExploratoryDAO exploratoryDAO, ProjectService projectService) {
+ this.infrastructureInfoService = infrastructureInfoService;
+ this.securityService = securityService;
+ this.endpointService = endpointService;
+ this.exploratoryDAO = exploratoryDAO;
+ this.projectService = projectService;
+ }
+
+ @Override
+ public void execute(JobExecutionContext context) {
+ UserInfo serviceUser = securityService.getServiceAccountInfo("admin");
+
+ List<String> activeEndpoints = endpointService.getEndpointsWithStatus(EndpointDTO.EndpointStatus.ACTIVE)
+ .stream()
+ .map(EndpointDTO::getName)
+ .collect(Collectors.toList());
+
+ List<UserInstanceDTO> userInstanceDTOS = exploratoryDAO.fetchExploratoriesByEndpointWhereStatusIn(activeEndpoints, statusesToCheck, Boolean.TRUE);
+
+ Map<String, List<EnvResource>> exploratoryAndSparkInstances = userInstanceDTOS
+ .stream()
+ .map(this::getExploratoryAndSparkInstances)
.flatMap(Collection::stream)
.collect(Collectors.groupingBy(EnvResource::getEndpoint));
- activeEndpoints.forEach(e -> {
- List<EnvResource> hostInstances = Stream.of(getEdgeInstances(e), exploratoryAndSparkInstances.get(e))
- .flatMap(Collection::stream)
- .collect(Collectors.toList());
- infrastructureInfoService.updateInfrastructureStatuses(serviceUser, e, hostInstances, Collections.emptyList());
- }
- );
- }
-
- private List<EnvResource> getExploratoryAndSparkInstances(UserInstanceDTO userInstanceDTO) {
- List<EnvResource> instances = userInstanceDTO.getResources()
- .stream()
- .filter(c -> DataEngineType.SPARK_STANDALONE == DataEngineType.fromDockerImageName(c.getImageName()))
- .filter(c -> statusesToCheck.contains(UserInstanceStatus.of(c.getStatus())))
- .map(r -> new EnvResource()
- .withId(r.getInstanceId())
- .withName(r.getComputationalName())
- .withProject(userInstanceDTO.getProject())
- .withEndpoint(userInstanceDTO.getEndpoint())
- .withStatus(r.getStatus())
- .withResourceType(ResourceType.COMPUTATIONAL))
- .collect(Collectors.toList());
-
- instances.add(new EnvResource()
- .withId(userInstanceDTO.getInstanceId())
- .withName(userInstanceDTO.getExploratoryName())
- .withProject(userInstanceDTO.getProject())
- .withEndpoint(userInstanceDTO.getEndpoint())
- .withStatus(userInstanceDTO.getStatus())
- .withResourceType(ResourceType.EXPLORATORY));
-
- return instances;
- }
-
- private List<EnvResource> getEdgeInstances(String endpoint) {
- return projectService.getProjectsByEndpoint(endpoint)
- .stream()
- .collect(Collectors.toMap(ProjectDTO::getName, ProjectDTO::getEndpoints))
- .entrySet()
- .stream()
- .map(entry -> getEdgeInstances(endpoint, entry))
+ Map<String, List<EnvResource>> clusterInstances = userInstanceDTOS
+ .stream()
+ .map(this::getCloudInstances)
.flatMap(Collection::stream)
- .collect(Collectors.toList());
- }
-
- private List<EnvResource> getEdgeInstances(String endpoint, Map.Entry<String, List<ProjectEndpointDTO>> entry) {
- return entry.getValue()
- .stream()
- .filter(e -> statusesToCheck.contains(e.getStatus()))
- .filter(e -> e.getName().equals(endpoint))
- .filter(e -> Objects.nonNull(e.getEdgeInfo()))
- .map(e -> new EnvResource()
- .withId(e.getEdgeInfo().getInstanceId())
- .withName(e.getName())
- .withProject(entry.getKey())
- .withEndpoint(endpoint)
- .withStatus(e.getStatus().toString())
- .withResourceType(ResourceType.EDGE)
- )
- .collect(Collectors.toList());
- }
+ .collect(Collectors.groupingBy(EnvResource::getEndpoint));
+
+ activeEndpoints.forEach(e -> {
+ List<EnvResource> hostInstances = Stream.of(getEdgeInstances(e), exploratoryAndSparkInstances.get(e))
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+
+ infrastructureInfoService.updateInfrastructureStatuses(serviceUser, e, hostInstances, clusterInstances.get(e));
+ }
+ );
+ }
+
+ private List<EnvResource> getExploratoryAndSparkInstances(UserInstanceDTO userInstanceDTO) {
+ List<EnvResource> instances = userInstanceDTO.getResources()
+ .stream()
+ .filter(c -> DataEngineType.SPARK_STANDALONE == DataEngineType.fromDockerImageName(c.getImageName()))
+ .filter(c -> statusesToCheck.contains(UserInstanceStatus.of(c.getStatus())))
+ .map(r -> new EnvResource()
+ .withId(r.getInstanceId())
+ .withName(r.getComputationalName())
+ .withProject(userInstanceDTO.getProject())
+ .withEndpoint(userInstanceDTO.getEndpoint())
+ .withStatus(r.getStatus())
+ .withResourceType(ResourceType.COMPUTATIONAL))
+ .collect(Collectors.toList());
+
+ instances.add(new EnvResource()
+ .withId(userInstanceDTO.getInstanceId())
+ .withName(userInstanceDTO.getExploratoryName())
+ .withProject(userInstanceDTO.getProject())
+ .withEndpoint(userInstanceDTO.getEndpoint())
+ .withStatus(userInstanceDTO.getStatus())
+ .withResourceType(ResourceType.EXPLORATORY));
+
+ return instances;
+ }
+
+ private List<EnvResource> getCloudInstances(UserInstanceDTO userInstanceDTO) {
+ return userInstanceDTO.getResources().stream()
+ .filter(c -> DataEngineType.CLOUD_SERVICE == DataEngineType.fromDockerImageName(c.getImageName()))
+ .filter(c -> statusesToCheck.contains(UserInstanceStatus.of(c.getStatus())))
+ .map(r -> new EnvResource()
+ .withId(r.getInstanceId())
+ .withName(r.getComputationalName())
+ .withProject(userInstanceDTO.getProject())
+ .withEndpoint(userInstanceDTO.getEndpoint())
+ .withStatus(r.getStatus())
+ .withResourceType(ResourceType.COMPUTATIONAL))
+ .collect(Collectors.toList());
+ }
+
+
+ private List<EnvResource> getEdgeInstances(String endpoint) {
+ return projectService.getProjectsByEndpoint(endpoint)
+ .stream()
+ .collect(Collectors.toMap(ProjectDTO::getName, ProjectDTO::getEndpoints))
+ .entrySet()
+ .stream()
+ .map(entry -> getEdgeInstances(endpoint, entry))
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+ }
+
+ private List<EnvResource> getEdgeInstances(String endpoint, Map.Entry<String, List<ProjectEndpointDTO>> entry) {
+ return entry.getValue()
+ .stream()
+ .filter(e -> statusesToCheck.contains(e.getStatus()))
+ .filter(e -> e.getName().equals(endpoint))
+ .filter(e -> Objects.nonNull(e.getEdgeInfo()))
+ .map(e -> new EnvResource()
+ .withId(e.getEdgeInfo().getInstanceId())
+ .withName(e.getName())
+ .withProject(entry.getKey())
+ .withEndpoint(endpoint)
+ .withStatus(e.getStatus().toString())
+ .withResourceType(ResourceType.EDGE)
+ )
+ .collect(Collectors.toList());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datalab.apache.org
For additional commands, e-mail: commits-help@datalab.apache.org