You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by jk...@apache.org on 2023/05/05 07:28:18 UTC
[unomi] branch unomi-1.x updated: UNOMI-764: merge backport + no scroll queries anymore during merge (#615)
This is an automated email from the ASF dual-hosted git repository.
jkevan pushed a commit to branch unomi-1.x
in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/unomi-1.x by this push:
new f687569b4 UNOMI-764: merge backport + no scroll queries anymore during merge (#615)
f687569b4 is described below
commit f687569b4a46f8091932c404c8164124fc8baa74
Author: kevan Jahanshahi <jk...@apache.org>
AuthorDate: Fri May 5 09:28:13 2023 +0200
UNOMI-764: merge backport + no scroll queries anymore during merge (#615)
---
.../org/apache/unomi/itests/ProfileMergeIT.java | 327 ++++++++++++++++++-
.../main/resources/etc/custom.system.properties | 2 +-
persistence-elasticsearch/core/pom.xml | 9 +-
.../ElasticSearchPersistenceServiceImpl.java | 32 +-
.../actions/MergeProfilesOnPropertyAction.java | 359 ++++++++++-----------
.../META-INF/cxs/painless/updateProfileId.painless | 32 ++
.../resources/OSGI-INF/blueprint/blueprint.xml | 8 +-
.../resources/org.apache.unomi.plugins.base.cfg | 2 +-
.../services/impl/segments/SegmentServiceImpl.java | 21 --
9 files changed, 560 insertions(+), 232 deletions(-)
diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java b/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java
index fb1cc40de..afe92c142 100644
--- a/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java
@@ -19,13 +19,11 @@ package org.apache.unomi.itests;
import org.apache.unomi.api.Event;
import org.apache.unomi.api.Metadata;
import org.apache.unomi.api.Profile;
+import org.apache.unomi.api.Session;
import org.apache.unomi.api.actions.Action;
import org.apache.unomi.api.conditions.Condition;
import org.apache.unomi.api.rules.Rule;
-import org.apache.unomi.api.services.DefinitionsService;
-import org.apache.unomi.api.services.EventService;
-import org.apache.unomi.api.services.ProfileService;
-import org.apache.unomi.api.services.RulesService;
+import org.apache.unomi.api.services.*;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@@ -52,6 +50,8 @@ public class ProfileMergeIT extends BaseIT {
@Inject @Filter(timeout = 600000)
protected EventService eventService;
@Inject @Filter(timeout = 600000)
+ protected PrivacyService privacyService;
+ @Inject @Filter(timeout = 600000)
protected RulesService rulesService;
@Inject @Filter(timeout = 600000)
protected DefinitionsService definitionsService;
@@ -63,14 +63,15 @@ public class ProfileMergeIT extends BaseIT {
private final static String TEST_PROFILE_ID = "mergeOnPropertyTestProfileId";
@After
- public void after() {
+ public void after() throws InterruptedException {
// cleanup created data
rulesService.removeRule(TEST_RULE_ID);
+ removeItems(Profile.class, Event.class, Session.class);
}
@Test
public void testProfileMergeOnPropertyAction_dont_forceEventProfileAsMaster() throws InterruptedException {
- createAndWaitForRule(createMergeOnPropertyRule(false));
+ createAndWaitForRule(createMergeOnPropertyRule(false, "j:nodename"));
// A new profile should be created.
Assert.assertNotEquals(sendEvent().getProfile().getItemId(), TEST_PROFILE_ID);
@@ -78,12 +79,314 @@ public class ProfileMergeIT extends BaseIT {
@Test
public void testProfileMergeOnPropertyAction_forceEventProfileAsMaster() throws InterruptedException {
- createAndWaitForRule(createMergeOnPropertyRule(true));
+ createAndWaitForRule(createMergeOnPropertyRule(true, "j:nodename"));
// No new profile should be created, instead the profile of the event should be used.
Assert.assertEquals(sendEvent().getProfile().getItemId(), TEST_PROFILE_ID);
}
+ @Test
+ public void testProfileMergeOnPropertyAction_simpleMerge() throws InterruptedException {
+ // create rule
+ createAndWaitForRule(createMergeOnPropertyRule(false, "email"));
+
+ // create master profile
+ Profile masterProfile = new Profile();
+ masterProfile.setItemId("masterProfileID");
+ masterProfile.setProperty("email", "username@domain.com");
+ masterProfile.setSystemProperty("mergeIdentifier", "username@domain.com");
+ profileService.save(masterProfile);
+
+ // create event profile
+ Profile eventProfile = new Profile();
+ eventProfile.setItemId("eventProfileID");
+ eventProfile.setProperty("email", "username@domain.com");
+ profileService.save(eventProfile);
+
+ keepTrying("Profile with id masterProfileID not found in the required time", () -> profileService.load("masterProfileID"),
+ Objects::nonNull, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+ keepTrying("Profile with id eventProfileID not found in the required time", () -> profileService.load("eventProfileID"),
+ Objects::nonNull, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+ Event event = new Event(TEST_EVENT_TYPE, null, eventProfile, null, null, eventProfile, new Date());
+
+ eventService.send(event);
+
+ Assert.assertNotNull(event.getProfile());
+
+ keepTrying("Profile with id eventProfileID should still be accessible but marked as mergedWith",
+ () -> profileService.load("eventProfileID"), (profile -> profile != null && "masterProfileID".equals(profile.getMergedWith())),
+ DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+ }
+
+
+ /**
+ * User switch case, this case can happen when a person (user A) is using the same browser session of a previous logged user (user B).
+ * user A will be using user B profile, but when user A is going to login by send a merge event, then we will detect that the mergeIdentifier is not the same
+ * In this case we will just switch user A profile to:
+ * - a new one, if it's the first time we encounter his own mergeIdentifier (TESTED in this scenario)
+ * - a previous one, if we already have a profile in DB with the same mergeIdentifier.
+ */
+ @Test
+ public void testProfileMergeOnPropertyAction_sessionReassigned_newProfile() throws InterruptedException {
+ // create rule
+ createAndWaitForRule(createMergeOnPropertyRule(false, "email"));
+
+ // create master profile
+ Profile masterProfile = new Profile();
+ masterProfile.setItemId("masterProfileID");
+ masterProfile.setProperty("email", "master@domain.com");
+ masterProfile.setSystemProperty("mergeIdentifier", "master@domain.com");
+ profileService.save(masterProfile);
+
+ keepTrying("Profile with id masterProfileID not found in the required time", () -> profileService.load("masterProfileID"),
+ Objects::nonNull, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+
+ // create event profile
+ Profile eventProfile = new Profile();
+ eventProfile.setItemId("eventProfileID");
+ eventProfile.setProperty("email", "event@domain.com");
+
+ Session simpleSession = new Session("simpleSession", eventProfile, new Date(), null);
+ Event event = new Event(TEST_EVENT_TYPE, simpleSession, masterProfile, null, null, eventProfile, new Date());
+ eventService.send(event);
+
+ // Session should have been reassign and a new profile should have been created ! (We call this user switch case)
+ Assert.assertNotNull(event.getProfile());
+ Assert.assertNotEquals("eventProfileID", event.getProfile().getItemId());
+ Assert.assertNotEquals("eventProfileID", event.getProfileId());
+ Assert.assertNotEquals("eventProfileID", event.getSession().getProfile().getItemId());
+ Assert.assertNotEquals("eventProfileID", event.getSession().getProfileId());
+
+ Assert.assertNotEquals("masterProfileID", event.getProfile().getItemId());
+ Assert.assertNotEquals("masterProfileID", event.getProfileId());
+ Assert.assertNotEquals("masterProfileID", event.getSession().getProfile().getItemId());
+ Assert.assertNotEquals("masterProfileID", event.getSession().getProfileId());
+
+ Assert.assertEquals(event.getSession().getProfileId(), event.getProfileId());
+ Assert.assertEquals("event@domain.com", event.getProfile().getSystemProperties().get("mergeIdentifier"));
+ }
+
+ /**
+ * User switch case, this case can happen when a person (user A) is using the same browser session of a previous logged user (user B).
+ * user A will be using user B profile, but when user A is going to login by send a merge event, then we will detect that the mergeIdentifier is not the same
+ * In this case we will just switch user A profile to:
+ * - a new one, if it's the first time we encounter his own mergeIdentifier
+ * - a previous one, if we already have a profile in DB with the same mergeIdentifier. (TESTED in this scenario)
+ */
+ @Test
+ public void testProfileMergeOnPropertyAction_sessionReassigned_existingProfile() throws InterruptedException {
+ // create rule
+ createAndWaitForRule(createMergeOnPropertyRule(false, "email"));
+
+ // create master profile
+ Profile masterProfile = new Profile();
+ masterProfile.setItemId("masterProfileID");
+ masterProfile.setProperty("email", "master@domain.com");
+ masterProfile.setSystemProperty("mergeIdentifier", "master@domain.com");
+ profileService.save(masterProfile);
+
+ // create a previous existing profile with same mergeIdentifier
+ Profile previousProfile = new Profile();
+ previousProfile.setItemId("previousProfileID");
+ previousProfile.setProperty("email", "event@domain.com");
+ previousProfile.setSystemProperty("mergeIdentifier", "event@domain.com");
+ profileService.save(previousProfile);
+
+ keepTrying("Profile with id masterProfileID not found in the required time", () -> profileService.load("masterProfileID"),
+ Objects::nonNull, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+ keepTrying("Profile with id previousProfileID not found in the required time", () -> profileService.load("previousProfileID"),
+ Objects::nonNull, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+
+ // create event profile
+ Profile eventProfile = new Profile();
+ eventProfile.setItemId("eventProfileID");
+ eventProfile.setProperty("email", "event@domain.com");
+
+ Session simpleSession = new Session("simpleSession", eventProfile, new Date(), null);
+ Event event = new Event(TEST_EVENT_TYPE, simpleSession, masterProfile, null, null, eventProfile, new Date());
+ eventService.send(event);
+
+ // Session should have been reassign and the previous existing profile for mergeIdentifier: event@domain.com should have been reuse
+ // Session should have been reassign and a new profile should have been created ! (We call this user switch case)
+ Assert.assertNotNull(event.getProfile());
+ Assert.assertEquals("previousProfileID", event.getProfile().getItemId());
+ Assert.assertEquals("previousProfileID", event.getProfileId());
+ Assert.assertEquals("previousProfileID", event.getSession().getProfile().getItemId());
+ Assert.assertEquals("previousProfileID", event.getSession().getProfileId());
+
+ Assert.assertEquals(event.getSession().getProfileId(), event.getProfileId());
+ Assert.assertEquals("event@domain.com", event.getProfile().getSystemProperties().get("mergeIdentifier"));
+ }
+
+ /**
+ * In case of merge, existing sessions/events from previous profileId should be rewritten to use the new master profileId
+ */
+ @Test
+ public void testProfileMergeOnPropertyAction_rewriteExistingSessionsEvents() throws InterruptedException {
+ Condition matchAll = new Condition(definitionsService.getConditionType("matchAllCondition"));
+ // create rule
+ createAndWaitForRule(createMergeOnPropertyRule(false, "email"));
+
+ // create master profile
+ Profile masterProfile = new Profile();
+ masterProfile.setItemId("masterProfileID");
+ masterProfile.setProperty("email", "username@domain.com");
+ masterProfile.setSystemProperty("mergeIdentifier", "username@domain.com");
+ profileService.save(masterProfile);
+
+ Profile eventProfile = new Profile();
+ eventProfile.setItemId("eventProfileID");
+ eventProfile.setProperty("email", "username@domain.com");
+ profileService.save(eventProfile);
+
+ // create 5 past sessions and 5 past events.
+ List<Session> sessionsToBeRewritten = new ArrayList<>();
+ List<Event> eventsToBeRewritten = new ArrayList<>();
+ for (int i = 1; i <= 5; i++) {
+ Session sessionToBeRewritten = new Session("simpleSession_"+ i, eventProfile, new Date(), null);
+ sessionsToBeRewritten.add(sessionToBeRewritten);
+ Event eventToBeRewritten = new Event("view", sessionToBeRewritten, eventProfile, null, null, null, new Date());
+ eventsToBeRewritten.add(eventToBeRewritten);
+
+
+ persistenceService.save(sessionToBeRewritten);
+ persistenceService.save(eventToBeRewritten);
+ }
+ for (Session session : sessionsToBeRewritten) {
+ keepTrying("Wait for session: " + session.getItemId() + " to be indexed",
+ () -> persistenceService.query("itemId", session.getItemId(), null, Session.class),
+ (list) -> list.size() == 1, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+ }
+ for (Event event : eventsToBeRewritten) {
+ keepTrying("Wait for event: " + event.getItemId() + " to be indexed",
+ () -> persistenceService.query("itemId", event.getItemId(), null, Event.class),
+ (list) -> list.size() == 1, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+ }
+ keepTrying("Profile with id masterProfileID not found in the required time", () -> profileService.load("masterProfileID"),
+ Objects::nonNull, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+ keepTrying("Profile with id eventProfileID not found in the required time", () -> profileService.load("eventProfileID"),
+ Objects::nonNull, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+
+ // Trigger the merge
+ Session simpleSession = new Session("simpleSession", eventProfile, new Date(), null);
+ Event mergeEvent = new Event(TEST_EVENT_TYPE, simpleSession, eventProfile, null, null, eventProfile, new Date());
+ eventService.send(mergeEvent);
+
+ // Check that master profile is now used:
+ Assert.assertNotNull(mergeEvent.getProfile());
+ Assert.assertEquals("masterProfileID", mergeEvent.getProfile().getItemId());
+ Assert.assertEquals("masterProfileID", mergeEvent.getProfileId());
+ Assert.assertEquals("masterProfileID", mergeEvent.getSession().getProfile().getItemId());
+ Assert.assertEquals("masterProfileID", mergeEvent.getSession().getProfileId());
+ Assert.assertEquals(mergeEvent.getSession().getProfileId(), mergeEvent.getProfileId());
+ Assert.assertEquals("username@domain.com", mergeEvent.getProfile().getSystemProperties().get("mergeIdentifier"));
+
+ // Check events are correctly rewritten
+ for (Event event : eventsToBeRewritten) {
+ keepTrying("Wait for event: " + event.getItemId() + " profileId to be rewritten for masterProfileID",
+ () -> persistenceService.load(event.getItemId(), Event.class),
+ (loadedEvent) -> loadedEvent.getProfileId().equals("masterProfileID"), DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+ }
+
+ // Check sessions are correctly rewritten
+ Condition sessionProfileIDRewrittenCondition = new Condition(definitionsService.getConditionType("sessionPropertyCondition"));
+ sessionProfileIDRewrittenCondition.setParameter("propertyName","profileId");
+ sessionProfileIDRewrittenCondition.setParameter("comparisonOperator","equals");
+ sessionProfileIDRewrittenCondition.setParameter("propertyValue","masterProfileID");
+ keepTrying("Wait for sessions profileId to be rewritten to masterProfileID",
+ () -> persistenceService.queryCount(sessionProfileIDRewrittenCondition, Session.ITEM_TYPE),
+ (count) -> count == 5, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+
+ for (Session session : sessionsToBeRewritten) {
+ keepTrying("Wait for session: " + session.getItemId() + " profileId to be rewritten for masterProfileID",
+ () -> persistenceService.load(session.getItemId(), Session.class),
+ (loadedSession) -> loadedSession.getProfileId().equals("masterProfileID"), DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+ }
+ }
+
+ /**
+ * If master profile is flagged as anonymous profile, then after the merge all past sessions/events should be anonymized
+ */
+ @Test
+ public void testProfileMergeOnPropertyAction_rewriteExistingSessionsEventsAnonymous() throws InterruptedException {
+ Condition matchAll = new Condition(definitionsService.getConditionType("matchAllCondition"));
+ // create rule
+ createAndWaitForRule(createMergeOnPropertyRule(false, "email"));
+
+ // create master profile
+ Profile masterProfile = new Profile();
+ masterProfile.setItemId("masterProfileID");
+ masterProfile.setProperty("email", "username@domain.com");
+ masterProfile.setSystemProperty("mergeIdentifier", "username@domain.com");
+ profileService.save(masterProfile);
+ privacyService.setRequireAnonymousBrowsing(masterProfile.getItemId(), true, null);
+
+ Profile eventProfile = new Profile();
+ eventProfile.setItemId("eventProfileID");
+ eventProfile.setProperty("email", "username@domain.com");
+ profileService.save(eventProfile);
+
+ // create 5 sessions and 5 events for master profile.
+ List<Session> sessionsToBeRewritten = new ArrayList<>();
+ List<Event> eventsToBeRewritten = new ArrayList<>();
+ for (int i = 1; i <= 5; i++) {
+ Session sessionToBeRewritten = new Session("simpleSession_"+ i, eventProfile, new Date(), null);
+ sessionsToBeRewritten.add(sessionToBeRewritten);
+ Event eventToBeRewritten = new Event("view", sessionToBeRewritten, eventProfile, null, null, null, new Date());
+ eventsToBeRewritten.add(eventToBeRewritten);
+
+ persistenceService.save(sessionToBeRewritten);
+ persistenceService.save(eventToBeRewritten);
+ }
+ for (Session session : sessionsToBeRewritten) {
+ keepTrying("Wait for session: " + session.getItemId() + " to be indexed",
+ () -> persistenceService.query("itemId", session.getItemId(), null, Session.class),
+ (list) -> list.size() == 1, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+ }
+ for (Event event : eventsToBeRewritten) {
+ keepTrying("Wait for event: " + event.getItemId() + " to be indexed",
+ () -> persistenceService.query("itemId", event.getItemId(), null, Event.class),
+ (list) -> list.size() == 1, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+ }
+ keepTrying("Profile with id masterProfileID (should required anonymous browsing) not found in the required time",
+ () -> profileService.load("masterProfileID"),
+ profile -> profile != null && privacyService.isRequireAnonymousBrowsing(profile), DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+ keepTrying("Profile with id eventProfileID not found in the required time", () -> profileService.load("eventProfileID"),
+ Objects::nonNull, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+
+ // Trigger the merge
+ Session simpleSession = new Session("simpleSession", eventProfile, new Date(), null);
+ Event mergeEvent = new Event(TEST_EVENT_TYPE, simpleSession, eventProfile, null, null, eventProfile, new Date());
+ eventService.send(mergeEvent);
+
+ // Check that master profile is now used, but anonymous browsing is respected:
+ Assert.assertNotNull(mergeEvent.getProfile());
+ Assert.assertEquals("masterProfileID", mergeEvent.getProfile().getItemId()); // We still have profile in the event
+ Assert.assertNull(mergeEvent.getProfileId()); // But profileId prop is null due to anonymous browsing
+ Assert.assertNull(mergeEvent.getSession().getProfile().getItemId()); // Same for the event session
+ Assert.assertNull(mergeEvent.getSession().getProfileId());
+ Assert.assertEquals(mergeEvent.getSession().getProfileId(), mergeEvent.getProfileId());
+ Assert.assertEquals("username@domain.com", mergeEvent.getProfile().getSystemProperties().get("mergeIdentifier"));
+
+ // Check events are correctly rewritten (Anonymous !)
+ for (Event event : eventsToBeRewritten) {
+ keepTrying("Wait for event: " + event.getItemId() + " profileId to be rewritten for NULL due to anonymous browsing",
+ () -> persistenceService.load(event.getItemId(), Event.class),
+ (loadedEvent) -> loadedEvent.getProfileId() == null, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+ }
+
+ // Check sessions are correctly rewritten (Anonymous !)
+ for (Session session : sessionsToBeRewritten) {
+ keepTrying("Wait for session: " + session.getItemId() + " profileId to be rewritten for NULL due to anonymous browsing",
+ () -> persistenceService.load(session.getItemId(), Session.class),
+ (loadedSession) -> loadedSession.getProfileId() == null, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+ }
+ }
+
+ /**
+ * Personalization strategy have a specific handling during the merge of two profiles
+ * This test is here to ensure this specific behavior is correctly working.
+ */
@Test
public void testPersonalizationStrategyStatusMerge() {
// create some statuses for the tests:
@@ -157,6 +460,7 @@ public class ProfileMergeIT extends BaseIT {
}
}
}
+
private Event sendEvent() {
Profile profile = new Profile();
profile.setProperties(new HashMap<>());
@@ -168,16 +472,17 @@ public class ProfileMergeIT extends BaseIT {
return testEvent;
}
- private Rule createMergeOnPropertyRule(boolean forceEventProfileAsMaster) throws InterruptedException {
+ private Rule createMergeOnPropertyRule(boolean forceEventProfileAsMaster, String eventProperty) throws InterruptedException {
Rule mergeOnPropertyTestRule = new Rule();
- mergeOnPropertyTestRule.setMetadata(new Metadata(null, TEST_RULE_ID, TEST_RULE_ID, "Test rule for testing MergeProfilesOnPropertyAction"));
+ mergeOnPropertyTestRule
+ .setMetadata(new Metadata(null, TEST_RULE_ID, TEST_RULE_ID, "Test rule for testing MergeProfilesOnPropertyAction"));
Condition condition = new Condition(definitionsService.getConditionType("eventTypeCondition"));
condition.setParameter("eventTypeId", TEST_EVENT_TYPE);
mergeOnPropertyTestRule.setCondition(condition);
- final Action mergeProfilesOnPropertyAction = new Action( definitionsService.getActionType( "mergeProfilesOnPropertyAction"));
- mergeProfilesOnPropertyAction.setParameter("mergeProfilePropertyValue", "eventProperty::target.properties(j:nodename)");
+ final Action mergeProfilesOnPropertyAction = new Action(definitionsService.getActionType("mergeProfilesOnPropertyAction"));
+ mergeProfilesOnPropertyAction.setParameter("mergeProfilePropertyValue", "eventProperty::target.properties(" + eventProperty + ")");
mergeProfilesOnPropertyAction.setParameter("mergeProfilePropertyName", "mergeIdentifier");
mergeProfilesOnPropertyAction.setParameter("forceEventProfileAsMaster", forceEventProfileAsMaster);
mergeOnPropertyTestRule.setActions(Collections.singletonList(mergeProfilesOnPropertyAction));
diff --git a/package/src/main/resources/etc/custom.system.properties b/package/src/main/resources/etc/custom.system.properties
index aa9c94290..0628777f2 100644
--- a/package/src/main/resources/etc/custom.system.properties
+++ b/package/src/main/resources/etc/custom.system.properties
@@ -314,7 +314,7 @@ org.apache.unomi.mail.server.sslOnConnect=${env:UNOMI_MAIL_SSLONCONNECT:-true}
#######################################################################################################################
## baseplugin settings ##
#######################################################################################################################
-org.apache.unomi.plugins.base.maxProfilesInOneMerge=${env:UNOMI_MAX_PROFILES_IN_ONE_MERGE:--1}
+org.apache.unomi.plugins.base.maxProfilesInOneMerge=${env:UNOMI_MAX_PROFILES_IN_ONE_MERGE:-50}
#######################################################################################################################
## Security settings ##
diff --git a/persistence-elasticsearch/core/pom.xml b/persistence-elasticsearch/core/pom.xml
index 29a619cec..76759d181 100644
--- a/persistence-elasticsearch/core/pom.xml
+++ b/persistence-elasticsearch/core/pom.xml
@@ -189,6 +189,10 @@
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
</dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.unomi</groupId>
@@ -202,11 +206,6 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast-all</artifactId>
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 80f333cdf..6c0a0fa63 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -19,6 +19,8 @@ package org.apache.unomi.persistence.elasticsearch;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
@@ -118,8 +120,10 @@ import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.io.BufferedReader;
import java.io.IOException;
+import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URL;
+import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
@@ -436,11 +440,13 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
loadPredefinedMappings(bundleContext, false);
+ loadPainlessScripts(bundleContext);
// load predefined mappings and condition dispatchers of any bundles that were started before this one.
for (Bundle existingBundle : bundleContext.getBundles()) {
if (existingBundle.getBundleContext() != null) {
loadPredefinedMappings(existingBundle.getBundleContext(), false);
+ loadPainlessScripts(existingBundle.getBundleContext());
}
}
@@ -656,6 +662,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
switch (event.getType()) {
case BundleEvent.STARTING:
loadPredefinedMappings(event.getBundle().getBundleContext(), true);
+ loadPainlessScripts(event.getBundle().getBundleContext());
break;
}
}
@@ -692,6 +699,29 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
}
+ private void loadPainlessScripts(BundleContext bundleContext) {
+ Enumeration<URL> scriptsURL = bundleContext.getBundle().findEntries("META-INF/cxs/painless", "*.painless", true);
+ if (scriptsURL == null) {
+ return;
+ }
+
+ Map<String, String> scriptsById = new HashMap<>();
+ while (scriptsURL.hasMoreElements()) {
+ URL scriptURL = scriptsURL.nextElement();
+ logger.info("Found painless script at " + scriptURL + ", loading... ");
+ try (InputStream in = scriptURL.openStream()) {
+ String script = IOUtils.toString(in, StandardCharsets.UTF_8);
+ String scriptId = FilenameUtils.getBaseName(scriptURL.getPath());
+ scriptsById.put(scriptId, script);
+ } catch (Exception e) {
+ logger.error("Error while loading painless script " + scriptURL, e);
+ }
+
+ }
+
+ storeScripts(scriptsById);
+ }
+
private String loadMappingFile(URL predefinedMappingURL) throws IOException {
BufferedReader reader = new BufferedReader(new InputStreamReader(predefinedMappingURL.openStream()));
@@ -977,7 +1007,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
try {
String itemType = Item.getItemType(clazz);
- String index = getIndex(itemType, dateHint);
+ String index = getIndexNameForQuery(itemType);
for (int i = 0; i < scripts.length; i++) {
RefreshRequest refreshRequest = new RefreshRequest(index);
diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
index 7a3782079..3d90f6317 100644
--- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
+++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
@@ -18,13 +18,9 @@
package org.apache.unomi.plugins.baseplugin.actions;
import org.apache.commons.lang3.StringUtils;
-import org.apache.unomi.api.Event;
-import org.apache.unomi.api.Persona;
-import org.apache.unomi.api.Profile;
-import org.apache.unomi.api.Session;
+import org.apache.unomi.api.*;
import org.apache.unomi.api.actions.Action;
import org.apache.unomi.api.actions.ActionExecutor;
-import org.apache.unomi.api.actions.ActionPostExecutor;
import org.apache.unomi.api.conditions.Condition;
import org.apache.unomi.api.services.*;
import org.apache.unomi.persistence.spi.PersistenceService;
@@ -35,6 +31,8 @@ import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
public class MergeProfilesOnPropertyAction implements ActionExecutor {
private static final Logger logger = LoggerFactory.getLogger(MergeProfilesOnPropertyAction.class.getName());
@@ -45,223 +43,203 @@ public class MergeProfilesOnPropertyAction implements ActionExecutor {
private DefinitionsService definitionsService;
private PrivacyService privacyService;
private ConfigSharingService configSharingService;
- private int maxProfilesInOneMerge = -1;
+ private SchedulerService schedulerService;
+ // TODO we can remove this limit after dealing with: UNOMI-776 (50 is completely arbitrary and it's used to bypass the auto-scroll done by the persistence Service)
+ private int maxProfilesInOneMerge = 50;
public int execute(Action action, Event event) {
- String profileIdCookieName = (String) configSharingService.getProperty("profileIdCookieName");
- String profileIdCookieDomain = (String) configSharingService.getProperty("profileIdCookieDomain");
- Integer profileIdCookieMaxAgeInSeconds = (Integer) configSharingService.getProperty("profileIdCookieMaxAgeInSeconds");
- Boolean profileIdCookieHttpOnly = (Boolean) configSharingService.getProperty("profileIdCookieHttpOnly");
+ HttpServletResponse httpServletResponse = (HttpServletResponse) event.getAttributes().get(Event.HTTP_RESPONSE_ATTRIBUTE);
+ HttpServletRequest httpServletRequest = (HttpServletRequest) event.getAttributes().get(Event.HTTP_REQUEST_ATTRIBUTE);
- Profile profile = event.getProfile();
- if (profile instanceof Persona || profile.isAnonymousProfile()) {
- return EventService.NO_CHANGE;
- }
+ Profile eventProfile = event.getProfile();
+ final String mergePropName = (String) action.getParameterValues().get("mergeProfilePropertyName");
+ final String mergePropValue = (String) action.getParameterValues().get("mergeProfilePropertyValue");
+ boolean forceEventProfileAsMaster = action.getParameterValues().containsKey("forceEventProfileAsMaster") ? (boolean) action.getParameterValues().get("forceEventProfileAsMaster") : false;
+ final String currentProfileMergeValue = (String) eventProfile.getSystemProperties().get(mergePropName);
- final String mergeProfilePropertyName = (String) action.getParameterValues().get("mergeProfilePropertyName");
- if (StringUtils.isEmpty(mergeProfilePropertyName)) {
+ if (eventProfile instanceof Persona || eventProfile.isAnonymousProfile() || StringUtils.isEmpty(mergePropName) ||
+ StringUtils.isEmpty(mergePropValue)) {
return EventService.NO_CHANGE;
}
- final String mergeProfilePropertyValue = (String) action.getParameterValues().get("mergeProfilePropertyValue");
- if (StringUtils.isEmpty(mergeProfilePropertyValue)) {
- return EventService.NO_CHANGE;
- }
+ final List<Profile> profilesToBeMerge = getProfilesToBeMerge(mergePropName, mergePropValue);
- final String mergeProfilePreviousPropertyValue = profile.getSystemProperties().get(mergeProfilePropertyName) != null ? profile.getSystemProperties().get(mergeProfilePropertyName).toString() : "";
-
- final Session currentSession = event.getSession();
+ // Check if the user switched to another profile
+ if (StringUtils.isNotEmpty(currentProfileMergeValue) && !currentProfileMergeValue.equals(mergePropValue)) {
+ String reassignProfileId = reassignCurrentBrowsingData(event, profilesToBeMerge, forceEventProfileAsMaster, mergePropName, mergePropValue);
+ sendProfileCookie(reassignProfileId, httpServletResponse, httpServletRequest);
- boolean forceEventProfileAsMaster = action.getParameterValues().containsKey("forceEventProfileAsMaster") ?
- (boolean) action.getParameterValues().get("forceEventProfileAsMaster") : false;
+ return EventService.PROFILE_UPDATED + EventService.SESSION_UPDATED;
+ }
- // store the profile id in case the merge change it to a previous one
- String profileId = profile.getItemId();
+ // Store merge prop on current profile
+ boolean profileUpdated = false;
+ if (StringUtils.isEmpty(currentProfileMergeValue)) {
+ profileUpdated = true;
+ eventProfile.getSystemProperties().put(mergePropName, mergePropValue);
+ }
- Condition propertyCondition = new Condition(definitionsService.getConditionType("profilePropertyCondition"));
- propertyCondition.setParameter("comparisonOperator", "equals");
- propertyCondition.setParameter("propertyName", "systemProperties." + mergeProfilePropertyName);
- propertyCondition.setParameter("propertyValue", mergeProfilePropertyValue);
+ // If not profiles to merge we are done here.
+ if (profilesToBeMerge.isEmpty()) {
+ return profileUpdated ? EventService.PROFILE_UPDATED : EventService.NO_CHANGE;
+ }
- Condition excludeMergedProfilesCondition = new Condition(definitionsService.getConditionType("profilePropertyCondition"));
- excludeMergedProfilesCondition.setParameter("comparisonOperator", "missing");
- excludeMergedProfilesCondition.setParameter("propertyName", "mergedWith");
+ // add current Profile to profiles to be merged
+ if (profilesToBeMerge.stream().noneMatch(p -> StringUtils.equals(p.getItemId(), eventProfile.getItemId()))) {
+ profilesToBeMerge.add(eventProfile);
+ }
- Condition c = new Condition(definitionsService.getConditionType("booleanCondition"));
- c.setParameter("operator", "and");
- c.setParameter("subConditions", Arrays.asList(propertyCondition, excludeMergedProfilesCondition));
+ final String eventProfileId = eventProfile.getItemId();
+ final Profile masterProfile = profileService.mergeProfiles(forceEventProfileAsMaster ? eventProfile : profilesToBeMerge.get(0), profilesToBeMerge);
+ final String masterProfileId = masterProfile.getItemId();
- final List<Profile> profiles = persistenceService.query(c, "properties.firstVisit", Profile.class, 0, maxProfilesInOneMerge).getList();
+ // Profile is still using the same profileId after being merged, no need to rewrite exists data, merge is done
+ if (!forceEventProfileAsMaster && masterProfileId.equals(eventProfileId)) {
+ return profileUpdated ? EventService.PROFILE_UPDATED : EventService.NO_CHANGE;
+ }
- // Check if the user switched to another profile
- if (StringUtils.isNotEmpty(mergeProfilePreviousPropertyValue) && !mergeProfilePreviousPropertyValue.equals(mergeProfilePropertyValue)) {
- if (profiles.size() > 0) {
- // Take existing profile
- profile = profiles.get(0);
- } else {
- // Create a new profile
- if (forceEventProfileAsMaster)
- profile = event.getProfile();
- else {
- profile = new Profile(UUID.randomUUID().toString());
- profile.setProperty("firstVisit", event.getTimeStamp());
- }
- profile.getSystemProperties().put(mergeProfilePropertyName, mergeProfilePropertyValue);
- }
+ // ProfileID changed we have a lot to do
+ // First check for privacy stuff (inherit from previous profile if necessary)
+ if (privacyService.isRequireAnonymousBrowsing(eventProfile)) {
+ privacyService.setRequireAnonymousBrowsing(masterProfileId, true, event.getScope());
+ }
+ final boolean anonymousBrowsing = privacyService.isRequireAnonymousBrowsing(masterProfileId);
- logger.info("Different users, switch to " + profile.getItemId());
+ // we still send back the current profile cookie. It will be changed on the next request to the ContextServlet.
+ // The current profile will be deleted only then because we cannot delete it right now (too soon)
+ sendProfileCookie(eventProfileId, httpServletResponse, httpServletRequest);
- HttpServletResponse httpServletResponse = (HttpServletResponse) event.getAttributes().get(Event.HTTP_RESPONSE_ATTRIBUTE);
- HttpServletRequest httpServletRequest = (HttpServletRequest) event.getAttributes().get(Event.HTTP_REQUEST_ATTRIBUTE);
- if (httpServletRequest != null) {
- sendProfileCookie(profile, httpServletResponse, profileIdCookieName, profileIdCookieDomain,
- profileIdCookieMaxAgeInSeconds, profileIdCookieHttpOnly, httpServletRequest.isSecure());
- }
+ // Modify current session:
+ if (event.getSession() != null) {
+ event.getSession().setProfile(anonymousBrowsing ? privacyService.getAnonymousProfile(masterProfile) : masterProfile);
+ }
- // At the end of the merge, we must set the merged profile as profile event to process other Actions
- event.setProfileId(profile.getItemId());
- event.setProfile(profile);
+ // Modify current event:
+ event.setProfileId(anonymousBrowsing ? null : masterProfileId);
+ event.setProfile(masterProfile);
- if (currentSession != null) {
- currentSession.setProfile(profile);
- eventService.send(new Event("sessionReassigned", currentSession, profile, event.getScope(), event, currentSession,
- null, event.getTimeStamp(), false));
- }
+ event.getActionPostExecutors().add(() -> {
+ try {
+ // This is the list of profile Ids to be updated in browsing data (events/sessions)
+ List<Profile> mergedProfiles = profilesToBeMerge.stream()
+ .filter(mergedProfile -> !StringUtils.equals(mergedProfile.getItemId(), masterProfileId))
+ .collect(Collectors.toList());
- return EventService.PROFILE_UPDATED + EventService.SESSION_UPDATED;
- } else {
- // Store the merge property identifier in the profile
- profile.getSystemProperties().put(mergeProfilePropertyName, mergeProfilePropertyValue);
+ // ASYNC: Update browsing data (events/sessions) for merged profiles
+ reassignPersistedBrowsingDatasAsync(anonymousBrowsing, mergedProfiles.stream().map(Item::getItemId).collect(Collectors.toList()), masterProfileId);
- // add current Profile to profiles to be merged
- boolean add = true;
- for (Profile p : profiles) {
- add = add && !StringUtils.equals(p.getItemId(), profile.getItemId());
- }
- if (add) {
- profiles.add(profile);
- }
+ // Save event, as we dynamically changed the profileId of the current event
+ if (event.isPersistent()) {
+ persistenceService.save(event);
+ }
- if (profiles.size() == 1) {
- return StringUtils.isEmpty(mergeProfilePreviousPropertyValue) ? EventService.PROFILE_UPDATED : EventService.NO_CHANGE;
+ // we must mark all the profiles that we merged into the master as merged with the master, and they will
+ // be deleted upon next load
+ for (Profile mergedProfile : mergedProfiles) {
+ mergedProfile.setMergedWith(masterProfileId);
+ mergedProfile.setSystemProperty("lastUpdated", new Date());
+
+ boolean isExist = persistenceService.load(mergedProfile.getItemId(), Profile.class) != null;
+ if (!isExist) {
+ //save the original event profile is it has been changed
+ persistenceService.save(mergedProfile);
+ } else {
+ Map<String,Object> sourceMap = new HashMap<>();
+ sourceMap.put("mergedWith", masterProfileId);
+ sourceMap.put("systemProperties", mergedProfile.getSystemProperties());
+ persistenceService.update(mergedProfile, null, Profile.class, sourceMap,true);
+ }
+ }
+ } catch (Exception e) {
+ logger.error("unable to execute callback action, profile and session will not be saved", e);
+ return false;
}
+ return true;
+ });
- Profile markedMasterProfile;
- if (forceEventProfileAsMaster)
- markedMasterProfile = event.getProfile();
- else
- markedMasterProfile = profiles.get(0);// Use oldest profile for master profile
-
- final Profile masterProfile = profileService.mergeProfiles(markedMasterProfile, profiles);
-
- // Profile has changed
- if (forceEventProfileAsMaster || !masterProfile.getItemId().equals(profileId)) {
- HttpServletResponse httpServletResponse = (HttpServletResponse) event.getAttributes().get(Event.HTTP_RESPONSE_ATTRIBUTE);
- HttpServletRequest httpServletRequest = (HttpServletRequest) event.getAttributes().get(Event.HTTP_REQUEST_ATTRIBUTE);
- // we still send back the current profile cookie. It will be changed on the next request to the ContextServlet.
- // The current profile will be deleted only then because we cannot delete it right now (too soon)
- if (httpServletRequest != null) {
- sendProfileCookie(profile, httpServletResponse, profileIdCookieName, profileIdCookieDomain,
- profileIdCookieMaxAgeInSeconds, profileIdCookieHttpOnly, httpServletRequest.isSecure());
- }
+ return EventService.PROFILE_UPDATED + EventService.SESSION_UPDATED;
+ }
- final String masterProfileId = masterProfile.getItemId();
- // At the end of the merge, we must set the merged profile as profile event to process other Actions
- event.setProfileId(masterProfileId);
- event.setProfile(masterProfile);
+ private void sendProfileCookie(String profileId, HttpServletResponse response, HttpServletRequest request) {
+ if (response != null && request != null) {
+ String profileIdCookieName = (String) configSharingService.getProperty("profileIdCookieName");
+ String profileIdCookieDomain = (String) configSharingService.getProperty("profileIdCookieDomain");
+ Integer profileIdCookieMaxAgeInSeconds = (Integer) configSharingService.getProperty("profileIdCookieMaxAgeInSeconds");
+ Boolean profileIdCookieHttpOnly = (Boolean) configSharingService.getProperty("profileIdCookieHttpOnly");
+
+ response.addHeader("Set-Cookie",
+ profileIdCookieName + "=" + profileId +
+ "; Path=/" +
+ "; Max-Age=" + profileIdCookieMaxAgeInSeconds +
+ (StringUtils.isNotBlank(profileIdCookieDomain) ? ("; Domain=" + profileIdCookieDomain) : "") +
+ "; SameSite=Lax" +
+ (request.isSecure() ? "; Secure" : "") +
+ (profileIdCookieHttpOnly ? "; HttpOnly" : ""));
+ }
+ }
- final Boolean anonymousBrowsing = privacyService.isRequireAnonymousBrowsing(masterProfileId);
+ private List<Profile> getProfilesToBeMerge(String mergeProfilePropertyName, String mergeProfilePropertyValue) {
+ Condition propertyCondition = new Condition(definitionsService.getConditionType("profilePropertyCondition"));
+ propertyCondition.setParameter("comparisonOperator", "equals");
+ propertyCondition.setParameter("propertyName", "systemProperties." + mergeProfilePropertyName);
+ propertyCondition.setParameter("propertyValue", mergeProfilePropertyValue);
- if (currentSession != null){
- currentSession.setProfile(masterProfile);
- if (privacyService.isRequireAnonymousBrowsing(profile)) {
- privacyService.setRequireAnonymousBrowsing(masterProfileId, true, event.getScope());
- }
+ return persistenceService.query(propertyCondition, "properties.firstVisit", Profile.class, 0, maxProfilesInOneMerge).getList();
+ }
- if (anonymousBrowsing) {
- currentSession.setProfile(privacyService.getAnonymousProfile(masterProfile));
- event.setProfileId(null);
- persistenceService.save(event);
+ private void reassignPersistedBrowsingDatasAsync(boolean anonymousBrowsing, List<String> mergedProfileIds, String masterProfileId) {
+ schedulerService.getSharedScheduleExecutorService().schedule(new TimerTask() {
+ @Override
+ public void run() {
+ if (!anonymousBrowsing) {
+ Condition profileIdsCondition = new Condition(definitionsService.getConditionType("eventPropertyCondition"));
+ profileIdsCondition.setParameter("propertyName","profileId");
+ profileIdsCondition.setParameter("comparisonOperator","in");
+ profileIdsCondition.setParameter("propertyValues", mergedProfileIds);
+
+ String[] scripts = new String[]{"updateProfileId"};
+ Map<String, Object>[] scriptParams = new Map[]{Collections.singletonMap("profileId", masterProfileId)};
+ Condition[] conditions = new Condition[]{profileIdsCondition};
+
+ persistenceService.updateWithQueryAndStoredScript(null, Session.class, scripts, scriptParams, conditions);
+ persistenceService.updateWithQueryAndStoredScript(null, Event.class, scripts, scriptParams, conditions);
+ } else {
+ for (String mergedProfileId : mergedProfileIds) {
+ privacyService.anonymizeBrowsingData(mergedProfileId);
}
}
-
- event.getActionPostExecutors().add(new ActionPostExecutor() {
- @Override
- public boolean execute() {
- try {
- Event currentEvent = event;
- // Update current event explicitly, as it might not return from search query if there wasn't a refresh in ES
- if (!StringUtils.equals(profileId, masterProfileId)) {
- if (currentEvent.isPersistent()) {
- persistenceService.update(currentEvent, currentEvent.getTimeStamp(), Event.class, "profileId", anonymousBrowsing ? null : masterProfileId);
- } }
-
- for (Profile profile : profiles) {
- String profileId = profile.getItemId();
- if (!StringUtils.equals(profileId, masterProfileId)) {
- List<Session> sessions = persistenceService.query("profileId", profileId, null, Session.class);
- if (currentSession != null){
- if (masterProfileId.equals(profileId) && !sessions.contains(currentSession)) {
- sessions.add(currentSession);
- }
- }
-
- for (Session session : sessions) {
- persistenceService.update(session, session.getTimeStamp(), Session.class, "profileId", anonymousBrowsing ? null : masterProfileId);
- }
-
- List<Event> events = persistenceService.query("profileId", profileId, null, Event.class);
- for (Event event : events) {
- if (!event.getItemId().equals(currentEvent.getItemId())) {
- persistenceService.update(event, event.getTimeStamp(), Event.class, "profileId", anonymousBrowsing ? null : masterProfileId);
- }
- }
- // we must mark all the profiles that we merged into the master as merged with the master, and they will
- // be deleted upon next load
- profile.setMergedWith(masterProfileId);
- Map<String,Object> sourceMap = new HashMap<>();
- sourceMap.put("mergedWith", masterProfileId);
- profile.setSystemProperty("lastUpdated", new Date());
- sourceMap.put("systemProperties", profile.getSystemProperties());
-
- boolean isExist = persistenceService.load(profile.getItemId(), Profile.class) != null;
-
- if (isExist == false) //save the original event profile is it has been changed
- persistenceService.save(profile);
- else
- persistenceService.update(profile, null, Profile.class, sourceMap,true);
-
- }
- }
- } catch (Exception e) {
- logger.error("unable to execute callback action, profile and session will not be saved", e);
- return false;
- }
- return true;
- }
- });
- return EventService.PROFILE_UPDATED + EventService.SESSION_UPDATED;
- } else {
- return StringUtils.isEmpty(mergeProfilePreviousPropertyValue) ? EventService.PROFILE_UPDATED : EventService.NO_CHANGE;
}
- }
+ }, 1000, TimeUnit.MILLISECONDS);
}
- private static void sendProfileCookie(Profile profile, ServletResponse response, String profileIdCookieName, String profileIdCookieDomain,
- int cookieAgeInSeconds, boolean httpOnly, boolean secure) {
- if (response != null && response instanceof HttpServletResponse) {
- HttpServletResponse httpServletResponse = (HttpServletResponse) response;
- if (!(profile instanceof Persona)) {
- httpServletResponse.addHeader("Set-Cookie",
- profileIdCookieName + "=" + profile.getItemId() +
- "; Path=/" +
- "; Max-Age=" + cookieAgeInSeconds +
- (StringUtils.isNotBlank(profileIdCookieDomain) ? ("; Domain=" + profileIdCookieDomain) : "") +
- "; SameSite=Lax" +
- (secure ? "; Secure" : "") +
- (httpOnly ? "; HttpOnly" : ""));
+ private String reassignCurrentBrowsingData(Event event, List<Profile> existingMergedProfiles, boolean forceEventProfileAsMaster, String mergePropName, String mergePropValue) {
+ Profile eventProfile = event.getProfile();
+
+ if (existingMergedProfiles.size() > 0) {
+ // Take existing profile
+ eventProfile = existingMergedProfiles.get(0);
+ } else {
+ if (!forceEventProfileAsMaster) {
+ // Create a new profile
+ eventProfile = new Profile(UUID.randomUUID().toString());
+ eventProfile.setProperty("firstVisit", event.getTimeStamp());
}
+ eventProfile.getSystemProperties().put(mergePropName, mergePropValue);
}
+
+ logger.info("Different users, switch to {}", eventProfile.getItemId());
+ // At the end of the merge, we must set the merged profile as profile event to process other Actions
+ event.setProfileId(eventProfile.getItemId());
+ event.setProfile(eventProfile);
+
+ if (event.getSession() != null) {
+ Session eventSession = event.getSession();
+ eventSession.setProfile(eventProfile);
+ eventService.send(new Event("sessionReassigned", eventSession, eventProfile, event.getScope(), event, eventSession,
+ null, event.getTimeStamp(), false));
+ }
+
+ return eventProfile.getItemId();
}
public void setProfileService(ProfileService profileService) {
@@ -288,8 +266,11 @@ public class MergeProfilesOnPropertyAction implements ActionExecutor {
this.configSharingService = configSharingService;
}
+ public void setSchedulerService(SchedulerService schedulerService) {
+ this.schedulerService = schedulerService;
+ }
+
public void setMaxProfilesInOneMerge(String maxProfilesInOneMerge) {
this.maxProfilesInOneMerge = Integer.parseInt(maxProfilesInOneMerge);
}
-
-}
+}
\ No newline at end of file
diff --git a/plugins/baseplugin/src/main/resources/META-INF/cxs/painless/updateProfileId.painless b/plugins/baseplugin/src/main/resources/META-INF/cxs/painless/updateProfileId.painless
new file mode 100644
index 000000000..31f900c62
--- /dev/null
+++ b/plugins/baseplugin/src/main/resources/META-INF/cxs/painless/updateProfileId.painless
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ This script is used to update the profileId and profile.profileId in sessions and/or events after a merge situation
+ required params:
+ - params.profileId: the ID of the new profileId
+*/
+
+// update profileId
+if (ctx._source.containsKey("profileId") && ctx._source.profileId != params.profileId) {
+ ctx._source.put("profileId", params.profileId)
+}
+
+// update inner profile.profileId if the inner profile exists (in sessions for example)
+if (ctx._source.containsKey("profile") && ctx._source.profile.containsKey("itemId") && ctx._source.profile.itemId != params.profileId) {
+ ctx._source.profile.put("itemId", params.profileId)
+}
\ No newline at end of file
diff --git a/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 1a067ba0e..ff7bcb6c5 100644
--- a/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -26,7 +26,7 @@
<cm:default-properties>
<cm:property name="useEventToUpdateProfile" value="false" />
<cm:property name="usePropertyConditionOptimizations" value="true" />
- <cm:property name="maxProfilesInOneMerge" value="-1"/>
+ <cm:property name="maxProfilesInOneMerge" value="50"/>
</cm:default-properties>
</cm:property-placeholder>
@@ -43,9 +43,10 @@
<reference id="persistenceService" interface="org.apache.unomi.persistence.spi.PersistenceService"/>
<reference id="profileService" interface="org.apache.unomi.api.services.ProfileService"/>
<reference id="privacyService" interface="org.apache.unomi.api.services.PrivacyService"/>
+ <reference id="schedulerService" interface="org.apache.unomi.api.services.SchedulerService"/>
<reference id="segmentService" interface="org.apache.unomi.api.services.SegmentService"/>
<reference id="eventService" interface="org.apache.unomi.api.services.EventService"/>
- <reference id="configSharingService" interface="org.apache.unomi.api.services.ConfigSharingService" />
+ <reference id="configSharingService" interface="org.apache.unomi.api.services.ConfigSharingService"/>
<reference id="scriptExecutor" interface="org.apache.unomi.scripting.ScriptExecutor"/>
<service
@@ -115,7 +116,7 @@
<property name="definitionsService" ref="definitionsService"/>
<property name="persistenceService" ref="persistenceService"/>
<property name="segmentService" ref="segmentService"/>
- <property name="scriptExecutor" ref="scriptExecutor" />
+ <property name="scriptExecutor" ref="scriptExecutor"/>
<property name="maximumIdsQueryCount" value="${es.maximumIdsQueryCount}"/>
<property name="pastEventsDisablePartitions" value="${es.pastEventsDisablePartitions}"/>
<property name="aggregateQueryBucketSize" value="${es.aggregateQueryBucketSize}"/>
@@ -298,6 +299,7 @@
<property name="definitionsService" ref="definitionsService"/>
<property name="privacyService" ref="privacyService"/>
<property name="configSharingService" ref="configSharingService" />
+ <property name="schedulerService" ref="schedulerService"/>
<property name="maxProfilesInOneMerge" value="${base.maxProfilesInOneMerge}"/>
</bean>
</service>
diff --git a/plugins/baseplugin/src/main/resources/org.apache.unomi.plugins.base.cfg b/plugins/baseplugin/src/main/resources/org.apache.unomi.plugins.base.cfg
index 41f80a950..b0bad3b5c 100644
--- a/plugins/baseplugin/src/main/resources/org.apache.unomi.plugins.base.cfg
+++ b/plugins/baseplugin/src/main/resources/org.apache.unomi.plugins.base.cfg
@@ -15,4 +15,4 @@
# limitations under the License.
#
-maxProfilesInOneMerge=${org.apache.unomi.plugins.base.maxProfilesInOneMerge:--1}
+maxProfilesInOneMerge=${org.apache.unomi.plugins.base.maxProfilesInOneMerge:-50}
diff --git a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
index 377631c1f..e4794bceb 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
@@ -22,7 +22,6 @@ import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.CharEncoding;
import org.apache.unomi.api.Event;
import org.apache.unomi.api.Item;
import org.apache.unomi.api.Metadata;
@@ -163,7 +162,6 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
}
bundleContext.addBundleListener(this);
initializeTimer();
- loadScripts();
logger.info("Segment service initialized.");
}
@@ -1251,25 +1249,6 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
schedulerService.getScheduleExecutorService().scheduleAtFixedRate(task, 0, segmentRefreshInterval, TimeUnit.MILLISECONDS);
}
- private void loadScripts() throws IOException {
- Enumeration<URL> scriptsURL = bundleContext.getBundle().findEntries("META-INF/cxs/painless", "*.painless", true);
- if (scriptsURL == null) {
- return;
- }
-
- Map<String, String> scriptsById = new HashMap<>();
- while (scriptsURL.hasMoreElements()) {
- URL scriptURL = scriptsURL.nextElement();
- logger.debug("Found painless script at " + scriptURL + ", loading... ");
- try (InputStream in = scriptURL.openStream()) {
- String script = IOUtils.toString(in, StandardCharsets.UTF_8);
- String scriptId = FilenameUtils.getBaseName(scriptURL.getPath());
- scriptsById.put(scriptId, script);
- }
- }
- persistenceService.storeScripts(scriptsById);
- }
-
public void setTaskExecutionPeriod(long taskExecutionPeriod) {
this.taskExecutionPeriod = taskExecutionPeriod;
}