You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by jk...@apache.org on 2021/05/26 16:33:49 UTC

[unomi] branch unomi-1.5.x updated: daily evaluate segments with date expression (#307)

This is an automated email from the ASF dual-hosted git repository.

jkevan pushed a commit to branch unomi-1.5.x
in repository https://gitbox.apache.org/repos/asf/unomi.git


The following commit(s) were added to refs/heads/unomi-1.5.x by this push:
     new 1e11541  daily evaluate segments with date expression (#307)
1e11541 is described below

commit 1e11541234fca3e07692d113dae8911abd23247a
Author: giladw <gw...@yotpo.com>
AuthorDate: Wed May 26 19:32:43 2021 +0300

    daily evaluate segments with date expression (#307)
---
 .../main/resources/etc/custom.system.properties    |  2 ++
 .../impl/scheduler/SchedulerServiceImpl.java       | 12 ++++++++
 .../services/impl/segments/SegmentServiceImpl.java | 34 ++++++++++++++++++++++
 .../resources/OSGI-INF/blueprint/blueprint.xml     |  2 ++
 .../main/resources/org.apache.unomi.services.cfg   |  3 ++
 .../impl/scheduler/SchedulerServiceImplTest.java   | 24 +++++++++++++++
 6 files changed, 77 insertions(+)

diff --git a/package/src/main/resources/etc/custom.system.properties b/package/src/main/resources/etc/custom.system.properties
index 49552fb..051ce2f 100644
--- a/package/src/main/resources/etc/custom.system.properties
+++ b/package/src/main/resources/etc/custom.system.properties
@@ -159,6 +159,8 @@ org.apache.unomi.segment.update.batchSize=${env:UNOMI_SEGMENT_UPDATE_BATCHSIZE:-
 org.apache.unomi.segment.batch.update=${env:UNOMI_SEGMENT_BATCH_PROFILE_UPDATE:-false}
 # Send Profile Updated Event for every profile segment update
 org.apache.unomi.segment.send.profile.update.event=${env: UNOMI_SEGMENT_SEND_PROFILE_UPDATE_EVENT:-true}
+# Daily hour once a day to recalculate segment with dateExpr condition
+org.apache.unomi.segment.daily.dateexpr.evaluation.hourutc=${env: UNOMI_SEGMENT_DAILY_DATEEXPR_EVALUATION_HOUR_UTC:-5}
 # When performing segment updates, can retry an update in case of an error to a single profile
 org.apache.unomi.services.segment.max.retries.update.profile.segment=${env:UNOMI_SEGMENT_UPDATE_MAX_RETRIES:-0}
 # When performing retry of segment update after a request was failed, delay of requests
diff --git a/services/src/main/java/org/apache/unomi/services/impl/scheduler/SchedulerServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/scheduler/SchedulerServiceImpl.java
index 7e7f1cf..2311bee 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/scheduler/SchedulerServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/scheduler/SchedulerServiceImpl.java
@@ -21,6 +21,8 @@ import org.apache.unomi.api.services.SchedulerService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Duration;
+import java.time.ZonedDateTime;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -45,4 +47,14 @@ public class SchedulerServiceImpl implements SchedulerService {
     public ScheduledExecutorService getScheduleExecutorService() {
         return scheduler;
     }
+
+    public static long getTimeDiffInSeconds(int hourInUtc, ZonedDateTime now) {
+        ZonedDateTime nextRun = now.withHour(hourInUtc).withMinute(0).withSecond(0);
+        if(now.compareTo(nextRun) > 0)
+            nextRun = nextRun.plusDays(1);
+
+        Duration duration = Duration.between(now, nextRun);
+        long initialDelay = duration.getSeconds();
+        return initialDelay;
+    }
 }
diff --git a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
index 2f159b0..886ad8f 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
@@ -38,6 +38,8 @@ import org.apache.unomi.api.services.SegmentService;
 import org.apache.unomi.persistence.spi.CustomObjectMapper;
 import org.apache.unomi.persistence.spi.aggregate.TermsAggregate;
 import org.apache.unomi.services.impl.AbstractServiceImpl;
+import org.apache.unomi.services.impl.scheduler.SchedulerServiceImpl;
+import org.apache.unomi.services.impl.scheduler.SchedulerServiceImpl;
 import org.apache.unomi.services.impl.ParserHelper;
 import org.apache.unomi.api.exceptions.BadSegmentConditionException;
 import org.osgi.framework.Bundle;
@@ -51,8 +53,11 @@ import java.io.IOException;
 import java.net.URL;
 import java.security.MessageDigest;
 import java.time.Duration;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentService, SynchronousBundleListener {
 
@@ -76,6 +81,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
     private boolean sendProfileUpdateEventForSegmentUpdate = true;
     private int maximumIdsQueryCount = 5000;
     private boolean pastEventsDisablePartitions = false;
+    private int dailyDateExprEvaluationHourUtc = 5;
 
     public SegmentServiceImpl() {
         logger.info("Initializing segment service...");
@@ -133,6 +139,10 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
         this.sendProfileUpdateEventForSegmentUpdate = sendProfileUpdateEventForSegmentUpdate;
     }
 
+    public void setDailyDateExprEvaluationHourUtc(int dailyDateExprEvaluationHourUtc) {
+        this.dailyDateExprEvaluationHourUtc = dailyDateExprEvaluationHourUtc;
+    }
+
     public void postConstruct() {
         logger.debug("postConstruct {" + bundleContext.getBundle() + "}");
         loadPredefinedSegments(bundleContext);
@@ -1122,10 +1132,13 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
     }
 
     private void initializeTimer() {
+
         TimerTask task = new TimerTask() {
             @Override
             public void run() {
                 try {
+                    logger.info("running scheduled task to recalculate segments with pastEventCondition conditions");
+                    long pastEventsTaskStartTime = System.currentTimeMillis();
                     for (Metadata metadata : rulesService.getRuleMetadatas()) {
                         Rule rule = rulesService.getRule(metadata.getId());
                         for (Action action : rule.getActions()) {
@@ -1137,6 +1150,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
                             }
                         }
                     }
+                    logger.info("finished recalculate segments with pastEventCondition conditions in {}ms. ", System.currentTimeMillis() - pastEventsTaskStartTime);
                 } catch (Throwable t) {
                     logger.error("Error while updating profiles for past event conditions", t);
                 }
@@ -1156,6 +1170,26 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
             }
         };
         schedulerService.getScheduleExecutorService().scheduleAtFixedRate(task, 0, segmentRefreshInterval, TimeUnit.MILLISECONDS);
+
+        task = new TimerTask() {
+            @Override
+            public void run() {
+                try {
+                    long dateExprTaskStartTime = System.currentTimeMillis();
+                    List<Segment> dateExprSegments = allSegments.stream().filter(segment ->
+                            segment.getCondition().toString().contains("propertyValueDateExpr")).collect(Collectors.toList());
+                    logger.info("running scheduled task to recalculate segments with DateExpr condition, found {} segments", dateExprSegments.size());
+                    dateExprSegments.forEach(segment -> updateExistingProfilesForSegment(segment));
+                    logger.info("finished recalculate segments with DateExpr conditions in {}ms. ", System.currentTimeMillis() - dateExprTaskStartTime);
+                } catch (Throwable t) {
+                    logger.error("Error while updating profiles for DateExpr conditions", t);
+                }
+            }
+        };
+
+        long initialDelay = SchedulerServiceImpl.getTimeDiffInSeconds(dailyDateExprEvaluationHourUtc, ZonedDateTime.now(ZoneOffset.UTC));
+        logger.info("daily DateExpr segments will run at fixed rate, initialDelay={}, taskExecutionPeriod={}, ", initialDelay, TimeUnit.DAYS.toSeconds(1));
+        schedulerService.getScheduleExecutorService().scheduleAtFixedRate(task, initialDelay,  TimeUnit.DAYS.toSeconds(1), TimeUnit.SECONDS);
     }
 
     public void setTaskExecutionPeriod(long taskExecutionPeriod) {
diff --git a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 3e7f10b..5a58f9b 100644
--- a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -39,6 +39,7 @@
             <cm:property name="segment.recalculate.period" value="1"/>
             <cm:property name="segment.batch.update" value="false"/>
             <cm:property name="segment.send.profile.update.event" value="true"/>
+            <cm:property name="segment.daily.dateexpr.evaluation.hourutc" value="5"/>
             <cm:property name="rules.refresh.interval" value="1000"/>
             <cm:property name="rules.statistics.refresh.interval" value="10000"/>
         </cm:default-properties>
@@ -182,6 +183,7 @@
         <property name="secondsDelayForRetryUpdateProfileSegment" value="${services.segment.retry.update.segment.seconds.delay}" />
         <property name="batchSegmentProfileUpdate" value="${services.segment.batch.update}" />
         <property name="sendProfileUpdateEventForSegmentUpdate" value="${services.segment.send.profile.update.event}" />
+        <property name="dailyDateExprEvaluationHourUtc" value="${services.segment.daily.dateexpr.evaluation.hourutc}" />
 
     </bean>
     <service id="segmentService" ref="segmentServiceImpl">
diff --git a/services/src/main/resources/org.apache.unomi.services.cfg b/services/src/main/resources/org.apache.unomi.services.cfg
index 37c9881..b54ca81 100644
--- a/services/src/main/resources/org.apache.unomi.services.cfg
+++ b/services/src/main/resources/org.apache.unomi.services.cfg
@@ -46,6 +46,9 @@ segment.batch.update=${org.apache.unomi.segment.batch.update:-false}
 # Send Profile Updated Event for every profile segment update
 segment.send.profile.update.event=${org.apache.unomi.segment.send.profile.update.event:-true}
 
+# Daily hour once a day to recalculate segment with dateExpr condition
+segment.daily.dateexpr.evaluation.hourutc=${org.apache.unomi.segment.daily.dateexpr.evaluation.hourutc:-5}
+
 # The interval in milliseconds to use to reload the definitions (condition types and action types)
 definitions.refresh.interval=${org.apache.unomi.definitions.refresh.interval:-10000}
 
diff --git a/services/src/test/java/org/apache/unomi/services/impl/scheduler/SchedulerServiceImplTest.java b/services/src/test/java/org/apache/unomi/services/impl/scheduler/SchedulerServiceImplTest.java
new file mode 100644
index 0000000..b095b55
--- /dev/null
+++ b/services/src/test/java/org/apache/unomi/services/impl/scheduler/SchedulerServiceImplTest.java
@@ -0,0 +1,24 @@
+package org.apache.unomi.services.impl.scheduler;
+
+import org.junit.Test;
+
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+
+import static org.junit.Assert.*;
+
+public class SchedulerServiceImplTest {
+
+    @Test
+    public void getTimeDiffInSeconds_whenGiveHourOfDay_shouldReturnDifferenceInSeconds(){
+        //Arrange
+        SchedulerServiceImpl service = new SchedulerServiceImpl();
+        int hourToRunInUtc = 11;
+        ZonedDateTime timeNowInUtc = ZonedDateTime.of(LocalDateTime.parse("2020-01-13T10:00:00"), ZoneOffset.UTC);
+        //Act
+        long seconds = service.getTimeDiffInSeconds(hourToRunInUtc, timeNowInUtc);
+        //Assert
+        assertEquals(3600, seconds);
+    }
+}
\ No newline at end of file