You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by jk...@apache.org on 2023/05/12 09:35:28 UTC
[unomi] 01/01: UNOMI-430: Fix batch profile update by using scrolling
This is an automated email from the ASF dual-hosted git repository.
jkevan pushed a commit to branch batchUpdateProfileScroll
in repository https://gitbox.apache.org/repos/asf/unomi.git
commit 2ce769abe632d175335c8124b102e7d49c7a3e80
Author: Kevan <ke...@jahia.com>
AuthorDate: Fri May 12 11:35:13 2023 +0200
UNOMI-430: Fix batch profile update by using scrolling
---
.../java/org/apache/unomi/api/BatchUpdate.java | 42 +++++++++++++++++++++
.../org/apache/unomi/itests/ProfileServiceIT.java | 43 ++++++++++++++++++++++
.../services/impl/profiles/ProfileServiceImpl.java | 19 ++++++++--
3 files changed, 100 insertions(+), 4 deletions(-)
diff --git a/api/src/main/java/org/apache/unomi/api/BatchUpdate.java b/api/src/main/java/org/apache/unomi/api/BatchUpdate.java
index 45bf9e80a..8f55a5195 100644
--- a/api/src/main/java/org/apache/unomi/api/BatchUpdate.java
+++ b/api/src/main/java/org/apache/unomi/api/BatchUpdate.java
@@ -27,6 +27,8 @@ public class BatchUpdate {
private Object propertyValue;
private Condition condition;
private String strategy;
+ private String scrollTimeValidity = "10m";
+ private int scrollBatchSize = 1000;
/**
* Retrieves the property name which value needs to be updated. Note that the property name follows the
@@ -101,4 +103,44 @@ public class BatchUpdate {
public void setStrategy(String strategy) {
this.strategy = strategy;
}
+
+ /**
+ * Batch update will perform scroll queries to query document to be updated, the scroll time validity allow specifying
+ * how much time the scroll context should stay open in memory to be able to complete the update.
+ *
+ * @return the scroll time validity (default: 10m)
+ */
+ public String getScrollTimeValidity() {
+ return scrollTimeValidity;
+ }
+
+ /**
+ * Batch update will perform scroll queries to query document to be updated, the scroll time validity allow specifying
+ * how much time the scroll context should stay open in memory to be able to complete the update.
+ *
+ * @param scrollTimeValidity the scroll time validity in time unit
+ */
+ public void setScrollTimeValidity(String scrollTimeValidity) {
+ this.scrollTimeValidity = scrollTimeValidity;
+ }
+
+ /**
+ * Batch update will perform scroll queries to query document to be updated, the scroll batch size allow specifying
+ * how many document we want to load per scroll.
+ *
+ * @return the scroll batch size (default: 1000)
+ */
+ public int getScrollBatchSize() {
+ return scrollBatchSize;
+ }
+
+ /**
+ * Batch update will perform scroll queries to query document to be updated, the scroll batch size allow specifying
+ * how many document we want to load per scroll.
+ *
+ * @param scrollBatchSize the scroll batch size (default: 1000)
+ */
+ public void setScrollBatchSize(int scrollBatchSize) {
+ this.scrollBatchSize = scrollBatchSize;
+ }
}
diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
index fdd2088a2..306f9d8c8 100644
--- a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
@@ -17,6 +17,7 @@
package org.apache.unomi.itests;
import org.apache.unomi.api.*;
+import org.apache.unomi.api.conditions.Condition;
import org.apache.unomi.api.query.Query;
import org.apache.unomi.persistence.spi.PersistenceService;
import org.junit.After;
@@ -418,4 +419,46 @@ public class ProfileServiceIT extends BaseIT {
keepTrying("Events number should be 150", () -> persistenceService.getAllItemsCount(Event.ITEM_TYPE),
(count) -> count == (150 + originalEventsCount), 1000, 100);
}
+
+ @Test
+ public void testBatchProfileUpdate() throws Exception {
+ // Create 50 profiles
+ for (int i = 1; i <= 50; i++) {
+ Profile profile = new Profile();
+ profile.setItemId("batchProfileUpdateTest" + i);
+ profile.setProperty("name", "Boby");
+ profile.setProperty("test", "batchProfileUpdateTest");
+
+ profileService.save(profile);
+ }
+
+ Condition batchUpdateCondition = new Condition(definitionsService.getConditionType("profilePropertyCondition"));
+ batchUpdateCondition.setParameter("propertyName","properties.test");
+ batchUpdateCondition.setParameter("comparisonOperator","equals");
+ batchUpdateCondition.setParameter("propertyValue", "batchProfileUpdateTest");
+ keepTrying("We should wait for profiles to be saved", () -> persistenceService.queryCount(batchUpdateCondition, Profile.ITEM_TYPE),
+ (count) -> count == 50, 1000, 100);
+
+ BatchUpdate batchUpdate = new BatchUpdate();
+ batchUpdate.setCondition(batchUpdateCondition);
+ batchUpdate.setStrategy("alwaysSet");
+ batchUpdate.setPropertyName("properties.name");
+ batchUpdate.setPropertyValue("Billybob");
+ batchUpdate.setScrollBatchSize(10);
+ profileService.batchProfilesUpdate(batchUpdate);
+
+ Condition updatedProfilesCondition = new Condition(definitionsService.getConditionType("profilePropertyCondition"));
+ updatedProfilesCondition.setParameter("propertyName","properties.name");
+ updatedProfilesCondition.setParameter("comparisonOperator","equals");
+ updatedProfilesCondition.setParameter("propertyValue", "Billybob");
+ keepTrying("We should still retrieve the 50 updated profiles", () -> persistenceService.queryCount(updatedProfilesCondition, Profile.ITEM_TYPE),
+ (count) -> count == 50, 1000, 100);
+
+ Condition oldProfilesCondition = new Condition(definitionsService.getConditionType("profilePropertyCondition"));
+ oldProfilesCondition.setParameter("propertyName","properties.name");
+ oldProfilesCondition.setParameter("comparisonOperator","equals");
+ oldProfilesCondition.setParameter("propertyValue", "Boby");
+ keepTrying("We should not be able to retrieve previous profile based on previous value", () -> persistenceService.queryCount(oldProfilesCondition, Profile.ITEM_TYPE),
+ (count) -> count == 0, 1000, 100);
+ }
}
diff --git a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
index a595ba892..56bf686d8 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
@@ -942,14 +942,25 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList
}
public void batchProfilesUpdate(BatchUpdate update) {
+ logger.info("Starting batch profiles update");
+ long startTime = System.currentTimeMillis();
+ long updatedCount = 0;
+
ParserHelper.resolveConditionType(definitionsService, update.getCondition(), "batch update on property " + update.getPropertyName());
- List<Profile> profiles = persistenceService.query(update.getCondition(), null, Profile.class);
+ PartialList<Profile> profiles = persistenceService.query(update.getCondition(), null, Profile.class, 0,update.getScrollBatchSize(), update.getScrollTimeValidity());
- for (Profile profile : profiles) {
- if (PropertyHelper.setProperty(profile, update.getPropertyName(), update.getPropertyValue(), update.getStrategy())) {
- save(profile);
+ while (profiles != null && profiles.getList().size() > 0) {
+ for (Profile profile : profiles.getList()) {
+ if (PropertyHelper.setProperty(profile, update.getPropertyName(), update.getPropertyValue(), update.getStrategy())) {
+ save(profile);
+ updatedCount += 1;
+ }
}
+ profiles = persistenceService.continueScrollQuery(Profile.class, profiles.getScrollIdentifier(), profiles.getScrollTimeValidity());
}
+
+ long totalTime = System.currentTimeMillis() - startTime;
+ logger.info("Batch profiles updated: {} profiles in {}ms", updatedCount, totalTime);
}
public Persona loadPersona(String personaId) {