You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by jk...@apache.org on 2021/09/16 08:30:15 UTC
[unomi] branch unomi-1.6.x updated: [UNOMI-462] Fix recalculation of Segments containing past event conditions, also … (#340)
This is an automated email from the ASF dual-hosted git repository.
jkevan pushed a commit to branch unomi-1.6.x
in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/unomi-1.6.x by this push:
new 7e24dd6 [UNOMI-462] Fix recalculation of Segments containing past event conditions, also … (#340)
7e24dd6 is described below
commit 7e24dd667fa427d3b332e9448d2d5bb17cd87a71
Author: kevan Jahanshahi <ke...@jahia.com>
AuthorDate: Thu Sep 16 10:27:50 2021 +0200
[UNOMI-462] Fix recalculation of Segments containing past event conditions, also … (#340)
* Fix recalculation of Segments containing past event conditions, also provide ITest
* remove unecessary retry classe and use the existing keepRetry function that already exists
* Fix pastEventsDisablePartitions
---
.../apache/unomi/api/services/SegmentService.java | 11 ++
itests/pom.xml | 1 +
.../java/org/apache/unomi/itests/SegmentIT.java | 56 +++++++++
.../PastEventConditionESQueryBuilder.java | 25 ++--
.../services/impl/segments/SegmentServiceImpl.java | 126 +++++++++++++++++----
5 files changed, 180 insertions(+), 39 deletions(-)
diff --git a/api/src/main/java/org/apache/unomi/api/services/SegmentService.java b/api/src/main/java/org/apache/unomi/api/services/SegmentService.java
index d1dc440..a54489b 100644
--- a/api/src/main/java/org/apache/unomi/api/services/SegmentService.java
+++ b/api/src/main/java/org/apache/unomi/api/services/SegmentService.java
@@ -226,4 +226,15 @@ public interface SegmentService {
* @return a String representing the condition and parent condition uniquelly
*/
String getGeneratedPropertyKey(Condition condition, Condition parentCondition);
+
+ /**
+ * This will recalculate the past event conditions from existing rules
+ * This operation can be heavy and take time, it will:
+ * - browse existing rules to extract the past event condition,
+ * - query the matching events for those conditions,
+ * - update the corresponding profiles
+ * - reevaluate segments linked to this rules to engaged/disengaged profiles after the occurrences have been updated
+ * So use it carefully or execute this method in a dedicated thread.
+ */
+ void recalculatePastEventConditions();
}
diff --git a/itests/pom.xml b/itests/pom.xml
index ea4aef3..3783ea4 100644
--- a/itests/pom.xml
+++ b/itests/pom.xml
@@ -168,6 +168,7 @@
<instanceSettings>
<properties>
<cluster.routing.allocation.disk.threshold_enabled>false</cluster.routing.allocation.disk.threshold_enabled>
+ <http.cors.allow-origin>*</http.cors.allow-origin>
</properties>
</instanceSettings>
</configuration>
diff --git a/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java b/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java
index 207bd93..8241dc3 100644
--- a/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java
@@ -180,4 +180,60 @@ public class SegmentIT extends BaseIT {
profile = profileService.load("test_profile_id");
Assert.assertTrue("Profile should be engaged in the segment", profile.getSegments().contains("past-event-segment-test"));
}
+
+ @Test
+ public void testSegmentPastEventRecalculation() throws Exception {
+ // create Profile
+ Profile profile = new Profile();
+ profile.setItemId("test_profile_id");
+ profileService.save(profile);
+ persistenceService.refreshIndex(Profile.class, null); // wait for profile to be full persisted and index
+
+ // create the segment
+ Metadata segmentMetadata = new Metadata("past-event-segment-test");
+ Segment segment = new Segment(segmentMetadata);
+ Condition segmentCondition = new Condition(definitionsService.getConditionType("pastEventCondition"));
+ segmentCondition.setParameter("numberOfDays", 10);
+ Condition pastEventEventCondition = new Condition(definitionsService.getConditionType("eventTypeCondition"));
+ pastEventEventCondition.setParameter("eventTypeId", "test-event-type");
+ segmentCondition.setParameter("eventCondition", pastEventEventCondition);
+ segment.setCondition(segmentCondition);
+ segmentService.setSegmentDefinition(segment);
+ Thread.sleep(5000);
+
+ // Persist the event (do not send it into the system so that it will not be processed by the rules)
+ ZoneId defaultZoneId = ZoneId.systemDefault();
+ LocalDate localDate = LocalDate.now().minusDays(3);
+ Event testEvent = new Event("test-event-type", null, profile, null, null, profile, Date.from(localDate.atStartOfDay(defaultZoneId).toInstant()));
+ testEvent.setPersistent(true);
+ persistenceService.save(testEvent, null, true);
+ persistenceService.refreshIndex(Event.class, new Date()); // wait for event to be fully persisted and indexed
+
+ // insure the profile is not yet engaged since we directly saved the event in ES
+ profile = profileService.load("test_profile_id");
+ Assert.assertFalse("Profile should not be engaged in the segment", profile.getSegments().contains("past-event-segment-test"));
+
+ // now recalculate the past event conditions
+ segmentService.recalculatePastEventConditions();
+ persistenceService.refreshIndex(Profile.class, null);
+ keepTrying("Profile should be engaged in the segment",
+ () -> profileService.load("test_profile_id"),
+ updatedProfile -> updatedProfile.getSegments().contains("past-event-segment-test"),
+ 1000, 20);
+
+ // update the event to a date out of the past event condition
+ removeItems(Event.class);
+ localDate = LocalDate.now().minusDays(15);
+ testEvent = new Event("test-event-type", null, profile, null, null, profile, Date.from(localDate.atStartOfDay(defaultZoneId).toInstant()));
+ persistenceService.save(testEvent);
+ persistenceService.refreshIndex(Event.class, new Date()); // wait for event to be fully persisted and indexed
+
+ // now recalculate the past event conditions
+ segmentService.recalculatePastEventConditions();
+ persistenceService.refreshIndex(Profile.class, null);
+ keepTrying("Profile should not be engaged in the segment anymore",
+ () -> profileService.load("test_profile_id"),
+ updatedProfile -> !updatedProfile.getSegments().contains("past-event-segment-test"),
+ 1000, 20);
+ }
}
diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
index 3c676d4..ca2ee7e 100644
--- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
+++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
@@ -77,23 +77,14 @@ public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder
public QueryBuilder buildQuery(Condition condition, Map<String, Object> context, ConditionESQueryBuilderDispatcher dispatcher) {
Integer minimumEventCount = condition.getParameter("minimumEventCount") == null ? 1 : (Integer) condition.getParameter("minimumEventCount");
Integer maximumEventCount = condition.getParameter("maximumEventCount") == null ? Integer.MAX_VALUE : (Integer) condition.getParameter("maximumEventCount");
-
- if (condition.getParameter("generatedPropertyKey") != null && condition.getParameter("generatedPropertyKey").equals(segmentService.getGeneratedPropertyKey((Condition) condition.getParameter("eventCondition"), condition))) {
- // A property is already set on profiles matching the past event condition, use it
- if (minimumEventCount != 1 || maximumEventCount != Integer.MAX_VALUE) {
- // Check the number of occurences
- RangeQueryBuilder builder = QueryBuilders.rangeQuery("systemProperties.pastEvents." + condition.getParameter("generatedPropertyKey"));
- if (minimumEventCount != 1) {
- builder.gte(minimumEventCount);
- }
- if (maximumEventCount != Integer.MAX_VALUE) {
- builder.lte(minimumEventCount);
- }
- return builder;
- } else {
- // Simply get profiles who have the property set
- return QueryBuilders.existsQuery("systemProperties.pastEvents." + condition.getParameter("generatedPropertyKey"));
- }
+ String generatedPropertyKey = (String) condition.getParameter("generatedPropertyKey");
+
+ if (generatedPropertyKey != null && generatedPropertyKey.equals(segmentService.getGeneratedPropertyKey((Condition) condition.getParameter("eventCondition"), condition))) {
+ // A property is already set on profiles matching the past event condition, use it to check the numbers of occurrences
+ RangeQueryBuilder builder = QueryBuilders.rangeQuery("systemProperties.pastEvents." + generatedPropertyKey);
+ builder.gte(minimumEventCount);
+ builder.lte(minimumEventCount);
+ return builder;
} else {
// No property set - tries to build an idsQuery
// Build past event condition
diff --git a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
index 1dc599d..cd3fe23 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
@@ -793,7 +793,9 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
rule.setLinkedItems(Arrays.asList(metadata.getId()));
rules.add(rule);
- updateExistingProfilesForPastEventCondition(condition, parentCondition, true);
+ // it's a new generated rules to keep track of the event count, we should update all the profile that match this past event
+ // it will update the count of event occurrence on the profile directly
+ recalculatePastEventOccurrencesOnProfiles(condition, parentCondition, true, false);
} else {
rule.getLinkedItems().add(metadata.getId());
rules.add(rule);
@@ -815,7 +817,18 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
}
}
- private void updateExistingProfilesForPastEventCondition(Condition eventCondition, Condition parentCondition, boolean forceRefresh) {
+ /**
+ * This will recalculate the event counts on the profiles that match the given past event condition
+ * @param eventCondition the real condition
+ * @param parentCondition the past event condition
+ * @param forceRefresh will refresh the Profile index in case it's true
+ * @param resetExistingProfilesNotMatching if true, will reset existing profiles having a count to 0, in case they do not have events matching anymore
+ * ("false" can be useful when you know that no existing profiles already exist because it's a new rule for example,
+ * in that case setting this to "false" allow to skip profiles queries and speedup this process.
+ * Otherwise use "true" here to be sure the count is reset to 0 on profiles that need to be reset)
+ */
+ private void recalculatePastEventOccurrencesOnProfiles(Condition eventCondition, Condition parentCondition,
+ boolean forceRefresh, boolean resetExistingProfilesNotMatching) {
long t = System.currentTimeMillis();
List<Condition> l = new ArrayList<Condition>();
Condition andCondition = new Condition();
@@ -855,21 +868,33 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
}
String propertyKey = (String) parentCondition.getParameter("generatedPropertyKey");
+ Set<String> existingProfilesWithCounts = resetExistingProfilesNotMatching ? getExistingProfilesWithPastEventOccurrenceCount(propertyKey) : Collections.emptySet();
int updatedProfileCount = 0;
if(pastEventsDisablePartitions) {
Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(eventCondition, new TermsAggregate("profileId"), Event.ITEM_TYPE, maximumIdsQueryCount);
- updatedProfileCount = updateProfilesWithPastEventProperty(eventCountByProfile, propertyKey);
+ Set<String> updatedProfiles = updatePastEventOccurrencesOnProfiles(eventCountByProfile, propertyKey);
+ existingProfilesWithCounts.removeAll(updatedProfiles);
+ updatedProfileCount = updatedProfiles.size();
} else {
Map<String, Double> m = persistenceService.getSingleValuesMetrics(andCondition, new String[]{"card"}, "profileId.keyword", Event.ITEM_TYPE);
long card = m.get("_card").longValue();
int numParts = (int) (card / aggregateQueryBucketSize) + 2;
for (int i = 0; i < numParts; i++) {
Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(andCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE);
- updatedProfileCount += updateProfilesWithPastEventProperty(eventCountByProfile, propertyKey);
+ Set<String> updatedProfiles = updatePastEventOccurrencesOnProfiles(eventCountByProfile, propertyKey);
+ existingProfilesWithCounts.removeAll(updatedProfiles);
+ updatedProfileCount += updatedProfiles.size();
}
}
+ // remaining existing profiles with counts should be reset to 0 since they have not been updated it means
+ // that they do not have matching events anymore in the time based condition
+ if (!existingProfilesWithCounts.isEmpty()) {
+ updatedProfileCount += updatePastEventOccurrencesOnProfiles(
+ existingProfilesWithCounts.stream().collect(Collectors.toMap(key -> key, value -> 0L)), propertyKey).size();
+ }
+
if (forceRefresh && updatedProfileCount > 0) {
persistenceService.refreshIndex(Profile.class, null);
}
@@ -877,6 +902,34 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
logger.info("{} profiles updated for past event condition in {}ms", updatedProfileCount, System.currentTimeMillis() - t);
}
+ /**
+ * Return the list of profile ids, for profiles that already have an event count matching the generated property key
+ * @param generatedPropertyKey the generated property key of the generated rule for the given past event condition.
+ * @return the list of profile ids.
+ */
+ private Set<String> getExistingProfilesWithPastEventOccurrenceCount(String generatedPropertyKey) {
+ Condition countExistsCondition = new Condition();
+ countExistsCondition.setConditionType(definitionsService.getConditionType("profilePropertyCondition"));
+ countExistsCondition.setParameter("propertyName", "systemProperties.pastEvents." + generatedPropertyKey);
+ countExistsCondition.setParameter("comparisonOperator", "greaterThan");
+ countExistsCondition.setParameter("propertyValueInteger", 0);
+
+ Set<String> profileIds = new HashSet<>();
+ if(pastEventsDisablePartitions) {
+ profileIds.addAll(persistenceService.aggregateWithOptimizedQuery(countExistsCondition, new TermsAggregate("itemId"),
+ Profile.ITEM_TYPE, maximumIdsQueryCount).keySet());
+ } else {
+ Map<String, Double> m = persistenceService.getSingleValuesMetrics(countExistsCondition, new String[]{"card"}, "itemId.keyword", Profile.ITEM_TYPE);
+ long card = m.get("_card").longValue();
+ int numParts = (int) (card / aggregateQueryBucketSize) + 2;
+ for (int i = 0; i < numParts; i++) {
+ profileIds.addAll(persistenceService.aggregateWithOptimizedQuery(countExistsCondition, new TermsAggregate("itemId", i, numParts),
+ Profile.ITEM_TYPE).keySet());
+ }
+ }
+ return profileIds;
+ }
+
public String getGeneratedPropertyKey(Condition condition, Condition parentCondition) {
try {
Map<String, Object> m = new HashMap<>();
@@ -900,8 +953,50 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
}
}
- private int updateProfilesWithPastEventProperty(Map<String, Long> eventCountByProfile, String propertyKey) {
- int profileUpdatedCount = 0;
+ @Override
+ public void recalculatePastEventConditions() {
+ logger.info("running scheduled task to recalculate segments with pastEventCondition conditions");
+ long pastEventsTaskStartTime = System.currentTimeMillis();
+ Set<String> linkedSegments = new HashSet<>();
+ for (Metadata metadata : rulesService.getRuleMetadatas()) {
+ // reevaluate auto generated rules used to store the event occurrence count on the profile
+ Rule rule = rulesService.getRule(metadata.getId());
+ for (Action action : rule.getActions()) {
+ if (action.getActionTypeId().equals("setEventOccurenceCountAction")) {
+ Condition pastEventCondition = (Condition) action.getParameterValues().get("pastEventCondition");
+ if (pastEventCondition.containsParameter("numberOfDays")) {
+ recalculatePastEventOccurrencesOnProfiles(rule.getCondition(), pastEventCondition, false, true);
+ logger.info("Event occurrence count on profiles updated for rule: {}", rule.getItemId());
+ if (rule.getLinkedItems() != null && rule.getLinkedItems().size() > 0) {
+ linkedSegments.addAll(rule.getLinkedItems());
+ }
+ }
+ }
+ }
+ }
+
+ // reevaluate segments linked to this rule, since we have updated the event occurrences count on the profiles.
+ if (linkedSegments.size() > 0) {
+ persistenceService.refreshIndex(Profile.class, null);
+ for (String linkedItem : linkedSegments) {
+ Segment linkedSegment = getSegmentDefinition(linkedItem);
+ if (linkedSegment != null) {
+ updateExistingProfilesForSegment(linkedSegment);
+ }
+ }
+ }
+
+ logger.info("finished recalculate segments with pastEventCondition conditions in {}ms. ", System.currentTimeMillis() - pastEventsTaskStartTime);
+ }
+
+ /**
+ * This will update all the profiles in the given map with the according new count occurrence for the given propertyKey
+ * @param eventCountByProfile the events count per profileId map
+ * @param propertyKey the generate property key for this past event condition, to keep track of the count in the profile
+ * @return the list of profiles for witch the count of event occurrences have been updated.
+ */
+ private Set<String> updatePastEventOccurrencesOnProfiles(Map<String, Long> eventCountByProfile, String propertyKey) {
+ Set<String> profilesUpdated = new HashSet<>();
Map<Item, Map> batch = new HashMap<>();
Iterator<Map.Entry<String, Long>> entryIterator = eventCountByProfile.entrySet().iterator();
while (entryIterator.hasNext()){
@@ -917,12 +1012,12 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
Profile profile = new Profile();
profile.setItemId(profileId);
batch.put(profile, Collections.singletonMap("systemProperties", systemProperties));
+ profilesUpdated.add(profileId);
}
if (batch.size() == segmentUpdateBatchSize || (!entryIterator.hasNext() && batch.size() > 0)) {
try {
persistenceService.update(batch, null, Profile.class);
- profileUpdatedCount += batch.size();
} catch (Exception e) {
logger.error("Error updating {} profiles for past event system properties", batch.size(), e);
} finally {
@@ -930,7 +1025,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
}
}
}
- return profileUpdatedCount;
+ return profilesUpdated;
}
private String getMD5(String md5) {
@@ -1144,20 +1239,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
@Override
public void run() {
try {
- logger.info("running scheduled task to recalculate segments with pastEventCondition conditions");
- long pastEventsTaskStartTime = System.currentTimeMillis();
- for (Metadata metadata : rulesService.getRuleMetadatas()) {
- Rule rule = rulesService.getRule(metadata.getId());
- for (Action action : rule.getActions()) {
- if (action.getActionTypeId().equals("setEventOccurenceCountAction")) {
- Condition pastEventCondition = (Condition) action.getParameterValues().get("pastEventCondition");
- if (pastEventCondition.containsParameter("numberOfDays")) {
- updateExistingProfilesForPastEventCondition(rule.getCondition(), pastEventCondition, false);
- }
- }
- }
- }
- logger.info("finished recalculate segments with pastEventCondition conditions in {}ms. ", System.currentTimeMillis() - pastEventsTaskStartTime);
+ recalculatePastEventConditions();
} catch (Throwable t) {
logger.error("Error while updating profiles for past event conditions", t);
}