You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dlab.apache.org by bh...@apache.org on 2019/04/10 13:03:41 UTC

[incubator-dlab] branch feature-DLAB-453-1 created (now e16a99c)

This is an automated email from the ASF dual-hosted git repository.

bhliva pushed a change to branch feature-DLAB-453-1
in repository https://gitbox.apache.org/repos/asf/incubator-dlab.git.


      at e16a99c  DLAB-453 added possibility to terminate computational by scheduler

This branch includes the following new commits:

     new e16a99c  DLAB-453 added possibility to terminate computational by scheduler

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@dlab.apache.org
For additional commands, e-mail: commits-help@dlab.apache.org


[incubator-dlab] 01/01: DLAB-453 added possibility to terminate computational by scheduler

Posted by bh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bhliva pushed a commit to branch feature-DLAB-453-1
in repository https://gitbox.apache.org/repos/asf/incubator-dlab.git

commit e16a99c736ced5f0019645ddcd2eca87d89b653e
Author: bhliva <bo...@epam.com>
AuthorDate: Wed Apr 10 12:41:25 2019 +0300

    DLAB-453 added possibility to terminate computational by scheduler
---
 services/self-service/self-service.yml             |  3 ++
 .../epam/dlab/backendapi/dao/SchedulerJobDAO.java  | 42 ++++++++++++----------
 .../computational/StopComputationalJob.java        |  2 --
 ...onalJob.java => TerminateComputationalJob.java} | 19 +++++-----
 .../service/impl/SchedulerJobServiceImpl.java      | 29 +++++++--------
 .../validation/SchedulerJobDTOValidator.java       |  2 +-
 .../service/impl/SchedulerJobServiceImplTest.java  | 17 ++++-----
 7 files changed, 54 insertions(+), 60 deletions(-)

diff --git a/services/self-service/self-service.yml b/services/self-service/self-service.yml
index c001403..e60207a 100644
--- a/services/self-service/self-service.yml
+++ b/services/self-service/self-service.yml
@@ -150,6 +150,9 @@ schedulers:
   stopExploratoryScheduler:
     enabled: true
     cron: "*/20 * * ? * * *"
+  terminateComputationalScheduler:
+    enabled: true
+    cron: "*/20 * * ? * * *"
   checkQuoteScheduler:
     enabled: true
     cron: "0 0 * ? * * *"
diff --git a/services/self-service/src/main/java/com/epam/dlab/backendapi/dao/SchedulerJobDAO.java b/services/self-service/src/main/java/com/epam/dlab/backendapi/dao/SchedulerJobDAO.java
index 37c14b7..9664852 100644
--- a/services/self-service/src/main/java/com/epam/dlab/backendapi/dao/SchedulerJobDAO.java
+++ b/services/self-service/src/main/java/com/epam/dlab/backendapi/dao/SchedulerJobDAO.java
@@ -31,10 +31,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.bson.Document;
 import org.bson.conversions.Bson;
 
-import java.util.Arrays;
-import java.util.List;
-import java.util.Optional;
-import java.util.Set;
+import java.util.*;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -43,6 +40,7 @@ import static com.epam.dlab.backendapi.dao.ComputationalDAO.COMPUTATIONAL_NAME;
 import static com.epam.dlab.backendapi.dao.ComputationalDAO.IMAGE;
 import static com.epam.dlab.backendapi.dao.ExploratoryDAO.*;
 import static com.epam.dlab.backendapi.dao.MongoCollections.USER_INSTANCES;
+import static com.epam.dlab.dto.base.DataEngineType.fromDockerImageName;
 import static com.mongodb.client.model.Filters.*;
 import static com.mongodb.client.model.Projections.*;
 import static java.util.stream.Collectors.toList;
