You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by jk...@apache.org on 2023/05/12 09:35:27 UTC

[unomi] branch batchUpdateProfileScroll created (now 2ce769abe)

This is an automated email from the ASF dual-hosted git repository.

jkevan pushed a change to branch batchUpdateProfileScroll
in repository https://gitbox.apache.org/repos/asf/unomi.git


      at 2ce769abe UNOMI-430: Fix batch profile update by using scrolling

This branch includes the following new commits:

     new 2ce769abe UNOMI-430: Fix batch profile update by using scrolling

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[unomi] 01/01: UNOMI-430: Fix batch profile update by using scrolling

Posted by jk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jkevan pushed a commit to branch batchUpdateProfileScroll
in repository https://gitbox.apache.org/repos/asf/unomi.git

commit 2ce769abe632d175335c8124b102e7d49c7a3e80
Author: Kevan <ke...@jahia.com>
AuthorDate: Fri May 12 11:35:13 2023 +0200

    UNOMI-430: Fix batch profile update by using scrolling
---
 .../java/org/apache/unomi/api/BatchUpdate.java     | 42 +++++++++++++++++++++
 .../org/apache/unomi/itests/ProfileServiceIT.java  | 43 ++++++++++++++++++++++
 .../services/impl/profiles/ProfileServiceImpl.java | 19 ++++++++--
 3 files changed, 100 insertions(+), 4 deletions(-)

diff --git a/api/src/main/java/org/apache/unomi/api/BatchUpdate.java b/api/src/main/java/org/apache/unomi/api/BatchUpdate.java
index 45bf9e80a..8f55a5195 100644
--- a/api/src/main/java/org/apache/unomi/api/BatchUpdate.java
+++ b/api/src/main/java/org/apache/unomi/api/BatchUpdate.java
@@ -27,6 +27,8 @@ public class BatchUpdate {
     private Object propertyValue;
     private Condition condition;
     private String strategy;
+    private String scrollTimeValidity = "10m";
+    private int scrollBatchSize = 1000;
 
     /**
      * Retrieves the property name which value needs to be updated. Note that the property name follows the
@@ -101,4 +103,44 @@ public class BatchUpdate {
     public void setStrategy(String strategy) {
         this.strategy = strategy;
     }
+
+    /**
+     * Batch update will perform scroll queries to query document to be updated, the scroll time validity allow specifying
+     * how much time the scroll context should stay open in memory to be able to complete the update.
+     *
+     * @return the scroll time validity (default: 10m)
+     */
+    public String getScrollTimeValidity() {
+        return scrollTimeValidity;
+    }
+
+    /**
+     * Batch update will perform scroll queries to query document to be updated, the scroll time validity allow specifying
+     * how much time the scroll context should stay open in memory to be able to complete the update.
+     *
+     * @param scrollTimeValidity the scroll time validity in time unit
+     */
+    public void setScrollTimeValidity(String scrollTimeValidity) {
+        this.scrollTimeValidity = scrollTimeValidity;
+    }
+
+    /**
+     * Batch update will perform scroll queries to query document to be updated, the scroll batch size allow specifying
+     * how many document we want to load per scroll.
+     *
+     * @return the scroll batch size (default: 1000)
+     */
+    public int getScrollBatchSize() {
+        return scrollBatchSize;
+    }
+
+    /**
+     * Batch update will perform scroll queries to query document to be updated, the scroll batch size allow specifying
+     * how many document we want to load per scroll.
+     *
+     * @param scrollBatchSize the scroll batch size (default: 1000)
+     */
+    public void setScrollBatchSize(int scrollBatchSize) {
+        this.scrollBatchSize = scrollBatchSize;
+    }
 }
diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
index fdd2088a2..306f9d8c8 100644
--- a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
@@ -17,6 +17,7 @@
 package org.apache.unomi.itests;
 
 import org.apache.unomi.api.*;
+import org.apache.unomi.api.conditions.Condition;
 import org.apache.unomi.api.query.Query;
 import org.apache.unomi.persistence.spi.PersistenceService;
 import org.junit.After;
@@ -418,4 +419,46 @@ public class ProfileServiceIT extends BaseIT {
         keepTrying("Events number should be 150", () -> persistenceService.getAllItemsCount(Event.ITEM_TYPE),
                 (count) -> count == (150 + originalEventsCount), 1000, 100);
     }
