You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by jk...@apache.org on 2021/03/10 15:03:16 UTC

[unomi] branch UNOMI-442-segment-past-event created (now 92ae092)

This is an automated email from the ASF dual-hosted git repository.

jkevan pushed a change to branch UNOMI-442-segment-past-event
in repository https://gitbox.apache.org/repos/asf/unomi.git.


      at 92ae092  UNOMI-442: fix creation of Segment that contains past event condition, correctly engage profiles during creation process

This branch includes the following new commits:

     new 92ae092  UNOMI-442: fix creation of Segment that contains past event condition, correctly engage profiles during creation process

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[unomi] 01/01: UNOMI-442: fix creation of Segment that contains past event condition, correctly engage profiles during creation process

Posted by jk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jkevan pushed a commit to branch UNOMI-442-segment-past-event
in repository https://gitbox.apache.org/repos/asf/unomi.git

commit 92ae092d4793591a0d375537e0fb8f6b62151447
Author: Kevan <ke...@jahia.com>
AuthorDate: Wed Mar 10 16:03:02 2021 +0100

    UNOMI-442: fix creation of Segment that contains past event condition, correctly engage profiles during creation process
---
 .../java/org/apache/unomi/itests/SegmentIT.java    | 61 ++++++++++++++++++++-
 .../services/impl/segments/SegmentServiceImpl.java | 63 ++++++++++++++--------
 2 files changed, 99 insertions(+), 25 deletions(-)

diff --git a/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java b/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java
index fceb70c..2b5abf9 100644
--- a/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java
@@ -17,11 +17,17 @@
 
 package org.apache.unomi.itests;
 
+import org.apache.unomi.api.Event;
 import org.apache.unomi.api.Metadata;
+import org.apache.unomi.api.Profile;
 import org.apache.unomi.api.conditions.Condition;
 import org.apache.unomi.api.segments.Segment;
+import org.apache.unomi.api.services.EventService;
+import org.apache.unomi.api.services.ProfileService;
 import org.apache.unomi.api.services.SegmentService;
 import org.apache.unomi.api.exceptions.BadSegmentConditionException;
+import org.apache.unomi.persistence.spi.PersistenceService;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -34,6 +40,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.inject.Inject;
+import java.time.LocalDate;
+import java.time.ZoneId;
+import java.util.Date;
 import java.util.List;
 
 @RunWith(PaxExam.class)
@@ -42,15 +51,30 @@ public class SegmentIT extends BaseIT {
     private final static Logger LOGGER = LoggerFactory.getLogger(SegmentIT.class);
     private final static String SEGMENT_ID = "test-segment-id-2";
 
-    @Inject
-    @Filter(timeout = 600000)
+    @Inject @Filter(timeout = 600000)
     protected SegmentService segmentService;
 
+    @Inject @Filter(timeout = 600000)
+    protected ProfileService profileService;
+
+    @Inject @Filter(timeout = 600000)
+    protected EventService eventService;
+
+    @Inject @Filter(timeout = 600000)
+    protected PersistenceService persistenceService;
+
     @Before
     public void setUp() throws InterruptedException {
         removeItems(Segment.class);
     }
 
+    @After
+    public void tearDown() throws InterruptedException {
+        removeItems(Segment.class);
+        removeItems(Profile.class);
+        removeItems(Event.class);
+    }
+
     @Test
     public void testSegments() {
         Assert.assertNotNull("Segment service should be available", segmentService);
@@ -111,4 +135,37 @@ public class SegmentIT extends BaseIT {
 
         segmentService.removeSegmentDefinition(SEGMENT_ID, false);
     }
+
+    @Test
+    public void testSegmentWithPastEventCondition() throws InterruptedException {
+        // create Profile
+        Profile profile = new Profile();
+        profile.setItemId("test_profile_id");
+        profileService.save(profile);
+        persistenceService.refreshIndex(Profile.class, null); // wait for profile to be full persisted and index
+
+        // send event for profile from a previous date (today -3 days)
+        ZoneId defaultZoneId = ZoneId.systemDefault();
+        LocalDate localDate = LocalDate.now().minusDays(3);
+        Event testEvent = new Event("test-event-type", null, profile, null, null, profile, Date.from(localDate.atStartOfDay(defaultZoneId).toInstant()));
+        testEvent.setPersistent(true);
+        eventService.send(testEvent);
+        persistenceService.refreshIndex(Event.class, null); // wait for event to be fully persisted and indexed
+
+        // create the segment
+        Metadata segmentMetadata = new Metadata("past-event-segment-test");
+        Segment segment = new Segment(segmentMetadata);
+        Condition segmentCondition = new Condition(definitionsService.getConditionType("pastEventCondition"));
+        segmentCondition.setParameter("numberOfDays", 10);
+        Condition pastEventEventCondition = new Condition(definitionsService.getConditionType("eventTypeCondition"));
+        pastEventEventCondition.setParameter("eventTypeId", "test-event-type");
+        segmentCondition.setParameter("eventCondition", pastEventEventCondition);
+        segment.setCondition(segmentCondition);
+        segmentService.setSegmentDefinition(segment);
+
+        // insure the profile that did the past event condition is correctly engaged in the segment.
+        Thread.sleep(5000);
+        profile = profileService.load("test_profile_id");
+        Assert.assertTrue("Profile should be engaged in the segment", profile.getSegments().contains("past-event-segment-test"));
+    }
 }
diff --git a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
index 7d8c2ce..2f159b0 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
@@ -776,7 +776,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
                     rule.setLinkedItems(Arrays.asList(metadata.getId()));
                     rules.add(rule);
 
-                    updateExistingProfilesForPastEventCondition(condition, parentCondition);
+                    updateExistingProfilesForPastEventCondition(condition, parentCondition, true);
                 } else {
                     rule.getLinkedItems().add(metadata.getId());
                     rules.add(rule);
@@ -798,7 +798,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
         }
     }
 
