You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dlab.apache.org by bh...@apache.org on 2019/03/04 15:40:11 UTC

[incubator-dlab] branch inactivity_integration updated: DLAB-406 fixed bug with inactivity scheduler

This is an automated email from the ASF dual-hosted git repository.

bhliva pushed a commit to branch inactivity_integration
in repository https://gitbox.apache.org/repos/asf/incubator-dlab.git


The following commit(s) were added to refs/heads/inactivity_integration by this push:
     new f8460f3  DLAB-406 fixed bug with inactivity scheduler
f8460f3 is described below

commit f8460f38b062caff27accd3e7e05e8e012fe0872
Author: bhliva <bo...@epam.com>
AuthorDate: Mon Mar 4 17:39:57 2019 +0200

    DLAB-406 fixed bug with inactivity scheduler
---
 .../service/impl/SchedulerJobServiceImpl.java      | 780 ++++++++++-----------
 1 file changed, 390 insertions(+), 390 deletions(-)

diff --git a/services/self-service/src/main/java/com/epam/dlab/backendapi/service/impl/SchedulerJobServiceImpl.java b/services/self-service/src/main/java/com/epam/dlab/backendapi/service/impl/SchedulerJobServiceImpl.java
index 63e99e4..7be29e5 100644
--- a/services/self-service/src/main/java/com/epam/dlab/backendapi/service/impl/SchedulerJobServiceImpl.java
+++ b/services/self-service/src/main/java/com/epam/dlab/backendapi/service/impl/SchedulerJobServiceImpl.java
@@ -57,396 +57,396 @@ import static java.util.Date.from;
 @Singleton
 public class SchedulerJobServiceImpl implements SchedulerJobService {
 
-	private static final String SCHEDULER_NOT_FOUND_MSG =
-			"Scheduler job data not found for user %s with exploratory %s";
-	private static final long ALLOWED_INACTIVITY_MINUTES = 1L;
-
-	@Inject
-	private SchedulerJobDAO schedulerJobDAO;
-
-	@Inject
-	private ExploratoryDAO exploratoryDAO;
-
-	@Inject
-	private ComputationalDAO computationalDAO;
-
-	@Inject
-	private ExploratoryService exploratoryService;
-
-	@Inject
-	private ComputationalService computationalService;
-
-	@Inject
-	private SystemUserInfoService systemUserService;
-
-	@Override
-	public SchedulerJobDTO fetchSchedulerJobForUserAndExploratory(String user, String exploratoryName) {
-		return schedulerJobDAO.fetchSingleSchedulerJobByUserAndExploratory(user, exploratoryName)
-				.orElseThrow(() -> new ResourceNotFoundException(String.format(SCHEDULER_NOT_FOUND_MSG, user,
-						exploratoryName)));
-	}
-
-	@Override
-	public SchedulerJobDTO fetchSchedulerJobForComputationalResource(String user, String exploratoryName,
-																	 String computationalName) {
-		return schedulerJobDAO.fetchSingleSchedulerJobForCluster(user, exploratoryName, computationalName)
-				.orElseThrow(() -> new ResourceNotFoundException(String.format(SCHEDULER_NOT_FOUND_MSG, user,
-						exploratoryName) + " with computational resource " + computationalName));
-	}
-
-	@Override
-	public void updateExploratorySchedulerData(String user, String exploratoryName, SchedulerJobDTO dto) {
-		validateExploratoryStatus(user, exploratoryName);
-		populateDefaultSchedulerValues(dto);
-		log.debug("Updating exploratory {} for user {} with new scheduler job data: {}...", exploratoryName, user,
-				dto);
-		exploratoryDAO.updateSchedulerDataForUserAndExploratory(user, exploratoryName, dto);
-		if (dto.isSyncStartRequired()) {
-			shareSchedulerJobDataToSparkClusters(user, exploratoryName, dto);
-		} else {
-			computationalDAO.updateSchedulerSyncFlag(user, exploratoryName, dto.isSyncStartRequired());
-		}
-	}
-
-	@Override
-	public void updateComputationalSchedulerData(String user, String exploratoryName, String computationalName,
-												 SchedulerJobDTO dto) {
-		validateExploratoryStatus(user, exploratoryName);
-		validateComputationalStatus(user, exploratoryName, computationalName);
-		populateDefaultSchedulerValues(dto);
-		log.debug("Updating computational resource {} affiliated with exploratory {} for user {} with new scheduler " +
-				"job data {}...", computationalName, exploratoryName, user, dto);
-		computationalDAO.updateSchedulerDataForComputationalResource(user, exploratoryName, computationalName, dto);
-	}
-
-	@Override
-	public void stopComputationalByScheduler() {
-		getComputationalSchedulersForStopping(OffsetDateTime.now(), true)
-				.forEach(this::stopComputational);
-	}
-
-	@Override
-	public void stopExploratoryByScheduler() {
-		getExploratorySchedulersForStopping(OffsetDateTime.now(), true)
-				.forEach(this::stopExploratory);
-	}
-
-	@Override
-	public void startExploratoryByScheduler() {
-		getExploratorySchedulersForStarting(OffsetDateTime.now())
-				.forEach(this::startExploratory);
-	}
-
-	@Override
-	public void startComputationalByScheduler() {
-		getComputationalSchedulersForStarting(OffsetDateTime.now())
-				.forEach(job -> startSpark(job.getUser(), job.getExploratoryName(), job.getComputationalName()));
-	}
-
-	@Override
-	public void terminateExploratoryByScheduler() {
-		getExploratorySchedulersForTerminating(OffsetDateTime.now())
-				.forEach(this::terminateExploratory);
-
-	}
-
-	@Override
-	public void terminateComputationalByScheduler() {
-		getComputationalSchedulersForTerminating(OffsetDateTime.now()).forEach(this::terminateComputational);
-
-	}
-
-	@Override
-	public void removeScheduler(String user, String exploratoryName) {
-		schedulerJobDAO.removeScheduler(user, exploratoryName);
-	}
-
-	@Override
-	public void removeScheduler(String user, String exploratoryName, String computationalName) {
-		schedulerJobDAO.removeScheduler(user, exploratoryName, computationalName);
-	}
-
-	@Override
-	public List<SchedulerJobData> getActiveSchedulers(String user, long minutesOffset) {
-		final OffsetDateTime desiredDateTime = OffsetDateTime.now().plusMinutes(minutesOffset);
-		final Predicate<SchedulerJobData> userPredicate = s -> user.equals(s.getUser());
-		final Stream<SchedulerJobData> computationalSchedulersStream =
-				getComputationalSchedulersForStopping(desiredDateTime, false)
-						.stream()
-						.filter(userPredicate);
-		final Stream<SchedulerJobData> exploratorySchedulersStream =
-				getExploratorySchedulersForStopping(desiredDateTime, false)
-						.stream()
-						.filter(userPredicate);
-		return Stream.concat(computationalSchedulersStream, exploratorySchedulersStream)
-				.collect(Collectors.toList());
-	}
-
-	private void stopComputational(SchedulerJobData job) {
-		final String expName = job.getExploratoryName();
-		final String compName = job.getComputationalName();
-		final String user = job.getUser();
-		log.debug("Stopping exploratory {} computational {} for user {} by scheduler", expName, compName, user);
-		computationalService.stopSparkCluster(systemUserService.create(user), expName, compName);
-	}
-
-	private void terminateComputational(SchedulerJobData job) {
-		final String user = job.getUser();
-		final String expName = job.getExploratoryName();
-		final String compName = job.getComputationalName();
-		final UserInfo userInfo = systemUserService.create(user);
-		log.debug("Terminating exploratory {} computational {} for user {} by scheduler", expName, compName, user);
-		computationalService.terminateComputational(userInfo, expName, compName);
-	}
-
-	private void stopExploratory(SchedulerJobData job) {
-		final String expName = job.getExploratoryName();
-		final String user = job.getUser();
-		log.debug("Stopping exploratory {} for user {} by scheduler", expName, user);
-		exploratoryService.stop(systemUserService.create(user), expName);
-	}
-
-	private List<SchedulerJobData> getExploratorySchedulersForTerminating(OffsetDateTime now) {
-		return schedulerJobDAO.getExploratorySchedulerDataWithOneOfStatus(RUNNING, STOPPED)
-				.stream()
-				.filter(canSchedulerForTerminatingBeApplied(now))
-				.collect(Collectors.toList());
-	}
-
-	private List<SchedulerJobData> getComputationalSchedulersForTerminating(OffsetDateTime now) {
-		return schedulerJobDAO.getComputationalSchedulerDataWithOneOfStatus(RUNNING,
-				DataEngineType.SPARK_STANDALONE, STOPPED, RUNNING)
-				.stream()
-				.filter(canSchedulerForTerminatingBeApplied(now))
-				.collect(Collectors.toList());
-	}
-
-	private void startExploratory(SchedulerJobData schedulerJobData) {
-		final String user = schedulerJobData.getUser();
-		final String exploratoryName = schedulerJobData.getExploratoryName();
-		log.debug("Starting exploratory {} for user {} by scheduler", exploratoryName, user);
-		exploratoryService.start(systemUserService.create(user), exploratoryName);
-		if (schedulerJobData.getJobDTO().isSyncStartRequired()) {
-			log.trace("Starting computational for exploratory {} for user {} by scheduler", exploratoryName, user);
-			final DataEngineType sparkCluster = DataEngineType.SPARK_STANDALONE;
-			final List<UserComputationalResource> compToBeStarted =
-					computationalDAO.findComputationalResourcesWithStatus(user, exploratoryName, STOPPED);
-
-			compToBeStarted
-					.stream()
-					.filter(compResource -> shouldClusterBeStarted(sparkCluster, compResource))
-					.forEach(comp -> startSpark(user, exploratoryName, comp.getComputationalName()));
-		}
-	}
-
-	private void terminateExploratory(SchedulerJobData job) {
-		final String user = job.getUser();
-		final String expName = job.getExploratoryName();
-		log.debug("Terminating exploratory {} for user {} by scheduler", expName, user);
-		exploratoryService.terminate(systemUserService.create(user), expName);
-	}
-
-	private void startSpark(String user, String expName, String compName) {
-		log.debug("Starting exploratory {} computational {} for user {} by scheduler", expName, compName, user);
-		computationalService.startSparkCluster(systemUserService.create(user), expName, compName);
-	}
-
-	private boolean shouldClusterBeStarted(DataEngineType sparkCluster, UserComputationalResource compResource) {
-		return Objects.nonNull(compResource.getSchedulerData()) && compResource.getSchedulerData().isSyncStartRequired()
-				&& compResource.getImageName().equals(getDockerImageName(sparkCluster));
-	}
-
-	/**
-	 * Performs bulk updating operation with scheduler data for corresponding to exploratory Spark clusters.
-	 * All these resources will obtain data which is equal to exploratory's except 'stopping' operation (it will be
-	 * performed automatically with notebook stopping since Spark resources have such feature).
-	 *
-	 * @param user            user's name
-	 * @param exploratoryName name of exploratory resource
-	 * @param dto             scheduler job data.
-	 */
-	private void shareSchedulerJobDataToSparkClusters(String user, String exploratoryName, SchedulerJobDTO dto) {
-		List<String> correspondingSparkClusters = computationalDAO.getComputationalResourcesWhereStatusIn(user,
-				singletonList(DataEngineType.SPARK_STANDALONE), exploratoryName,
-				STARTING, RUNNING, STOPPING, STOPPED);
-		SchedulerJobDTO dtoWithoutStopData = getSchedulerJobWithoutStopData(dto);
-		for (String sparkName : correspondingSparkClusters) {
-			log.debug("Updating computational resource {} affiliated with exploratory {} for user {} with new " +
-					"scheduler job data {}...", sparkName, exploratoryName, user, dtoWithoutStopData);
-			computationalDAO.updateSchedulerDataForComputationalResource(user, exploratoryName, sparkName,
-					dtoWithoutStopData);
-		}
-	}
-
-	private List<SchedulerJobData> getExploratorySchedulersForStopping(OffsetDateTime currentDateTime,
-																	   boolean checkInactivity) {
-
-		final Date clusterMaxInactivityAllowedDate =
-				from(LocalDateTime.now().minusMinutes(ALLOWED_INACTIVITY_MINUTES).atZone(systemDefault()).toInstant());
-		return schedulerJobDAO.getExploratorySchedulerWithStatusAndClusterLastActivityLessThan(RUNNING,
-				clusterMaxInactivityAllowedDate)
-				.stream()
-				.filter(schedulerJobData -> shouldSchedulerBeExecuted(schedulerJobData.getJobDTO(),
-						currentDateTime, schedulerJobData.getJobDTO().getStopDaysRepeat(),
-						schedulerJobData.getJobDTO().getEndTime()) ||
-						(checkInactivity && exploratoryInactivityCondition(schedulerJobData)))
-				.collect(Collectors.toList());
-	}
-
-	private List<SchedulerJobData> getExploratorySchedulersForStarting(OffsetDateTime currentDateTime) {
-		return schedulerJobDAO.getExploratorySchedulerDataWithStatus(STOPPED)
-				.stream()
-				.filter(canSchedulerForStartingBeApplied(currentDateTime))
-				.collect(Collectors.toList());
-	}
-
-	private List<SchedulerJobData> getComputationalSchedulersForStarting(OffsetDateTime currentDateTime) {
-		return schedulerJobDAO
-				.getComputationalSchedulerDataWithOneOfStatus(RUNNING, DataEngineType.SPARK_STANDALONE, STOPPED)
-				.stream()
-				.filter(canSchedulerForStartingBeApplied(currentDateTime))
-				.collect(Collectors.toList());
-	}
-
-
-	private Predicate<SchedulerJobData> canSchedulerForStartingBeApplied(OffsetDateTime currentDateTime) {
-		return schedulerJobData -> shouldSchedulerBeExecuted(schedulerJobData.getJobDTO(),
-				currentDateTime, schedulerJobData.getJobDTO().getStartDaysRepeat(),
-				schedulerJobData.getJobDTO().getStartTime());
-	}
-
-	private Predicate<SchedulerJobData> canSchedulerForTerminatingBeApplied(OffsetDateTime currentDateTime) {
-		return schedulerJobData -> shouldBeTerminated(currentDateTime, schedulerJobData);
-	}
-
-	private boolean shouldBeTerminated(OffsetDateTime currentDateTime, SchedulerJobData schedulerJobData) {
-		final SchedulerJobDTO jobDTO = schedulerJobData.getJobDTO();
-		final LocalDateTime convertedCurrentTime = schedulerExecutionDate(jobDTO, currentDateTime);
-		return isSchedulerActive(schedulerJobData.getJobDTO(), convertedCurrentTime) && Objects.nonNull(jobDTO.getTerminateDateTime()) &&
-				convertedCurrentTime.equals(jobDTO.getTerminateDateTime());
-	}
-
-	private List<SchedulerJobData> getComputationalSchedulersForStopping(OffsetDateTime currentDateTime,
-																		 boolean checkInactivity) {
-		return schedulerJobDAO
-				.getComputationalSchedulerDataWithOneOfStatus(RUNNING, DataEngineType.SPARK_STANDALONE, RUNNING)
-				.stream()
-				.filter(schedulerJobData -> shouldSchedulerBeExecuted(schedulerJobData.getJobDTO(),
-						currentDateTime, schedulerJobData.getJobDTO().getStopDaysRepeat(),
-						schedulerJobData.getJobDTO().getEndTime()) ||
-						(checkInactivity && computationalInactivityCondition(schedulerJobData)))
-				.collect(Collectors.toList());
-	}
-
-	private boolean computationalInactivityCondition(SchedulerJobData jobData) {
-		final SchedulerJobDTO schedulerData = jobData.getJobDTO();
-		return schedulerData.isCheckInactivityRequired() && computationalInactivityExceed(jobData, schedulerData);
-	}
-
-	private boolean computationalInactivityExceed(SchedulerJobData schedulerJobData, SchedulerJobDTO schedulerData) {
-		final String explName = schedulerJobData.getExploratoryName();
-		final String compName = schedulerJobData.getComputationalName();
-		final String user = schedulerJobData.getUser();
-		final UserComputationalResource c = computationalDAO.fetchComputationalFields(user, explName, compName);
-		final Long maxInactivity = schedulerData.getMaxInactivity();
-		return inactivityCondition(maxInactivity, c.getStatus(), c.getLastActivity());
-	}
-
-	private boolean exploratoryInactivityCondition(SchedulerJobData jobData) {
-		final SchedulerJobDTO schedulerData = jobData.getJobDTO();
-		return schedulerData.isCheckInactivityRequired() && exploratoryInactivityExceed(jobData, schedulerData);
-	}
-
-	private boolean exploratoryInactivityExceed(SchedulerJobData schedulerJobData, SchedulerJobDTO schedulerData) {
-		final String expName = schedulerJobData.getExploratoryName();
-		final String user = schedulerJobData.getUser();
-		final UserInstanceDTO userInstanceDTO = exploratoryDAO.fetchExploratoryFields(user, expName, true);
-		final boolean canBeStopped = userInstanceDTO.getResources()
-				.stream()
-				.map(UserComputationalResource::getStatus)
-				.map(UserInstanceStatus::of)
-				.noneMatch(status -> status.in(TERMINATING, CONFIGURING, CREATING));
-		return canBeStopped && inactivityCondition(schedulerData.getMaxInactivity(), userInstanceDTO.getStatus(),
-				userInstanceDTO.getLastActivity());
-	}
-
-	private boolean inactivityCondition(Long maxInactivity, String status, LocalDateTime lastActivity) {
-		return UserInstanceStatus.RUNNING.toString().equals(status) &&
-				Optional.ofNullable(lastActivity)
-						.map(la -> la.plusMinutes(maxInactivity).isBefore(LocalDateTime.now()))
-						.orElse(Boolean.FALSE);
-	}
-
-	private void populateDefaultSchedulerValues(SchedulerJobDTO dto) {
-		if (Objects.isNull(dto.getBeginDate()) || StringUtils.isBlank(dto.getBeginDate().toString())) {
-			dto.setBeginDate(LocalDate.now());
-		}
-		if (Objects.isNull(dto.getTimeZoneOffset()) || StringUtils.isBlank(dto.getTimeZoneOffset().toString())) {
-			dto.setTimeZoneOffset(OffsetDateTime.now(systemDefault()).getOffset());
-		}
-	}
-
-	private void validateExploratoryStatus(String user, String exploratoryName) {
-		final UserInstanceDTO userInstance = exploratoryDAO.fetchExploratoryFields(user, exploratoryName);
-		validateResourceStatus(userInstance.getStatus());
-	}
-
-	private void validateComputationalStatus(String user, String exploratoryName, String computationalName) {
-		final UserComputationalResource computationalResource =
-				computationalDAO.fetchComputationalFields(user, exploratoryName, computationalName);
-		final String computationalStatus = computationalResource.getStatus();
-		validateResourceStatus(computationalStatus);
-	}
-
-	private void validateResourceStatus(String resourceStatus) {
-		final UserInstanceStatus status = UserInstanceStatus.of(resourceStatus);
-		if (Objects.isNull(status) || status.in(UserInstanceStatus.TERMINATED, TERMINATING,
-				UserInstanceStatus.FAILED)) {
-			throw new ResourceInappropriateStateException(String.format("Can not create/update scheduler for user " +
-					"instance with status: %s", status));
-		}
-	}
-
-	private boolean shouldSchedulerBeExecuted(SchedulerJobDTO dto, OffsetDateTime dateTime, List<DayOfWeek> daysRepeat,
-											  LocalTime time) {
-		LocalDateTime convertedDateTime = schedulerExecutionDate(dto, dateTime);
-
-		return isSchedulerActive(dto, convertedDateTime)
-				&& daysRepeat.contains(convertedDateTime.toLocalDate().getDayOfWeek())
-				&& convertedDateTime.toLocalTime().equals(time);
-	}
-
-	private boolean isSchedulerActive(SchedulerJobDTO dto, LocalDateTime convertedDateTime) {
-		return !convertedDateTime.toLocalDate().isBefore(dto.getBeginDate())
-				&& finishDateAfterCurrentDate(dto, convertedDateTime);
-	}
-
-	private LocalDateTime schedulerExecutionDate(SchedulerJobDTO dto, OffsetDateTime dateTime) {
-		ZoneOffset zOffset = dto.getTimeZoneOffset();
-		OffsetDateTime roundedDateTime = OffsetDateTime.of(
-				dateTime.toLocalDate(),
-				LocalTime.of(dateTime.toLocalTime().getHour(), dateTime.toLocalTime().getMinute()),
-				dateTime.getOffset());
-
-		return ZonedDateTime.ofInstant(roundedDateTime.toInstant(),
-				ZoneId.ofOffset(TIMEZONE_PREFIX, zOffset)).toLocalDateTime();
-	}
-
-	private boolean finishDateAfterCurrentDate(SchedulerJobDTO dto, LocalDateTime currentDateTime) {
-		return Objects.isNull(dto.getFinishDate()) || !currentDateTime.toLocalDate().isAfter(dto.getFinishDate());
-	}
-
-	private SchedulerJobDTO getSchedulerJobWithoutStopData(SchedulerJobDTO dto) {
-		SchedulerJobDTO convertedDto = new SchedulerJobDTO();
-		convertedDto.setBeginDate(dto.getBeginDate());
-		convertedDto.setFinishDate(dto.getFinishDate());
-		convertedDto.setStartTime(dto.getStartTime());
-		convertedDto.setStartDaysRepeat(dto.getStartDaysRepeat());
-		convertedDto.setTerminateDateTime(dto.getTerminateDateTime());
-		convertedDto.setTimeZoneOffset(dto.getTimeZoneOffset());
-		convertedDto.setSyncStartRequired(dto.isSyncStartRequired());
-		return convertedDto;
-	}
+    private static final String SCHEDULER_NOT_FOUND_MSG =
+            "Scheduler job data not found for user %s with exploratory %s";
+    private static final long ALLOWED_INACTIVITY_MINUTES = 1L;
+
+    @Inject
+    private SchedulerJobDAO schedulerJobDAO;
+
+    @Inject
+    private ExploratoryDAO exploratoryDAO;
+
+    @Inject
+    private ComputationalDAO computationalDAO;
+
+    @Inject
+    private ExploratoryService exploratoryService;
+
+    @Inject
+    private ComputationalService computationalService;
+
+    @Inject
+    private SystemUserInfoService systemUserService;
+
+    @Override
+    public SchedulerJobDTO fetchSchedulerJobForUserAndExploratory(String user, String exploratoryName) {
+        return schedulerJobDAO.fetchSingleSchedulerJobByUserAndExploratory(user, exploratoryName)
+                .orElseThrow(() -> new ResourceNotFoundException(String.format(SCHEDULER_NOT_FOUND_MSG, user,
+                        exploratoryName)));
+    }
+
+    @Override
+    public SchedulerJobDTO fetchSchedulerJobForComputationalResource(String user, String exploratoryName,
+                                                                     String computationalName) {
+        return schedulerJobDAO.fetchSingleSchedulerJobForCluster(user, exploratoryName, computationalName)
+                .orElseThrow(() -> new ResourceNotFoundException(String.format(SCHEDULER_NOT_FOUND_MSG, user,
+                        exploratoryName) + " with computational resource " + computationalName));
+    }
+
+    @Override
+    public void updateExploratorySchedulerData(String user, String exploratoryName, SchedulerJobDTO dto) {
+        validateExploratoryStatus(user, exploratoryName);
+        populateDefaultSchedulerValues(dto);
+        log.debug("Updating exploratory {} for user {} with new scheduler job data: {}...", exploratoryName, user,
+                dto);
+        exploratoryDAO.updateSchedulerDataForUserAndExploratory(user, exploratoryName, dto);
+        if (dto.isSyncStartRequired()) {
+            shareSchedulerJobDataToSparkClusters(user, exploratoryName, dto);
+        } else {
+            computationalDAO.updateSchedulerSyncFlag(user, exploratoryName, dto.isSyncStartRequired());
+        }
+    }
+
+    @Override
+    public void updateComputationalSchedulerData(String user, String exploratoryName, String computationalName,
+                                                 SchedulerJobDTO dto) {
+        validateExploratoryStatus(user, exploratoryName);
+        validateComputationalStatus(user, exploratoryName, computationalName);
+        populateDefaultSchedulerValues(dto);
+        log.debug("Updating computational resource {} affiliated with exploratory {} for user {} with new scheduler " +
+                "job data {}...", computationalName, exploratoryName, user, dto);
+        computationalDAO.updateSchedulerDataForComputationalResource(user, exploratoryName, computationalName, dto);
+    }
+
+    @Override
+    public void stopComputationalByScheduler() {
+        getComputationalSchedulersForStopping(OffsetDateTime.now(), true)
+                .forEach(this::stopComputational);
+    }
+
+    @Override
+    public void stopExploratoryByScheduler() {
+        getExploratorySchedulersForStopping(OffsetDateTime.now(), true)
+                .forEach(this::stopExploratory);
+    }
+
+    @Override
+    public void startExploratoryByScheduler() {
+        getExploratorySchedulersForStarting(OffsetDateTime.now())
+                .forEach(this::startExploratory);
+    }
+
+    @Override
+    public void startComputationalByScheduler() {
+        getComputationalSchedulersForStarting(OffsetDateTime.now())
+                .forEach(job -> startSpark(job.getUser(), job.getExploratoryName(), job.getComputationalName()));
+    }
+
+    @Override
+    public void terminateExploratoryByScheduler() {
+        getExploratorySchedulersForTerminating(OffsetDateTime.now())
+                .forEach(this::terminateExploratory);
+
+    }
+
+    @Override
+    public void terminateComputationalByScheduler() {
+        getComputationalSchedulersForTerminating(OffsetDateTime.now()).forEach(this::terminateComputational);
+
+    }
+
+    @Override
+    public void removeScheduler(String user, String exploratoryName) {
+        schedulerJobDAO.removeScheduler(user, exploratoryName);
+    }
+
+    @Override
+    public void removeScheduler(String user, String exploratoryName, String computationalName) {
+        schedulerJobDAO.removeScheduler(user, exploratoryName, computationalName);
+    }
+
+    @Override
+    public List<SchedulerJobData> getActiveSchedulers(String user, long minutesOffset) {
+        final OffsetDateTime desiredDateTime = OffsetDateTime.now().plusMinutes(minutesOffset);
+        final Predicate<SchedulerJobData> userPredicate = s -> user.equals(s.getUser());
+        final Stream<SchedulerJobData> computationalSchedulersStream =
+                getComputationalSchedulersForStopping(desiredDateTime, false)
+                        .stream()
+                        .filter(userPredicate);
+        final Stream<SchedulerJobData> exploratorySchedulersStream =
+                getExploratorySchedulersForStopping(desiredDateTime, false)
+                        .stream()
+                        .filter(userPredicate);
+        return Stream.concat(computationalSchedulersStream, exploratorySchedulersStream)
+                .collect(Collectors.toList());
+    }
+
+    private void stopComputational(SchedulerJobData job) {
+        final String expName = job.getExploratoryName();
+        final String compName = job.getComputationalName();
+        final String user = job.getUser();
+        log.debug("Stopping exploratory {} computational {} for user {} by scheduler", expName, compName, user);
+        computationalService.stopSparkCluster(systemUserService.create(user), expName, compName);
+    }
+
+    private void terminateComputational(SchedulerJobData job) {
+        final String user = job.getUser();
+        final String expName = job.getExploratoryName();
+        final String compName = job.getComputationalName();
+        final UserInfo userInfo = systemUserService.create(user);
+        log.debug("Terminating exploratory {} computational {} for user {} by scheduler", expName, compName, user);
+        computationalService.terminateComputational(userInfo, expName, compName);
+    }
+
+    private void stopExploratory(SchedulerJobData job) {
+        final String expName = job.getExploratoryName();
+        final String user = job.getUser();
+        log.debug("Stopping exploratory {} for user {} by scheduler", expName, user);
+        exploratoryService.stop(systemUserService.create(user), expName);
+    }
+
+    private List<SchedulerJobData> getExploratorySchedulersForTerminating(OffsetDateTime now) {
+        return schedulerJobDAO.getExploratorySchedulerDataWithOneOfStatus(RUNNING, STOPPED)
+                .stream()
+                .filter(canSchedulerForTerminatingBeApplied(now))
+                .collect(Collectors.toList());
+    }
+
+    private List<SchedulerJobData> getComputationalSchedulersForTerminating(OffsetDateTime now) {
+        return schedulerJobDAO.getComputationalSchedulerDataWithOneOfStatus(RUNNING,
+                DataEngineType.SPARK_STANDALONE, STOPPED, RUNNING)
+                .stream()
+                .filter(canSchedulerForTerminatingBeApplied(now))
+                .collect(Collectors.toList());
+    }
+
+    private void startExploratory(SchedulerJobData schedulerJobData) {
+        final String user = schedulerJobData.getUser();
+        final String exploratoryName = schedulerJobData.getExploratoryName();
+        log.debug("Starting exploratory {} for user {} by scheduler", exploratoryName, user);
+        exploratoryService.start(systemUserService.create(user), exploratoryName);
+        if (schedulerJobData.getJobDTO().isSyncStartRequired()) {
+            log.trace("Starting computational for exploratory {} for user {} by scheduler", exploratoryName, user);
+            final DataEngineType sparkCluster = DataEngineType.SPARK_STANDALONE;
+            final List<UserComputationalResource> compToBeStarted =
+                    computationalDAO.findComputationalResourcesWithStatus(user, exploratoryName, STOPPED);
+
+            compToBeStarted
+                    .stream()
+                    .filter(compResource -> shouldClusterBeStarted(sparkCluster, compResource))
+                    .forEach(comp -> startSpark(user, exploratoryName, comp.getComputationalName()));
+        }
+    }
+
+    private void terminateExploratory(SchedulerJobData job) {
+        final String user = job.getUser();
+        final String expName = job.getExploratoryName();
+        log.debug("Terminating exploratory {} for user {} by scheduler", expName, user);
+        exploratoryService.terminate(systemUserService.create(user), expName);
+    }
+
+    private void startSpark(String user, String expName, String compName) {
+        log.debug("Starting exploratory {} computational {} for user {} by scheduler", expName, compName, user);
+        computationalService.startSparkCluster(systemUserService.create(user), expName, compName);
+    }
+
+    private boolean shouldClusterBeStarted(DataEngineType sparkCluster, UserComputationalResource compResource) {
+        return Objects.nonNull(compResource.getSchedulerData()) && compResource.getSchedulerData().isSyncStartRequired()
+                && compResource.getImageName().equals(getDockerImageName(sparkCluster));
+    }
+
+    /**
+     * Performs bulk updating operation with scheduler data for corresponding to exploratory Spark clusters.
+     * All these resources will obtain data which is equal to exploratory's except 'stopping' operation (it will be
+     * performed automatically with notebook stopping since Spark resources have such feature).
+     *
+     * @param user            user's name
+     * @param exploratoryName name of exploratory resource
+     * @param dto             scheduler job data.
+     */
+    private void shareSchedulerJobDataToSparkClusters(String user, String exploratoryName, SchedulerJobDTO dto) {
+        List<String> correspondingSparkClusters = computationalDAO.getComputationalResourcesWhereStatusIn(user,
+                singletonList(DataEngineType.SPARK_STANDALONE), exploratoryName,
+                STARTING, RUNNING, STOPPING, STOPPED);
+        SchedulerJobDTO dtoWithoutStopData = getSchedulerJobWithoutStopData(dto);
+        for (String sparkName : correspondingSparkClusters) {
+            log.debug("Updating computational resource {} affiliated with exploratory {} for user {} with new " +
+                    "scheduler job data {}...", sparkName, exploratoryName, user, dtoWithoutStopData);
+            computationalDAO.updateSchedulerDataForComputationalResource(user, exploratoryName, sparkName,
+                    dtoWithoutStopData);
+        }
+    }
+
+    private List<SchedulerJobData> getExploratorySchedulersForStopping(OffsetDateTime currentDateTime,
+                                                                       boolean checkInactivity) {
+
+        final Date clusterMaxInactivityAllowedDate =
+                from(LocalDateTime.now().minusMinutes(ALLOWED_INACTIVITY_MINUTES).atZone(systemDefault()).toInstant());
+        return schedulerJobDAO.getExploratorySchedulerWithStatusAndClusterLastActivityLessThan(RUNNING,
+                clusterMaxInactivityAllowedDate)
+                .stream()
+                .filter(schedulerJobData -> shouldSchedulerBeExecuted(schedulerJobData.getJobDTO(),
+                        currentDateTime, schedulerJobData.getJobDTO().getStopDaysRepeat(),
+                        schedulerJobData.getJobDTO().getEndTime()) ||
+                        (checkInactivity && exploratoryInactivityCondition(schedulerJobData)))
+                .collect(Collectors.toList());
+    }
+
+    private List<SchedulerJobData> getExploratorySchedulersForStarting(OffsetDateTime currentDateTime) {
+        return schedulerJobDAO.getExploratorySchedulerDataWithStatus(STOPPED)
+                .stream()
+                .filter(canSchedulerForStartingBeApplied(currentDateTime))
+                .collect(Collectors.toList());
+    }
+
+    private List<SchedulerJobData> getComputationalSchedulersForStarting(OffsetDateTime currentDateTime) {
+        return schedulerJobDAO
+                .getComputationalSchedulerDataWithOneOfStatus(RUNNING, DataEngineType.SPARK_STANDALONE, STOPPED)
+                .stream()
+                .filter(canSchedulerForStartingBeApplied(currentDateTime))
+                .collect(Collectors.toList());
+    }
+
+
+    private Predicate<SchedulerJobData> canSchedulerForStartingBeApplied(OffsetDateTime currentDateTime) {
+        return schedulerJobData -> shouldSchedulerBeExecuted(schedulerJobData.getJobDTO(),
+                currentDateTime, schedulerJobData.getJobDTO().getStartDaysRepeat(),
+                schedulerJobData.getJobDTO().getStartTime());
+    }
+
+    private Predicate<SchedulerJobData> canSchedulerForTerminatingBeApplied(OffsetDateTime currentDateTime) {
+        return schedulerJobData -> shouldBeTerminated(currentDateTime, schedulerJobData);
+    }
+
+    private boolean shouldBeTerminated(OffsetDateTime currentDateTime, SchedulerJobData schedulerJobData) {
+        final SchedulerJobDTO jobDTO = schedulerJobData.getJobDTO();
+        final LocalDateTime convertedCurrentTime = schedulerExecutionDate(jobDTO, currentDateTime);
+        return isSchedulerActive(schedulerJobData.getJobDTO(), convertedCurrentTime) && Objects.nonNull(jobDTO.getTerminateDateTime()) &&
+                convertedCurrentTime.equals(jobDTO.getTerminateDateTime());
+    }
+
+    private List<SchedulerJobData> getComputationalSchedulersForStopping(OffsetDateTime currentDateTime,
+                                                                         boolean checkInactivity) {
+        return schedulerJobDAO
+                .getComputationalSchedulerDataWithOneOfStatus(RUNNING, DataEngineType.SPARK_STANDALONE, RUNNING)
+                .stream()
+                .filter(schedulerJobData -> shouldSchedulerBeExecuted(schedulerJobData.getJobDTO(),
+                        currentDateTime, schedulerJobData.getJobDTO().getStopDaysRepeat(),
+                        schedulerJobData.getJobDTO().getEndTime()) ||
+                        (checkInactivity && computationalInactivityCondition(schedulerJobData)))
+                .collect(Collectors.toList());
+    }
+
+    private boolean computationalInactivityCondition(SchedulerJobData jobData) {
+        final SchedulerJobDTO schedulerData = jobData.getJobDTO();
+        return schedulerData.isCheckInactivityRequired() && computationalInactivityExceed(jobData, schedulerData);
+    }
+
+    private boolean computationalInactivityExceed(SchedulerJobData schedulerJobData, SchedulerJobDTO schedulerData) {
+        final String explName = schedulerJobData.getExploratoryName();
+        final String compName = schedulerJobData.getComputationalName();
+        final String user = schedulerJobData.getUser();
+        final UserComputationalResource c = computationalDAO.fetchComputationalFields(user, explName, compName);
+        final Long maxInactivity = schedulerData.getMaxInactivity();
+        return inactivityCondition(maxInactivity, c.getStatus(), c.getLastActivity());
+    }
+
+    private boolean exploratoryInactivityCondition(SchedulerJobData jobData) {
+        final SchedulerJobDTO schedulerData = jobData.getJobDTO();
+        return schedulerData.isCheckInactivityRequired() && exploratoryInactivityExceed(jobData, schedulerData);
+    }
+
+    private boolean exploratoryInactivityExceed(SchedulerJobData schedulerJobData, SchedulerJobDTO schedulerData) {
+        final String expName = schedulerJobData.getExploratoryName();
+        final String user = schedulerJobData.getUser();
+        final UserInstanceDTO userInstanceDTO = exploratoryDAO.fetchExploratoryFields(user, expName, true);
+        final boolean canBeStopped = userInstanceDTO.getResources()
+                .stream()
+                .map(UserComputationalResource::getStatus)
+                .map(UserInstanceStatus::of)
+                .noneMatch(status -> status.in(TERMINATING, CONFIGURING, CREATING, CREATING));
+        return canBeStopped && inactivityCondition(schedulerData.getMaxInactivity(), userInstanceDTO.getStatus(),
+                userInstanceDTO.getLastActivity());
+    }
+
+    private boolean inactivityCondition(Long maxInactivity, String status, LocalDateTime lastActivity) {
+        return UserInstanceStatus.RUNNING.toString().equals(status) &&
+                Optional.ofNullable(lastActivity)
+                        .map(la -> la.plusMinutes(maxInactivity).isBefore(LocalDateTime.now()))
+                        .orElse(Boolean.FALSE);
+    }
+
+    private void populateDefaultSchedulerValues(SchedulerJobDTO dto) {
+        if (Objects.isNull(dto.getBeginDate()) || StringUtils.isBlank(dto.getBeginDate().toString())) {
+            dto.setBeginDate(LocalDate.now());
+        }
+        if (Objects.isNull(dto.getTimeZoneOffset()) || StringUtils.isBlank(dto.getTimeZoneOffset().toString())) {
+            dto.setTimeZoneOffset(OffsetDateTime.now(systemDefault()).getOffset());
+        }
+    }
+
+    private void validateExploratoryStatus(String user, String exploratoryName) {
+        final UserInstanceDTO userInstance = exploratoryDAO.fetchExploratoryFields(user, exploratoryName);
+        validateResourceStatus(userInstance.getStatus());
+    }
+
+    private void validateComputationalStatus(String user, String exploratoryName, String computationalName) {
+        final UserComputationalResource computationalResource =
+                computationalDAO.fetchComputationalFields(user, exploratoryName, computationalName);
+        final String computationalStatus = computationalResource.getStatus();
+        validateResourceStatus(computationalStatus);
+    }
+
+    private void validateResourceStatus(String resourceStatus) {
+        final UserInstanceStatus status = UserInstanceStatus.of(resourceStatus);
+        if (Objects.isNull(status) || status.in(UserInstanceStatus.TERMINATED, TERMINATING,
+                UserInstanceStatus.FAILED)) {
+            throw new ResourceInappropriateStateException(String.format("Can not create/update scheduler for user " +
+                    "instance with status: %s", status));
+        }
+    }
+
+    private boolean shouldSchedulerBeExecuted(SchedulerJobDTO dto, OffsetDateTime dateTime, List<DayOfWeek> daysRepeat,
+                                              LocalTime time) {
+        LocalDateTime convertedDateTime = schedulerExecutionDate(dto, dateTime);
+
+        return isSchedulerActive(dto, convertedDateTime)
+                && daysRepeat.contains(convertedDateTime.toLocalDate().getDayOfWeek())
+                && convertedDateTime.toLocalTime().equals(time);
+    }
+
+    private boolean isSchedulerActive(SchedulerJobDTO dto, LocalDateTime convertedDateTime) {
+        return !convertedDateTime.toLocalDate().isBefore(dto.getBeginDate())
+                && finishDateAfterCurrentDate(dto, convertedDateTime);
+    }
+
+    private LocalDateTime schedulerExecutionDate(SchedulerJobDTO dto, OffsetDateTime dateTime) {
+        ZoneOffset zOffset = dto.getTimeZoneOffset();
+        OffsetDateTime roundedDateTime = OffsetDateTime.of(
+                dateTime.toLocalDate(),
+                LocalTime.of(dateTime.toLocalTime().getHour(), dateTime.toLocalTime().getMinute()),
+                dateTime.getOffset());
+
+        return ZonedDateTime.ofInstant(roundedDateTime.toInstant(),
+                ZoneId.ofOffset(TIMEZONE_PREFIX, zOffset)).toLocalDateTime();
+    }
+
+    private boolean finishDateAfterCurrentDate(SchedulerJobDTO dto, LocalDateTime currentDateTime) {
+        return Objects.isNull(dto.getFinishDate()) || !currentDateTime.toLocalDate().isAfter(dto.getFinishDate());
+    }
+
+    private SchedulerJobDTO getSchedulerJobWithoutStopData(SchedulerJobDTO dto) {
+        SchedulerJobDTO convertedDto = new SchedulerJobDTO();
+        convertedDto.setBeginDate(dto.getBeginDate());
+        convertedDto.setFinishDate(dto.getFinishDate());
+        convertedDto.setStartTime(dto.getStartTime());
+        convertedDto.setStartDaysRepeat(dto.getStartDaysRepeat());
+        convertedDto.setTerminateDateTime(dto.getTerminateDateTime());
+        convertedDto.setTimeZoneOffset(dto.getTimeZoneOffset());
+        convertedDto.setSyncStartRequired(dto.isSyncStartRequired());
+        return convertedDto;
+    }
 
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@dlab.apache.org
For additional commands, e-mail: commits-help@dlab.apache.org