You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by jk...@apache.org on 2023/05/12 09:35:28 UTC

[unomi] 01/01: UNOMI-430: Fix batch profile update by using scrolling

This is an automated email from the ASF dual-hosted git repository.

jkevan pushed a commit to branch batchUpdateProfileScroll
in repository https://gitbox.apache.org/repos/asf/unomi.git

commit 2ce769abe632d175335c8124b102e7d49c7a3e80
Author: Kevan <ke...@jahia.com>
AuthorDate: Fri May 12 11:35:13 2023 +0200

    UNOMI-430: Fix batch profile update by using scrolling
---
 .../java/org/apache/unomi/api/BatchUpdate.java     | 42 +++++++++++++++++++++
 .../org/apache/unomi/itests/ProfileServiceIT.java  | 43 ++++++++++++++++++++++
 .../services/impl/profiles/ProfileServiceImpl.java | 19 ++++++++--
 3 files changed, 100 insertions(+), 4 deletions(-)

diff --git a/api/src/main/java/org/apache/unomi/api/BatchUpdate.java b/api/src/main/java/org/apache/unomi/api/BatchUpdate.java
index 45bf9e80a..8f55a5195 100644
--- a/api/src/main/java/org/apache/unomi/api/BatchUpdate.java
+++ b/api/src/main/java/org/apache/unomi/api/BatchUpdate.java
@@ -27,6 +27,8 @@ public class BatchUpdate {
     private Object propertyValue;
     private Condition condition;
     private String strategy;
+    private String scrollTimeValidity = "10m";
+    private int scrollBatchSize = 1000;
 
     /**
      * Retrieves the property name which value needs to be updated. Note that the property name follows the
@@ -101,4 +103,44 @@ public class BatchUpdate {
     public void setStrategy(String strategy) {
         this.strategy = strategy;
     }
+
+    /**
+     * Batch update will perform scroll queries to query document to be updated, the scroll time validity allow specifying
+     * how much time the scroll context should stay open in memory to be able to complete the update.
+     *
+     * @return the scroll time validity (default: 10m)
+     */
+    public String getScrollTimeValidity() {
+        return scrollTimeValidity;
+    }
+
+    /**
+     * Batch update will perform scroll queries to query document to be updated, the scroll time validity allow specifying
+     * how much time the scroll context should stay open in memory to be able to complete the update.
+     *
+     * @param scrollTimeValidity the scroll time validity in time unit
+     */
+    public void setScrollTimeValidity(String scrollTimeValidity) {
+        this.scrollTimeValidity = scrollTimeValidity;
+    }
+
+    /**
+     * Batch update will perform scroll queries to query document to be updated, the scroll batch size allow specifying
+     * how many document we want to load per scroll.
+     *
+     * @return the scroll batch size (default: 1000)
+     */
+    public int getScrollBatchSize() {
+        return scrollBatchSize;
+    }
+
+    /**
+     * Batch update will perform scroll queries to query document to be updated, the scroll batch size allow specifying
+     * how many document we want to load per scroll.
+     *
+     * @param scrollBatchSize the scroll batch size (default: 1000)
+     */
+    public void setScrollBatchSize(int scrollBatchSize) {
+        this.scrollBatchSize = scrollBatchSize;
+    }
 }
diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
index fdd2088a2..306f9d8c8 100644
--- a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
@@ -17,6 +17,7 @@
 package org.apache.unomi.itests;
 
 import org.apache.unomi.api.*;
+import org.apache.unomi.api.conditions.Condition;
 import org.apache.unomi.api.query.Query;
 import org.apache.unomi.persistence.spi.PersistenceService;
 import org.junit.After;
@@ -418,4 +419,46 @@ public class ProfileServiceIT extends BaseIT {
         keepTrying("Events number should be 150", () -> persistenceService.getAllItemsCount(Event.ITEM_TYPE),
                 (count) -> count == (150 + originalEventsCount), 1000, 100);
     }
