You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by jk...@apache.org on 2023/05/12 09:39:19 UTC

[unomi] branch batchUpdateProfileScroll2 created (now 79c46adb3)

This is an automated email from the ASF dual-hosted git repository.

jkevan pushed a change to branch batchUpdateProfileScroll2
in repository https://gitbox.apache.org/repos/asf/unomi.git


      at 79c46adb3 UNOMI-430: Fix batch profile update by using scrolling

This branch includes the following new commits:

     new 79c46adb3 UNOMI-430: Fix batch profile update by using scrolling

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[unomi] 01/01: UNOMI-430: Fix batch profile update by using scrolling

Posted by jk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jkevan pushed a commit to branch batchUpdateProfileScroll2
in repository https://gitbox.apache.org/repos/asf/unomi.git

commit 79c46adb3b7998252d5cad564851ff68b7a05943
Author: Kevan <ke...@jahia.com>
AuthorDate: Fri May 12 11:35:13 2023 +0200

    UNOMI-430: Fix batch profile update by using scrolling
---
 .../java/org/apache/unomi/api/BatchUpdate.java     | 42 +++++++++++++++++++
 .../org/apache/unomi/itests/ProfileServiceIT.java  | 47 ++++++++++++++++++++--
 .../services/impl/profiles/ProfileServiceImpl.java | 19 +++++++--
 3 files changed, 101 insertions(+), 7 deletions(-)

diff --git a/api/src/main/java/org/apache/unomi/api/BatchUpdate.java b/api/src/main/java/org/apache/unomi/api/BatchUpdate.java
index 45bf9e80a..8f55a5195 100644
--- a/api/src/main/java/org/apache/unomi/api/BatchUpdate.java
+++ b/api/src/main/java/org/apache/unomi/api/BatchUpdate.java
@@ -27,6 +27,8 @@ public class BatchUpdate {
     private Object propertyValue;
     private Condition condition;
     private String strategy;
+    private String scrollTimeValidity = "10m";
+    private int scrollBatchSize = 1000;
 
     /**
      * Retrieves the property name which value needs to be updated. Note that the property name follows the
@@ -101,4 +103,44 @@ public class BatchUpdate {
     public void setStrategy(String strategy) {
         this.strategy = strategy;
     }
+
+    /**
+     * Batch update will perform scroll queries to query document to be updated, the scroll time validity allow specifying
+     * how much time the scroll context should stay open in memory to be able to complete the update.
+     *
+     * @return the scroll time validity (default: 10m)
+     */
+    public String getScrollTimeValidity() {
+        return scrollTimeValidity;
+    }
+
+    /**
+     * Batch update will perform scroll queries to query document to be updated, the scroll time validity allow specifying
+     * how much time the scroll context should stay open in memory to be able to complete the update.
+     *
+     * @param scrollTimeValidity the scroll time validity in time unit
+     */
+    public void setScrollTimeValidity(String scrollTimeValidity) {
+        this.scrollTimeValidity = scrollTimeValidity;
+    }
+
+    /**
+     * Batch update will perform scroll queries to query document to be updated, the scroll batch size allow specifying
+     * how many document we want to load per scroll.
+     *
+     * @return the scroll batch size (default: 1000)
+     */
+    public int getScrollBatchSize() {
+        return scrollBatchSize;
+    }
+
+    /**
+     * Batch update will perform scroll queries to query document to be updated, the scroll batch size allow specifying
+     * how many document we want to load per scroll.
+     *
+     * @param scrollBatchSize the scroll batch size (default: 1000)
+     */
+    public void setScrollBatchSize(int scrollBatchSize) {
+        this.scrollBatchSize = scrollBatchSize;
+    }
 }
diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
index 80573dfee..fe82fa749 100644
--- a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
@@ -16,9 +16,8 @@
  */
 package org.apache.unomi.itests;
 
-import org.apache.unomi.api.Event;
-import org.apache.unomi.api.Profile;
-import org.apache.unomi.api.Session;
+import org.apache.unomi.api.*;
+import org.apache.unomi.api.conditions.Condition;
 import org.apache.unomi.api.query.Query;
 import org.apache.unomi.api.services.ProfileService;
 import org.apache.unomi.persistence.spi.PersistenceService;
