You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by jk...@apache.org on 2021/10/01 13:44:20 UTC
[unomi] branch unomi-1.6.x updated: UNOMI-513: fix scoring plan
recalculation (#348)
This is an automated email from the ASF dual-hosted git repository.
jkevan pushed a commit to branch unomi-1.6.x
in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/unomi-1.6.x by this push:
new 05f2547 UNOMI-513: fix scoring plan recalculation (#348)
05f2547 is described below
commit 05f2547b91134bda41059db021740e9444f5fb73
Author: kevan Jahanshahi <ke...@jahia.com>
AuthorDate: Fri Oct 1 15:42:57 2021 +0200
UNOMI-513: fix scoring plan recalculation (#348)
* UNOMI-513: fix scoring plan recalculation
* UNOMI-513: stabilize iTests and move painless scripts in /cxs/painless
---
.../java/org/apache/unomi/itests/SegmentIT.java | 138 +++++++++++++++++++--
.../ElasticSearchPersistenceServiceImpl.java | 63 +++++++++-
.../unomi/persistence/spi/PersistenceService.java | 21 ++++
services/pom.xml | 4 +
.../services/impl/segments/SegmentServiceImpl.java | 116 +++++++++--------
.../painless/evaluateScoringPlanElement.painless | 52 ++++++++
.../cxs/painless/resetScoringPlan.painless | 31 +++++
7 files changed, 360 insertions(+), 65 deletions(-)
diff --git a/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java b/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java
index 8241dc3..bbb0354 100644
--- a/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java
@@ -21,6 +21,8 @@ import org.apache.unomi.api.Event;
import org.apache.unomi.api.Metadata;
import org.apache.unomi.api.Profile;
import org.apache.unomi.api.conditions.Condition;
+import org.apache.unomi.api.segments.Scoring;
+import org.apache.unomi.api.segments.ScoringElement;
import org.apache.unomi.api.segments.Segment;
import org.apache.unomi.api.services.EventService;
import org.apache.unomi.api.services.ProfileService;
@@ -42,6 +44,7 @@ import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import java.time.LocalDate;
import java.time.ZoneId;
+import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@@ -66,11 +69,13 @@ public class SegmentIT extends BaseIT {
@Before
public void setUp() throws InterruptedException {
removeItems(Segment.class);
+ removeItems(Scoring.class);
}
@After
public void tearDown() throws InterruptedException {
removeItems(Segment.class);
+ removeItems(Scoring.class);
removeItems(Profile.class);
removeItems(Event.class);
}
@@ -161,8 +166,12 @@ public class SegmentIT extends BaseIT {
LocalDate localDate = LocalDate.now().minusDays(3);
Event testEvent = new Event("test-event-type", null, profile, null, null, profile, Date.from(localDate.atStartOfDay(defaultZoneId).toInstant()));
testEvent.setPersistent(true);
- eventService.send(testEvent);
- persistenceService.refreshIndex(Event.class, new Date()); // wait for event to be fully persisted and indexed
+ int changes = eventService.send(testEvent);
+ if ((changes & EventService.PROFILE_UPDATED) == EventService.PROFILE_UPDATED) {
+ profileService.save(profile);
+ persistenceService.refreshIndex(Profile.class, null);
+ }
+ persistenceService.refreshIndex(Event.class, testEvent.getTimeStamp()); // wait for event to be fully persisted and indexed
// create the segment
Metadata segmentMetadata = new Metadata("past-event-segment-test");
@@ -176,9 +185,10 @@ public class SegmentIT extends BaseIT {
segmentService.setSegmentDefinition(segment);
// insure the profile that did the past event condition is correctly engaged in the segment.
- Thread.sleep(5000);
- profile = profileService.load("test_profile_id");
- Assert.assertTrue("Profile should be engaged in the segment", profile.getSegments().contains("past-event-segment-test"));
+ keepTrying("Profile should be engaged in the segment",
+ () -> profileService.load("test_profile_id"),
+ updatedProfile -> updatedProfile.getSegments().contains("past-event-segment-test"),
+ 1000, 20);
}
@Test
@@ -207,7 +217,7 @@ public class SegmentIT extends BaseIT {
Event testEvent = new Event("test-event-type", null, profile, null, null, profile, Date.from(localDate.atStartOfDay(defaultZoneId).toInstant()));
testEvent.setPersistent(true);
persistenceService.save(testEvent, null, true);
- persistenceService.refreshIndex(Event.class, new Date()); // wait for event to be fully persisted and indexed
+ persistenceService.refreshIndex(Event.class, testEvent.getTimeStamp()); // wait for event to be fully persisted and indexed
// insure the profile is not yet engaged since we directly saved the event in ES
profile = profileService.load("test_profile_id");
@@ -226,7 +236,7 @@ public class SegmentIT extends BaseIT {
localDate = LocalDate.now().minusDays(15);
testEvent = new Event("test-event-type", null, profile, null, null, profile, Date.from(localDate.atStartOfDay(defaultZoneId).toInstant()));
persistenceService.save(testEvent);
- persistenceService.refreshIndex(Event.class, new Date()); // wait for event to be fully persisted and indexed
+ persistenceService.refreshIndex(Event.class, testEvent.getTimeStamp()); // wait for event to be fully persisted and indexed
// now recalculate the past event conditions
segmentService.recalculatePastEventConditions();
@@ -236,4 +246,116 @@ public class SegmentIT extends BaseIT {
updatedProfile -> !updatedProfile.getSegments().contains("past-event-segment-test"),
1000, 20);
}
-}
+
+ @Test
+ public void testScoringWithPastEventCondition() throws InterruptedException {
+ // create Profile
+ Profile profile = new Profile();
+ profile.setItemId("test_profile_id");
+ profileService.save(profile);
+ persistenceService.refreshIndex(Profile.class, null); // wait for profile to be full persisted and index
+
+ // send event for profile from a previous date (today -3 days)
+ ZoneId defaultZoneId = ZoneId.systemDefault();
+ LocalDate localDate = LocalDate.now().minusDays(3);
+ Event testEvent = new Event("test-event-type", null, profile, null, null, profile, Date.from(localDate.atStartOfDay(defaultZoneId).toInstant()));
+ testEvent.setPersistent(true);
+ int changes = eventService.send(testEvent);
+ if ((changes & EventService.PROFILE_UPDATED) == EventService.PROFILE_UPDATED) {
+ profileService.save(profile);
+ persistenceService.refreshIndex(Profile.class, null);
+ }
+ persistenceService.refreshIndex(Event.class, testEvent.getTimeStamp()); // wait for event to be fully persisted and indexed
+
+ // create the past event condition
+ Condition pastEventCondition = new Condition(definitionsService.getConditionType("pastEventCondition"));
+ pastEventCondition.setParameter("numberOfDays", 10);
+ Condition pastEventEventCondition = new Condition(definitionsService.getConditionType("eventTypeCondition"));
+ pastEventEventCondition.setParameter("eventTypeId", "test-event-type");
+ pastEventCondition.setParameter("eventCondition", pastEventEventCondition);
+
+ // create the scoring plan
+ Metadata scoringMetadata = new Metadata("past-event-scoring-test");
+ Scoring scoring = new Scoring(scoringMetadata);
+ List<ScoringElement> scoringElements = new ArrayList<>();
+ ScoringElement scoringElement = new ScoringElement();
+ scoringElement.setCondition(pastEventCondition);
+ scoringElement.setValue(50);
+ scoringElements.add(scoringElement);
+ scoring.setElements(scoringElements);
+ segmentService.setScoringDefinition(scoring);
+
+ // insure the profile that did the past event condition is correctly engaged in the scoring plan.
+ keepTrying("Profile should be engaged in the scoring with a score of 50",
+ () -> profileService.load("test_profile_id"),
+ updatedProfile -> updatedProfile.getScores() != null &&
+ updatedProfile.getScores().containsKey("past-event-scoring-test") &&
+ updatedProfile.getScores().get("past-event-scoring-test") == 50,
+ 1000, 20);
+ }
+
+ @Test
+ public void testScoringPastEventRecalculation() throws Exception {
+ // create Profile
+ Profile profile = new Profile();
+ profile.setItemId("test_profile_id");
+ profileService.save(profile);
+ persistenceService.refreshIndex(Profile.class, null); // wait for profile to be full persisted and index
+
+ // create the past event condition
+ Condition pastEventCondition = new Condition(definitionsService.getConditionType("pastEventCondition"));
+ pastEventCondition.setParameter("numberOfDays", 10);
+ Condition pastEventEventCondition = new Condition(definitionsService.getConditionType("eventTypeCondition"));
+ pastEventEventCondition.setParameter("eventTypeId", "test-event-type");
+ pastEventCondition.setParameter("eventCondition", pastEventEventCondition);
+
+ // create the scoring
+ Metadata scoringMetadata = new Metadata("past-event-scoring-test");
+ Scoring scoring = new Scoring(scoringMetadata);
+ List<ScoringElement> scoringElements = new ArrayList<>();
+ ScoringElement scoringElement = new ScoringElement();
+ scoringElement.setCondition(pastEventCondition);
+ scoringElement.setValue(50);
+ scoringElements.add(scoringElement);
+ scoring.setElements(scoringElements);
+ segmentService.setScoringDefinition(scoring);
+ Thread.sleep(5000);
+
+ // Persist the event (do not send it into the system so that it will not be processed by the rules)
+ ZoneId defaultZoneId = ZoneId.systemDefault();
+ LocalDate localDate = LocalDate.now().minusDays(3);
+ Event testEvent = new Event("test-event-type", null, profile, null, null, profile, Date.from(localDate.atStartOfDay(defaultZoneId).toInstant()));
+ testEvent.setPersistent(true);
+ persistenceService.save(testEvent, null, true);
+ persistenceService.refreshIndex(Event.class, testEvent.getTimeStamp()); // wait for event to be fully persisted and indexed
+
+ // insure the profile is not yet engaged since we directly saved the event in ES
+ profile = profileService.load("test_profile_id");
+ Assert.assertTrue("Profile should not be engaged in the scoring", profile.getScores() == null || !profile.getScores().containsKey("past-event-scoring-test"));
+
+ // now recalculate the past event conditions
+ segmentService.recalculatePastEventConditions();
+ persistenceService.refreshIndex(Profile.class, null);
+ keepTrying("Profile should be engaged in the scoring with a score of 50",
+ () -> profileService.load("test_profile_id"),
+ updatedProfile -> updatedProfile.getScores() != null &&
+ updatedProfile.getScores().containsKey("past-event-scoring-test") &&
+ updatedProfile.getScores().get("past-event-scoring-test") == 50,
+ 1000, 20);
+
+ // update the event to a date out of the past event condition
+ removeItems(Event.class);
+ localDate = LocalDate.now().minusDays(15);
+ testEvent = new Event("test-event-type", null, profile, null, null, profile, Date.from(localDate.atStartOfDay(defaultZoneId).toInstant()));
+ persistenceService.save(testEvent);
+ persistenceService.refreshIndex(Event.class, testEvent.getTimeStamp()); // wait for event to be fully persisted and indexed
+
+ // now recalculate the past event conditions
+ segmentService.recalculatePastEventConditions();
+ persistenceService.refreshIndex(Profile.class, null);
+ keepTrying("Profile should not be engaged in the scoring anymore",
+ () -> profileService.load("test_profile_id"),
+ updatedProfile -> !updatedProfile.getScores().containsKey("past-event-scoring-test"),
+ 1000, 20);
+ }
+}
\ No newline at end of file
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index bf2250c..4ef4dcd 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -43,6 +43,7 @@ import org.apache.unomi.persistence.spi.aggregate.*;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
+import org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
@@ -72,10 +73,13 @@ import org.elasticsearch.client.core.CountResponse;
import org.elasticsearch.client.core.MainResponse;
import org.elasticsearch.client.indices.*;
import org.elasticsearch.cluster.metadata.MappingMetaData;
+import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.*;
@@ -953,6 +957,23 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
@Override
public boolean updateWithQueryAndScript(final Date dateHint, final Class<?> clazz, final String[] scripts, final Map<String, Object>[] scriptParams, final Condition[] conditions) {
+ Script[] builtScripts = new Script[scripts.length];
+ for (int i = 0; i < scripts.length; i++) {
+ builtScripts[i] = new Script(ScriptType.INLINE, "painless", scripts[i], scriptParams[i]);
+ }
+ return updateWithQueryAndScript(dateHint, clazz, builtScripts, conditions);
+ }
+
+ @Override
+ public boolean updateWithQueryAndStoredScript(Date dateHint, Class<?> clazz, String[] scripts, Map<String, Object>[] scriptParams, Condition[] conditions) {
+ Script[] builtScripts = new Script[scripts.length];
+ for (int i = 0; i < scripts.length; i++) {
+ builtScripts[i] = new Script(ScriptType.STORED, null, scripts[i], scriptParams[i]);
+ }
+ return updateWithQueryAndScript(dateHint, clazz, builtScripts, conditions);
+ }
+
+ private boolean updateWithQueryAndScript(final Date dateHint, final Class<?> clazz, final Script[] scripts, final Condition[] conditions) {
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateWithQueryAndScript", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
protected Boolean execute(Object... args) throws Exception {
try {
@@ -961,8 +982,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
String index = getIndex(itemType, dateHint);
for (int i = 0; i < scripts.length; i++) {
- Script actualScript = new Script(ScriptType.INLINE, "painless", scripts[i], scriptParams[i]);
-
RefreshRequest refreshRequest = new RefreshRequest(index);
client.indices().refresh(refreshRequest, RequestOptions.DEFAULT);
@@ -970,7 +989,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
updateByQueryRequest.setConflicts("proceed");
updateByQueryRequest.setMaxRetries(1000);
updateByQueryRequest.setSlices(2);
- updateByQueryRequest.setScript(actualScript);
+ updateByQueryRequest.setScript(scripts[i]);
updateByQueryRequest.setQuery(conditionESQueryBuilderDispatcher.buildFilter(conditions[i]));
BulkByScrollResponse response = client.updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);
@@ -1009,6 +1028,44 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
@Override
+ public boolean storeScripts(Map<String, String> scripts) {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".storeScripts", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
+ protected Boolean execute(Object... args) throws Exception {
+ boolean executedSuccessfully = true;
+ for (Map.Entry<String, String> script : scripts.entrySet()) {
+ PutStoredScriptRequest putStoredScriptRequest = new PutStoredScriptRequest();
+ XContentBuilder builder = XContentFactory.jsonBuilder();
+ builder.startObject();
+ {
+ builder.startObject("script");
+ {
+ builder.field("lang", "painless");
+ builder.field("source", script.getValue());
+ }
+ builder.endObject();
+ }
+ builder.endObject();
+ putStoredScriptRequest.content(BytesReference.bytes(builder), XContentType.JSON);
+ putStoredScriptRequest.id(script.getKey());
+ AcknowledgedResponse response = client.putScript(putStoredScriptRequest, RequestOptions.DEFAULT);
+ executedSuccessfully &= response.isAcknowledged();
+ if (response.isAcknowledged()) {
+ logger.info("Successfully stored painless script: {}", script.getKey());
+ } else {
+ logger.error("Failed to store painless script: {}", script.getKey());
+ }
+ }
+ return executedSuccessfully;
+ }
+ }.catchingExecuteInClassLoader(true);
+ if (result == null) {
+ return false;
+ } else {
+ return result;
+ }
+ }
+
+ @Override
public boolean updateWithScript(final Item item, final Date dateHint, final Class<?> clazz, final String script, final Map<String, Object> scriptParams) {
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateWithScript", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
protected Boolean execute(Object... args) throws Exception {
diff --git a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
index 636939a..c4e62f6 100644
--- a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
+++ b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
@@ -193,6 +193,27 @@ public interface PersistenceService {
boolean updateWithQueryAndScript(Date dateHint, Class<?> clazz, String[] scripts, Map<String, Object>[] scriptParams, Condition[] conditions);
/**
+ * Updates the items of the specified class by a query with a new property value for the specified property name
+ * based on provided stored scripts and script parameters
+ *
+ * @param dateHint a Date helping in identifying where the item is located
+ * @param clazz the Item subclass of the item to update
+ * @param scripts Stored scripts name
+ * @param scriptParams script params array
+ * @param conditions conditions array
+ * @return {@code true} if the update was successful, {@code false} otherwise
+ */
+ boolean updateWithQueryAndStoredScript(Date dateHint, Class<?> clazz, String[] scripts, Map<String, Object>[] scriptParams, Condition[] conditions);
+
+ /**
+ * Store script in the Database for later usage with updateWithQueryAndStoredScript function for example.
+ *
+ * @param scripts inline scripts map indexed by id
+ * @return {@code true} if the update was successful, {@code false} otherwise
+ */
+ boolean storeScripts(Map<String, String> scripts);
+
+ /**
* Retrieves the item identified with the specified identifier and with the specified Item subclass if it exists.
*
* @param <T> the type of the Item subclass we want to retrieve
diff --git a/services/pom.xml b/services/pom.xml
index 4d88926..6812a2c 100644
--- a/services/pom.xml
+++ b/services/pom.xml
@@ -117,6 +117,10 @@
<artifactId>commons-collections</artifactId>
</dependency>
<dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.karaf.cellar</groupId>
<artifactId>org.apache.karaf.cellar.core</artifactId>
<scope>provided</scope>
diff --git a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
index cd3fe23..6e3592a 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
@@ -20,6 +20,9 @@ package org.apache.unomi.services.impl.segments;
import com.fasterxml.jackson.core.JsonProcessingException;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.CharEncoding;
import org.apache.unomi.api.Event;
import org.apache.unomi.api.Item;
import org.apache.unomi.api.Metadata;
@@ -39,7 +42,6 @@ import org.apache.unomi.persistence.spi.CustomObjectMapper;
import org.apache.unomi.persistence.spi.aggregate.TermsAggregate;
import org.apache.unomi.services.impl.AbstractServiceImpl;
import org.apache.unomi.services.impl.scheduler.SchedulerServiceImpl;
-import org.apache.unomi.services.impl.scheduler.SchedulerServiceImpl;
import org.apache.unomi.services.impl.ParserHelper;
import org.apache.unomi.api.exceptions.BadSegmentConditionException;
import org.osgi.framework.Bundle;
@@ -50,7 +52,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.io.InputStream;
import java.net.URL;
+import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.time.Duration;
import java.time.ZoneOffset;
@@ -64,6 +68,9 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
private static final Logger logger = LoggerFactory.getLogger(SegmentServiceImpl.class.getName());
private static final String VALIDATION_PROFILE_ID = "validation-profile-id";
+ private static final String RESET_SCORING_SCRIPT = "resetScoringPlan";
+ private static final String EVALUATE_SCORING_ELEMENT_SCRIPT = "evaluateScoringPlanElement";
+
private BundleContext bundleContext;
private EventService eventService;
@@ -144,7 +151,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
this.dailyDateExprEvaluationHourUtc = dailyDateExprEvaluationHourUtc;
}
- public void postConstruct() {
+ public void postConstruct() throws IOException {
logger.debug("postConstruct {" + bundleContext.getBundle() + "}");
loadPredefinedSegments(bundleContext);
loadPredefinedScorings(bundleContext);
@@ -156,6 +163,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
}
bundleContext.addBundleListener(this);
initializeTimer();
+ loadScripts();
logger.info("Segment service initialized.");
}
@@ -591,7 +599,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
" }\n" +
"}", scoring.getItemId()));
- updateExistingProfilesForScoring(scoring);
+ updateExistingProfilesForScoring(scoring.getItemId(), scoring.getElements(), scoring.getMetadata().isEnabled());
}
public void createScoringDefinition(String scope, String scoringId, String name, String description) {
@@ -702,7 +710,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
Set<Scoring> impactedScorings = getScoringDependentScorings(scoringId);
if (!validate || (impactedSegments.isEmpty() && impactedScorings.isEmpty())) {
// update profiles
- updateExistingProfilesForRemovedScoring(scoringId);
+ updateExistingProfilesForScoring(scoringId, Collections.emptyList(), false);
// update impacted segments
for (Segment segment : impactedSegments) {
@@ -957,7 +965,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
public void recalculatePastEventConditions() {
logger.info("running scheduled task to recalculate segments with pastEventCondition conditions");
long pastEventsTaskStartTime = System.currentTimeMillis();
- Set<String> linkedSegments = new HashSet<>();
+ Set<String> linkedItems = new HashSet<>();
for (Metadata metadata : rulesService.getRuleMetadatas()) {
// reevaluate auto generated rules used to store the event occurrence count on the profile
Rule rule = rulesService.getRule(metadata.getId());
@@ -965,23 +973,29 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
if (action.getActionTypeId().equals("setEventOccurenceCountAction")) {
Condition pastEventCondition = (Condition) action.getParameterValues().get("pastEventCondition");
if (pastEventCondition.containsParameter("numberOfDays")) {
- recalculatePastEventOccurrencesOnProfiles(rule.getCondition(), pastEventCondition, false, true);
+ recalculatePastEventOccurrencesOnProfiles(rule.getCondition(), pastEventCondition, true, true);
logger.info("Event occurrence count on profiles updated for rule: {}", rule.getItemId());
if (rule.getLinkedItems() != null && rule.getLinkedItems().size() > 0) {
- linkedSegments.addAll(rule.getLinkedItems());
+ linkedItems.addAll(rule.getLinkedItems());
}
}
}
}
}
- // reevaluate segments linked to this rule, since we have updated the event occurrences count on the profiles.
- if (linkedSegments.size() > 0) {
+ // reevaluate segments and scoring linked to this rule, since we have updated the event occurrences count on the profiles.
+ if (linkedItems.size() > 0) {
persistenceService.refreshIndex(Profile.class, null);
- for (String linkedItem : linkedSegments) {
+ for (String linkedItem : linkedItems) {
Segment linkedSegment = getSegmentDefinition(linkedItem);
if (linkedSegment != null) {
updateExistingProfilesForSegment(linkedSegment);
+ continue;
+ }
+
+ Scoring linkedScoring = getScoringDefinition(linkedItem);
+ if (linkedScoring != null) {
+ updateExistingProfilesForScoring(linkedScoring.getItemId(), linkedScoring.getElements(), linkedScoring.getMetadata().isEnabled());
}
}
}
@@ -1165,63 +1179,38 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
return sourceMap;
}
- private void updateExistingProfilesForScoring(Scoring scoring) {
+ private void updateExistingProfilesForScoring(String scoringId, List<ScoringElement> scoringElements, boolean isEnabled) {
long startTime = System.currentTimeMillis();
- Condition scoringCondition = new Condition();
- scoringCondition.setConditionType(definitionsService.getConditionType("profilePropertyCondition"));
- scoringCondition.setParameter("propertyName", "scores." + scoring.getItemId());
- scoringCondition.setParameter("comparisonOperator", "exists");
-
- String[] scripts = new String[scoring.getElements().size() + 1];
- HashMap<String, Object>[] scriptParams = new HashMap[scoring.getElements().size() + 1];
- Condition[] conditions = new Condition[scoring.getElements().size() + 1];
-
- String lastUpdatedScriptPart = " if (!ctx._source.containsKey(\"systemProperties\")) { ctx._source.put(\"systemProperties\", [:]) } ctx._source.systemProperties.put(\"lastUpdated\", ZonedDateTime.ofInstant(Instant.ofEpochMilli(System.currentTimeMillis()), ZoneId.of(\"Z\")))";
- scriptParams[0] = new HashMap<String, Object>();
- scriptParams[0].put("scoringId", scoring.getItemId());
- scripts[0] = "if (ctx._source.containsKey(\"systemProperties\") && ctx._source.systemProperties.containsKey(\"scoreModifiers\") && ctx._source.systemProperties.scoreModifiers.containsKey(params.scoringId) ) { ctx._source.scores.put(params.scoringId, ctx._source.systemProperties.scoreModifiers.get(params.scoringId)) } else { ctx._source.scores.remove(params.scoringId) } " +
- lastUpdatedScriptPart;
- conditions[0] = scoringCondition;
+ String[] scripts = new String[scoringElements.size() + 1];
+ Map<String, Object>[] scriptParams = new HashMap[scoringElements.size() + 1];
+ Condition[] conditions = new Condition[scoringElements.size() + 1];
- if (scoring.getMetadata().isEnabled()) {
- String scriptToAdd = "if (!ctx._source.containsKey(\"scores\")) { ctx._source.put(\"scores\", [:])} if (ctx._source.scores.containsKey(params.scoringId) ) { ctx._source.scores.put(params.scoringId, ctx._source.scores.get(params.scoringId)+params.scoringValue) } else { ctx._source.scores.put(params.scoringId, params.scoringValue) } " +
- lastUpdatedScriptPart;
+ // reset Score
+ scriptParams[0] = new HashMap<>();
+ scriptParams[0].put("scoringId", scoringId);
+ scripts[0] = RESET_SCORING_SCRIPT;
+ conditions[0] = new Condition();
+ conditions[0].setConditionType(definitionsService.getConditionType("profilePropertyCondition"));
+ conditions[0].setParameter("propertyName", "scores." + scoringId);
+ conditions[0].setParameter("comparisonOperator", "exists");
+
+ // evaluate each elements of the scoring
+ if (isEnabled) {
int idx = 1;
- for (ScoringElement element : scoring.getElements()) {
+ for (ScoringElement element : scoringElements) {
scriptParams[idx] = new HashMap<>();
- scriptParams[idx].put("scoringId", scoring.getItemId());
+ scriptParams[idx].put("scoringId", scoringId);
scriptParams[idx].put("scoringValue", element.getValue());
- scripts[idx] = scriptToAdd;
+ scripts[idx] = EVALUATE_SCORING_ELEMENT_SCRIPT;
conditions[idx] = element.getCondition();
idx++;
}
}
- persistenceService.updateWithQueryAndScript(null, Profile.class, scripts, scriptParams, conditions);
+ persistenceService.updateWithQueryAndStoredScript(null, Profile.class, scripts, scriptParams, conditions);
logger.info("Updated scoring for profiles in {}ms", System.currentTimeMillis() - startTime);
}
- private void updateExistingProfilesForRemovedScoring(String scoringId) {
- long startTime = System.currentTimeMillis();
- Condition scoringCondition = new Condition();
- scoringCondition.setConditionType(definitionsService.getConditionType("profilePropertyCondition"));
- scoringCondition.setParameter("propertyName", "scores." + scoringId);
- scoringCondition.setParameter("comparisonOperator", "exists");
- Condition[] conditions = new Condition[1];
- conditions[0] = scoringCondition;
-
- HashMap<String, Object>[] scriptParams = new HashMap[1];
- scriptParams[0] = new HashMap<String, Object>();
- scriptParams[0].put("scoringId", scoringId);
-
- String[] script = new String[1];
- script[0] = "ctx._source.scores.remove(params.scoringId); if (!ctx._source.containsKey(\"systemProperties\")) { ctx._source.put(\"systemProperties\", [:]) } ctx._source.systemProperties.put(\"lastUpdated\", ZonedDateTime.ofInstant(Instant.ofEpochMilli(System.currentTimeMillis()), ZoneId.of(\"Z\")))";
-
- persistenceService.updateWithQueryAndScript(null, Profile.class, script, scriptParams, conditions);
-
- logger.info("Removed scoring from profiles in {}ms", System.currentTimeMillis() - startTime);
- }
-
public void bundleChanged(BundleEvent event) {
switch (event.getType()) {
case BundleEvent.STARTED:
@@ -1281,6 +1270,25 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
schedulerService.getScheduleExecutorService().scheduleAtFixedRate(task, initialDelay, TimeUnit.DAYS.toSeconds(1), TimeUnit.SECONDS);
}
+ private void loadScripts() throws IOException {
+ Enumeration<URL> scriptsURL = bundleContext.getBundle().findEntries("META-INF/cxs/painless", "*.painless", true);
+ if (scriptsURL == null) {
+ return;
+ }
+
+ Map<String, String> scriptsById = new HashMap<>();
+ while (scriptsURL.hasMoreElements()) {
+ URL scriptURL = scriptsURL.nextElement();
+ logger.debug("Found painless script at " + scriptURL + ", loading... ");
+ try (InputStream in = scriptURL.openStream()) {
+ String script = IOUtils.toString(in, StandardCharsets.UTF_8);
+ String scriptId = FilenameUtils.getBaseName(scriptURL.getPath());
+ scriptsById.put(scriptId, script);
+ }
+ }
+ persistenceService.storeScripts(scriptsById);
+ }
+
public void setTaskExecutionPeriod(long taskExecutionPeriod) {
this.taskExecutionPeriod = taskExecutionPeriod;
}
diff --git a/services/src/main/resources/META-INF/cxs/painless/evaluateScoringPlanElement.painless b/services/src/main/resources/META-INF/cxs/painless/evaluateScoringPlanElement.painless
new file mode 100644
index 0000000..62e9e7e
--- /dev/null
+++ b/services/src/main/resources/META-INF/cxs/painless/evaluateScoringPlanElement.painless
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ This script is used to increment the score on all the profiles that match the Scoring plan element condition
+ required params:
+ - params.scoringId: the ID of the Scoring plan
+ - params.scoringValue: the score of the Scoring plan element (used for incrementation)
+*/
+
+// init the scores map
+if (!ctx._source.containsKey("scores") || ctx._source.scores == null) {
+ ctx._source.put("scores", [:]);
+}
+
+// increment the score
+if (ctx._source.scores.containsKey(params.scoringId)) {
+
+ // Score already exists, just increment
+ ctx._source.scores.put(params.scoringId, ctx._source.scores.get(params.scoringId) + params.scoringValue);
+} else {
+
+ // Score doesn't exists yet, check if the current profile is using a scoreModifier
+ if (ctx._source.containsKey("systemProperties") &&
+ ctx._source.systemProperties.containsKey("scoreModifiers") &&
+ ctx._source.systemProperties.scoreModifiers.containsKey(params.scoringId)) {
+
+ ctx._source.scores.put(params.scoringId, params.scoringValue + ctx._source.systemProperties.scoreModifiers.get(params.scoringId));
+ } else {
+ ctx._source.scores.put(params.scoringId, params.scoringValue);
+ }
+}
+
+// Update lastUpdated date on profile
+if (!ctx._source.containsKey("systemProperties")) {
+ ctx._source.put("systemProperties", [:]);
+}
+ctx._source.systemProperties.put("lastUpdated", ZonedDateTime.ofInstant(Instant.ofEpochMilli(System.currentTimeMillis()), ZoneId.of("Z")));
\ No newline at end of file
diff --git a/services/src/main/resources/META-INF/cxs/painless/resetScoringPlan.painless b/services/src/main/resources/META-INF/cxs/painless/resetScoringPlan.painless
new file mode 100644
index 0000000..2324069
--- /dev/null
+++ b/services/src/main/resources/META-INF/cxs/painless/resetScoringPlan.painless
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ This script is used to reset all profiles that have a scoring plan matching the given params.scoringId
+ required params:
+ - params.scoringId: the ID of the Scoring plan
+*/
+
+// remove score for the given params.scoringId
+ctx._source.scores.remove(params.scoringId);
+
+// Update lastUpdated date on profile
+if (!ctx._source.containsKey("systemProperties")) {
+ ctx._source.put("systemProperties", [:]);
+}
+ctx._source.systemProperties.put("lastUpdated", ZonedDateTime.ofInstant(Instant.ofEpochMilli(System.currentTimeMillis()), ZoneId.of("Z")));
\ No newline at end of file