You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dlab.apache.org by bh...@apache.org on 2019/04/11 09:11:56 UTC
[incubator-dlab] branch inactivity_integration updated: DLAB-000
fixed bug connected with inactivity scheduler
This is an automated email from the ASF dual-hosted git repository.
bhliva pushed a commit to branch inactivity_integration
in repository https://gitbox.apache.org/repos/asf/incubator-dlab.git
The following commit(s) were added to refs/heads/inactivity_integration by this push:
new f738195 DLAB-000 fixed bug connected with inactivity scheduler
f738195 is described below
commit f7381958420713062f350e4af6e6cb045edcc620
Author: bhliva <bo...@epam.com>
AuthorDate: Thu Apr 11 12:11:47 2019 +0300
DLAB-000 fixed bug connected with inactivity scheduler
---
.../java/com/epam/dlab/dto/SchedulerJobDTO.java | 5 +
.../service/impl/SchedulerJobServiceImpl.java | 781 +++++++++++----------
2 files changed, 396 insertions(+), 390 deletions(-)
diff --git a/services/dlab-model/src/main/java/com/epam/dlab/dto/SchedulerJobDTO.java b/services/dlab-model/src/main/java/com/epam/dlab/dto/SchedulerJobDTO.java
index b5db38c..8e15a59 100644
--- a/services/dlab-model/src/main/java/com/epam/dlab/dto/SchedulerJobDTO.java
+++ b/services/dlab-model/src/main/java/com/epam/dlab/dto/SchedulerJobDTO.java
@@ -27,6 +27,7 @@ import lombok.Data;
import java.time.*;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
/**
* Stores info about a scheduler job (general duration, days to repeat, time to start and finish).
@@ -74,4 +75,8 @@ public class SchedulerJobDTO {
@JsonProperty("consider_inactivity")
private boolean considerInactivity = true;
+ public boolean inactivityScheduler() {
+ return Objects.nonNull(maxInactivity);
+ }
+
}
\ No newline at end of file
diff --git a/services/self-service/src/main/java/com/epam/dlab/backendapi/service/impl/SchedulerJobServiceImpl.java b/services/self-service/src/main/java/com/epam/dlab/backendapi/service/impl/SchedulerJobServiceImpl.java
index d31eb39..940b716 100644
--- a/services/self-service/src/main/java/com/epam/dlab/backendapi/service/impl/SchedulerJobServiceImpl.java
+++ b/services/self-service/src/main/java/com/epam/dlab/backendapi/service/impl/SchedulerJobServiceImpl.java
@@ -60,396 +60,397 @@ import static java.util.Date.from;
@Singleton
public class SchedulerJobServiceImpl implements SchedulerJobService {
- private static final String SCHEDULER_NOT_FOUND_MSG =
- "Scheduler job data not found for user %s with exploratory %s";
- private static final long ALLOWED_INACTIVITY_MINUTES = 1L;
-
- @Inject
- private SchedulerJobDAO schedulerJobDAO;
-
- @Inject
- private ExploratoryDAO exploratoryDAO;
-
- @Inject
- private ComputationalDAO computationalDAO;
-
- @Inject
- private ExploratoryService exploratoryService;
-
- @Inject
- private ComputationalService computationalService;
-
- @Inject
- private SystemUserInfoService systemUserService;
-
- @Override
- public SchedulerJobDTO fetchSchedulerJobForUserAndExploratory(String user, String exploratoryName) {
- return schedulerJobDAO.fetchSingleSchedulerJobByUserAndExploratory(user, exploratoryName)
- .orElseThrow(() -> new ResourceNotFoundException(String.format(SCHEDULER_NOT_FOUND_MSG, user,
- exploratoryName)));
- }
-
- @Override
- public SchedulerJobDTO fetchSchedulerJobForComputationalResource(String user, String exploratoryName,
- String computationalName) {
- return schedulerJobDAO.fetchSingleSchedulerJobForCluster(user, exploratoryName, computationalName)
- .orElseThrow(() -> new ResourceNotFoundException(String.format(SCHEDULER_NOT_FOUND_MSG, user,
- exploratoryName) + " with computational resource " + computationalName));
- }
-
- @Override
- public void updateExploratorySchedulerData(String user, String exploratoryName, SchedulerJobDTO dto) {
- validateExploratoryStatus(user, exploratoryName);
- populateDefaultSchedulerValues(dto);
- log.debug("Updating exploratory {} for user {} with new scheduler job data: {}...", exploratoryName, user,
- dto);
- exploratoryDAO.updateSchedulerDataForUserAndExploratory(user, exploratoryName, dto);
- if (dto.isSyncStartRequired()) {
- shareSchedulerJobDataToSparkClusters(user, exploratoryName, dto);
- } else {
- computationalDAO.updateSchedulerSyncFlag(user, exploratoryName, dto.isSyncStartRequired());
- }
- }
-
- @Override
- public void updateComputationalSchedulerData(String user, String exploratoryName, String computationalName,
- SchedulerJobDTO dto) {
- validateExploratoryStatus(user, exploratoryName);
- validateComputationalStatus(user, exploratoryName, computationalName);
- populateDefaultSchedulerValues(dto);
- log.debug("Updating computational resource {} affiliated with exploratory {} for user {} with new scheduler " +
- "job data {}...", computationalName, exploratoryName, user, dto);
- computationalDAO.updateSchedulerDataForComputationalResource(user, exploratoryName, computationalName, dto);
- }
-
- @Override
- public void stopComputationalByScheduler() {
- getComputationalSchedulersForStopping(OffsetDateTime.now(), true)
- .forEach(this::stopComputational);
- }
-
- @Override
- public void stopExploratoryByScheduler() {
- getExploratorySchedulersForStopping(OffsetDateTime.now(), true)
- .forEach(this::stopExploratory);
- }
-
- @Override
- public void startExploratoryByScheduler() {
- getExploratorySchedulersForStarting(OffsetDateTime.now())
- .forEach(this::startExploratory);
- }
-
- @Override
- public void startComputationalByScheduler() {
- getComputationalSchedulersForStarting(OffsetDateTime.now())
- .forEach(job -> startSpark(job.getUser(), job.getExploratoryName(), job.getComputationalName()));
- }
-
- @Override
- public void terminateExploratoryByScheduler() {
- getExploratorySchedulersForTerminating(OffsetDateTime.now())
- .forEach(this::terminateExploratory);
-
- }
-
- @Override
- public void terminateComputationalByScheduler() {
- getComputationalSchedulersForTerminating(OffsetDateTime.now()).forEach(this::terminateComputational);
-
- }
-
- @Override
- public void removeScheduler(String user, String exploratoryName) {
- schedulerJobDAO.removeScheduler(user, exploratoryName);
- }
-
- @Override
- public void removeScheduler(String user, String exploratoryName, String computationalName) {
- schedulerJobDAO.removeScheduler(user, exploratoryName, computationalName);
- }
-
- @Override
- public List<SchedulerJobData> getActiveSchedulers(String user, long minutesOffset) {
- final OffsetDateTime desiredDateTime = OffsetDateTime.now().plusMinutes(minutesOffset);
- final Predicate<SchedulerJobData> userPredicate = s -> user.equals(s.getUser());
- final Stream<SchedulerJobData> computationalSchedulersStream =
- getComputationalSchedulersForStopping(desiredDateTime, false)
- .stream()
- .filter(userPredicate);
- final Stream<SchedulerJobData> exploratorySchedulersStream =
- getExploratorySchedulersForStopping(desiredDateTime, false)
- .stream()
- .filter(userPredicate);
- return Stream.concat(computationalSchedulersStream, exploratorySchedulersStream)
- .collect(Collectors.toList());
- }
-
- private void stopComputational(SchedulerJobData job) {
- final String expName = job.getExploratoryName();
- final String compName = job.getComputationalName();
- final String user = job.getUser();
- log.debug("Stopping exploratory {} computational {} for user {} by scheduler", expName, compName, user);
- computationalService.stopSparkCluster(systemUserService.create(user), expName, compName);
- }
-
- private void terminateComputational(SchedulerJobData job) {
- final String user = job.getUser();
- final String expName = job.getExploratoryName();
- final String compName = job.getComputationalName();
- final UserInfo userInfo = systemUserService.create(user);
- log.debug("Terminating exploratory {} computational {} for user {} by scheduler", expName, compName, user);
- computationalService.terminateComputational(userInfo, expName, compName);
- }
-
- private void stopExploratory(SchedulerJobData job) {
- final String expName = job.getExploratoryName();
- final String user = job.getUser();
- log.debug("Stopping exploratory {} for user {} by scheduler", expName, user);
- exploratoryService.stop(systemUserService.create(user), expName);
- }
-
- private List<SchedulerJobData> getExploratorySchedulersForTerminating(OffsetDateTime now) {
- return schedulerJobDAO.getExploratorySchedulerDataWithOneOfStatus(RUNNING, STOPPED)
- .stream()
- .filter(canSchedulerForTerminatingBeApplied(now))
- .collect(Collectors.toList());
- }
-
- private List<SchedulerJobData> getComputationalSchedulersForTerminating(OffsetDateTime now) {
- return schedulerJobDAO.getComputationalSchedulerDataWithOneOfStatus(RUNNING,
- DataEngineType.SPARK_STANDALONE, STOPPED, RUNNING)
- .stream()
- .filter(canSchedulerForTerminatingBeApplied(now))
- .collect(Collectors.toList());
- }
-
- private void startExploratory(SchedulerJobData schedulerJobData) {
- final String user = schedulerJobData.getUser();
- final String exploratoryName = schedulerJobData.getExploratoryName();
- log.debug("Starting exploratory {} for user {} by scheduler", exploratoryName, user);
- exploratoryService.start(systemUserService.create(user), exploratoryName);
- if (schedulerJobData.getJobDTO().isSyncStartRequired()) {
- log.trace("Starting computational for exploratory {} for user {} by scheduler", exploratoryName, user);
- final DataEngineType sparkCluster = DataEngineType.SPARK_STANDALONE;
- final List<UserComputationalResource> compToBeStarted =
- computationalDAO.findComputationalResourcesWithStatus(user, exploratoryName, STOPPED);
-
- compToBeStarted
- .stream()
- .filter(compResource -> shouldClusterBeStarted(sparkCluster, compResource))
- .forEach(comp -> startSpark(user, exploratoryName, comp.getComputationalName()));
- }
- }
-
- private void terminateExploratory(SchedulerJobData job) {
- final String user = job.getUser();
- final String expName = job.getExploratoryName();
- log.debug("Terminating exploratory {} for user {} by scheduler", expName, user);
- exploratoryService.terminate(systemUserService.create(user), expName);
- }
-
- private void startSpark(String user, String expName, String compName) {
- log.debug("Starting exploratory {} computational {} for user {} by scheduler", expName, compName, user);
- computationalService.startSparkCluster(systemUserService.create(user), expName, compName);
- }
-
- private boolean shouldClusterBeStarted(DataEngineType sparkCluster, UserComputationalResource compResource) {
- return Objects.nonNull(compResource.getSchedulerData()) && compResource.getSchedulerData().isSyncStartRequired()
- && compResource.getImageName().equals(getDockerImageName(sparkCluster));
- }
-
- /**
- * Performs bulk updating operation with scheduler data for corresponding to exploratory Spark clusters.
- * All these resources will obtain data which is equal to exploratory's except 'stopping' operation (it will be
- * performed automatically with notebook stopping since Spark resources have such feature).
- *
- * @param user user's name
- * @param exploratoryName name of exploratory resource
- * @param dto scheduler job data.
- */
- private void shareSchedulerJobDataToSparkClusters(String user, String exploratoryName, SchedulerJobDTO dto) {
- List<String> correspondingSparkClusters = computationalDAO.getComputationalResourcesWhereStatusIn(user,
- singletonList(DataEngineType.SPARK_STANDALONE), exploratoryName,
- STARTING, RUNNING, STOPPING, STOPPED);
- SchedulerJobDTO dtoWithoutStopData = getSchedulerJobWithoutStopData(dto);
- for (String sparkName : correspondingSparkClusters) {
- log.debug("Updating computational resource {} affiliated with exploratory {} for user {} with new " +
- "scheduler job data {}...", sparkName, exploratoryName, user, dtoWithoutStopData);
- computationalDAO.updateSchedulerDataForComputationalResource(user, exploratoryName, sparkName,
- dtoWithoutStopData);
- }
- }
-
- private List<SchedulerJobData> getExploratorySchedulersForStopping(OffsetDateTime currentDateTime,
- boolean checkInactivity) {
-
- final Date clusterMaxInactivityAllowedDate =
- from(LocalDateTime.now().minusMinutes(ALLOWED_INACTIVITY_MINUTES).atZone(systemDefault()).toInstant());
- return schedulerJobDAO.getExploratorySchedulerWithStatusAndClusterLastActivityLessThan(RUNNING,
- clusterMaxInactivityAllowedDate)
- .stream()
- .filter(schedulerJobData -> shouldSchedulerBeExecuted(schedulerJobData.getJobDTO(),
- currentDateTime, schedulerJobData.getJobDTO().getStopDaysRepeat(),
- schedulerJobData.getJobDTO().getEndTime()) ||
- (checkInactivity && exploratoryInactivityCondition(schedulerJobData)))
- .collect(Collectors.toList());
- }
-
- private List<SchedulerJobData> getExploratorySchedulersForStarting(OffsetDateTime currentDateTime) {
- return schedulerJobDAO.getExploratorySchedulerDataWithStatus(STOPPED)
- .stream()
- .filter(canSchedulerForStartingBeApplied(currentDateTime))
- .collect(Collectors.toList());
- }
-
- private List<SchedulerJobData> getComputationalSchedulersForStarting(OffsetDateTime currentDateTime) {
- return schedulerJobDAO
- .getComputationalSchedulerDataWithOneOfStatus(RUNNING, DataEngineType.SPARK_STANDALONE, STOPPED)
- .stream()
- .filter(canSchedulerForStartingBeApplied(currentDateTime))
- .collect(Collectors.toList());
- }
-
-
- private Predicate<SchedulerJobData> canSchedulerForStartingBeApplied(OffsetDateTime currentDateTime) {
- return schedulerJobData -> shouldSchedulerBeExecuted(schedulerJobData.getJobDTO(),
- currentDateTime, schedulerJobData.getJobDTO().getStartDaysRepeat(),
- schedulerJobData.getJobDTO().getStartTime());
- }
-
- private Predicate<SchedulerJobData> canSchedulerForTerminatingBeApplied(OffsetDateTime currentDateTime) {
- return schedulerJobData -> shouldBeTerminated(currentDateTime, schedulerJobData);
- }
-
- private boolean shouldBeTerminated(OffsetDateTime currentDateTime, SchedulerJobData schedulerJobData) {
- final SchedulerJobDTO jobDTO = schedulerJobData.getJobDTO();
- final LocalDateTime convertedCurrentTime = schedulerExecutionDate(jobDTO, currentDateTime);
- return isSchedulerActive(schedulerJobData.getJobDTO(), convertedCurrentTime) && Objects.nonNull(jobDTO.getTerminateDateTime()) &&
- convertedCurrentTime.equals(jobDTO.getTerminateDateTime());
- }
-
- private List<SchedulerJobData> getComputationalSchedulersForStopping(OffsetDateTime currentDateTime,
- boolean checkInactivity) {
- return schedulerJobDAO
- .getComputationalSchedulerDataWithOneOfStatus(RUNNING, DataEngineType.SPARK_STANDALONE, RUNNING)
- .stream()
- .filter(schedulerJobData -> shouldSchedulerBeExecuted(schedulerJobData.getJobDTO(),
- currentDateTime, schedulerJobData.getJobDTO().getStopDaysRepeat(),
- schedulerJobData.getJobDTO().getEndTime()) ||
- (checkInactivity && computationalInactivityCondition(schedulerJobData)))
- .collect(Collectors.toList());
- }
-
- private boolean computationalInactivityCondition(SchedulerJobData jobData) {
- final SchedulerJobDTO schedulerData = jobData.getJobDTO();
- return schedulerData.isCheckInactivityRequired() && computationalInactivityExceed(jobData, schedulerData);
- }
-
- private boolean computationalInactivityExceed(SchedulerJobData schedulerJobData, SchedulerJobDTO schedulerData) {
- final String explName = schedulerJobData.getExploratoryName();
- final String compName = schedulerJobData.getComputationalName();
- final String user = schedulerJobData.getUser();
- final UserComputationalResource c = computationalDAO.fetchComputationalFields(user, explName, compName);
- final Long maxInactivity = schedulerData.getMaxInactivity();
- return inactivityCondition(maxInactivity, c.getStatus(), c.getLastActivity());
- }
-
- private boolean exploratoryInactivityCondition(SchedulerJobData jobData) {
- final SchedulerJobDTO schedulerData = jobData.getJobDTO();
- return schedulerData.isCheckInactivityRequired() && exploratoryInactivityExceed(jobData, schedulerData);
- }
-
- private boolean exploratoryInactivityExceed(SchedulerJobData schedulerJobData, SchedulerJobDTO schedulerData) {
- final String expName = schedulerJobData.getExploratoryName();
- final String user = schedulerJobData.getUser();
- final UserInstanceDTO userInstanceDTO = exploratoryDAO.fetchExploratoryFields(user, expName, true);
- final boolean canBeStopped = userInstanceDTO.getResources()
- .stream()
- .map(UserComputationalResource::getStatus)
- .map(UserInstanceStatus::of)
- .noneMatch(status -> status.in(TERMINATING, CONFIGURING, CREATING, CREATING));
- return canBeStopped && inactivityCondition(schedulerData.getMaxInactivity(), userInstanceDTO.getStatus(),
- userInstanceDTO.getLastActivity());
- }
-
- private boolean inactivityCondition(Long maxInactivity, String status, LocalDateTime lastActivity) {
- return UserInstanceStatus.RUNNING.toString().equals(status) &&
- Optional.ofNullable(lastActivity)
- .map(la -> la.plusMinutes(maxInactivity).isBefore(LocalDateTime.now()))
- .orElse(Boolean.FALSE);
- }
-
- private void populateDefaultSchedulerValues(SchedulerJobDTO dto) {
- if (Objects.isNull(dto.getBeginDate()) || StringUtils.isBlank(dto.getBeginDate().toString())) {
- dto.setBeginDate(LocalDate.now());
- }
- if (Objects.isNull(dto.getTimeZoneOffset()) || StringUtils.isBlank(dto.getTimeZoneOffset().toString())) {
- dto.setTimeZoneOffset(OffsetDateTime.now(systemDefault()).getOffset());
- }
- }
-
- private void validateExploratoryStatus(String user, String exploratoryName) {
- final UserInstanceDTO userInstance = exploratoryDAO.fetchExploratoryFields(user, exploratoryName);
- validateResourceStatus(userInstance.getStatus());
- }
-
- private void validateComputationalStatus(String user, String exploratoryName, String computationalName) {
- final UserComputationalResource computationalResource =
- computationalDAO.fetchComputationalFields(user, exploratoryName, computationalName);
- final String computationalStatus = computationalResource.getStatus();
- validateResourceStatus(computationalStatus);
- }
-
- private void validateResourceStatus(String resourceStatus) {
- final UserInstanceStatus status = UserInstanceStatus.of(resourceStatus);
- if (Objects.isNull(status) || status.in(UserInstanceStatus.TERMINATED, TERMINATING,
- UserInstanceStatus.FAILED)) {
- throw new ResourceInappropriateStateException(String.format("Can not create/update scheduler for user " +
- "instance with status: %s", status));
- }
- }
-
- private boolean shouldSchedulerBeExecuted(SchedulerJobDTO dto, OffsetDateTime dateTime, List<DayOfWeek> daysRepeat,
- LocalTime time) {
- LocalDateTime convertedDateTime = schedulerExecutionDate(dto, dateTime);
-
- return isSchedulerActive(dto, convertedDateTime)
- && daysRepeat.contains(convertedDateTime.toLocalDate().getDayOfWeek())
- && convertedDateTime.toLocalTime().equals(time);
- }
-
- private boolean isSchedulerActive(SchedulerJobDTO dto, LocalDateTime convertedDateTime) {
- return !convertedDateTime.toLocalDate().isBefore(dto.getBeginDate())
- && finishDateAfterCurrentDate(dto, convertedDateTime);
- }
-
- private LocalDateTime schedulerExecutionDate(SchedulerJobDTO dto, OffsetDateTime dateTime) {
- ZoneOffset zOffset = dto.getTimeZoneOffset();
- OffsetDateTime roundedDateTime = OffsetDateTime.of(
- dateTime.toLocalDate(),
- LocalTime.of(dateTime.toLocalTime().getHour(), dateTime.toLocalTime().getMinute()),
- dateTime.getOffset());
-
- return ZonedDateTime.ofInstant(roundedDateTime.toInstant(),
- ZoneId.ofOffset(TIMEZONE_PREFIX, zOffset)).toLocalDateTime();
- }
-
- private boolean finishDateAfterCurrentDate(SchedulerJobDTO dto, LocalDateTime currentDateTime) {
- return Objects.isNull(dto.getFinishDate()) || !currentDateTime.toLocalDate().isAfter(dto.getFinishDate());
- }
-
- private SchedulerJobDTO getSchedulerJobWithoutStopData(SchedulerJobDTO dto) {
- SchedulerJobDTO convertedDto = new SchedulerJobDTO();
- convertedDto.setBeginDate(dto.getBeginDate());
- convertedDto.setFinishDate(dto.getFinishDate());
- convertedDto.setStartTime(dto.getStartTime());
- convertedDto.setStartDaysRepeat(dto.getStartDaysRepeat());
- convertedDto.setTerminateDateTime(dto.getTerminateDateTime());
- convertedDto.setTimeZoneOffset(dto.getTimeZoneOffset());
- convertedDto.setSyncStartRequired(dto.isSyncStartRequired());
- return convertedDto;
- }
+ private static final String SCHEDULER_NOT_FOUND_MSG =
+ "Scheduler job data not found for user %s with exploratory %s";
+ private static final long ALLOWED_INACTIVITY_MINUTES = 1L;
+
+ @Inject
+ private SchedulerJobDAO schedulerJobDAO;
+
+ @Inject
+ private ExploratoryDAO exploratoryDAO;
+
+ @Inject
+ private ComputationalDAO computationalDAO;
+
+ @Inject
+ private ExploratoryService exploratoryService;
+
+ @Inject
+ private ComputationalService computationalService;
+
+ @Inject
+ private SystemUserInfoService systemUserService;
+
+ @Override
+ public SchedulerJobDTO fetchSchedulerJobForUserAndExploratory(String user, String exploratoryName) {
+ return schedulerJobDAO.fetchSingleSchedulerJobByUserAndExploratory(user, exploratoryName)
+ .orElseThrow(() -> new ResourceNotFoundException(String.format(SCHEDULER_NOT_FOUND_MSG, user,
+ exploratoryName)));
+ }
+
+ @Override
+ public SchedulerJobDTO fetchSchedulerJobForComputationalResource(String user, String exploratoryName,
+ String computationalName) {
+ return schedulerJobDAO.fetchSingleSchedulerJobForCluster(user, exploratoryName, computationalName)
+ .orElseThrow(() -> new ResourceNotFoundException(String.format(SCHEDULER_NOT_FOUND_MSG, user,
+ exploratoryName) + " with computational resource " + computationalName));
+ }
+
+ @Override
+ public void updateExploratorySchedulerData(String user, String exploratoryName, SchedulerJobDTO dto) {
+ validateExploratoryStatus(user, exploratoryName);
+ populateDefaultSchedulerValues(dto);
+ log.debug("Updating exploratory {} for user {} with new scheduler job data: {}...", exploratoryName, user,
+ dto);
+ exploratoryDAO.updateSchedulerDataForUserAndExploratory(user, exploratoryName, dto);
+
+ if (!dto.inactivityScheduler() && dto.isSyncStartRequired()) {
+ shareSchedulerJobDataToSparkClusters(user, exploratoryName, dto);
+ } else if (!dto.inactivityScheduler()) {
+ computationalDAO.updateSchedulerSyncFlag(user, exploratoryName, dto.isSyncStartRequired());
+ }
+ }
+
+ @Override
+ public void updateComputationalSchedulerData(String user, String exploratoryName, String computationalName,
+ SchedulerJobDTO dto) {
+ validateExploratoryStatus(user, exploratoryName);
+ validateComputationalStatus(user, exploratoryName, computationalName);
+ populateDefaultSchedulerValues(dto);
+ log.debug("Updating computational resource {} affiliated with exploratory {} for user {} with new scheduler " +
+ "job data {}...", computationalName, exploratoryName, user, dto);
+ computationalDAO.updateSchedulerDataForComputationalResource(user, exploratoryName, computationalName, dto);
+ }
+
+ @Override
+ public void stopComputationalByScheduler() {
+ getComputationalSchedulersForStopping(OffsetDateTime.now(), true)
+ .forEach(this::stopComputational);
+ }
+
+ @Override
+ public void stopExploratoryByScheduler() {
+ getExploratorySchedulersForStopping(OffsetDateTime.now(), true)
+ .forEach(this::stopExploratory);
+ }
+
+ @Override
+ public void startExploratoryByScheduler() {
+ getExploratorySchedulersForStarting(OffsetDateTime.now())
+ .forEach(this::startExploratory);
+ }
+
+ @Override
+ public void startComputationalByScheduler() {
+ getComputationalSchedulersForStarting(OffsetDateTime.now())
+ .forEach(job -> startSpark(job.getUser(), job.getExploratoryName(), job.getComputationalName()));
+ }
+
+ @Override
+ public void terminateExploratoryByScheduler() {
+ getExploratorySchedulersForTerminating(OffsetDateTime.now())
+ .forEach(this::terminateExploratory);
+
+ }
+
+ @Override
+ public void terminateComputationalByScheduler() {
+ getComputationalSchedulersForTerminating(OffsetDateTime.now()).forEach(this::terminateComputational);
+
+ }
+
+ @Override
+ public void removeScheduler(String user, String exploratoryName) {
+ schedulerJobDAO.removeScheduler(user, exploratoryName);
+ }
+
+ @Override
+ public void removeScheduler(String user, String exploratoryName, String computationalName) {
+ schedulerJobDAO.removeScheduler(user, exploratoryName, computationalName);
+ }
+
+ @Override
+ public List<SchedulerJobData> getActiveSchedulers(String user, long minutesOffset) {
+ final OffsetDateTime desiredDateTime = OffsetDateTime.now().plusMinutes(minutesOffset);
+ final Predicate<SchedulerJobData> userPredicate = s -> user.equals(s.getUser());
+ final Stream<SchedulerJobData> computationalSchedulersStream =
+ getComputationalSchedulersForStopping(desiredDateTime, false)
+ .stream()
+ .filter(userPredicate);
+ final Stream<SchedulerJobData> exploratorySchedulersStream =
+ getExploratorySchedulersForStopping(desiredDateTime, false)
+ .stream()
+ .filter(userPredicate);
+ return Stream.concat(computationalSchedulersStream, exploratorySchedulersStream)
+ .collect(Collectors.toList());
+ }
+
+ private void stopComputational(SchedulerJobData job) {
+ final String expName = job.getExploratoryName();
+ final String compName = job.getComputationalName();
+ final String user = job.getUser();
+ log.debug("Stopping exploratory {} computational {} for user {} by scheduler", expName, compName, user);
+ computationalService.stopSparkCluster(systemUserService.create(user), expName, compName);
+ }
+
+ private void terminateComputational(SchedulerJobData job) {
+ final String user = job.getUser();
+ final String expName = job.getExploratoryName();
+ final String compName = job.getComputationalName();
+ final UserInfo userInfo = systemUserService.create(user);
+ log.debug("Terminating exploratory {} computational {} for user {} by scheduler", expName, compName, user);
+ computationalService.terminateComputational(userInfo, expName, compName);
+ }
+
+ private void stopExploratory(SchedulerJobData job) {
+ final String expName = job.getExploratoryName();
+ final String user = job.getUser();
+ log.debug("Stopping exploratory {} for user {} by scheduler", expName, user);
+ exploratoryService.stop(systemUserService.create(user), expName);
+ }
+
+ private List<SchedulerJobData> getExploratorySchedulersForTerminating(OffsetDateTime now) {
+ return schedulerJobDAO.getExploratorySchedulerDataWithOneOfStatus(RUNNING, STOPPED)
+ .stream()
+ .filter(canSchedulerForTerminatingBeApplied(now))
+ .collect(Collectors.toList());
+ }
+
+ private List<SchedulerJobData> getComputationalSchedulersForTerminating(OffsetDateTime now) {
+ return schedulerJobDAO.getComputationalSchedulerDataWithOneOfStatus(RUNNING,
+ DataEngineType.SPARK_STANDALONE, STOPPED, RUNNING)
+ .stream()
+ .filter(canSchedulerForTerminatingBeApplied(now))
+ .collect(Collectors.toList());
+ }
+
+ private void startExploratory(SchedulerJobData schedulerJobData) {
+ final String user = schedulerJobData.getUser();
+ final String exploratoryName = schedulerJobData.getExploratoryName();
+ log.debug("Starting exploratory {} for user {} by scheduler", exploratoryName, user);
+ exploratoryService.start(systemUserService.create(user), exploratoryName);
+ if (schedulerJobData.getJobDTO().isSyncStartRequired()) {
+ log.trace("Starting computational for exploratory {} for user {} by scheduler", exploratoryName, user);
+ final DataEngineType sparkCluster = DataEngineType.SPARK_STANDALONE;
+ final List<UserComputationalResource> compToBeStarted =
+ computationalDAO.findComputationalResourcesWithStatus(user, exploratoryName, STOPPED);
+
+ compToBeStarted
+ .stream()
+ .filter(compResource -> shouldClusterBeStarted(sparkCluster, compResource))
+ .forEach(comp -> startSpark(user, exploratoryName, comp.getComputationalName()));
+ }
+ }
+
+ private void terminateExploratory(SchedulerJobData job) {
+ final String user = job.getUser();
+ final String expName = job.getExploratoryName();
+ log.debug("Terminating exploratory {} for user {} by scheduler", expName, user);
+ exploratoryService.terminate(systemUserService.create(user), expName);
+ }
+
+ private void startSpark(String user, String expName, String compName) {
+ log.debug("Starting exploratory {} computational {} for user {} by scheduler", expName, compName, user);
+ computationalService.startSparkCluster(systemUserService.create(user), expName, compName);
+ }
+
+ private boolean shouldClusterBeStarted(DataEngineType sparkCluster, UserComputationalResource compResource) {
+ return Objects.nonNull(compResource.getSchedulerData()) && compResource.getSchedulerData().isSyncStartRequired()
+ && compResource.getImageName().equals(getDockerImageName(sparkCluster));
+ }
+
+ /**
+ * Performs bulk updating operation with scheduler data for corresponding to exploratory Spark clusters.
+ * All these resources will obtain data which is equal to exploratory's except 'stopping' operation (it will be
+ * performed automatically with notebook stopping since Spark resources have such feature).
+ *
+ * @param user user's name
+ * @param exploratoryName name of exploratory resource
+ * @param dto scheduler job data.
+ */
+ private void shareSchedulerJobDataToSparkClusters(String user, String exploratoryName, SchedulerJobDTO dto) {
+ List<String> correspondingSparkClusters = computationalDAO.getComputationalResourcesWhereStatusIn(user,
+ singletonList(DataEngineType.SPARK_STANDALONE), exploratoryName,
+ STARTING, RUNNING, STOPPING, STOPPED);
+ SchedulerJobDTO dtoWithoutStopData = getSchedulerJobWithoutStopData(dto);
+ for (String sparkName : correspondingSparkClusters) {
+ log.debug("Updating computational resource {} affiliated with exploratory {} for user {} with new " +
+ "scheduler job data {}...", sparkName, exploratoryName, user, dtoWithoutStopData);
+ computationalDAO.updateSchedulerDataForComputationalResource(user, exploratoryName, sparkName,
+ dtoWithoutStopData);
+ }
+ }
+
+ private List<SchedulerJobData> getExploratorySchedulersForStopping(OffsetDateTime currentDateTime,
+ boolean checkInactivity) {
+
+ final Date clusterMaxInactivityAllowedDate =
+ from(LocalDateTime.now().minusMinutes(ALLOWED_INACTIVITY_MINUTES).atZone(systemDefault()).toInstant());
+ return schedulerJobDAO.getExploratorySchedulerWithStatusAndClusterLastActivityLessThan(RUNNING,
+ clusterMaxInactivityAllowedDate)
+ .stream()
+ .filter(schedulerJobData -> shouldSchedulerBeExecuted(schedulerJobData.getJobDTO(),
+ currentDateTime, schedulerJobData.getJobDTO().getStopDaysRepeat(),
+ schedulerJobData.getJobDTO().getEndTime()) ||
+ (checkInactivity && exploratoryInactivityCondition(schedulerJobData)))
+ .collect(Collectors.toList());
+ }
+
+ private List<SchedulerJobData> getExploratorySchedulersForStarting(OffsetDateTime currentDateTime) {
+ return schedulerJobDAO.getExploratorySchedulerDataWithStatus(STOPPED)
+ .stream()
+ .filter(canSchedulerForStartingBeApplied(currentDateTime))
+ .collect(Collectors.toList());
+ }
+
+ private List<SchedulerJobData> getComputationalSchedulersForStarting(OffsetDateTime currentDateTime) {
+ return schedulerJobDAO
+ .getComputationalSchedulerDataWithOneOfStatus(RUNNING, DataEngineType.SPARK_STANDALONE, STOPPED)
+ .stream()
+ .filter(canSchedulerForStartingBeApplied(currentDateTime))
+ .collect(Collectors.toList());
+ }
+
+
+ private Predicate<SchedulerJobData> canSchedulerForStartingBeApplied(OffsetDateTime currentDateTime) {
+ return schedulerJobData -> shouldSchedulerBeExecuted(schedulerJobData.getJobDTO(),
+ currentDateTime, schedulerJobData.getJobDTO().getStartDaysRepeat(),
+ schedulerJobData.getJobDTO().getStartTime());
+ }
+
+ private Predicate<SchedulerJobData> canSchedulerForTerminatingBeApplied(OffsetDateTime currentDateTime) {
+ return schedulerJobData -> shouldBeTerminated(currentDateTime, schedulerJobData);
+ }
+
+ private boolean shouldBeTerminated(OffsetDateTime currentDateTime, SchedulerJobData schedulerJobData) {
+ final SchedulerJobDTO jobDTO = schedulerJobData.getJobDTO();
+ final LocalDateTime convertedCurrentTime = schedulerExecutionDate(jobDTO, currentDateTime);
+ return isSchedulerActive(schedulerJobData.getJobDTO(), convertedCurrentTime) && Objects.nonNull(jobDTO.getTerminateDateTime()) &&
+ convertedCurrentTime.equals(jobDTO.getTerminateDateTime());
+ }
+
+ private List<SchedulerJobData> getComputationalSchedulersForStopping(OffsetDateTime currentDateTime,
+ boolean checkInactivity) {
+ return schedulerJobDAO
+ .getComputationalSchedulerDataWithOneOfStatus(RUNNING, DataEngineType.SPARK_STANDALONE, RUNNING)
+ .stream()
+ .filter(schedulerJobData -> shouldSchedulerBeExecuted(schedulerJobData.getJobDTO(),
+ currentDateTime, schedulerJobData.getJobDTO().getStopDaysRepeat(),
+ schedulerJobData.getJobDTO().getEndTime()) ||
+ (checkInactivity && computationalInactivityCondition(schedulerJobData)))
+ .collect(Collectors.toList());
+ }
+
+ private boolean computationalInactivityCondition(SchedulerJobData jobData) {
+ final SchedulerJobDTO schedulerData = jobData.getJobDTO();
+ return schedulerData.isCheckInactivityRequired() && computationalInactivityExceed(jobData, schedulerData);
+ }
+
+ private boolean computationalInactivityExceed(SchedulerJobData schedulerJobData, SchedulerJobDTO schedulerData) {
+ final String explName = schedulerJobData.getExploratoryName();
+ final String compName = schedulerJobData.getComputationalName();
+ final String user = schedulerJobData.getUser();
+ final UserComputationalResource c = computationalDAO.fetchComputationalFields(user, explName, compName);
+ final Long maxInactivity = schedulerData.getMaxInactivity();
+ return inactivityCondition(maxInactivity, c.getStatus(), c.getLastActivity());
+ }
+
+ private boolean exploratoryInactivityCondition(SchedulerJobData jobData) {
+ final SchedulerJobDTO schedulerData = jobData.getJobDTO();
+ return schedulerData.isCheckInactivityRequired() && exploratoryInactivityExceed(jobData, schedulerData);
+ }
+
+ private boolean exploratoryInactivityExceed(SchedulerJobData schedulerJobData, SchedulerJobDTO schedulerData) {
+ final String expName = schedulerJobData.getExploratoryName();
+ final String user = schedulerJobData.getUser();
+ final UserInstanceDTO userInstanceDTO = exploratoryDAO.fetchExploratoryFields(user, expName, true);
+ final boolean canBeStopped = userInstanceDTO.getResources()
+ .stream()
+ .map(UserComputationalResource::getStatus)
+ .map(UserInstanceStatus::of)
+ .noneMatch(status -> status.in(TERMINATING, CONFIGURING, CREATING, CREATING));
+ return canBeStopped && inactivityCondition(schedulerData.getMaxInactivity(), userInstanceDTO.getStatus(),
+ userInstanceDTO.getLastActivity());
+ }
+
+ private boolean inactivityCondition(Long maxInactivity, String status, LocalDateTime lastActivity) {
+ return UserInstanceStatus.RUNNING.toString().equals(status) &&
+ Optional.ofNullable(lastActivity)
+ .map(la -> la.plusMinutes(maxInactivity).isBefore(LocalDateTime.now()))
+ .orElse(Boolean.FALSE);
+ }
+
+ private void populateDefaultSchedulerValues(SchedulerJobDTO dto) {
+ if (Objects.isNull(dto.getBeginDate()) || StringUtils.isBlank(dto.getBeginDate().toString())) {
+ dto.setBeginDate(LocalDate.now());
+ }
+ if (Objects.isNull(dto.getTimeZoneOffset()) || StringUtils.isBlank(dto.getTimeZoneOffset().toString())) {
+ dto.setTimeZoneOffset(OffsetDateTime.now(systemDefault()).getOffset());
+ }
+ }
+
+ private void validateExploratoryStatus(String user, String exploratoryName) {
+ final UserInstanceDTO userInstance = exploratoryDAO.fetchExploratoryFields(user, exploratoryName);
+ validateResourceStatus(userInstance.getStatus());
+ }
+
+ private void validateComputationalStatus(String user, String exploratoryName, String computationalName) {
+ final UserComputationalResource computationalResource =
+ computationalDAO.fetchComputationalFields(user, exploratoryName, computationalName);
+ final String computationalStatus = computationalResource.getStatus();
+ validateResourceStatus(computationalStatus);
+ }
+
+ private void validateResourceStatus(String resourceStatus) {
+ final UserInstanceStatus status = UserInstanceStatus.of(resourceStatus);
+ if (Objects.isNull(status) || status.in(UserInstanceStatus.TERMINATED, TERMINATING,
+ UserInstanceStatus.FAILED)) {
+ throw new ResourceInappropriateStateException(String.format("Can not create/update scheduler for user " +
+ "instance with status: %s", status));
+ }
+ }
+
+ private boolean shouldSchedulerBeExecuted(SchedulerJobDTO dto, OffsetDateTime dateTime, List<DayOfWeek> daysRepeat,
+ LocalTime time) {
+ LocalDateTime convertedDateTime = schedulerExecutionDate(dto, dateTime);
+
+ return isSchedulerActive(dto, convertedDateTime)
+ && daysRepeat.contains(convertedDateTime.toLocalDate().getDayOfWeek())
+ && convertedDateTime.toLocalTime().equals(time);
+ }
+
+ private boolean isSchedulerActive(SchedulerJobDTO dto, LocalDateTime convertedDateTime) {
+ return !convertedDateTime.toLocalDate().isBefore(dto.getBeginDate())
+ && finishDateAfterCurrentDate(dto, convertedDateTime);
+ }
+
+ private LocalDateTime schedulerExecutionDate(SchedulerJobDTO dto, OffsetDateTime dateTime) {
+ ZoneOffset zOffset = dto.getTimeZoneOffset();
+ OffsetDateTime roundedDateTime = OffsetDateTime.of(
+ dateTime.toLocalDate(),
+ LocalTime.of(dateTime.toLocalTime().getHour(), dateTime.toLocalTime().getMinute()),
+ dateTime.getOffset());
+
+ return ZonedDateTime.ofInstant(roundedDateTime.toInstant(),
+ ZoneId.ofOffset(TIMEZONE_PREFIX, zOffset)).toLocalDateTime();
+ }
+
+ private boolean finishDateAfterCurrentDate(SchedulerJobDTO dto, LocalDateTime currentDateTime) {
+ return Objects.isNull(dto.getFinishDate()) || !currentDateTime.toLocalDate().isAfter(dto.getFinishDate());
+ }
+
+ private SchedulerJobDTO getSchedulerJobWithoutStopData(SchedulerJobDTO dto) {
+ SchedulerJobDTO convertedDto = new SchedulerJobDTO();
+ convertedDto.setBeginDate(dto.getBeginDate());
+ convertedDto.setFinishDate(dto.getFinishDate());
+ convertedDto.setStartTime(dto.getStartTime());
+ convertedDto.setStartDaysRepeat(dto.getStartDaysRepeat());
+ convertedDto.setTerminateDateTime(dto.getTerminateDateTime());
+ convertedDto.setTimeZoneOffset(dto.getTimeZoneOffset());
+ convertedDto.setSyncStartRequired(dto.isSyncStartRequired());
+ return convertedDto;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@dlab.apache.org
For additional commands, e-mail: commits-help@dlab.apache.org