@@ -135,18 +133,27 @@ public class SchedulerJobDAO extends BaseDAO {
 	public List<SchedulerJobData> getComputationalSchedulerDataWithOneOfStatus(UserInstanceStatus exploratoryStatus,
 																			   DataEngineType dataEngineType,
 																			   UserInstanceStatus... statuses) {
-		final Bson computationalSchedulerCondition = Filters.elemMatch(COMPUTATIONAL_RESOURCES,
-				and(schedulerNotNullCondition(), eq(CHECK_INACTIVITY_FLAG, false)));
-		FindIterable<Document> userInstances = find(USER_INSTANCES,
-				and(eq(STATUS, exploratoryStatus.toString()), computationalSchedulerCondition),
-				fields(excludeId(), include(USER, EXPLORATORY_NAME, COMPUTATIONAL_RESOURCES + ".$")));
-
-		return stream(userInstances)
+		return stream(computationalResourcesWithScheduler(exploratoryStatus))
 				.map(doc -> computationalSchedulerDataStream(doc, dataEngineType, statuses))
 				.flatMap(Function.identity())
 				.collect(toList());
 	}
 
+	public List<SchedulerJobData> getComputationalSchedulerDataWithOneOfStatus(UserInstanceStatus exploratoryStatus, UserInstanceStatus... statuses) {
+		return stream(computationalResourcesWithScheduler(exploratoryStatus))
+				.map(doc -> computationalSchedulerData(doc, statuses).map(compResource -> toSchedulerData(doc, compResource)))
+				.flatMap(Function.identity())
+				.collect(toList());
+	}
+
+	private FindIterable<Document> computationalResourcesWithScheduler(UserInstanceStatus exploratoryStatus) {
+		final Bson computationalSchedulerCondition = Filters.elemMatch(COMPUTATIONAL_RESOURCES,
+				and(schedulerNotNullCondition(), eq(CHECK_INACTIVITY_FLAG, false)));
+		return find(USER_INSTANCES,
+				and(eq(STATUS, exploratoryStatus.toString()), computationalSchedulerCondition),
+				fields(excludeId(), include(USER, EXPLORATORY_NAME, COMPUTATIONAL_RESOURCES)));
+	}
+
 	public void removeScheduler(String user, String exploratory) {
 		updateOne(USER_INSTANCES, and(eq(USER, user), eq(EXPLORATORY_NAME, exploratory)),
 				unset(SCHEDULER_DATA, StringUtils.EMPTY));
@@ -169,8 +176,8 @@ public class SchedulerJobDAO extends BaseDAO {
 
 	private Stream<SchedulerJobData> computationalSchedulerDataStream(Document doc, DataEngineType computationalType,
 																	  UserInstanceStatus... computationalStatuses) {
-		return computationalSchedulerData(doc, computationalType, computationalStatuses)
-				.stream()
+		return computationalSchedulerData(doc, computationalStatuses)
+				.filter(compResource -> fromDockerImageName(compResource.getString(IMAGE)) == computationalType)
 				.map(compResource -> toSchedulerData(doc, compResource));
 	}
 
@@ -184,17 +191,14 @@ public class SchedulerJobDAO extends BaseDAO {
 	}
 
 	@SuppressWarnings("unchecked")
-	private List<Document> computationalSchedulerData(Document doc, DataEngineType computationalType,
-													  UserInstanceStatus... computationalStatuses) {
+	private Stream<Document> computationalSchedulerData(Document doc, UserInstanceStatus... computationalStatuses) {
 		final Set<String> statusSet = Arrays.stream(computationalStatuses)
 				.map(UserInstanceStatus::toString)
 				.collect(Collectors.toSet());
 		return ((List<Document>) doc.get(COMPUTATIONAL_RESOURCES))
 				.stream()
-				.filter(compResource ->
-						DataEngineType.fromDockerImageName(compResource.getString(IMAGE)) ==
-								computationalType && statusSet.contains(compResource.getString(STATUS)))
-				.collect(toList());
+				.filter(compResource -> Objects.nonNull(compResource.get(SCHEDULER_DATA)) &&
+						statusSet.contains(compResource.getString(STATUS)));
 	}
 }
 
diff --git a/services/self-service/src/main/java/com/epam/dlab/backendapi/schedulers/computational/StopComputationalJob.java b/services/self-service/src/main/java/com/epam/dlab/backendapi/schedulers/computational/StopComputationalJob.java
index 95a7098..3825d08 100644
--- a/services/self-service/src/main/java/com/epam/dlab/backendapi/schedulers/computational/StopComputationalJob.java
+++ b/services/self-service/src/main/java/com/epam/dlab/backendapi/schedulers/computational/StopComputationalJob.java
@@ -22,7 +22,6 @@ package com.epam.dlab.backendapi.schedulers.computational;
 import com.epam.dlab.backendapi.schedulers.internal.Scheduled;
 import com.epam.dlab.backendapi.service.SchedulerJobService;
 import com.google.inject.Inject;
-import lombok.extern.slf4j.Slf4j;
 import org.quartz.Job;
 import org.quartz.JobExecutionContext;
 
@@ -30,7 +29,6 @@ import org.quartz.JobExecutionContext;
  * There realized integration with Quartz scheduler framework and defined stop computational resource scheduler job
  * which executes every time specified.
  */
-@Slf4j
 @Scheduled("stopComputationalScheduler")
 public class StopComputationalJob implements Job {
 
diff --git a/services/self-service/src/main/java/com/epam/dlab/backendapi/schedulers/computational/StopComputationalJob.java b/services/self-service/src/main/java/com/epam/dlab/backendapi/schedulers/computational/TerminateComputationalJob.java
similarity index 70%
copy from services/self-service/src/main/java/com/epam/dlab/backendapi/schedulers/computational/StopComputationalJob.java
copy to services/self-service/src/main/java/com/epam/dlab/backendapi/schedulers/computational/TerminateComputationalJob.java
index 95a7098..765a6e8 100644
--- a/services/self-service/src/main/java/com/epam/dlab/backendapi/schedulers/computational/StopComputationalJob.java
+++ b/services/self-service/src/main/java/com/epam/dlab/backendapi/schedulers/computational/TerminateComputationalJob.java
@@ -22,23 +22,20 @@ package com.epam.dlab.backendapi.schedulers.computational;
 import com.epam.dlab.backendapi.schedulers.internal.Scheduled;
 import com.epam.dlab.backendapi.service.SchedulerJobService;
 import com.google.inject.Inject;
-import lombok.extern.slf4j.Slf4j;
 import org.quartz.Job;
 import org.quartz.JobExecutionContext;
 
-/**
- * There realized integration with Quartz scheduler framework and defined stop computational resource scheduler job
- * which executes every time specified.
- */
-@Slf4j
-@Scheduled("stopComputationalScheduler")
-public class StopComputationalJob implements Job {
+@Scheduled("terminateComputationalScheduler")
+public class TerminateComputationalJob implements Job {
+	private final SchedulerJobService schedulerJobService;
 
 	@Inject
-	private SchedulerJobService schedulerJobService;
+	public TerminateComputationalJob(SchedulerJobService schedulerJobService) {
+		this.schedulerJobService = schedulerJobService;
+	}
 
 	@Override
-	public void execute(JobExecutionContext jobExecutionContext) {
-		schedulerJobService.stopComputationalByScheduler();
+	public void execute(JobExecutionContext context) {
+		schedulerJobService.terminateComputationalByScheduler();
 	}
 }
diff --git a/services/self-service/src/main/java/com/epam/dlab/backendapi/service/impl/SchedulerJobServiceImpl.java b/services/self-service/src/main/java/com/epam/dlab/backendapi/service/impl/SchedulerJobServiceImpl.java
index c22ff5a..6222093 100644
--- a/services/self-service/src/main/java/com/epam/dlab/backendapi/service/impl/SchedulerJobServiceImpl.java
+++ b/services/self-service/src/main/java/com/epam/dlab/backendapi/service/impl/SchedulerJobServiceImpl.java
@@ -47,13 +47,13 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 
 import java.time.*;
+import java.time.temporal.ChronoUnit;
 import java.util.List;
 import java.util.Objects;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static com.epam.dlab.backendapi.dao.SchedulerJobDAO.TIMEZONE_PREFIX;
 import static com.epam.dlab.constants.ServiceConsts.PROVISIONING_SERVICE_NAME;
 import static com.epam.dlab.dto.UserInstanceStatus.*;
 import static com.epam.dlab.dto.base.DataEngineType.getDockerImageName;
@@ -240,8 +240,7 @@ public class SchedulerJobServiceImpl implements SchedulerJobService {
 	}
 
 	private List<SchedulerJobData> getComputationalSchedulersForTerminating(OffsetDateTime now) {
-		return schedulerJobDAO.getComputationalSchedulerDataWithOneOfStatus(RUNNING,
-				DataEngineType.SPARK_STANDALONE, STOPPED, RUNNING)
+		return schedulerJobDAO.getComputationalSchedulerDataWithOneOfStatus(RUNNING, STOPPED, RUNNING)
 				.stream()
 				.filter(canSchedulerForTerminatingBeApplied(now))
 				.collect(Collectors.toList());
@@ -344,9 +343,11 @@ public class SchedulerJobServiceImpl implements SchedulerJobService {
 
 	private boolean shouldBeTerminated(OffsetDateTime currentDateTime, SchedulerJobData schedulerJobData) {
 		final SchedulerJobDTO jobDTO = schedulerJobData.getJobDTO();
-		final LocalDateTime convertedCurrentTime = schedulerExecutionDate(jobDTO, currentDateTime);
-		return isSchedulerActive(schedulerJobData.getJobDTO(), convertedCurrentTime) && Objects.nonNull(jobDTO.getTerminateDateTime()) &&
-				convertedCurrentTime.equals(jobDTO.getTerminateDateTime());
+		final ZoneOffset timeZoneOffset = jobDTO.getTimeZoneOffset();
+		final LocalDateTime convertedCurrentTime = localDateTimeAtZone(currentDateTime, timeZoneOffset);
+		final LocalDateTime terminateDateTime = jobDTO.getTerminateDateTime();
+		return Objects.nonNull(terminateDateTime) && isSchedulerActive(jobDTO, convertedCurrentTime) &&
+				convertedCurrentTime.equals(terminateDateTime.atOffset(timeZoneOffset).toLocalDateTime());
 	}
 
 	private List<SchedulerJobData> getComputationalSchedulersForStopping(OffsetDateTime currentDateTime) {
@@ -389,7 +390,7 @@ public class SchedulerJobServiceImpl implements SchedulerJobService {
 
 	private boolean shouldSchedulerBeExecuted(SchedulerJobDTO dto, OffsetDateTime dateTime, List<DayOfWeek> daysRepeat,
 											  LocalTime time) {
-		LocalDateTime convertedDateTime = schedulerExecutionDate(dto, dateTime);
+		LocalDateTime convertedDateTime = localDateTimeAtZone(dateTime, dto.getTimeZoneOffset());
 
 		return isSchedulerActive(dto, convertedDateTime)
 				&& daysRepeat.contains(convertedDateTime.toLocalDate().getDayOfWeek())
@@ -401,15 +402,11 @@ public class SchedulerJobServiceImpl implements SchedulerJobService {
 				&& finishDateAfterCurrentDate(dto, convertedDateTime);
 	}
 
-	private LocalDateTime schedulerExecutionDate(SchedulerJobDTO dto, OffsetDateTime dateTime) {
-		ZoneOffset zOffset = dto.getTimeZoneOffset();
-		OffsetDateTime roundedDateTime = OffsetDateTime.of(
-				dateTime.toLocalDate(),
-				LocalTime.of(dateTime.toLocalTime().getHour(), dateTime.toLocalTime().getMinute()),
-				dateTime.getOffset());
-
-		return ZonedDateTime.ofInstant(roundedDateTime.toInstant(),
-				ZoneId.ofOffset(TIMEZONE_PREFIX, zOffset)).toLocalDateTime();
+	private LocalDateTime localDateTimeAtZone(OffsetDateTime dateTime, ZoneOffset timeZoneOffset) {
+		return dateTime.atZoneSameInstant(ZoneOffset.UTC)
+				.truncatedTo(ChronoUnit.MINUTES)
+				.withZoneSameInstant(timeZoneOffset)
+				.toLocalDateTime();
 	}
 
 	private boolean finishDateAfterCurrentDate(SchedulerJobDTO dto, LocalDateTime currentDateTime) {
diff --git a/services/self-service/src/main/java/com/epam/dlab/backendapi/validation/SchedulerJobDTOValidator.java b/services/self-service/src/main/java/com/epam/dlab/backendapi/validation/SchedulerJobDTOValidator.java
index bfd78af..87dc55e 100644
--- a/services/self-service/src/main/java/com/epam/dlab/backendapi/validation/SchedulerJobDTOValidator.java
+++ b/services/self-service/src/main/java/com/epam/dlab/backendapi/validation/SchedulerJobDTOValidator.java
@@ -34,7 +34,7 @@ public class SchedulerJobDTOValidator implements ConstraintValidator<SchedulerJo
 
 	@Override
 	public boolean isValid(SchedulerJobDTO schedulerJobDTO, ConstraintValidatorContext constraintValidatorContext) {
-		if (!schedulerJobDTO.isCheckInactivityRequired()) {
+		if (!schedulerJobDTO.isCheckInactivityRequired() && Objects.isNull(schedulerJobDTO.getTerminateDateTime())) {
 			return !schedulerJobDTO.getStartDaysRepeat().isEmpty() || !schedulerJobDTO.getStopDaysRepeat().isEmpty();
 		} else if (schedulerJobDTO.isCheckInactivityRequired() && Objects.isNull(schedulerJobDTO.getMaxInactivity())) {
 			constraintValidatorContext.disableDefaultConstraintViolation();
diff --git a/services/self-service/src/test/java/com/epam/dlab/backendapi/service/impl/SchedulerJobServiceImplTest.java b/services/self-service/src/test/java/com/epam/dlab/backendapi/service/impl/SchedulerJobServiceImplTest.java
index 45ba862..161b72d 100644
--- a/services/self-service/src/test/java/com/epam/dlab/backendapi/service/impl/SchedulerJobServiceImplTest.java
+++ b/services/self-service/src/test/java/com/epam/dlab/backendapi/service/impl/SchedulerJobServiceImplTest.java
@@ -885,15 +885,14 @@ public class SchedulerJobServiceImplTest {
 				, terminateDateTime, false, USER, LocalTime.now().truncatedTo(ChronoUnit.MINUTES)
 		);
 		when(schedulerJobDAO.getComputationalSchedulerDataWithOneOfStatus(any(UserInstanceStatus.class),
-				any(DataEngineType.class), anyVararg())).thenReturn(singletonList(schedulerJobData));
+				anyVararg())).thenReturn(singletonList(schedulerJobData));
 		when(systemUserService.create(anyString())).thenReturn(getUserInfo());
 
 		schedulerJobService.terminateComputationalByScheduler();
 
 		verify(systemUserService).create(USER);
 		verify(schedulerJobDAO)
-				.getComputationalSchedulerDataWithOneOfStatus(RUNNING, DataEngineType.SPARK_STANDALONE, STOPPED,
-						RUNNING);
+				.getComputationalSchedulerDataWithOneOfStatus(RUNNING, STOPPED, RUNNING);
 		verify(computationalService).terminateComputational(refEq(getUserInfo()), eq(EXPLORATORY_NAME),
 				eq(COMPUTATIONAL_NAME));
 		verifyNoMoreInteractions(systemUserService, schedulerJobDAO, computationalService);
@@ -906,8 +905,7 @@ public class SchedulerJobServiceImplTest {
 
 		schedulerJobService.terminateComputationalByScheduler();
 
-		verify(schedulerJobDAO).getComputationalSchedulerDataWithOneOfStatus(RUNNING, DataEngineType.SPARK_STANDALONE,
-				STOPPED, RUNNING);
+		verify(schedulerJobDAO).getComputationalSchedulerDataWithOneOfStatus(RUNNING, STOPPED, RUNNING);
 		verifyNoMoreInteractions(schedulerJobDAO);
 		verifyZeroInteractions(systemUserService, computationalService);
 	}
@@ -926,8 +924,7 @@ public class SchedulerJobServiceImplTest {
 
 		schedulerJobService.terminateComputationalByScheduler();
 
-		verify(schedulerJobDAO).getComputationalSchedulerDataWithOneOfStatus(RUNNING, DataEngineType.SPARK_STANDALONE,
-				STOPPED, RUNNING);
+		verify(schedulerJobDAO).getComputationalSchedulerDataWithOneOfStatus(RUNNING, STOPPED, RUNNING);
 		verifyNoMoreInteractions(systemUserService, schedulerJobDAO, computationalService);
 	}
 
@@ -947,8 +944,7 @@ public class SchedulerJobServiceImplTest {
 
 		schedulerJobService.terminateComputationalByScheduler();
 
-		verify(schedulerJobDAO).getComputationalSchedulerDataWithOneOfStatus(RUNNING, DataEngineType.SPARK_STANDALONE,
-				STOPPED, RUNNING);
+		verify(schedulerJobDAO).getComputationalSchedulerDataWithOneOfStatus(RUNNING, STOPPED, RUNNING);
 		verifyNoMoreInteractions(systemUserService, schedulerJobDAO, computationalService);
 	}
 
@@ -969,8 +965,7 @@ public class SchedulerJobServiceImplTest {
 		schedulerJobService.terminateComputationalByScheduler();
 
 		verify(schedulerJobDAO)
-				.getComputationalSchedulerDataWithOneOfStatus(RUNNING, DataEngineType.SPARK_STANDALONE, STOPPED,
-						RUNNING);
+				.getComputationalSchedulerDataWithOneOfStatus(RUNNING, STOPPED, RUNNING);
 		verifyNoMoreInteractions(systemUserService, schedulerJobDAO, computationalService);
 	}
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@dlab.apache.org
For additional commands, e-mail: commits-help@dlab.apache.org