@@ -288,4 +287,46 @@ public class ProfileServiceIT extends BaseIT {
         keepTrying("Events number should be 150", () -> persistenceService.getAllItemsCount(Event.ITEM_TYPE),
                 (count) -> count == (150 + originalEventsCount), 1000, 100);
     }
+
+    @Test
+    public void testBatchProfileUpdate() throws Exception {
+        // Create 50 profiles
+        for (int i = 1; i <= 50; i++) {
+            Profile profile = new Profile();
+            profile.setItemId("batchProfileUpdateTest" + i);
+            profile.setProperty("name", "Boby");
+            profile.setProperty("test", "batchProfileUpdateTest");
+
+            profileService.save(profile);
+        }
+
+        Condition batchUpdateCondition = new Condition(definitionsService.getConditionType("profilePropertyCondition"));
+        batchUpdateCondition.setParameter("propertyName","properties.test");
+        batchUpdateCondition.setParameter("comparisonOperator","equals");
+        batchUpdateCondition.setParameter("propertyValue", "batchProfileUpdateTest");
+        keepTrying("We should wait for profiles to be saved", () -> persistenceService.queryCount(batchUpdateCondition, Profile.ITEM_TYPE),
+                (count) -> count == 50, 1000, 100);
+
+        BatchUpdate batchUpdate = new BatchUpdate();
+        batchUpdate.setCondition(batchUpdateCondition);
+        batchUpdate.setStrategy("alwaysSet");
+        batchUpdate.setPropertyName("properties.name");
+        batchUpdate.setPropertyValue("Billybob");
+        batchUpdate.setScrollBatchSize(10);
+        profileService.batchProfilesUpdate(batchUpdate);
+
+        Condition updatedProfilesCondition = new Condition(definitionsService.getConditionType("profilePropertyCondition"));
+        updatedProfilesCondition.setParameter("propertyName","properties.name");
+        updatedProfilesCondition.setParameter("comparisonOperator","equals");
+        updatedProfilesCondition.setParameter("propertyValue", "Billybob");
+        keepTrying("We should still retrieve the 50 updated profiles", () -> persistenceService.queryCount(updatedProfilesCondition, Profile.ITEM_TYPE),
+                (count) -> count == 50, 1000, 100);
+
+        Condition oldProfilesCondition = new Condition(definitionsService.getConditionType("profilePropertyCondition"));
+        oldProfilesCondition.setParameter("propertyName","properties.name");
+        oldProfilesCondition.setParameter("comparisonOperator","equals");
+        oldProfilesCondition.setParameter("propertyValue", "Boby");
+        keepTrying("We should not be able to retrieve previous profile based on previous value", () -> persistenceService.queryCount(oldProfilesCondition, Profile.ITEM_TYPE),
+                (count) -> count == 0, 1000, 100);
+    }
 }
diff --git a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
index 1b4aff4d5..75a9e93f5 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
@@ -814,14 +814,25 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList
     }
 
     public void batchProfilesUpdate(BatchUpdate update) {
+        logger.info("Starting batch profiles update");
+        long startTime = System.currentTimeMillis();
+        long updatedCount = 0;
+
         ParserHelper.resolveConditionType(definitionsService, update.getCondition(), "batch update on property " + update.getPropertyName());
-        List<Profile> profiles = persistenceService.query(update.getCondition(), null, Profile.class);
+        PartialList<Profile> profiles = persistenceService.query(update.getCondition(), null, Profile.class, 0,update.getScrollBatchSize(), update.getScrollTimeValidity());
 
-        for (Profile profile : profiles) {
-            if (PropertyHelper.setProperty(profile, update.getPropertyName(), update.getPropertyValue(), update.getStrategy())) {
-                save(profile);
+        while (profiles != null && profiles.getList().size() > 0) {
+            for (Profile profile : profiles.getList()) {
+                if (PropertyHelper.setProperty(profile, update.getPropertyName(), update.getPropertyValue(), update.getStrategy())) {
+                    save(profile);
+                    updatedCount += 1;
+                }
             }
+            profiles = persistenceService.continueScrollQuery(Profile.class, profiles.getScrollIdentifier(), profiles.getScrollTimeValidity());
         }
+
+        long totalTime = System.currentTimeMillis() - startTime;
+        logger.info("Batch profiles updated: {} profiles in {}ms", updatedCount, totalTime);
     }
 
     public Persona loadPersona(String personaId) {