You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by js...@apache.org on 2023/05/12 16:04:08 UTC

[unomi] branch batchUpdateProfileScroll2 updated (79c46adb3 -> 4c7c7406f)

This is an automated email from the ASF dual-hosted git repository.

jsinovassinnaik pushed a change to branch batchUpdateProfileScroll2
in repository https://gitbox.apache.org/repos/asf/unomi.git


    omit 79c46adb3 UNOMI-430: Fix batch profile update by using scrolling
     add b764053b7 UNOMI-444 redeploy unomi objects in case they are modified (#264) (#624)
     add b44dc2e28 UNOMI-780 : Allow to use scroll query to get more than 10000 events (#621) (#622)
     new 4c7c7406f UNOMI-430: Fix batch profile update by using scrolling

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (79c46adb3)
            \
             N -- N -- N   refs/heads/batchUpdateProfileScroll2 (4c7c7406f)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/unomi/itests/SegmentIT.java    |  5 +++-
 .../impl/definitions/DefinitionsServiceImpl.java   | 18 +++---------
 .../services/impl/events/EventServiceImpl.java     |  5 +++-
 .../services/impl/goals/GoalsServiceImpl.java      | 19 ++++---------
 .../services/impl/profiles/ProfileServiceImpl.java | 32 ++++++++--------------
 .../services/impl/rules/RulesServiceImpl.java      |  9 ++----
 .../services/impl/segments/SegmentServiceImpl.java | 18 +++---------
 7 files changed, 34 insertions(+), 72 deletions(-)


[unomi] 01/01: UNOMI-430: Fix batch profile update by using scrolling

Posted by js...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jsinovassinnaik pushed a commit to branch batchUpdateProfileScroll2
in repository https://gitbox.apache.org/repos/asf/unomi.git

commit 4c7c7406fa0015b068e70f3c2af756aa5904b832
Author: Kevan <ke...@jahia.com>
AuthorDate: Fri May 12 11:35:13 2023 +0200

    UNOMI-430: Fix batch profile update by using scrolling
---
 .../java/org/apache/unomi/api/BatchUpdate.java     | 42 +++++++++++++++++++
 .../org/apache/unomi/itests/ProfileServiceIT.java  | 47 ++++++++++++++++++++--
 .../services/impl/profiles/ProfileServiceImpl.java | 19 +++++++--
 3 files changed, 101 insertions(+), 7 deletions(-)

diff --git a/api/src/main/java/org/apache/unomi/api/BatchUpdate.java b/api/src/main/java/org/apache/unomi/api/BatchUpdate.java
index 45bf9e80a..8f55a5195 100644
--- a/api/src/main/java/org/apache/unomi/api/BatchUpdate.java
+++ b/api/src/main/java/org/apache/unomi/api/BatchUpdate.java
@@ -27,6 +27,8 @@ public class BatchUpdate {
     private Object propertyValue;
     private Condition condition;
     private String strategy;
+    private String scrollTimeValidity = "10m";
+    private int scrollBatchSize = 1000;
 
     /**
      * Retrieves the property name which value needs to be updated. Note that the property name follows the
@@ -101,4 +103,44 @@ public class BatchUpdate {
     public void setStrategy(String strategy) {
         this.strategy = strategy;
     }
+
+    /**
+     * Batch update will perform scroll queries to query document to be updated, the scroll time validity allow specifying
+     * how much time the scroll context should stay open in memory to be able to complete the update.
+     *
+     * @return the scroll time validity (default: 10m)
+     */
+    public String getScrollTimeValidity() {
+        return scrollTimeValidity;
+    }
+
+    /**
+     * Batch update will perform scroll queries to query document to be updated, the scroll time validity allow specifying
+     * how much time the scroll context should stay open in memory to be able to complete the update.
+     *
+     * @param scrollTimeValidity the scroll time validity in time unit
+     */
+    public void setScrollTimeValidity(String scrollTimeValidity) {
+        this.scrollTimeValidity = scrollTimeValidity;
+    }
+
+    /**
+     * Batch update will perform scroll queries to query document to be updated, the scroll batch size allow specifying
+     * how many document we want to load per scroll.
+     *
+     * @return the scroll batch size (default: 1000)
+     */
+    public int getScrollBatchSize() {
+        return scrollBatchSize;
+    }
+
+    /**
+     * Batch update will perform scroll queries to query document to be updated, the scroll batch size allow specifying
+     * how many document we want to load per scroll.
+     *
+     * @param scrollBatchSize the scroll batch size (default: 1000)
+     */
+    public void setScrollBatchSize(int scrollBatchSize) {
+        this.scrollBatchSize = scrollBatchSize;
+    }
 }
diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
index 80573dfee..fe82fa749 100644
--- a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
@@ -16,9 +16,8 @@
  */
 package org.apache.unomi.itests;
 
-import org.apache.unomi.api.Event;
-import org.apache.unomi.api.Profile;
-import org.apache.unomi.api.Session;
+import org.apache.unomi.api.*;
+import org.apache.unomi.api.conditions.Condition;
 import org.apache.unomi.api.query.Query;
 import org.apache.unomi.api.services.ProfileService;
 import org.apache.unomi.persistence.spi.PersistenceService;
@@ -288,4 +287,46 @@ public class ProfileServiceIT extends BaseIT {
         keepTrying("Events number should be 150", () -> persistenceService.getAllItemsCount(Event.ITEM_TYPE),
                 (count) -> count == (150 + originalEventsCount), 1000, 100);
     }
+
+    @Test
+    public void testBatchProfileUpdate() throws Exception {
+        // Create 50 profiles
+        for (int i = 1; i <= 50; i++) {
+            Profile profile = new Profile();
+            profile.setItemId("batchProfileUpdateTest" + i);
+            profile.setProperty("name", "Boby");
+            profile.setProperty("test", "batchProfileUpdateTest");
+
+            profileService.save(profile);
+        }
+
+        Condition batchUpdateCondition = new Condition(definitionsService.getConditionType("profilePropertyCondition"));
+        batchUpdateCondition.setParameter("propertyName","properties.test");
+        batchUpdateCondition.setParameter("comparisonOperator","equals");
+        batchUpdateCondition.setParameter("propertyValue", "batchProfileUpdateTest");
+        keepTrying("We should wait for profiles to be saved", () -> persistenceService.queryCount(batchUpdateCondition, Profile.ITEM_TYPE),
+                (count) -> count == 50, 1000, 100);
+
+        BatchUpdate batchUpdate = new BatchUpdate();
+        batchUpdate.setCondition(batchUpdateCondition);
+        batchUpdate.setStrategy("alwaysSet");
+        batchUpdate.setPropertyName("properties.name");
+        batchUpdate.setPropertyValue("Billybob");
+        batchUpdate.setScrollBatchSize(10);
+        profileService.batchProfilesUpdate(batchUpdate);
+
+        Condition updatedProfilesCondition = new Condition(definitionsService.getConditionType("profilePropertyCondition"));
+        updatedProfilesCondition.setParameter("propertyName","properties.name");
+        updatedProfilesCondition.setParameter("comparisonOperator","equals");
+        updatedProfilesCondition.setParameter("propertyValue", "Billybob");
+        keepTrying("We should still retrieve the 50 updated profiles", () -> persistenceService.queryCount(updatedProfilesCondition, Profile.ITEM_TYPE),
+                (count) -> count == 50, 1000, 100);
+
+        Condition oldProfilesCondition = new Condition(definitionsService.getConditionType("profilePropertyCondition"));
+        oldProfilesCondition.setParameter("propertyName","properties.name");
+        oldProfilesCondition.setParameter("comparisonOperator","equals");
+        oldProfilesCondition.setParameter("propertyValue", "Boby");
+        keepTrying("We should not be able to retrieve previous profile based on previous value", () -> persistenceService.queryCount(oldProfilesCondition, Profile.ITEM_TYPE),
+                (count) -> count == 0, 1000, 100);
+    }
 }
diff --git a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
index 09c355edf..0e88f8706 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
@@ -814,14 +814,25 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList
     }
 
     public void batchProfilesUpdate(BatchUpdate update) {
+        logger.info("Starting batch profiles update");
+        long startTime = System.currentTimeMillis();
+        long updatedCount = 0;
+
         ParserHelper.resolveConditionType(definitionsService, update.getCondition(), "batch update on property " + update.getPropertyName());
-        List<Profile> profiles = persistenceService.query(update.getCondition(), null, Profile.class);
+        PartialList<Profile> profiles = persistenceService.query(update.getCondition(), null, Profile.class, 0,update.getScrollBatchSize(), update.getScrollTimeValidity());
 
-        for (Profile profile : profiles) {
-            if (PropertyHelper.setProperty(profile, update.getPropertyName(), update.getPropertyValue(), update.getStrategy())) {
-                save(profile);
+        while (profiles != null && profiles.getList().size() > 0) {
+            for (Profile profile : profiles.getList()) {
+                if (PropertyHelper.setProperty(profile, update.getPropertyName(), update.getPropertyValue(), update.getStrategy())) {
+                    save(profile);
+                    updatedCount += 1;
+                }
             }
+            profiles = persistenceService.continueScrollQuery(Profile.class, profiles.getScrollIdentifier(), profiles.getScrollTimeValidity());
         }
+
+        long totalTime = System.currentTimeMillis() - startTime;
+        logger.info("Batch profiles updated: {} profiles in {}ms", updatedCount, totalTime);
     }
 
     public Persona loadPersona(String personaId) {