+
+    @Test
+    public void testBatchProfileUpdate() throws Exception {
+        // Create 50 profiles
+        for (int i = 1; i <= 50; i++) {
+            Profile profile = new Profile();
+            profile.setItemId("batchProfileUpdateTest" + i);
+            profile.setProperty("name", "Boby");
+            profile.setProperty("test", "batchProfileUpdateTest");
+
+            profileService.save(profile);
+        }
+
+        Condition batchUpdateCondition = new Condition(definitionsService.getConditionType("profilePropertyCondition"));
+        batchUpdateCondition.setParameter("propertyName","properties.test");
+        batchUpdateCondition.setParameter("comparisonOperator","equals");
+        batchUpdateCondition.setParameter("propertyValue", "batchProfileUpdateTest");
+        keepTrying("We should wait for profiles to be saved", () -> persistenceService.queryCount(batchUpdateCondition, Profile.ITEM_TYPE),
+                (count) -> count == 50, 1000, 100);
+
+        BatchUpdate batchUpdate = new BatchUpdate();
+        batchUpdate.setCondition(batchUpdateCondition);
+        batchUpdate.setStrategy("alwaysSet");
+        batchUpdate.setPropertyName("properties.name");
+        batchUpdate.setPropertyValue("Billybob");
+        batchUpdate.setScrollBatchSize(10);
+        profileService.batchProfilesUpdate(batchUpdate);
+
+        Condition updatedProfilesCondition = new Condition(definitionsService.getConditionType("profilePropertyCondition"));
+        updatedProfilesCondition.setParameter("propertyName","properties.name");
+        updatedProfilesCondition.setParameter("comparisonOperator","equals");
+        updatedProfilesCondition.setParameter("propertyValue", "Billybob");
+        keepTrying("We should still retrieve the 50 updated profiles", () -> persistenceService.queryCount(updatedProfilesCondition, Profile.ITEM_TYPE),
+                (count) -> count == 50, 1000, 100);
+
+        Condition oldProfilesCondition = new Condition(definitionsService.getConditionType("profilePropertyCondition"));
+        oldProfilesCondition.setParameter("propertyName","properties.name");
+        oldProfilesCondition.setParameter("comparisonOperator","equals");
+        oldProfilesCondition.setParameter("propertyValue", "Boby");
+        keepTrying("We should not be able to retrieve previous profile based on previous value", () -> persistenceService.queryCount(oldProfilesCondition, Profile.ITEM_TYPE),
+                (count) -> count == 0, 1000, 100);
+    }
 }
diff --git a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
index a595ba892..56bf686d8 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
@@ -942,14 +942,25 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList
     }
 
     public void batchProfilesUpdate(BatchUpdate update) {
+        logger.info("Starting batch profiles update");
+        long startTime = System.currentTimeMillis();
+        long updatedCount = 0;
+
         ParserHelper.resolveConditionType(definitionsService, update.getCondition(), "batch update on property " + update.getPropertyName());
-        List<Profile> profiles = persistenceService.query(update.getCondition(), null, Profile.class);
+        PartialList<Profile> profiles = persistenceService.query(update.getCondition(), null, Profile.class, 0,update.getScrollBatchSize(), update.getScrollTimeValidity());
 
-        for (Profile profile : profiles) {
-            if (PropertyHelper.setProperty(profile, update.getPropertyName(), update.getPropertyValue(), update.getStrategy())) {
-                save(profile);
+        while (profiles != null && profiles.getList().size() > 0) {
+            for (Profile profile : profiles.getList()) {
+                if (PropertyHelper.setProperty(profile, update.getPropertyName(), update.getPropertyValue(), update.getStrategy())) {
+                    save(profile);
+                    updatedCount += 1;
+                }
             }
+            profiles = persistenceService.continueScrollQuery(Profile.class, profiles.getScrollIdentifier(), profiles.getScrollTimeValidity());
         }
+
+        long totalTime = System.currentTimeMillis() - startTime;
+        logger.info("Batch profiles updated: {} profiles in {}ms", updatedCount, totalTime);
     }
 
     public Persona loadPersona(String personaId) {