You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by jk...@apache.org on 2021/05/26 16:33:49 UTC
[unomi] branch unomi-1.5.x updated: daily evaluate segments with
date expression (#307)
This is an automated email from the ASF dual-hosted git repository.
jkevan pushed a commit to branch unomi-1.5.x
in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/unomi-1.5.x by this push:
new 1e11541 daily evaluate segments with date expression (#307)
1e11541 is described below
commit 1e11541234fca3e07692d113dae8911abd23247a
Author: giladw <gw...@yotpo.com>
AuthorDate: Wed May 26 19:32:43 2021 +0300
daily evaluate segments with date expression (#307)
---
.../main/resources/etc/custom.system.properties | 2 ++
.../impl/scheduler/SchedulerServiceImpl.java | 12 ++++++++
.../services/impl/segments/SegmentServiceImpl.java | 34 ++++++++++++++++++++++
.../resources/OSGI-INF/blueprint/blueprint.xml | 2 ++
.../main/resources/org.apache.unomi.services.cfg | 3 ++
.../impl/scheduler/SchedulerServiceImplTest.java | 24 +++++++++++++++
6 files changed, 77 insertions(+)
diff --git a/package/src/main/resources/etc/custom.system.properties b/package/src/main/resources/etc/custom.system.properties
index 49552fb..051ce2f 100644
--- a/package/src/main/resources/etc/custom.system.properties
+++ b/package/src/main/resources/etc/custom.system.properties
@@ -159,6 +159,8 @@ org.apache.unomi.segment.update.batchSize=${env:UNOMI_SEGMENT_UPDATE_BATCHSIZE:-
org.apache.unomi.segment.batch.update=${env:UNOMI_SEGMENT_BATCH_PROFILE_UPDATE:-false}
# Send Profile Updated Event for every profile segment update
org.apache.unomi.segment.send.profile.update.event=${env: UNOMI_SEGMENT_SEND_PROFILE_UPDATE_EVENT:-true}
+# Daily hour once a day to recalculate segment with dateExpr condition
+org.apache.unomi.segment.daily.dateexpr.evaluation.hourutc=${env: UNOMI_SEGMENT_DAILY_DATEEXPR_EVALUATION_HOUR_UTC:-5}
# When performing segment updates, can retry an update in case of an error to a single profile
org.apache.unomi.services.segment.max.retries.update.profile.segment=${env:UNOMI_SEGMENT_UPDATE_MAX_RETRIES:-0}
# When performing retry of segment update after a request was failed, delay of requests
diff --git a/services/src/main/java/org/apache/unomi/services/impl/scheduler/SchedulerServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/scheduler/SchedulerServiceImpl.java
index 7e7f1cf..2311bee 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/scheduler/SchedulerServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/scheduler/SchedulerServiceImpl.java
@@ -21,6 +21,8 @@ import org.apache.unomi.api.services.SchedulerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.time.Duration;
+import java.time.ZonedDateTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -45,4 +47,14 @@ public class SchedulerServiceImpl implements SchedulerService {
public ScheduledExecutorService getScheduleExecutorService() {
return scheduler;
}
+
+ public static long getTimeDiffInSeconds(int hourInUtc, ZonedDateTime now) {
+ ZonedDateTime nextRun = now.withHour(hourInUtc).withMinute(0).withSecond(0);
+ if(now.compareTo(nextRun) > 0)
+ nextRun = nextRun.plusDays(1);
+
+ Duration duration = Duration.between(now, nextRun);
+ long initialDelay = duration.getSeconds();
+ return initialDelay;
+ }
}
diff --git a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
index 2f159b0..886ad8f 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
@@ -38,6 +38,8 @@ import org.apache.unomi.api.services.SegmentService;
import org.apache.unomi.persistence.spi.CustomObjectMapper;
import org.apache.unomi.persistence.spi.aggregate.TermsAggregate;
import org.apache.unomi.services.impl.AbstractServiceImpl;
+import org.apache.unomi.services.impl.scheduler.SchedulerServiceImpl;
+import org.apache.unomi.services.impl.scheduler.SchedulerServiceImpl;
import org.apache.unomi.services.impl.ParserHelper;
import org.apache.unomi.api.exceptions.BadSegmentConditionException;
import org.osgi.framework.Bundle;
@@ -51,8 +53,11 @@ import java.io.IOException;
import java.net.URL;
import java.security.MessageDigest;
import java.time.Duration;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
import java.util.*;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentService, SynchronousBundleListener {
@@ -76,6 +81,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
private boolean sendProfileUpdateEventForSegmentUpdate = true;
private int maximumIdsQueryCount = 5000;
private boolean pastEventsDisablePartitions = false;
+ private int dailyDateExprEvaluationHourUtc = 5;
public SegmentServiceImpl() {
logger.info("Initializing segment service...");
@@ -133,6 +139,10 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
this.sendProfileUpdateEventForSegmentUpdate = sendProfileUpdateEventForSegmentUpdate;
}
+ public void setDailyDateExprEvaluationHourUtc(int dailyDateExprEvaluationHourUtc) {
+ this.dailyDateExprEvaluationHourUtc = dailyDateExprEvaluationHourUtc;
+ }
+
public void postConstruct() {
logger.debug("postConstruct {" + bundleContext.getBundle() + "}");
loadPredefinedSegments(bundleContext);
@@ -1122,10 +1132,13 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
}
private void initializeTimer() {
+
TimerTask task = new TimerTask() {
@Override
public void run() {
try {
+ logger.info("running scheduled task to recalculate segments with pastEventCondition conditions");
+ long pastEventsTaskStartTime = System.currentTimeMillis();
for (Metadata metadata : rulesService.getRuleMetadatas()) {
Rule rule = rulesService.getRule(metadata.getId());
for (Action action : rule.getActions()) {
@@ -1137,6 +1150,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
}
}
}
+ logger.info("finished recalculate segments with pastEventCondition conditions in {}ms. ", System.currentTimeMillis() - pastEventsTaskStartTime);
} catch (Throwable t) {
logger.error("Error while updating profiles for past event conditions", t);
}
@@ -1156,6 +1170,26 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
}
};
schedulerService.getScheduleExecutorService().scheduleAtFixedRate(task, 0, segmentRefreshInterval, TimeUnit.MILLISECONDS);
+
+ task = new TimerTask() {
+ @Override
+ public void run() {
+ try {
+ long dateExprTaskStartTime = System.currentTimeMillis();
+ List<Segment> dateExprSegments = allSegments.stream().filter(segment ->
+ segment.getCondition().toString().contains("propertyValueDateExpr")).collect(Collectors.toList());
+ logger.info("running scheduled task to recalculate segments with DateExpr condition, found {} segments", dateExprSegments.size());
+ dateExprSegments.forEach(segment -> updateExistingProfilesForSegment(segment));
+ logger.info("finished recalculate segments with DateExpr conditions in {}ms. ", System.currentTimeMillis() - dateExprTaskStartTime);
+ } catch (Throwable t) {
+ logger.error("Error while updating profiles for DateExpr conditions", t);
+ }
+ }
+ };
+
+ long initialDelay = SchedulerServiceImpl.getTimeDiffInSeconds(dailyDateExprEvaluationHourUtc, ZonedDateTime.now(ZoneOffset.UTC));
+ logger.info("daily DateExpr segments will run at fixed rate, initialDelay={}, taskExecutionPeriod={}, ", initialDelay, TimeUnit.DAYS.toSeconds(1));
+ schedulerService.getScheduleExecutorService().scheduleAtFixedRate(task, initialDelay, TimeUnit.DAYS.toSeconds(1), TimeUnit.SECONDS);
}
public void setTaskExecutionPeriod(long taskExecutionPeriod) {
diff --git a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 3e7f10b..5a58f9b 100644
--- a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -39,6 +39,7 @@
<cm:property name="segment.recalculate.period" value="1"/>
<cm:property name="segment.batch.update" value="false"/>
<cm:property name="segment.send.profile.update.event" value="true"/>
+ <cm:property name="segment.daily.dateexpr.evaluation.hourutc" value="5"/>
<cm:property name="rules.refresh.interval" value="1000"/>
<cm:property name="rules.statistics.refresh.interval" value="10000"/>
</cm:default-properties>
@@ -182,6 +183,7 @@
<property name="secondsDelayForRetryUpdateProfileSegment" value="${services.segment.retry.update.segment.seconds.delay}" />
<property name="batchSegmentProfileUpdate" value="${services.segment.batch.update}" />
<property name="sendProfileUpdateEventForSegmentUpdate" value="${services.segment.send.profile.update.event}" />
+ <property name="dailyDateExprEvaluationHourUtc" value="${services.segment.daily.dateexpr.evaluation.hourutc}" />
</bean>
<service id="segmentService" ref="segmentServiceImpl">
diff --git a/services/src/main/resources/org.apache.unomi.services.cfg b/services/src/main/resources/org.apache.unomi.services.cfg
index 37c9881..b54ca81 100644
--- a/services/src/main/resources/org.apache.unomi.services.cfg
+++ b/services/src/main/resources/org.apache.unomi.services.cfg
@@ -46,6 +46,9 @@ segment.batch.update=${org.apache.unomi.segment.batch.update:-false}
# Send Profile Updated Event for every profile segment update
segment.send.profile.update.event=${org.apache.unomi.segment.send.profile.update.event:-true}
+# Daily hour once a day to recalculate segment with dateExpr condition
+segment.daily.dateexpr.evaluation.hourutc=${org.apache.unomi.segment.daily.dateexpr.evaluation.hourutc:-5}
+
# The interval in milliseconds to use to reload the definitions (condition types and action types)
definitions.refresh.interval=${org.apache.unomi.definitions.refresh.interval:-10000}
diff --git a/services/src/test/java/org/apache/unomi/services/impl/scheduler/SchedulerServiceImplTest.java b/services/src/test/java/org/apache/unomi/services/impl/scheduler/SchedulerServiceImplTest.java
new file mode 100644
index 0000000..b095b55
--- /dev/null
+++ b/services/src/test/java/org/apache/unomi/services/impl/scheduler/SchedulerServiceImplTest.java
@@ -0,0 +1,24 @@
+package org.apache.unomi.services.impl.scheduler;
+
+import org.junit.Test;
+
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+
+import static org.junit.Assert.*;
+
+public class SchedulerServiceImplTest {
+
+ @Test
+ public void getTimeDiffInSeconds_whenGiveHourOfDay_shouldReturnDifferenceInSeconds(){
+ //Arrange
+ SchedulerServiceImpl service = new SchedulerServiceImpl();
+ int hourToRunInUtc = 11;
+ ZonedDateTime timeNowInUtc = ZonedDateTime.of(LocalDateTime.parse("2020-01-13T10:00:00"), ZoneOffset.UTC);
+ //Act
+ long seconds = service.getTimeDiffInSeconds(hourToRunInUtc, timeNowInUtc);
+ //Assert
+ assertEquals(3600, seconds);
+ }
+}
\ No newline at end of file