You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by jk...@apache.org on 2021/09/15 09:46:06 UTC
[unomi] 01/01: Fix recalculation of Segments containing past event
conditions, also provide ITest
This is an automated email from the ASF dual-hosted git repository.
jkevan pushed a commit to branch UNOMI-462-past-events
in repository https://gitbox.apache.org/repos/asf/unomi.git
commit 077384a29e0f2f811a95e9b684d0d3bde33301e0
Author: Kevan <ke...@jahia.com>
AuthorDate: Wed Sep 15 11:45:50 2021 +0200
Fix recalculation of Segments containing past event conditions, also provide ITest
---
.../apache/unomi/api/services/SegmentService.java | 11 ++
itests/pom.xml | 1 +
.../java/org/apache/unomi/itests/SegmentIT.java | 64 +++++++++++
.../apache/unomi/itests/tools/RetriableHelper.java | 66 +++++++++++
.../PastEventConditionESQueryBuilder.java | 25 ++--
.../services/impl/segments/SegmentServiceImpl.java | 126 +++++++++++++++++----
6 files changed, 254 insertions(+), 39 deletions(-)
diff --git a/api/src/main/java/org/apache/unomi/api/services/SegmentService.java b/api/src/main/java/org/apache/unomi/api/services/SegmentService.java
index d1dc440..a54489b 100644
--- a/api/src/main/java/org/apache/unomi/api/services/SegmentService.java
+++ b/api/src/main/java/org/apache/unomi/api/services/SegmentService.java
@@ -226,4 +226,15 @@ public interface SegmentService {
* @return a String representing the condition and parent condition uniquelly
*/
String getGeneratedPropertyKey(Condition condition, Condition parentCondition);
+
+ /**
+ * This will recalculate the past event conditions from existing rules
+ * This operation can be heavy and take time, it will:
+ * - browse existing rules to extract the past event condition,
+ * - query the matching events for those conditions,
+ * - update the corresponding profiles
+ * - reevaluate segments linked to this rules to engaged/disengaged profiles after the occurrences have been updated
+ * So use it carefully or execute this method in a dedicated thread.
+ */
+ void recalculatePastEventConditions();
}
diff --git a/itests/pom.xml b/itests/pom.xml
index c022536..e3bec40 100644
--- a/itests/pom.xml
+++ b/itests/pom.xml
@@ -183,6 +183,7 @@
<instanceSettings>
<properties>
<cluster.routing.allocation.disk.threshold_enabled>false</cluster.routing.allocation.disk.threshold_enabled>
+ <http.cors.allow-origin>*</http.cors.allow-origin>
</properties>
</instanceSettings>
</configuration>
diff --git a/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java b/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java
index 207bd93..87ca3fd 100644
--- a/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java
@@ -26,6 +26,7 @@ import org.apache.unomi.api.services.EventService;
import org.apache.unomi.api.services.ProfileService;
import org.apache.unomi.api.services.SegmentService;
import org.apache.unomi.api.exceptions.BadSegmentConditionException;
+import org.apache.unomi.itests.tools.RetriableHelper;
import org.apache.unomi.persistence.spi.PersistenceService;
import org.junit.After;
import org.junit.Assert;
@@ -44,6 +45,7 @@ import java.time.LocalDate;
import java.time.ZoneId;
import java.util.Date;
import java.util.List;
+import java.util.concurrent.Callable;
@RunWith(PaxExam.class)
@ExamReactorStrategy(PerSuite.class)
@@ -180,4 +182,66 @@ public class SegmentIT extends BaseIT {
profile = profileService.load("test_profile_id");
Assert.assertTrue("Profile should be engaged in the segment", profile.getSegments().contains("past-event-segment-test"));
}
+
+ @Test
+ public void testSegmentPastEventRecalculation() throws Exception {
+ // create Profile
+ Profile profile = new Profile();
+ profile.setItemId("test_profile_id");
+ profileService.save(profile);
+ persistenceService.refreshIndex(Profile.class, null); // wait for profile to be full persisted and index
+
+ // create the segment
+ Metadata segmentMetadata = new Metadata("past-event-segment-test");
+ Segment segment = new Segment(segmentMetadata);
+ Condition segmentCondition = new Condition(definitionsService.getConditionType("pastEventCondition"));
+ segmentCondition.setParameter("numberOfDays", 10);
+ Condition pastEventEventCondition = new Condition(definitionsService.getConditionType("eventTypeCondition"));
+ pastEventEventCondition.setParameter("eventTypeId", "test-event-type");
+ segmentCondition.setParameter("eventCondition", pastEventEventCondition);
+ segment.setCondition(segmentCondition);
+ segmentService.setSegmentDefinition(segment);
+ Thread.sleep(5000);
+
+ // Persist the event (do not send it into the system so that it will not be processed by the rules)
+ ZoneId defaultZoneId = ZoneId.systemDefault();
+ LocalDate localDate = LocalDate.now().minusDays(3);
+ Event testEvent = new Event("test-event-type", null, profile, null, null, profile, Date.from(localDate.atStartOfDay(defaultZoneId).toInstant()));
+ testEvent.setPersistent(true);
+ persistenceService.save(testEvent, null, true);
+ persistenceService.refreshIndex(Event.class, new Date()); // wait for event to be fully persisted and indexed
+
+ // insure the profile is not yet engaged since we directly saved the event in ES
+ profile = profileService.load("test_profile_id");
+ Assert.assertFalse("Profile should not be engaged in the segment", profile.getSegments().contains("past-event-segment-test"));
+
+ // now recalculate the past event conditions
+ segmentService.recalculatePastEventConditions();
+ persistenceService.refreshIndex(Profile.class, null);
+ new RetriableHelper<>("testSegmentPastEventRecalculation profile engaged", 20, 1000, () -> {
+ Profile updatedProfile = profileService.load("test_profile_id");
+ if (!updatedProfile.getSegments().contains("past-event-segment-test")) {
+ throw new RuntimeException("Profile should be engaged in the segment, will retry or fail if retry reach the limit");
+ }
+ return updatedProfile;
+ }).call();
+
+ // update the event to a date out of the past event condition
+ removeItems(Event.class);
+ localDate = LocalDate.now().minusDays(15);
+ testEvent = new Event("test-event-type", null, profile, null, null, profile, Date.from(localDate.atStartOfDay(defaultZoneId).toInstant()));
+ persistenceService.save(testEvent);
+ persistenceService.refreshIndex(Event.class, new Date()); // wait for event to be fully persisted and indexed
+
+ // now recalculate the past event conditions
+ segmentService.recalculatePastEventConditions();
+ persistenceService.refreshIndex(Profile.class, null);
+ new RetriableHelper<>("testSegmentPastEventRecalculation profile not engaged anymore", 20, 1000, () -> {
+ Profile updatedProfile = profileService.load("test_profile_id");
+ if (updatedProfile.getSegments().contains("past-event-segment-test")) {
+ throw new RuntimeException("Profile should not be engaged in the segment anymore, will retry or fail if retry reach the limit");
+ }
+ return updatedProfile;
+ }).call();
+ }
}
diff --git a/itests/src/test/java/org/apache/unomi/itests/tools/RetriableHelper.java b/itests/src/test/java/org/apache/unomi/itests/tools/RetriableHelper.java
new file mode 100644
index 0000000..540a7db
--- /dev/null
+++ b/itests/src/test/java/org/apache/unomi/itests/tools/RetriableHelper.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.unomi.itests.tools;
+
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+
+/**
+ * Just an utility class to do some retriable stuff in ITests
+ * Useful when you are waiting for something to be indexed in ES for exemple, you just retry until your object is available
+ * @param <T> The type of object you are expecting to return from this retry instance
+ */
+public class RetriableHelper<T> implements Callable<T> {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(RetriableHelper.class);
+
+ private final Callable<T> task;
+ private final long timeToWait;
+ private final String key;
+
+ private int numberOfTriesLeft;
+
+
+ public RetriableHelper(String key, int numberOfRetries, long timeToWait, Callable<T> task) {
+ this.key = key;
+ this.numberOfTriesLeft = numberOfRetries;
+ this.timeToWait = timeToWait;
+ this.task = task;
+ }
+
+ public T call() throws Exception {
+ while (true) {
+ try {
+ return task.call();
+ } catch (InterruptedException | CancellationException e) {
+ throw e;
+ } catch (Exception e) {
+ numberOfTriesLeft--;
+ LOGGER.warn("RETRY: {} failed, number of tries left: {}, will wait {}ms before next try", key, numberOfTriesLeft, timeToWait);
+ if (numberOfTriesLeft == 0) {
+ Assert.fail("RETRY LIMIT REACH: " + e.getMessage());
+ }
+ Thread.sleep(timeToWait);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
index 3c676d4..ca2ee7e 100644
--- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
+++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
@@ -77,23 +77,14 @@ public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder
public QueryBuilder buildQuery(Condition condition, Map<String, Object> context, ConditionESQueryBuilderDispatcher dispatcher) {
Integer minimumEventCount = condition.getParameter("minimumEventCount") == null ? 1 : (Integer) condition.getParameter("minimumEventCount");
Integer maximumEventCount = condition.getParameter("maximumEventCount") == null ? Integer.MAX_VALUE : (Integer) condition.getParameter("maximumEventCount");
-
- if (condition.getParameter("generatedPropertyKey") != null && condition.getParameter("generatedPropertyKey").equals(segmentService.getGeneratedPropertyKey((Condition) condition.getParameter("eventCondition"), condition))) {
- // A property is already set on profiles matching the past event condition, use it
- if (minimumEventCount != 1 || maximumEventCount != Integer.MAX_VALUE) {
- // Check the number of occurences
- RangeQueryBuilder builder = QueryBuilders.rangeQuery("systemProperties.pastEvents." + condition.getParameter("generatedPropertyKey"));
- if (minimumEventCount != 1) {
- builder.gte(minimumEventCount);
- }
- if (maximumEventCount != Integer.MAX_VALUE) {
- builder.lte(minimumEventCount);
- }
- return builder;
- } else {
- // Simply get profiles who have the property set
- return QueryBuilders.existsQuery("systemProperties.pastEvents." + condition.getParameter("generatedPropertyKey"));
- }
+ String generatedPropertyKey = (String) condition.getParameter("generatedPropertyKey");
+
+ if (generatedPropertyKey != null && generatedPropertyKey.equals(segmentService.getGeneratedPropertyKey((Condition) condition.getParameter("eventCondition"), condition))) {
+ // A property is already set on profiles matching the past event condition, use it to check the numbers of occurrences
+ RangeQueryBuilder builder = QueryBuilders.rangeQuery("systemProperties.pastEvents." + generatedPropertyKey);
+ builder.gte(minimumEventCount);
+ builder.lte(minimumEventCount);
+ return builder;
} else {
// No property set - tries to build an idsQuery
// Build past event condition
diff --git a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
index d096668..d219776 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
@@ -783,7 +783,9 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
rule.setLinkedItems(Arrays.asList(metadata.getId()));
rules.add(rule);
- updateExistingProfilesForPastEventCondition(condition, parentCondition, true);
+ // it's a new generated rules to keep track of the event count, we should update all the profile that match this past event
+ // it will update the count of event occurrence on the profile directly
+ recalculatePastEventOccurrencesOnProfiles(condition, parentCondition, true, false);
} else {
rule.getLinkedItems().add(metadata.getId());
rules.add(rule);
@@ -805,7 +807,18 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
}
}
- private void updateExistingProfilesForPastEventCondition(Condition eventCondition, Condition parentCondition, boolean forceRefresh) {
+ /**
+ * This will recalculate the event counts on the profiles that match the given past event condition
+ * @param eventCondition the real condition
+ * @param parentCondition the past event condition
+ * @param forceRefresh will refresh the Profile index in case it's true
+ * @param resetExistingProfilesNotMatching if true, will reset existing profiles having a count to 0, in case they do not have events matching anymore
+ * ("false" can be useful when you know that no existing profiles already exist because it's a new rule for example,
+ * in that case setting this to "false" allow to skip profiles queries and speedup this process.
+ * Otherwise use "true" here to be sure the count is reset to 0 on profiles that need to be reset)
+ */
+ private void recalculatePastEventOccurrencesOnProfiles(Condition eventCondition, Condition parentCondition,
+ boolean forceRefresh, boolean resetExistingProfilesNotMatching) {
long t = System.currentTimeMillis();
List<Condition> l = new ArrayList<Condition>();
Condition andCondition = new Condition();
@@ -845,21 +858,33 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
}
String propertyKey = (String) parentCondition.getParameter("generatedPropertyKey");
+ Set<String> existingProfilesWithCounts = resetExistingProfilesNotMatching ? getExistingProfilesWithPastEventOccurrenceCount(propertyKey) : Collections.emptySet();
int updatedProfileCount = 0;
if(pastEventsDisablePartitions) {
Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(eventCondition, new TermsAggregate("profileId"), Event.ITEM_TYPE, maximumIdsQueryCount);
- updatedProfileCount = updateProfilesWithPastEventProperty(eventCountByProfile, propertyKey);
+ Set<String> updatedProfiles = updatePastEventOccurrencesOnProfiles(eventCountByProfile, propertyKey);
+ existingProfilesWithCounts.removeAll(updatedProfiles);
+ updatedProfileCount = updatedProfiles.size();
} else {
Map<String, Double> m = persistenceService.getSingleValuesMetrics(andCondition, new String[]{"card"}, "profileId.keyword", Event.ITEM_TYPE);
long card = m.get("_card").longValue();
int numParts = (int) (card / aggregateQueryBucketSize) + 2;
for (int i = 0; i < numParts; i++) {
Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(andCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE);
- updatedProfileCount += updateProfilesWithPastEventProperty(eventCountByProfile, propertyKey);
+ Set<String> updatedProfiles = updatePastEventOccurrencesOnProfiles(eventCountByProfile, propertyKey);
+ existingProfilesWithCounts.removeAll(updatedProfiles);
+ updatedProfileCount += updatedProfiles.size();
}
}
+ // remaining existing profiles with counts should be reset to 0 since they have not been updated it means
+ // that they do not have matching events anymore in the time based condition
+ if (!existingProfilesWithCounts.isEmpty()) {
+ updatedProfileCount += updatePastEventOccurrencesOnProfiles(
+ existingProfilesWithCounts.stream().collect(Collectors.toMap(key -> key, value -> 0L)), propertyKey).size();
+ }
+
if (forceRefresh && updatedProfileCount > 0) {
persistenceService.refreshIndex(Profile.class, null);
}
@@ -867,6 +892,34 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
logger.info("{} profiles updated for past event condition in {}ms", updatedProfileCount, System.currentTimeMillis() - t);
}
+ /**
+ * Return the list of profile ids, for profiles that already have an event count matching the generated property key
+ * @param generatedPropertyKey the generated property key of the generated rule for the given past event condition.
+ * @return the list of profile ids.
+ */
+ private Set<String> getExistingProfilesWithPastEventOccurrenceCount(String generatedPropertyKey) {
+ Condition countExistsCondition = new Condition();
+ countExistsCondition.setConditionType(definitionsService.getConditionType("profilePropertyCondition"));
+ countExistsCondition.setParameter("propertyName", "systemProperties.pastEvents." + generatedPropertyKey);
+ countExistsCondition.setParameter("comparisonOperator", "greaterThan");
+ countExistsCondition.setParameter("propertyValueInteger", 0);
+
+ Set<String> profileIds = new HashSet<>();
+ if(pastEventsDisablePartitions) {
+ profileIds.addAll(persistenceService.aggregateWithOptimizedQuery(countExistsCondition, new TermsAggregate("profileId"),
+ Event.ITEM_TYPE, maximumIdsQueryCount).keySet());
+ } else {
+ Map<String, Double> m = persistenceService.getSingleValuesMetrics(countExistsCondition, new String[]{"card"}, "itemId.keyword", Profile.ITEM_TYPE);
+ long card = m.get("_card").longValue();
+ int numParts = (int) (card / aggregateQueryBucketSize) + 2;
+ for (int i = 0; i < numParts; i++) {
+ profileIds.addAll(persistenceService.aggregateWithOptimizedQuery(countExistsCondition, new TermsAggregate("itemId", i, numParts),
+ Profile.ITEM_TYPE).keySet());
+ }
+ }
+ return profileIds;
+ }
+
public String getGeneratedPropertyKey(Condition condition, Condition parentCondition) {
try {
Map<String, Object> m = new HashMap<>();
@@ -890,8 +943,50 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
}
}
- private int updateProfilesWithPastEventProperty(Map<String, Long> eventCountByProfile, String propertyKey) {
- int profileUpdatedCount = 0;
+ @Override
+ public void recalculatePastEventConditions() {
+ logger.info("running scheduled task to recalculate segments with pastEventCondition conditions");
+ long pastEventsTaskStartTime = System.currentTimeMillis();
+ Set<String> linkedSegments = new HashSet<>();
+ for (Metadata metadata : rulesService.getRuleMetadatas()) {
+ // reevaluate auto generated rules used to store the event occurrence count on the profile
+ Rule rule = rulesService.getRule(metadata.getId());
+ for (Action action : rule.getActions()) {
+ if (action.getActionTypeId().equals("setEventOccurenceCountAction")) {
+ Condition pastEventCondition = (Condition) action.getParameterValues().get("pastEventCondition");
+ if (pastEventCondition.containsParameter("numberOfDays")) {
+ recalculatePastEventOccurrencesOnProfiles(rule.getCondition(), pastEventCondition, false, true);
+ logger.info("Event occurrence count on profiles updated for rule: {}", rule.getItemId());
+ if (rule.getLinkedItems() != null && rule.getLinkedItems().size() > 0) {
+ linkedSegments.addAll(rule.getLinkedItems());
+ }
+ }
+ }
+ }
+ }
+
+ // reevaluate segments linked to this rule, since we have updated the event occurrences count on the profiles.
+ if (linkedSegments.size() > 0) {
+ persistenceService.refreshIndex(Profile.class, null);
+ for (String linkedItem : linkedSegments) {
+ Segment linkedSegment = getSegmentDefinition(linkedItem);
+ if (linkedSegment != null) {
+ updateExistingProfilesForSegment(linkedSegment);
+ }
+ }
+ }
+
+ logger.info("finished recalculate segments with pastEventCondition conditions in {}ms. ", System.currentTimeMillis() - pastEventsTaskStartTime);
+ }
+
+ /**
+ * This will update all the profiles in the given map with the according new count occurrence for the given propertyKey
+ * @param eventCountByProfile the events count per profileId map
+ * @param propertyKey the generate property key for this past event condition, to keep track of the count in the profile
+ * @return the list of profiles for witch the count of event occurrences have been updated.
+ */
+ private Set<String> updatePastEventOccurrencesOnProfiles(Map<String, Long> eventCountByProfile, String propertyKey) {
+ Set<String> profilesUpdated = new HashSet<>();
Map<Item, Map> batch = new HashMap<>();
Iterator<Map.Entry<String, Long>> entryIterator = eventCountByProfile.entrySet().iterator();
while (entryIterator.hasNext()){
@@ -907,12 +1002,12 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
Profile profile = new Profile();
profile.setItemId(profileId);
batch.put(profile, Collections.singletonMap("systemProperties", systemProperties));
+ profilesUpdated.add(profileId);
}
if (batch.size() == segmentUpdateBatchSize || (!entryIterator.hasNext() && batch.size() > 0)) {
try {
persistenceService.update(batch, null, Profile.class);
- profileUpdatedCount += batch.size();
} catch (Exception e) {
logger.error("Error updating {} profiles for past event system properties", batch.size(), e);
} finally {
@@ -920,7 +1015,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
}
}
}
- return profileUpdatedCount;
+ return profilesUpdated;
}
private String getMD5(String md5) {
@@ -1134,20 +1229,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
@Override
public void run() {
try {
- logger.info("running scheduled task to recalculate segments with pastEventCondition conditions");
- long pastEventsTaskStartTime = System.currentTimeMillis();
- for (Metadata metadata : rulesService.getRuleMetadatas()) {
- Rule rule = rulesService.getRule(metadata.getId());
- for (Action action : rule.getActions()) {
- if (action.getActionTypeId().equals("setEventOccurenceCountAction")) {
- Condition pastEventCondition = (Condition) action.getParameterValues().get("pastEventCondition");
- if (pastEventCondition.containsParameter("numberOfDays")) {
- updateExistingProfilesForPastEventCondition(rule.getCondition(), pastEventCondition, false);
- }
- }
- }
- }
- logger.info("finished recalculate segments with pastEventCondition conditions in {}ms. ", System.currentTimeMillis() - pastEventsTaskStartTime);
+ recalculatePastEventConditions();
} catch (Throwable t) {
logger.error("Error while updating profiles for past event conditions", t);
}