-    private void updateExistingProfilesForPastEventCondition(Condition eventCondition, Condition parentCondition) {
+    private void updateExistingProfilesForPastEventCondition(Condition eventCondition, Condition parentCondition, boolean forceRefresh) {
         long t = System.currentTimeMillis();
         List<Condition> l = new ArrayList<Condition>();
         Condition andCondition = new Condition();
@@ -839,20 +839,25 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
 
         String propertyKey = (String) parentCondition.getParameter("generatedPropertyKey");
 
+        int updatedProfileCount = 0;
         if(pastEventsDisablePartitions) {
             Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(eventCondition, new TermsAggregate("profileId"), Event.ITEM_TYPE, maximumIdsQueryCount);
-            updateProfilesWithPastEventProperty(eventCountByProfile, propertyKey);
+            updatedProfileCount = updateProfilesWithPastEventProperty(eventCountByProfile, propertyKey);
         } else {
             Map<String, Double> m = persistenceService.getSingleValuesMetrics(andCondition, new String[]{"card"}, "profileId.keyword", Event.ITEM_TYPE);
             long card = m.get("_card").longValue();
             int numParts = (int) (card / aggregateQueryBucketSize) + 2;
             for (int i = 0; i < numParts; i++) {
                 Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(andCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE);
-                updateProfilesWithPastEventProperty(eventCountByProfile, propertyKey);
+                updatedProfileCount += updateProfilesWithPastEventProperty(eventCountByProfile, propertyKey);
             }
         }
 
-        logger.info("Profiles past condition updated in {}ms", System.currentTimeMillis() - t);
+        if (forceRefresh && updatedProfileCount > 0) {
+            persistenceService.refreshIndex(Profile.class, null);
+        }
+
+        logger.info("{} profiles updated for past event condition in {}ms", updatedProfileCount, System.currentTimeMillis() - t);
     }
 
     public String getGeneratedPropertyKey(Condition condition, Condition parentCondition) {
@@ -878,25 +883,37 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
         }
     }
 
-    private void updateProfilesWithPastEventProperty(Map<String, Long> eventCountByProfile, String propertyKey) {
-            for (Map.Entry<String, Long> entry : eventCountByProfile.entrySet()) {
-                String profileId = entry.getKey();
-                if (!profileId.startsWith("_")) {
-                    Map<String, Long> pastEventCounts = new HashMap<>();
-                    pastEventCounts.put(propertyKey, entry.getValue());
-                    Map<String, Object> systemProperties = new HashMap<>();
-                    systemProperties.put("pastEvents", pastEventCounts);
-                    try {
-                        systemProperties.put("lastUpdated", new Date());
-                        Profile profile = new Profile();
-                        profile.setItemId(profileId);
-                        persistenceService.update(profile, null, Profile.class, "systemProperties", systemProperties);
-                    } catch (Exception e) {
-                        logger.error("Error updating profile {} past event system properties", profileId, e);
-                    }
-                }
+    private int updateProfilesWithPastEventProperty(Map<String, Long> eventCountByProfile, String propertyKey) {
+        int profileUpdatedCount = 0;
+        Map<Item, Map> batch = new HashMap<>();
+        Iterator<Map.Entry<String, Long>> entryIterator = eventCountByProfile.entrySet().iterator();
+        while (entryIterator.hasNext()){
+            Map.Entry<String, Long> entry = entryIterator.next();
+            String profileId = entry.getKey();
+            if (!profileId.startsWith("_")) {
+                Map<String, Long> pastEventCounts = new HashMap<>();
+                pastEventCounts.put(propertyKey, entry.getValue());
+                Map<String, Object> systemProperties = new HashMap<>();
+                systemProperties.put("pastEvents", pastEventCounts);
+                systemProperties.put("lastUpdated", new Date());
+
+                Profile profile = new Profile();
+                profile.setItemId(profileId);
+                batch.put(profile, Collections.singletonMap("systemProperties", systemProperties));
             }
 
+            if (batch.size() == segmentUpdateBatchSize || (!entryIterator.hasNext() && batch.size() > 0)) {
+                try {
+                    persistenceService.update(batch, null, Profile.class);
+                    profileUpdatedCount += batch.size();
+                } catch (Exception e) {
+                    logger.error("Error updating {} profiles for past event system properties", batch.size(), e);
+                } finally {
+                    batch.clear();
+                }
+            }
+        }
+        return profileUpdatedCount;
     }
 
     private String getMD5(String md5) {
@@ -1115,7 +1132,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
                             if (action.getActionTypeId().equals("setEventOccurenceCountAction")) {
                                 Condition pastEventCondition = (Condition) action.getParameterValues().get("pastEventCondition");
                                 if (pastEventCondition.containsParameter("numberOfDays")) {
-                                    updateExistingProfilesForPastEventCondition(rule.getCondition(), pastEventCondition);
+                                    updateExistingProfilesForPastEventCondition(rule.getCondition(), pastEventCondition, false);
                                 }
                             }
                         }