You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by jk...@apache.org on 2021/09/16 08:30:15 UTC

[unomi] branch unomi-1.6.x updated: [UNOMI-462] Fix recalculation of Segments containing past event conditions, also … (#340)

This is an automated email from the ASF dual-hosted git repository.

jkevan pushed a commit to branch unomi-1.6.x
in repository https://gitbox.apache.org/repos/asf/unomi.git


The following commit(s) were added to refs/heads/unomi-1.6.x by this push:
     new 7e24dd6   [UNOMI-462] Fix recalculation of Segments containing past event conditions, also … (#340)
7e24dd6 is described below

commit 7e24dd667fa427d3b332e9448d2d5bb17cd87a71
Author: kevan Jahanshahi <ke...@jahia.com>
AuthorDate: Thu Sep 16 10:27:50 2021 +0200

     [UNOMI-462] Fix recalculation of Segments containing past event conditions, also … (#340)
    
    * Fix recalculation of Segments containing past event conditions, also provide ITest
    
    * remove unecessary retry classe and use the existing keepRetry function that already exists
    
    * Fix pastEventsDisablePartitions
---
 .../apache/unomi/api/services/SegmentService.java  |  11 ++
 itests/pom.xml                                     |   1 +
 .../java/org/apache/unomi/itests/SegmentIT.java    |  56 +++++++++
 .../PastEventConditionESQueryBuilder.java          |  25 ++--
 .../services/impl/segments/SegmentServiceImpl.java | 126 +++++++++++++++++----
 5 files changed, 180 insertions(+), 39 deletions(-)

diff --git a/api/src/main/java/org/apache/unomi/api/services/SegmentService.java b/api/src/main/java/org/apache/unomi/api/services/SegmentService.java
index d1dc440..a54489b 100644
--- a/api/src/main/java/org/apache/unomi/api/services/SegmentService.java
+++ b/api/src/main/java/org/apache/unomi/api/services/SegmentService.java
@@ -226,4 +226,15 @@ public interface SegmentService {
      * @return a String representing the condition and parent condition uniquelly
      */
     String getGeneratedPropertyKey(Condition condition, Condition parentCondition);
+
+    /**
+     * This will recalculate the past event conditions from existing rules
+     * This operation can be heavy and take time, it will:
+     * - browse existing rules to extract the past event condition,
+     * - query the matching events for those conditions,
+     * - update the corresponding profiles
+     * - reevaluate segments linked to this rules to engaged/disengaged profiles after the occurrences have been updated
+     * So use it carefully or execute this method in a dedicated thread.
+     */
+    void recalculatePastEventConditions();
 }
diff --git a/itests/pom.xml b/itests/pom.xml
index ea4aef3..3783ea4 100644
--- a/itests/pom.xml
+++ b/itests/pom.xml
@@ -168,6 +168,7 @@
                             <instanceSettings>
                                 <properties>
                                     <cluster.routing.allocation.disk.threshold_enabled>false</cluster.routing.allocation.disk.threshold_enabled>
+                                    <http.cors.allow-origin>*</http.cors.allow-origin>
                                 </properties>
                             </instanceSettings>
                         </configuration>
diff --git a/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java b/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java
index 207bd93..8241dc3 100644
--- a/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java
@@ -180,4 +180,60 @@ public class SegmentIT extends BaseIT {
         profile = profileService.load("test_profile_id");
         Assert.assertTrue("Profile should be engaged in the segment", profile.getSegments().contains("past-event-segment-test"));
     }
+
+    @Test
+    public void testSegmentPastEventRecalculation() throws Exception {
+        // create Profile
+        Profile profile = new Profile();
+        profile.setItemId("test_profile_id");
+        profileService.save(profile);
+        persistenceService.refreshIndex(Profile.class, null); // wait for profile to be full persisted and index
+
+        // create the segment
+        Metadata segmentMetadata = new Metadata("past-event-segment-test");
+        Segment segment = new Segment(segmentMetadata);
+        Condition segmentCondition = new Condition(definitionsService.getConditionType("pastEventCondition"));
+        segmentCondition.setParameter("numberOfDays", 10);
+        Condition pastEventEventCondition = new Condition(definitionsService.getConditionType("eventTypeCondition"));
+        pastEventEventCondition.setParameter("eventTypeId", "test-event-type");
+        segmentCondition.setParameter("eventCondition", pastEventEventCondition);
+        segment.setCondition(segmentCondition);
+        segmentService.setSegmentDefinition(segment);
+        Thread.sleep(5000);
+
+        // Persist the event (do not send it into the system so that it will not be processed by the rules)
+        ZoneId defaultZoneId = ZoneId.systemDefault();
+        LocalDate localDate = LocalDate.now().minusDays(3);
+        Event testEvent = new Event("test-event-type", null, profile, null, null, profile, Date.from(localDate.atStartOfDay(defaultZoneId).toInstant()));
+        testEvent.setPersistent(true);
+        persistenceService.save(testEvent, null, true);
+        persistenceService.refreshIndex(Event.class, new Date()); // wait for event to be fully persisted and indexed
+
+        // insure the profile is not yet engaged since we directly saved the event in ES
+        profile = profileService.load("test_profile_id");
+        Assert.assertFalse("Profile should not be engaged in the segment", profile.getSegments().contains("past-event-segment-test"));
+
+        // now recalculate the past event conditions
+        segmentService.recalculatePastEventConditions();
+        persistenceService.refreshIndex(Profile.class, null);
+        keepTrying("Profile should be engaged in the segment",
+                () -> profileService.load("test_profile_id"),
+                updatedProfile -> updatedProfile.getSegments().contains("past-event-segment-test"),
+                1000, 20);
+
+        // update the event to a date out of the past event condition
+        removeItems(Event.class);
+        localDate = LocalDate.now().minusDays(15);
+        testEvent = new Event("test-event-type", null, profile, null, null, profile, Date.from(localDate.atStartOfDay(defaultZoneId).toInstant()));
+        persistenceService.save(testEvent);
+        persistenceService.refreshIndex(Event.class, new Date()); // wait for event to be fully persisted and indexed
+
+        // now recalculate the past event conditions
+        segmentService.recalculatePastEventConditions();
+        persistenceService.refreshIndex(Profile.class, null);
+        keepTrying("Profile should not be engaged in the segment anymore",
+                () -> profileService.load("test_profile_id"),
+                updatedProfile -> !updatedProfile.getSegments().contains("past-event-segment-test"),
+                1000, 20);
+    }
 }
diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
index 3c676d4..ca2ee7e 100644
--- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
+++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
@@ -77,23 +77,14 @@ public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder
     public QueryBuilder buildQuery(Condition condition, Map<String, Object> context, ConditionESQueryBuilderDispatcher dispatcher) {
         Integer minimumEventCount = condition.getParameter("minimumEventCount") == null ? 1 : (Integer) condition.getParameter("minimumEventCount");
         Integer maximumEventCount = condition.getParameter("maximumEventCount") == null ? Integer.MAX_VALUE : (Integer) condition.getParameter("maximumEventCount");
-
-        if (condition.getParameter("generatedPropertyKey") != null && condition.getParameter("generatedPropertyKey").equals(segmentService.getGeneratedPropertyKey((Condition) condition.getParameter("eventCondition"), condition))) {
-            // A property is already set on profiles matching the past event condition, use it
-            if (minimumEventCount != 1 || maximumEventCount != Integer.MAX_VALUE) {
-                // Check the number of occurences
-                RangeQueryBuilder builder = QueryBuilders.rangeQuery("systemProperties.pastEvents." + condition.getParameter("generatedPropertyKey"));
-                if (minimumEventCount != 1) {
-                    builder.gte(minimumEventCount);
-                }
-                if (maximumEventCount != Integer.MAX_VALUE) {
-                    builder.lte(minimumEventCount);
-                }
-                return builder;
-            } else {
-                // Simply get profiles who have the property set
-                return QueryBuilders.existsQuery("systemProperties.pastEvents." + condition.getParameter("generatedPropertyKey"));
-            }
+        String generatedPropertyKey = (String) condition.getParameter("generatedPropertyKey");
+
+        if (generatedPropertyKey != null && generatedPropertyKey.equals(segmentService.getGeneratedPropertyKey((Condition) condition.getParameter("eventCondition"), condition))) {
+            // A property is already set on profiles matching the past event condition, use it to check the numbers of occurrences
+            RangeQueryBuilder builder = QueryBuilders.rangeQuery("systemProperties.pastEvents." + generatedPropertyKey);
+            builder.gte(minimumEventCount);
+            builder.lte(minimumEventCount);
+            return builder;
         } else {
             // No property set - tries to build an idsQuery
             // Build past event condition
diff --git a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
index 1dc599d..cd3fe23 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
@@ -793,7 +793,9 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
                     rule.setLinkedItems(Arrays.asList(metadata.getId()));
                     rules.add(rule);
 
-                    updateExistingProfilesForPastEventCondition(condition, parentCondition, true);
+                    // it's a new generated rules to keep track of the event count, we should update all the profile that match this past event
+                    // it will update the count of event occurrence on the profile directly
+                    recalculatePastEventOccurrencesOnProfiles(condition, parentCondition, true, false);
                 } else {
                     rule.getLinkedItems().add(metadata.getId());
                     rules.add(rule);
@@ -815,7 +817,18 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
         }
     }
 
-    private void updateExistingProfilesForPastEventCondition(Condition eventCondition, Condition parentCondition, boolean forceRefresh) {
+    /**
+     * This will recalculate the event counts on the profiles that match the given past event condition
+     * @param eventCondition the real condition
+     * @param parentCondition the past event condition
+     * @param forceRefresh will refresh the Profile index in case it's true
+     * @param resetExistingProfilesNotMatching if true, will reset existing profiles having a count to 0, in case they do not have events matching anymore
+     *                                         ("false" can be useful when you know that no existing profiles already exist because it's a new rule for example,
+     *                                         in that case setting this to "false" allow to skip profiles queries and speedup this process.
+     *                                         Otherwise use "true" here to be sure the count is reset to 0 on profiles that need to be reset)
+     */
+    private void recalculatePastEventOccurrencesOnProfiles(Condition eventCondition, Condition parentCondition,
+                                                             boolean forceRefresh, boolean resetExistingProfilesNotMatching) {
         long t = System.currentTimeMillis();
         List<Condition> l = new ArrayList<Condition>();
         Condition andCondition = new Condition();
@@ -855,21 +868,33 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
         }
 
         String propertyKey = (String) parentCondition.getParameter("generatedPropertyKey");
+        Set<String> existingProfilesWithCounts = resetExistingProfilesNotMatching ? getExistingProfilesWithPastEventOccurrenceCount(propertyKey) : Collections.emptySet();
 
         int updatedProfileCount = 0;
         if(pastEventsDisablePartitions) {
             Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(eventCondition, new TermsAggregate("profileId"), Event.ITEM_TYPE, maximumIdsQueryCount);
-            updatedProfileCount = updateProfilesWithPastEventProperty(eventCountByProfile, propertyKey);
+            Set<String> updatedProfiles =  updatePastEventOccurrencesOnProfiles(eventCountByProfile, propertyKey);
+            existingProfilesWithCounts.removeAll(updatedProfiles);
+            updatedProfileCount = updatedProfiles.size();
         } else {
             Map<String, Double> m = persistenceService.getSingleValuesMetrics(andCondition, new String[]{"card"}, "profileId.keyword", Event.ITEM_TYPE);
             long card = m.get("_card").longValue();
             int numParts = (int) (card / aggregateQueryBucketSize) + 2;
             for (int i = 0; i < numParts; i++) {
                 Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(andCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE);
-                updatedProfileCount += updateProfilesWithPastEventProperty(eventCountByProfile, propertyKey);
+                Set<String> updatedProfiles =  updatePastEventOccurrencesOnProfiles(eventCountByProfile, propertyKey);
+                existingProfilesWithCounts.removeAll(updatedProfiles);
+                updatedProfileCount += updatedProfiles.size();
             }
         }
 
+        // remaining existing profiles with counts should be reset to 0 since they have not been updated it means
+        // that they do not have matching events anymore in the time based condition
+        if (!existingProfilesWithCounts.isEmpty()) {
+            updatedProfileCount += updatePastEventOccurrencesOnProfiles(
+                    existingProfilesWithCounts.stream().collect(Collectors.toMap(key -> key, value -> 0L)), propertyKey).size();
+        }
+
         if (forceRefresh && updatedProfileCount > 0) {
             persistenceService.refreshIndex(Profile.class, null);
         }
@@ -877,6 +902,34 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
         logger.info("{} profiles updated for past event condition in {}ms", updatedProfileCount, System.currentTimeMillis() - t);
     }
 
+    /**
+     * Return the list of profile ids, for profiles that already have an event count matching the generated property key
+     * @param generatedPropertyKey the generated property key of the generated rule for the given past event condition.
+     * @return the list of profile ids.
+     */
+    private Set<String> getExistingProfilesWithPastEventOccurrenceCount(String generatedPropertyKey) {
+        Condition countExistsCondition = new Condition();
+        countExistsCondition.setConditionType(definitionsService.getConditionType("profilePropertyCondition"));
+        countExistsCondition.setParameter("propertyName", "systemProperties.pastEvents." + generatedPropertyKey);
+        countExistsCondition.setParameter("comparisonOperator", "greaterThan");
+        countExistsCondition.setParameter("propertyValueInteger", 0);
+
+        Set<String> profileIds = new HashSet<>();
+        if(pastEventsDisablePartitions) {
+            profileIds.addAll(persistenceService.aggregateWithOptimizedQuery(countExistsCondition, new TermsAggregate("itemId"),
+                    Profile.ITEM_TYPE, maximumIdsQueryCount).keySet());
+        } else {
+            Map<String, Double> m = persistenceService.getSingleValuesMetrics(countExistsCondition, new String[]{"card"}, "itemId.keyword", Profile.ITEM_TYPE);
+            long card = m.get("_card").longValue();
+            int numParts = (int) (card / aggregateQueryBucketSize) + 2;
+            for (int i = 0; i < numParts; i++) {
+                profileIds.addAll(persistenceService.aggregateWithOptimizedQuery(countExistsCondition, new TermsAggregate("itemId", i, numParts),
+                        Profile.ITEM_TYPE).keySet());
+            }
+        }
+        return profileIds;
+    }
+
     public String getGeneratedPropertyKey(Condition condition, Condition parentCondition) {
         try {
             Map<String, Object> m = new HashMap<>();
@@ -900,8 +953,50 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
         }
     }
 
-    private int updateProfilesWithPastEventProperty(Map<String, Long> eventCountByProfile, String propertyKey) {
-        int profileUpdatedCount = 0;
+    @Override
+    public void recalculatePastEventConditions() {
+        logger.info("running scheduled task to recalculate segments with pastEventCondition conditions");
+        long pastEventsTaskStartTime = System.currentTimeMillis();
+        Set<String> linkedSegments = new HashSet<>();
+        for (Metadata metadata : rulesService.getRuleMetadatas()) {
+            // reevaluate auto generated rules used to store the event occurrence count on the profile
+            Rule rule = rulesService.getRule(metadata.getId());
+            for (Action action : rule.getActions()) {
+                if (action.getActionTypeId().equals("setEventOccurenceCountAction")) {
+                    Condition pastEventCondition = (Condition) action.getParameterValues().get("pastEventCondition");
+                    if (pastEventCondition.containsParameter("numberOfDays")) {
+                        recalculatePastEventOccurrencesOnProfiles(rule.getCondition(), pastEventCondition, false, true);
+                        logger.info("Event occurrence count on profiles updated for rule: {}", rule.getItemId());
+                        if (rule.getLinkedItems() != null && rule.getLinkedItems().size() > 0) {
+                            linkedSegments.addAll(rule.getLinkedItems());
+                        }
+                    }
+                }
+            }
+        }
+
+        // reevaluate segments linked to this rule, since we have updated the event occurrences count on the profiles.
+        if (linkedSegments.size() > 0) {
+            persistenceService.refreshIndex(Profile.class, null);
+            for (String linkedItem : linkedSegments) {
+                Segment linkedSegment = getSegmentDefinition(linkedItem);
+                if (linkedSegment != null) {
+                    updateExistingProfilesForSegment(linkedSegment);
+                }
+            }
+        }
+
+        logger.info("finished recalculate segments with pastEventCondition conditions in {}ms. ", System.currentTimeMillis() - pastEventsTaskStartTime);
+    }
+
+    /**
+     * This will update all the profiles in the given map with the according new count occurrence for the given propertyKey
+     * @param eventCountByProfile the events count per profileId map
+     * @param propertyKey the generate property key for this past event condition, to keep track of the count in the profile
+     * @return the list of profiles for witch the count of event occurrences have been updated.
+     */
+    private Set<String> updatePastEventOccurrencesOnProfiles(Map<String, Long> eventCountByProfile, String propertyKey) {
+        Set<String> profilesUpdated = new HashSet<>();
         Map<Item, Map> batch = new HashMap<>();
         Iterator<Map.Entry<String, Long>> entryIterator = eventCountByProfile.entrySet().iterator();
         while (entryIterator.hasNext()){
@@ -917,12 +1012,12 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
                 Profile profile = new Profile();
                 profile.setItemId(profileId);
                 batch.put(profile, Collections.singletonMap("systemProperties", systemProperties));
+                profilesUpdated.add(profileId);
             }
 
             if (batch.size() == segmentUpdateBatchSize || (!entryIterator.hasNext() && batch.size() > 0)) {
                 try {
                     persistenceService.update(batch, null, Profile.class);
-                    profileUpdatedCount += batch.size();
                 } catch (Exception e) {
                     logger.error("Error updating {} profiles for past event system properties", batch.size(), e);
                 } finally {
@@ -930,7 +1025,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
                 }
             }
         }
-        return profileUpdatedCount;
+        return profilesUpdated;
     }
 
     private String getMD5(String md5) {
@@ -1144,20 +1239,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
             @Override
             public void run() {
                 try {
-                    logger.info("running scheduled task to recalculate segments with pastEventCondition conditions");
-                    long pastEventsTaskStartTime = System.currentTimeMillis();
-                    for (Metadata metadata : rulesService.getRuleMetadatas()) {
-                        Rule rule = rulesService.getRule(metadata.getId());
-                        for (Action action : rule.getActions()) {
-                            if (action.getActionTypeId().equals("setEventOccurenceCountAction")) {
-                                Condition pastEventCondition = (Condition) action.getParameterValues().get("pastEventCondition");
-                                if (pastEventCondition.containsParameter("numberOfDays")) {
-                                    updateExistingProfilesForPastEventCondition(rule.getCondition(), pastEventCondition, false);
-                                }
-                            }
-                        }
-                    }
-                    logger.info("finished recalculate segments with pastEventCondition conditions in {}ms. ", System.currentTimeMillis() - pastEventsTaskStartTime);
+                    recalculatePastEventConditions();
                 } catch (Throwable t) {
                     logger.error("Error while updating profiles for past event conditions", t);
                 }