You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by sh...@apache.org on 2023/03/13 15:04:14 UTC
[unomi] branch master updated: UNOMI-727: adapt merge system to rollover (and cleanup too) (#588)
This is an automated email from the ASF dual-hosted git repository.
shuber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/master by this push:
new 385699a5c UNOMI-727: adapt merge system to rollover (and cleanup too) (#588)
385699a5c is described below
commit 385699a5cdb771ac8b70d8ea493f43a849b78f86
Author: kevan Jahanshahi <jk...@apache.org>
AuthorDate: Mon Mar 13 16:04:07 2023 +0100
UNOMI-727: adapt merge system to rollover (and cleanup too) (#588)
* UNOMI-727: adapt merge system to rollover (and cleanup too)
* UNOMI-727: adapt merge system to rollover (and cleanup too)
* UNOMI-727: adapt merge system to rollover (and cleanup too)
---
.../org/apache/unomi/itests/ProfileMergeIT.java | 224 ++++++++++++++++--
.../actions/MergeProfilesOnPropertyAction.java | 261 ++++++++++-----------
.../services/impl/segments/SegmentServiceImpl.java | 4 +-
3 files changed, 322 insertions(+), 167 deletions(-)
diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java b/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java
index a5306d9f9..7815baccb 100644
--- a/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java
@@ -16,10 +16,7 @@
*/
package org.apache.unomi.itests;
-import org.apache.unomi.api.Event;
-import org.apache.unomi.api.Metadata;
-import org.apache.unomi.api.Profile;
-import org.apache.unomi.api.ProfileAlias;
+import org.apache.unomi.api.*;
import org.apache.unomi.api.actions.Action;
import org.apache.unomi.api.conditions.Condition;
import org.apache.unomi.api.rules.Rule;
@@ -49,14 +46,15 @@ public class ProfileMergeIT extends BaseIT {
private final static String TEST_PROFILE_ID = "mergeOnPropertyTestProfileId";
@After
- public void after() {
+ public void after() throws InterruptedException {
// cleanup created data
rulesService.removeRule(TEST_RULE_ID);
+ removeItems(Profile.class, ProfileAlias.class, Event.class, Session.class);
}
@Test
public void testProfileMergeOnPropertyAction_dont_forceEventProfileAsMaster() throws InterruptedException {
- createAndWaitForRule(createMergeOnPropertyRule(false));
+ createAndWaitForRule(createMergeOnPropertyRule(false, "j:nodename"));
// A new profile should be created.
Assert.assertNotEquals(sendEvent().getProfile().getItemId(), TEST_PROFILE_ID);
@@ -64,29 +62,16 @@ public class ProfileMergeIT extends BaseIT {
@Test
public void testProfileMergeOnPropertyAction_forceEventProfileAsMaster() throws InterruptedException {
- createAndWaitForRule(createMergeOnPropertyRule(true));
+ createAndWaitForRule(createMergeOnPropertyRule(true, "j:nodename"));
// No new profile should be created, instead the profile of the event should be used.
Assert.assertEquals(sendEvent().getProfile().getItemId(), TEST_PROFILE_ID);
}
@Test
- public void test() throws InterruptedException {
+ public void testProfileMergeOnPropertyAction_simpleMergeAndCheckAlias() throws InterruptedException {
// create rule
- Condition condition = new Condition(definitionsService.getConditionType("eventTypeCondition"));
- condition.setParameter("eventTypeId", TEST_EVENT_TYPE);
-
- final Action action = new Action(definitionsService.getActionType("mergeProfilesOnPropertyAction"));
- action.setParameter("mergeProfilePropertyValue", "eventProperty::target.properties(email)");
- action.setParameter("mergeProfilePropertyName", "mergeIdentifier");
- action.setParameter("forceEventProfileAsMaster", false);
-
- Rule rule = new Rule();
- rule.setMetadata(new Metadata(null, TEST_RULE_ID, TEST_RULE_ID, "Description"));
- rule.setCondition(condition);
- rule.setActions(Collections.singletonList(action));
-
- createAndWaitForRule(rule);
+ createAndWaitForRule(createMergeOnPropertyRule(false, "email"));
// create master profile
Profile masterProfile = new Profile();
@@ -115,6 +100,14 @@ public class ProfileMergeIT extends BaseIT {
() -> persistenceService.getAllItems(ProfileAlias.class), (profileAliases) -> !profileAliases.isEmpty(),
DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+ waitForNullValue("Profile with id eventProfileID not removed in the required time",
+ () -> persistenceService.load("eventProfileID", Profile.class),
+ DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+
+ keepTrying("Profile with id eventProfileID should still be accessible due to alias",
+ () -> profileService.load("eventProfileID"), Objects::nonNull,
+ DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+
List<ProfileAlias> aliases = persistenceService.query("profileID", masterProfile.getItemId(), null, ProfileAlias.class);
Assert.assertFalse(aliases.isEmpty());
@@ -123,6 +116,189 @@ public class ProfileMergeIT extends BaseIT {
Assert.assertEquals("defaultClientId", aliases.get(0).getClientID());
}
+
+ /**
+ * User switch case, this case can happen when a person (user A) is using the same browser session of a previous logged user (user B).
+ * user A will be using user B profile, but when user A is going to login by send a merge event, then we will detect that the mergeIdentifier is not the same
+ * In this case we will just switch user A profile to:
+ * - a new one, if it's the first time we encounter his own mergeIdentifier (TESTED in this scenario)
+ * - a previous one, if we already have a profile in DB with the same mergeIdentifier.
+ */
+ @Test
+ public void testProfileMergeOnPropertyAction_sessionReassigned_newProfile() throws InterruptedException {
+ // create rule
+ createAndWaitForRule(createMergeOnPropertyRule(false, "email"));
+
+ // create master profile
+ Profile masterProfile = new Profile();
+ masterProfile.setItemId("masterProfileID");
+ masterProfile.setProperty("email", "master@domain.com");
+ masterProfile.setSystemProperty("mergeIdentifier", "master@domain.com");
+ profileService.save(masterProfile);
+
+ keepTrying("Profile with id masterProfileID not found in the required time", () -> profileService.load("masterProfileID"),
+ Objects::nonNull, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+
+ // create event profile
+ Profile eventProfile = new Profile();
+ eventProfile.setItemId("eventProfileID");
+ eventProfile.setProperty("email", "event@domain.com");
+
+ Session simpleSession = new Session("simpleSession", eventProfile, new Date(), null);
+ Event event = new Event(TEST_EVENT_TYPE, simpleSession, masterProfile, null, null, eventProfile, new Date());
+ eventService.send(event);
+
+ // Session should have been reassign and a new profile should have been created ! (We call this user switch case)
+ Assert.assertNotNull(event.getProfile());
+ Assert.assertNotEquals("eventProfileID", event.getProfile().getItemId());
+ Assert.assertNotEquals("eventProfileID", event.getProfileId());
+ Assert.assertNotEquals("eventProfileID", event.getSession().getProfile().getItemId());
+ Assert.assertNotEquals("eventProfileID", event.getSession().getProfileId());
+
+ Assert.assertNotEquals("masterProfileID", event.getProfile().getItemId());
+ Assert.assertNotEquals("masterProfileID", event.getProfileId());
+ Assert.assertNotEquals("masterProfileID", event.getSession().getProfile().getItemId());
+ Assert.assertNotEquals("masterProfileID", event.getSession().getProfileId());
+
+ Assert.assertEquals(event.getSession().getProfileId(), event.getProfileId());
+ Assert.assertEquals("event@domain.com", event.getProfile().getSystemProperties().get("mergeIdentifier"));
+ }
+
+ /**
+ * User switch case, this case can happen when a person (user A) is using the same browser session of a previous logged user (user B).
+ * user A will be using user B profile, but when user A is going to login by send a merge event, then we will detect that the mergeIdentifier is not the same
+ * In this case we will just switch user A profile to:
+ * - a new one, if it's the first time we encounter his own mergeIdentifier
+ * - a previous one, if we already have a profile in DB with the same mergeIdentifier. (TESTED in this scenario)
+ */
+ @Test
+ public void testProfileMergeOnPropertyAction_sessionReassigned_existingProfile() throws InterruptedException {
+ // create rule
+ createAndWaitForRule(createMergeOnPropertyRule(false, "email"));
+
+ // create master profile
+ Profile masterProfile = new Profile();
+ masterProfile.setItemId("masterProfileID");
+ masterProfile.setProperty("email", "master@domain.com");
+ masterProfile.setSystemProperty("mergeIdentifier", "master@domain.com");
+ profileService.save(masterProfile);
+
+ // create a previous existing profile with same mergeIdentifier
+ Profile previousProfile = new Profile();
+ previousProfile.setItemId("previousProfileID");
+ previousProfile.setProperty("email", "event@domain.com");
+ previousProfile.setSystemProperty("mergeIdentifier", "event@domain.com");
+ profileService.save(previousProfile);
+
+ keepTrying("Profile with id masterProfileID not found in the required time", () -> profileService.load("masterProfileID"),
+ Objects::nonNull, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+ keepTrying("Profile with id previousProfileID not found in the required time", () -> profileService.load("previousProfileID"),
+ Objects::nonNull, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+
+ // create event profile
+ Profile eventProfile = new Profile();
+ eventProfile.setItemId("eventProfileID");
+ eventProfile.setProperty("email", "event@domain.com");
+
+ Session simpleSession = new Session("simpleSession", eventProfile, new Date(), null);
+ Event event = new Event(TEST_EVENT_TYPE, simpleSession, masterProfile, null, null, eventProfile, new Date());
+ eventService.send(event);
+
+ // Session should have been reassign and the previous existing profile for mergeIdentifier: event@domain.com should have been reuse
+ // Session should have been reassign and a new profile should have been created ! (We call this user switch case)
+ Assert.assertNotNull(event.getProfile());
+ Assert.assertEquals("previousProfileID", event.getProfile().getItemId());
+ Assert.assertEquals("previousProfileID", event.getProfileId());
+ Assert.assertEquals("previousProfileID", event.getSession().getProfile().getItemId());
+ Assert.assertEquals("previousProfileID", event.getSession().getProfileId());
+
+ Assert.assertEquals(event.getSession().getProfileId(), event.getProfileId());
+ Assert.assertEquals("event@domain.com", event.getProfile().getSystemProperties().get("mergeIdentifier"));
+ }
+
+ /**
+ * In case of merge, existing sessions/events from previous profileId should be rewritten to use the new master profileId
+ */
+ @Test
+ public void testProfileMergeOnPropertyAction_simpleMergeRewriteExistingSessionsEvents() throws InterruptedException {
+ Condition matchAll = new Condition(definitionsService.getConditionType("matchAllCondition"));
+ // create rule
+ createAndWaitForRule(createMergeOnPropertyRule(false, "email"));
+
+ // create master profile
+ Profile masterProfile = new Profile();
+ masterProfile.setItemId("masterProfileID");
+ masterProfile.setProperty("email", "username@domain.com");
+ masterProfile.setSystemProperty("mergeIdentifier", "username@domain.com");
+ profileService.save(masterProfile);
+
+ Profile eventProfile = new Profile();
+ eventProfile.setItemId("eventProfileID");
+ eventProfile.setProperty("email", "username@domain.com");
+ profileService.save(eventProfile);
+
+ // create 5 sessions and 5 events for master profile.
+ List<Session> sessionsToBeRewritten = new ArrayList<>();
+ List<Event> eventsToBeRewritten = new ArrayList<>();
+ for (int i = 1; i <= 5; i++) {
+ Session sessionToBeRewritten = new Session("simpleSession_"+ i, eventProfile, new Date(), null);
+ sessionsToBeRewritten.add(sessionToBeRewritten);
+ Event eventToBeRewritten = new Event("view", sessionToBeRewritten, eventProfile, null, null, null, new Date());
+ eventsToBeRewritten.add(eventToBeRewritten);
+
+
+ persistenceService.save(sessionToBeRewritten);
+ persistenceService.save(eventToBeRewritten);
+ }
+ keepTrying("Wait for sessions and events to be persisted", () -> persistenceService.queryCount(matchAll, Session.ITEM_TYPE) + persistenceService.queryCount(matchAll, Event.ITEM_TYPE),
+ (count) -> count == 10, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+ keepTrying("Profile with id masterProfileID not found in the required time", () -> profileService.load("masterProfileID"),
+ Objects::nonNull, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+ keepTrying("Profile with id eventProfileID not found in the required time", () -> profileService.load("eventProfileID"),
+ Objects::nonNull, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+
+ // Trigger the merge
+ Session simpleSession = new Session("simpleSession", eventProfile, new Date(), null);
+ Event mergeEvent = new Event(TEST_EVENT_TYPE, simpleSession, eventProfile, null, null, eventProfile, new Date());
+ eventService.send(mergeEvent);
+
+ // Check that master profile is now used:
+ Assert.assertNotNull(mergeEvent.getProfile());
+ Assert.assertEquals("masterProfileID", mergeEvent.getProfile().getItemId());
+ Assert.assertEquals("masterProfileID", mergeEvent.getProfileId());
+ Assert.assertEquals("masterProfileID", mergeEvent.getSession().getProfile().getItemId());
+ Assert.assertEquals("masterProfileID", mergeEvent.getSession().getProfileId());
+ Assert.assertEquals(mergeEvent.getSession().getProfileId(), mergeEvent.getProfileId());
+ Assert.assertEquals("username@domain.com", mergeEvent.getProfile().getSystemProperties().get("mergeIdentifier"));
+
+ // Check events are correctly rewritten
+ for (Event event : eventsToBeRewritten) {
+ keepTrying("Wait for event: " + event.getItemId() + " profileId to be rewritten for masterProfileID",
+ () -> persistenceService.load(event.getItemId(), Event.class),
+ (loadedEvent) -> loadedEvent.getProfileId().equals("masterProfileID"), DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+ }
+
+ // Check sessions are correctly rewritten
+ Condition sessionProfileIDRewrittenCondition = new Condition(definitionsService.getConditionType("sessionPropertyCondition"));
+ sessionProfileIDRewrittenCondition.setParameter("propertyName","profileId");
+ sessionProfileIDRewrittenCondition.setParameter("comparisonOperator","equals");
+ sessionProfileIDRewrittenCondition.setParameter("propertyValue","masterProfileID");
+ keepTrying("Wait for sessions profileId to be rewritten to masterProfileID",
+ () -> persistenceService.queryCount(sessionProfileIDRewrittenCondition, Session.ITEM_TYPE),
+ (count) -> count == 5, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+
+ // TODO uncomment this when UNOMI-749 is fixed, currently session loaded are inconsistent
+ /* for (Session session : sessionsToBeRewritten) {
+ keepTrying("Wait for session: " + session.getItemId() + " profileId to be rewritten for masterProfileID",
+ () -> persistenceService.load(session.getItemId(), Session.class),
+ (loadedSession) -> loadedSession.getProfileId().equals("masterProfileID"), DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+ } */
+ }
+
+ /**
+ * Personalization strategy have a specific handling during the merge of two profiles
+ * This test is here to ensure this specific behavior is correctly working.
+ */
@Test
public void testPersonalizationStrategyStatusMerge() {
// create some statuses for the tests:
@@ -208,7 +384,7 @@ public class ProfileMergeIT extends BaseIT {
return testEvent;
}
- private Rule createMergeOnPropertyRule(boolean forceEventProfileAsMaster) throws InterruptedException {
+ private Rule createMergeOnPropertyRule(boolean forceEventProfileAsMaster, String eventProperty) throws InterruptedException {
Rule mergeOnPropertyTestRule = new Rule();
mergeOnPropertyTestRule
.setMetadata(new Metadata(null, TEST_RULE_ID, TEST_RULE_ID, "Test rule for testing MergeProfilesOnPropertyAction"));
@@ -218,7 +394,7 @@ public class ProfileMergeIT extends BaseIT {
mergeOnPropertyTestRule.setCondition(condition);
final Action mergeProfilesOnPropertyAction = new Action(definitionsService.getActionType("mergeProfilesOnPropertyAction"));
- mergeProfilesOnPropertyAction.setParameter("mergeProfilePropertyValue", "eventProperty::target.properties(j:nodename)");
+ mergeProfilesOnPropertyAction.setParameter("mergeProfilePropertyValue", "eventProperty::target.properties(" + eventProperty + ")");
mergeProfilesOnPropertyAction.setParameter("mergeProfilePropertyName", "mergeIdentifier");
mergeProfilesOnPropertyAction.setParameter("forceEventProfileAsMaster", forceEventProfileAsMaster);
mergeOnPropertyTestRule.setActions(Collections.singletonList(mergeProfilesOnPropertyAction));
diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
index 2261a2a4b..6ecd0e797 100644
--- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
+++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
@@ -24,7 +24,6 @@ import org.apache.unomi.api.Profile;
import org.apache.unomi.api.Session;
import org.apache.unomi.api.actions.Action;
import org.apache.unomi.api.actions.ActionExecutor;
-import org.apache.unomi.api.actions.ActionPostExecutor;
import org.apache.unomi.api.conditions.Condition;
import org.apache.unomi.api.services.*;
import org.apache.unomi.persistence.spi.PersistenceService;
@@ -46,170 +45,150 @@ public class MergeProfilesOnPropertyAction implements ActionExecutor {
public int execute(Action action, Event event) {
- Profile profile = event.getProfile();
- if (profile instanceof Persona || profile.isAnonymousProfile()) {
- return EventService.NO_CHANGE;
- }
+ Profile eventProfile = event.getProfile();
+ final String mergePropName = (String) action.getParameterValues().get("mergeProfilePropertyName");
+ final String mergePropValue = (String) action.getParameterValues().get("mergeProfilePropertyValue");
+ boolean forceEventProfileAsMaster = action.getParameterValues().containsKey("forceEventProfileAsMaster") ? (boolean) action.getParameterValues().get("forceEventProfileAsMaster") : false;
+ final String currentProfileMergeValue = (String) eventProfile.getSystemProperties().get(mergePropName);
- final String mergeProfilePropertyName = (String) action.getParameterValues().get("mergeProfilePropertyName");
- if (StringUtils.isEmpty(mergeProfilePropertyName)) {
+ if (eventProfile instanceof Persona || eventProfile.isAnonymousProfile() || StringUtils.isEmpty(mergePropName) ||
+ StringUtils.isEmpty(mergePropValue)) {
return EventService.NO_CHANGE;
}
- final String mergeProfilePropertyValue = (String) action.getParameterValues().get("mergeProfilePropertyValue");
- if (StringUtils.isEmpty(mergeProfilePropertyValue)) {
- return EventService.NO_CHANGE;
+ final List<Profile> profilesToBeMerge = getProfilesToBeMerge(mergePropName, mergePropValue);
+
+ // Check if the user switched to another profile
+ if (StringUtils.isNotEmpty(currentProfileMergeValue) && !currentProfileMergeValue.equals(mergePropValue)) {
+ reassignSession(event, profilesToBeMerge, forceEventProfileAsMaster, mergePropName, mergePropValue);
+ return EventService.PROFILE_UPDATED + EventService.SESSION_UPDATED;
}
- final String mergeProfilePreviousPropertyValue = profile.getSystemProperties().get(mergeProfilePropertyName) != null ? profile.getSystemProperties().get(mergeProfilePropertyName).toString() : "";
+ // Store merge prop on current profile
+ boolean profileUpdated = false;
+ if (StringUtils.isEmpty(currentProfileMergeValue)) {
+ profileUpdated = true;
+ eventProfile.getSystemProperties().put(mergePropName, mergePropValue);
+ }
- final Session currentSession = event.getSession();
+ // If not profiles to merge we are done here.
+ if (profilesToBeMerge.isEmpty()) {
+ return profileUpdated ? EventService.PROFILE_UPDATED : EventService.NO_CHANGE;
+ }
- boolean forceEventProfileAsMaster = action.getParameterValues().containsKey("forceEventProfileAsMaster") ?
- (boolean) action.getParameterValues().get("forceEventProfileAsMaster") : false;
+ // add current Profile to profiles to be merged
+ if (profilesToBeMerge.stream().noneMatch(p -> StringUtils.equals(p.getItemId(), eventProfile.getItemId()))) {
+ profilesToBeMerge.add(eventProfile);
+ }
- // store the profile id in case the merge change it to a previous one
- String profileId = profile.getItemId();
+ final String eventProfileId = eventProfile.getItemId();
+ final Profile mergedProfile = profileService.mergeProfiles(forceEventProfileAsMaster ? eventProfile : profilesToBeMerge.get(0), profilesToBeMerge);
+ final String mergedProfileId = mergedProfile.getItemId();
- Condition propertyCondition = new Condition(definitionsService.getConditionType("profilePropertyCondition"));
- propertyCondition.setParameter("comparisonOperator", "equals");
- propertyCondition.setParameter("propertyName", "systemProperties." + mergeProfilePropertyName);
- propertyCondition.setParameter("propertyValue", mergeProfilePropertyValue);
+ // Profile is still using the same profileId after being merged, no need to rewrite exists data, merge is done
+ if (!forceEventProfileAsMaster && mergedProfileId.equals(eventProfileId)) {
+ return profileUpdated ? EventService.PROFILE_UPDATED : EventService.NO_CHANGE;
+ }
- final List<Profile> profiles = persistenceService.query(propertyCondition, "properties.firstVisit", Profile.class, 0, maxProfilesInOneMerge).getList();
+ // ProfileID changed we have a lot to do
+ // First check for privacy stuff (inherit from previous profile if necessary)
+ if (privacyService.isRequireAnonymousBrowsing(eventProfile)) {
+ privacyService.setRequireAnonymousBrowsing(mergedProfileId, true, event.getScope());
+ }
+ final boolean anonymousBrowsing = privacyService.isRequireAnonymousBrowsing(mergedProfileId);
- // Check if the user switched to another profile
- if (StringUtils.isNotEmpty(mergeProfilePreviousPropertyValue) && !mergeProfilePreviousPropertyValue.equals(mergeProfilePropertyValue)) {
- if (profiles.size() > 0) {
- // Take existing profile
- profile = profiles.get(0);
- } else {
- if (forceEventProfileAsMaster) {
- profile = event.getProfile();
- } else {
- // Create a new profile
- profile = new Profile(UUID.randomUUID().toString());
- profile.setProperty("firstVisit", event.getTimeStamp());
- }
- profile.getSystemProperties().put(mergeProfilePropertyName, mergeProfilePropertyValue);
- }
+ // Modify current session:
+ if (event.getSession() != null) {
+ event.getSession().setProfile(anonymousBrowsing ? privacyService.getAnonymousProfile(mergedProfile) : mergedProfile);
+ }
- logger.info("Different users, switch to " + profile.getItemId());
- // At the end of the merge, we must set the merged profile as profile event to process other Actions
- event.setProfileId(profile.getItemId());
- event.setProfile(profile);
+ // Modify current event:
+ event.setProfileId(anonymousBrowsing ? null : mergedProfileId);
+ event.setProfile(mergedProfile);
- if (currentSession != null) {
- currentSession.setProfile(profile);
- eventService.send(new Event("sessionReassigned", currentSession, profile, event.getScope(), event, currentSession,
- null, event.getTimeStamp(), false));
- }
+ event.getActionPostExecutors().add(() -> {
+ try {
+ // Save event, as we changed the profileId of the current event
+ if (event.isPersistent()) {
+ persistenceService.save(event);
+ }
- return EventService.PROFILE_UPDATED + EventService.SESSION_UPDATED;
- } else {
- // Store the merge property identifier in the profile
- profile.getSystemProperties().put(mergeProfilePropertyName, mergeProfilePropertyValue);
+ for (Profile profileToBeMerge : profilesToBeMerge) {
+ String profileToBeMergeId = profileToBeMerge.getItemId();
+ if (!StringUtils.equals(profileToBeMergeId, mergedProfileId)) {
+
+ // TODO (UNOMI-748): the following updates are asynchron due to usage of bulk processor in ElasticSearch persistence service update function.
+ // We could consider replacing those updates(one item at a time) by updateByQueryAndScript to avoid loading all the sessions/events in memory,
+ // but we would loose the asynchronous nature (By doing that request may take longer than before,
+ // and could potentially break client side timeouts on requests)
+ List<Event> oldEvents = persistenceService.query("profileId", profileToBeMergeId, null, Event.class);
+ for (Event oldEvent : oldEvents) {
+ if (!oldEvent.getItemId().equals(event.getItemId())) {
+ persistenceService.update(oldEvent, Event.class, "profileId", anonymousBrowsing ? null : mergedProfileId);
+ }
+ }
- // add current Profile to profiles to be merged
- boolean add = true;
- for (Profile p : profiles) {
- add = add && !StringUtils.equals(p.getItemId(), profile.getItemId());
- }
- if (add) {
- profiles.add(profile);
- }
+ // TODO (UNOMI-749): this is creating inconsistent sessions, they still contains old profile.
+ // And due to deserialization of sessions the profileId property will always be the one from profile stored in the session
+ List<Session> oldSessions = persistenceService.query("profileId", profileToBeMergeId, null, Session.class);
+ for (Session oldSession : oldSessions) {
+ if (!oldSession.getItemId().equals(event.getSession().getItemId())) {
+ persistenceService.update(oldSession, Session.class, "profileId", anonymousBrowsing ? null : mergedProfileId);
+ }
+ }
- if (profiles.size() == 1) {
- return StringUtils.isEmpty(mergeProfilePreviousPropertyValue) ? EventService.PROFILE_UPDATED : EventService.NO_CHANGE;
+ final String clientIdFromEvent = (String) event.getAttributes().get(Event.CLIENT_ID_ATTRIBUTE);
+ String clientId = clientIdFromEvent != null ? clientIdFromEvent : "defaultClientId";
+ profileService.addAliasToProfile(mergedProfileId, profileToBeMergeId, clientId);
+ if (profileService.load(profileToBeMergeId) != null) {
+ profileService.delete(profileToBeMergeId, false);
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.error("unable to execute callback action, profile and session will not be saved", e);
+ return false;
}
+ return true;
+ });
- Profile markedMasterProfile;
- if (forceEventProfileAsMaster)
- markedMasterProfile = event.getProfile();
- else
- markedMasterProfile = profiles.get(0);// Use oldest profile for master profile
-
- final Profile masterProfile = profileService.mergeProfiles(markedMasterProfile, profiles);
-
- // Profile has changed
- if (forceEventProfileAsMaster || !masterProfile.getItemId().equals(profileId)) {
+ return EventService.PROFILE_UPDATED + EventService.SESSION_UPDATED;
+ }
- final String masterProfileId = masterProfile.getItemId();
- // At the end of the merge, we must set the merged profile as profile event to process other Actions
- event.setProfileId(masterProfileId);
- event.setProfile(masterProfile);
+ private List<Profile> getProfilesToBeMerge(String mergeProfilePropertyName, String mergeProfilePropertyValue) {
+ Condition propertyCondition = new Condition(definitionsService.getConditionType("profilePropertyCondition"));
+ propertyCondition.setParameter("comparisonOperator", "equals");
+ propertyCondition.setParameter("propertyName", "systemProperties." + mergeProfilePropertyName);
+ propertyCondition.setParameter("propertyValue", mergeProfilePropertyValue);
- final Boolean anonymousBrowsing = privacyService.isRequireAnonymousBrowsing(masterProfileId);
+ return persistenceService.query(propertyCondition, "properties.firstVisit", Profile.class, 0, maxProfilesInOneMerge).getList();
+ }
- if (currentSession != null) {
- currentSession.setProfile(masterProfile);
- if (privacyService.isRequireAnonymousBrowsing(profile)) {
- privacyService.setRequireAnonymousBrowsing(masterProfileId, true, event.getScope());
- }
+ private void reassignSession(Event event, List<Profile> existingMergedProfiles, boolean forceEventProfileAsMaster, String mergePropName, String mergePropValue) {
+ Profile eventProfile = event.getProfile();
- if (anonymousBrowsing) {
- currentSession.setProfile(privacyService.getAnonymousProfile(masterProfile));
- event.setProfileId(null);
- persistenceService.save(event);
- }
- }
+ if (existingMergedProfiles.size() > 0) {
+ // Take existing profile
+ eventProfile = existingMergedProfiles.get(0);
+ } else {
+ if (!forceEventProfileAsMaster) {
+ // Create a new profile
+ eventProfile = new Profile(UUID.randomUUID().toString());
+ eventProfile.setProperty("firstVisit", event.getTimeStamp());
+ }
+ eventProfile.getSystemProperties().put(mergePropName, mergePropValue);
+ }
- event.getActionPostExecutors().add(new ActionPostExecutor() {
- @Override
- public boolean execute() {
- try {
- Event currentEvent = event;
- // Update current event explicitly, as it might not return from search query if there wasn't a refresh in ES
- if (!StringUtils.equals(profileId, masterProfileId)) {
- if (currentEvent.isPersistent()) {
- persistenceService.update(currentEvent, Event.class, "profileId", anonymousBrowsing ? null : masterProfileId);
- }
- }
+ logger.info("Different users, switch to " + eventProfile.getItemId());
+ // At the end of the merge, we must set the merged profile as profile event to process other Actions
+ event.setProfileId(eventProfile.getItemId());
+ event.setProfile(eventProfile);
- for (Profile profile : profiles) {
- String profileId = profile.getItemId();
- if (!StringUtils.equals(profileId, masterProfileId)) {
- // TODO consider udpate by query and/or script
- List<Session> sessions = persistenceService.query("profileId", profileId, null, Session.class);
- if (currentSession != null) {
- if (masterProfileId.equals(profileId) && !sessions.contains(currentSession)) {
- sessions.add(currentSession);
- }
- }
-
- for (Session session : sessions) {
- persistenceService.update(session, Session.class, "profileId", anonymousBrowsing ? null : masterProfileId);
- }
-
- // TODO consider udpate by query and/or script
- List<Event> events = persistenceService.query("profileId", profileId, null, Event.class);
- for (Event event : events) {
- if (!event.getItemId().equals(currentEvent.getItemId())) {
- persistenceService.update(event, Event.class, "profileId", anonymousBrowsing ? null : masterProfileId);
- }
- }
-
- final String clientIdFromEvent = (String) event.getAttributes().get(Event.CLIENT_ID_ATTRIBUTE);
- String clientId = clientIdFromEvent != null ? clientIdFromEvent : "defaultClientId";
- profileService.addAliasToProfile(masterProfileId, profile.getItemId(), clientId);
-
- boolean isExist = profileService.load(profile.getItemId()) != null;
- if (isExist) {
- profileService.delete(profileId, false);
- }
- }
- }
- } catch (Exception e) {
- logger.error("unable to execute callback action, profile and session will not be saved", e);
- return false;
- }
- return true;
- }
- });
- return EventService.PROFILE_UPDATED + EventService.SESSION_UPDATED;
- } else {
- return StringUtils.isEmpty(mergeProfilePreviousPropertyValue) ? EventService.PROFILE_UPDATED : EventService.NO_CHANGE;
- }
+ if (event.getSession() != null) {
+ Session eventSession = event.getSession();
+ eventSession.setProfile(eventProfile);
+ eventService.send(new Event("sessionReassigned", eventSession, eventProfile, event.getScope(), event, eventSession,
+ null, event.getTimeStamp(), false));
}
}
@@ -236,4 +215,4 @@ public class MergeProfilesOnPropertyAction implements ActionExecutor {
public void setMaxProfilesInOneMerge(String maxProfilesInOneMerge) {
this.maxProfilesInOneMerge = Integer.parseInt(maxProfilesInOneMerge);
}
-}
+}
\ No newline at end of file
diff --git a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
index cb34515c3..216261c5a 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
@@ -1016,7 +1016,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
if (batch.size() == segmentUpdateBatchSize || (!entryIterator.hasNext() && batch.size() > 0)) {
try {
- persistenceService.update(batch, null, Profile.class);
+ persistenceService.update(batch, Profile.class);
} catch (Exception e) {
logger.error("Error updating {} profiles for past event system properties", batch.size(), e);
} finally {
@@ -1115,7 +1115,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
Map<String, Object> propertiesToUpdate = buildPropertiesMapForUpdateSegment(profileToUpdate, segmentId, isAdd);
profileToPropertiesMap.put(profileToUpdate, propertiesToUpdate);
}
- List<String> failedItemsIds = persistenceService.update(profileToPropertiesMap, null, Profile.class);
+ List<String> failedItemsIds = persistenceService.update(profileToPropertiesMap, Profile.class);
if (failedItemsIds != null)
failedItemsIds.forEach(s -> retryFailedSegmentUpdate(s, segmentId, isAdd));
}