+
+    @Test
+    public void testBatchProfileUpdate() throws Exception {
+        // Create 50 profiles
+        for (int i = 1; i <= 50; i++) {
+            Profile profile = new Profile();
+            profile.setItemId("batchProfileUpdateTest" + i);
+            profile.setProperty("name", "Boby");
+            profile.setProperty("test", "batchProfileUpdateTest");
+
+            profileService.save(profile);
+        }
+
+        Condition batchUpdateCondition = new Condition(definitionsService.getConditionType("profilePropertyCondition"));
+        batchUpdateCondition.setParameter("propertyName","properties.test");
+        batchUpdateCondition.setParameter("comparisonOperator","equals");
+        batchUpdateCondition.setParameter("propertyValue", "batchProfileUpdateTest");
+        keepTrying("We should wait for profiles to be saved", () -> persistenceService.queryCount(batchUpdateCondition, Profile.ITEM_TYPE),
+                (count) -> count == 50, 1000, 100);
+
+        BatchUpdate batchUpdate = new BatchUpdate();
+        batchUpdate.setCondition(batchUpdateCondition);
+        batchUpdate.setStrategy("alwaysSet");
+        batchUpdate.setPropertyName("properties.name");
+        batchUpdate.setPropertyValue("Billybob");
+        batchUpdate.setScrollBatchSize(10);
+        profileService.batchProfilesUpdate(batchUpdate);
+
+        Condition updatedProfilesCondition = new Condition(definitionsService.getConditionType("profilePropertyCondition"));
+        updatedProfilesCondition.setParameter("propertyName","properties.name");
+        updatedProfilesCondition.setParameter("comparisonOperator","equals");
+        updatedProfilesCondition.setParameter("propertyValue", "Billybob");
+        keepTrying("We should still retrieve the 50 updated profiles", () -> persistenceService.queryCount(updatedProfilesCondition, Profile.ITEM_TYPE),
+                (count) -> count == 50, 1000, 100);
+
+        Condition oldProfilesCondition = new Condition(definitionsService.getConditionType("profilePropertyCondition"));
+        oldProfilesCondition.setParameter("propertyName","properties.name");
+        oldProfilesCondition.setParameter("comparisonOperator","equals");
+        oldProfilesCondition.setParameter("propertyValue", "Boby");
+        keepTrying("We should not be able to retrieve previous profile based on previous value", () -> persistenceService.queryCount(oldProfilesCondition, Profile.ITEM_TYPE),
+                (count) -> count == 0, 1000, 100);
+    }
 }
diff --git a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
index a595ba892..56bf686d8 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
@@ -942,14 +942,25 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList
     }
 
     public void batchProfilesUpdate(BatchUpdate update) {
+        logger.info("Starting batch profiles update");
+        long startTime = System.currentTimeMillis();
+        long updatedCount = 0;
+
         ParserHelper.resolveConditionType(definitionsService, update.getCondition(), "batch update on property " + update.getPropertyName());
-        List<Profile> profiles = persistenceService.query(update.getCondition(), null, Profile.class);
+        PartialList<Profile> profiles = persistenceService.query(update.getCondition(), null, Profile.class, 0,update.getScrollBatchSize(), update.getScrollTimeValidity());
 
-        for (Profile profile : profiles) {
-            if (PropertyHelper.setProperty(profile, update.getPropertyName(), update.getPropertyValue(), update.getStrategy())) {
-                save(profile);
+        while (profiles != null && profiles.getList().size() > 0) {
+            for (Profile profile : profiles.getList()) {
+                if (PropertyHelper.setProperty(profile, update.getPropertyName(), update.getPropertyValue(), update.getStrategy())) {
+                    save(profile);
+                    updatedCount += 1;
+                }
             }
+            profiles = persistenceService.continueScrollQuery(Profile.class, profiles.getScrollIdentifier(), profiles.getScrollTimeValidity());
         }
+
+        long totalTime = System.currentTimeMillis() - startTime;
+        logger.info("Batch profiles updated: {} profiles in {}ms", updatedCount, totalTime);
     }
 
     public Persona loadPersona(String personaId) {