You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by sh...@apache.org on 2023/03/13 15:04:14 UTC

[unomi] branch master updated: UNOMI-727: adapt merge system to rollover (and cleanup too) (#588)

This is an automated email from the ASF dual-hosted git repository.

shuber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/unomi.git


The following commit(s) were added to refs/heads/master by this push:
     new 385699a5c UNOMI-727: adapt merge system to rollover (and cleanup too) (#588)
385699a5c is described below

commit 385699a5cdb771ac8b70d8ea493f43a849b78f86
Author: kevan Jahanshahi <jk...@apache.org>
AuthorDate: Mon Mar 13 16:04:07 2023 +0100

    UNOMI-727: adapt merge system to rollover (and cleanup too) (#588)
    
    * UNOMI-727: adapt merge system to rollover (and cleanup too)
    
    * UNOMI-727: adapt merge system to rollover (and cleanup too)
    
    * UNOMI-727: adapt merge system to rollover (and cleanup too)
---
 .../org/apache/unomi/itests/ProfileMergeIT.java    | 224 ++++++++++++++++--
 .../actions/MergeProfilesOnPropertyAction.java     | 261 ++++++++++-----------
 .../services/impl/segments/SegmentServiceImpl.java |   4 +-
 3 files changed, 322 insertions(+), 167 deletions(-)

diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java b/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java
index a5306d9f9..7815baccb 100644
--- a/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java
@@ -16,10 +16,7 @@
  */
 package org.apache.unomi.itests;
 
-import org.apache.unomi.api.Event;
-import org.apache.unomi.api.Metadata;
-import org.apache.unomi.api.Profile;
-import org.apache.unomi.api.ProfileAlias;
+import org.apache.unomi.api.*;
 import org.apache.unomi.api.actions.Action;
 import org.apache.unomi.api.conditions.Condition;
 import org.apache.unomi.api.rules.Rule;
@@ -49,14 +46,15 @@ public class ProfileMergeIT extends BaseIT {
     private final static String TEST_PROFILE_ID = "mergeOnPropertyTestProfileId";
 
     @After
-    public void after() {
+    public void after() throws InterruptedException {
         // cleanup created data
         rulesService.removeRule(TEST_RULE_ID);
+        removeItems(Profile.class, ProfileAlias.class, Event.class, Session.class);
     }
 
     @Test
     public void testProfileMergeOnPropertyAction_dont_forceEventProfileAsMaster() throws InterruptedException {
-        createAndWaitForRule(createMergeOnPropertyRule(false));
+        createAndWaitForRule(createMergeOnPropertyRule(false, "j:nodename"));
 
         // A new profile should be created.
         Assert.assertNotEquals(sendEvent().getProfile().getItemId(), TEST_PROFILE_ID);
@@ -64,29 +62,16 @@ public class ProfileMergeIT extends BaseIT {
 
     @Test
     public void testProfileMergeOnPropertyAction_forceEventProfileAsMaster() throws InterruptedException {
-        createAndWaitForRule(createMergeOnPropertyRule(true));
+        createAndWaitForRule(createMergeOnPropertyRule(true, "j:nodename"));
 
         // No new profile should be created, instead the profile of the event should be used.
         Assert.assertEquals(sendEvent().getProfile().getItemId(), TEST_PROFILE_ID);
     }
 
     @Test
-    public void test() throws InterruptedException {
+    public void testProfileMergeOnPropertyAction_simpleMergeAndCheckAlias() throws InterruptedException {
         // create rule
-        Condition condition = new Condition(definitionsService.getConditionType("eventTypeCondition"));
-        condition.setParameter("eventTypeId", TEST_EVENT_TYPE);
-
-        final Action action = new Action(definitionsService.getActionType("mergeProfilesOnPropertyAction"));
-        action.setParameter("mergeProfilePropertyValue", "eventProperty::target.properties(email)");
-        action.setParameter("mergeProfilePropertyName", "mergeIdentifier");
-        action.setParameter("forceEventProfileAsMaster", false);
-
-        Rule rule = new Rule();
-        rule.setMetadata(new Metadata(null, TEST_RULE_ID, TEST_RULE_ID, "Description"));
-        rule.setCondition(condition);
-        rule.setActions(Collections.singletonList(action));
-
-        createAndWaitForRule(rule);
+        createAndWaitForRule(createMergeOnPropertyRule(false, "email"));
 
         // create master profile
         Profile masterProfile = new Profile();
@@ -115,6 +100,14 @@ public class ProfileMergeIT extends BaseIT {
                 () -> persistenceService.getAllItems(ProfileAlias.class), (profileAliases) -> !profileAliases.isEmpty(),
                 DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
 
+        waitForNullValue("Profile with id eventProfileID not removed in the required time",
+                () -> persistenceService.load("eventProfileID", Profile.class),
+                DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+
+        keepTrying("Profile with id eventProfileID should still be accessible due to alias",
+                () -> profileService.load("eventProfileID"), Objects::nonNull,
+                DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+
         List<ProfileAlias> aliases = persistenceService.query("profileID", masterProfile.getItemId(), null, ProfileAlias.class);
 
         Assert.assertFalse(aliases.isEmpty());
@@ -123,6 +116,189 @@ public class ProfileMergeIT extends BaseIT {
         Assert.assertEquals("defaultClientId", aliases.get(0).getClientID());
     }
 
+
+    /**
+     * User switch case, this case can happen when a person (user A) is using the same browser session of a previous logged user (user B).
+     * user A will be using user B profile, but when user A is going to login by send a merge event, then we will detect that the mergeIdentifier is not the same
+     * In this case we will just switch user A profile to:
+     * - a new one, if it's the first time we encounter his own mergeIdentifier (TESTED in this scenario)
+     * - a previous one, if we already have a profile in DB with the same mergeIdentifier.
+     */
+    @Test
+    public void testProfileMergeOnPropertyAction_sessionReassigned_newProfile() throws InterruptedException {
+        // create rule
+        createAndWaitForRule(createMergeOnPropertyRule(false, "email"));
+
+        // create master profile
+        Profile masterProfile = new Profile();
+        masterProfile.setItemId("masterProfileID");
+        masterProfile.setProperty("email", "master@domain.com");
+        masterProfile.setSystemProperty("mergeIdentifier", "master@domain.com");
+        profileService.save(masterProfile);
+
+        keepTrying("Profile with id masterProfileID not found in the required time", () -> profileService.load("masterProfileID"),
+                Objects::nonNull, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+
+        // create event profile
+        Profile eventProfile = new Profile();
+        eventProfile.setItemId("eventProfileID");
+        eventProfile.setProperty("email", "event@domain.com");
+
+        Session simpleSession = new Session("simpleSession", eventProfile, new Date(), null);
+        Event event = new Event(TEST_EVENT_TYPE, simpleSession, masterProfile, null, null, eventProfile, new Date());
+        eventService.send(event);
+
+        // Session should have been reassign and a new profile should have been created ! (We call this user switch case)
+        Assert.assertNotNull(event.getProfile());
+        Assert.assertNotEquals("eventProfileID", event.getProfile().getItemId());
+        Assert.assertNotEquals("eventProfileID", event.getProfileId());
+        Assert.assertNotEquals("eventProfileID", event.getSession().getProfile().getItemId());
+        Assert.assertNotEquals("eventProfileID", event.getSession().getProfileId());
+
+        Assert.assertNotEquals("masterProfileID", event.getProfile().getItemId());
+        Assert.assertNotEquals("masterProfileID", event.getProfileId());
+        Assert.assertNotEquals("masterProfileID", event.getSession().getProfile().getItemId());
+        Assert.assertNotEquals("masterProfileID", event.getSession().getProfileId());
+
+        Assert.assertEquals(event.getSession().getProfileId(), event.getProfileId());
+        Assert.assertEquals("event@domain.com", event.getProfile().getSystemProperties().get("mergeIdentifier"));
+    }
+
+    /**
+     * User switch case, this case can happen when a person (user A) is using the same browser session of a previous logged user (user B).
+     * user A will be using user B profile, but when user A is going to login by send a merge event, then we will detect that the mergeIdentifier is not the same
+     * In this case we will just switch user A profile to:
+     * - a new one, if it's the first time we encounter his own mergeIdentifier
+     * - a previous one, if we already have a profile in DB with the same mergeIdentifier. (TESTED in this scenario)
+     */
+    @Test
+    public void testProfileMergeOnPropertyAction_sessionReassigned_existingProfile() throws InterruptedException {
+        // create rule
+        createAndWaitForRule(createMergeOnPropertyRule(false, "email"));
+
+        // create master profile
+        Profile masterProfile = new Profile();
+        masterProfile.setItemId("masterProfileID");
+        masterProfile.setProperty("email", "master@domain.com");
+        masterProfile.setSystemProperty("mergeIdentifier", "master@domain.com");
+        profileService.save(masterProfile);
+
+        // create a previous existing profile with same mergeIdentifier
+        Profile previousProfile = new Profile();
+        previousProfile.setItemId("previousProfileID");
+        previousProfile.setProperty("email", "event@domain.com");
+        previousProfile.setSystemProperty("mergeIdentifier", "event@domain.com");
+        profileService.save(previousProfile);
+
+        keepTrying("Profile with id masterProfileID not found in the required time", () -> profileService.load("masterProfileID"),
+                Objects::nonNull, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+        keepTrying("Profile with id previousProfileID not found in the required time", () -> profileService.load("previousProfileID"),
+                Objects::nonNull, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+
+        // create event profile
+        Profile eventProfile = new Profile();
+        eventProfile.setItemId("eventProfileID");
+        eventProfile.setProperty("email", "event@domain.com");
+
+        Session simpleSession = new Session("simpleSession", eventProfile, new Date(), null);
+        Event event = new Event(TEST_EVENT_TYPE, simpleSession, masterProfile, null, null, eventProfile, new Date());
+        eventService.send(event);
+
+        // Session should have been reassign and the previous existing profile for mergeIdentifier: event@domain.com should have been reuse
+        // Session should have been reassign and a new profile should have been created ! (We call this user switch case)
+        Assert.assertNotNull(event.getProfile());
+        Assert.assertEquals("previousProfileID", event.getProfile().getItemId());
+        Assert.assertEquals("previousProfileID", event.getProfileId());
+        Assert.assertEquals("previousProfileID", event.getSession().getProfile().getItemId());
+        Assert.assertEquals("previousProfileID", event.getSession().getProfileId());
+
+        Assert.assertEquals(event.getSession().getProfileId(), event.getProfileId());
+        Assert.assertEquals("event@domain.com", event.getProfile().getSystemProperties().get("mergeIdentifier"));
+    }
+
+    /**
+     * In case of merge, existing sessions/events from previous profileId should be rewritten to use the new master profileId
+     */
+    @Test
+    public void testProfileMergeOnPropertyAction_simpleMergeRewriteExistingSessionsEvents() throws InterruptedException {
+        Condition matchAll = new Condition(definitionsService.getConditionType("matchAllCondition"));
+        // create rule
+        createAndWaitForRule(createMergeOnPropertyRule(false, "email"));
+
+        // create master profile
+        Profile masterProfile = new Profile();
+        masterProfile.setItemId("masterProfileID");
+        masterProfile.setProperty("email", "username@domain.com");
+        masterProfile.setSystemProperty("mergeIdentifier", "username@domain.com");
+        profileService.save(masterProfile);
+
+        Profile eventProfile = new Profile();
+        eventProfile.setItemId("eventProfileID");
+        eventProfile.setProperty("email", "username@domain.com");
+        profileService.save(eventProfile);
+
+        // create 5 sessions and 5 events for master profile.
+        List<Session> sessionsToBeRewritten = new ArrayList<>();
+        List<Event> eventsToBeRewritten = new ArrayList<>();
+        for (int i = 1; i <= 5; i++) {
+            Session sessionToBeRewritten = new Session("simpleSession_"+ i, eventProfile, new Date(), null);
+            sessionsToBeRewritten.add(sessionToBeRewritten);
+            Event eventToBeRewritten = new Event("view", sessionToBeRewritten, eventProfile, null, null, null, new Date());
+            eventsToBeRewritten.add(eventToBeRewritten);
+
+
+            persistenceService.save(sessionToBeRewritten);
+            persistenceService.save(eventToBeRewritten);
+        }
+        keepTrying("Wait for sessions and events to be persisted", () -> persistenceService.queryCount(matchAll, Session.ITEM_TYPE) + persistenceService.queryCount(matchAll, Event.ITEM_TYPE),
+                (count) -> count == 10, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+        keepTrying("Profile with id masterProfileID not found in the required time", () -> profileService.load("masterProfileID"),
+                Objects::nonNull, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+        keepTrying("Profile with id eventProfileID not found in the required time", () -> profileService.load("eventProfileID"),
+                Objects::nonNull, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+
+        // Trigger the merge
+        Session simpleSession = new Session("simpleSession", eventProfile, new Date(), null);
+        Event mergeEvent = new Event(TEST_EVENT_TYPE, simpleSession, eventProfile, null, null, eventProfile, new Date());
+        eventService.send(mergeEvent);
+
+        // Check that master profile is now used:
+        Assert.assertNotNull(mergeEvent.getProfile());
+        Assert.assertEquals("masterProfileID", mergeEvent.getProfile().getItemId());
+        Assert.assertEquals("masterProfileID", mergeEvent.getProfileId());
+        Assert.assertEquals("masterProfileID", mergeEvent.getSession().getProfile().getItemId());
+        Assert.assertEquals("masterProfileID", mergeEvent.getSession().getProfileId());
+        Assert.assertEquals(mergeEvent.getSession().getProfileId(), mergeEvent.getProfileId());
+        Assert.assertEquals("username@domain.com", mergeEvent.getProfile().getSystemProperties().get("mergeIdentifier"));
+
+        // Check events are correctly rewritten
+        for (Event event : eventsToBeRewritten) {
+            keepTrying("Wait for event: " + event.getItemId() + " profileId to be rewritten for masterProfileID",
+                    () -> persistenceService.load(event.getItemId(), Event.class),
+                    (loadedEvent) -> loadedEvent.getProfileId().equals("masterProfileID"), DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+        }
+
+        // Check sessions are correctly rewritten
+        Condition sessionProfileIDRewrittenCondition = new Condition(definitionsService.getConditionType("sessionPropertyCondition"));
+        sessionProfileIDRewrittenCondition.setParameter("propertyName","profileId");
+        sessionProfileIDRewrittenCondition.setParameter("comparisonOperator","equals");
+        sessionProfileIDRewrittenCondition.setParameter("propertyValue","masterProfileID");
+        keepTrying("Wait for sessions profileId to be rewritten to masterProfileID",
+                () -> persistenceService.queryCount(sessionProfileIDRewrittenCondition, Session.ITEM_TYPE),
+                (count) -> count == 5, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+
+        // TODO uncomment this when UNOMI-749 is fixed, currently session loaded are inconsistent
+        /* for (Session session : sessionsToBeRewritten) {
+            keepTrying("Wait for session: " + session.getItemId() + " profileId to be rewritten for masterProfileID",
+                    () -> persistenceService.load(session.getItemId(), Session.class),
+                    (loadedSession) -> loadedSession.getProfileId().equals("masterProfileID"), DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+        } */
+    }
+
+    /**
+     * Personalization strategy have a specific handling during the merge of two profiles
+     * This test is here to ensure this specific behavior is correctly working.
+     */
     @Test
     public void testPersonalizationStrategyStatusMerge() {
         // create some statuses for the tests:
@@ -208,7 +384,7 @@ public class ProfileMergeIT extends BaseIT {
         return testEvent;
     }
 
-    private Rule createMergeOnPropertyRule(boolean forceEventProfileAsMaster) throws InterruptedException {
+    private Rule createMergeOnPropertyRule(boolean forceEventProfileAsMaster, String eventProperty) throws InterruptedException {
         Rule mergeOnPropertyTestRule = new Rule();
         mergeOnPropertyTestRule
                 .setMetadata(new Metadata(null, TEST_RULE_ID, TEST_RULE_ID, "Test rule for testing MergeProfilesOnPropertyAction"));
@@ -218,7 +394,7 @@ public class ProfileMergeIT extends BaseIT {
         mergeOnPropertyTestRule.setCondition(condition);
 
         final Action mergeProfilesOnPropertyAction = new Action(definitionsService.getActionType("mergeProfilesOnPropertyAction"));
-        mergeProfilesOnPropertyAction.setParameter("mergeProfilePropertyValue", "eventProperty::target.properties(j:nodename)");
+        mergeProfilesOnPropertyAction.setParameter("mergeProfilePropertyValue", "eventProperty::target.properties(" + eventProperty + ")");
         mergeProfilesOnPropertyAction.setParameter("mergeProfilePropertyName", "mergeIdentifier");
         mergeProfilesOnPropertyAction.setParameter("forceEventProfileAsMaster", forceEventProfileAsMaster);
         mergeOnPropertyTestRule.setActions(Collections.singletonList(mergeProfilesOnPropertyAction));
diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
index 2261a2a4b..6ecd0e797 100644
--- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
+++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
@@ -24,7 +24,6 @@ import org.apache.unomi.api.Profile;
 import org.apache.unomi.api.Session;
 import org.apache.unomi.api.actions.Action;
 import org.apache.unomi.api.actions.ActionExecutor;
-import org.apache.unomi.api.actions.ActionPostExecutor;
 import org.apache.unomi.api.conditions.Condition;
 import org.apache.unomi.api.services.*;
 import org.apache.unomi.persistence.spi.PersistenceService;
@@ -46,170 +45,150 @@ public class MergeProfilesOnPropertyAction implements ActionExecutor {
 
     public int execute(Action action, Event event) {
 
-        Profile profile = event.getProfile();
-        if (profile instanceof Persona || profile.isAnonymousProfile()) {
-            return EventService.NO_CHANGE;
-        }
+        Profile eventProfile = event.getProfile();
+        final String mergePropName = (String) action.getParameterValues().get("mergeProfilePropertyName");
+        final String mergePropValue = (String) action.getParameterValues().get("mergeProfilePropertyValue");
+        boolean forceEventProfileAsMaster = action.getParameterValues().containsKey("forceEventProfileAsMaster") ? (boolean) action.getParameterValues().get("forceEventProfileAsMaster") : false;
+        final String currentProfileMergeValue = (String) eventProfile.getSystemProperties().get(mergePropName);
 
-        final String mergeProfilePropertyName = (String) action.getParameterValues().get("mergeProfilePropertyName");
-        if (StringUtils.isEmpty(mergeProfilePropertyName)) {
+        if (eventProfile instanceof Persona || eventProfile.isAnonymousProfile() || StringUtils.isEmpty(mergePropName) ||
+                StringUtils.isEmpty(mergePropValue)) {
             return EventService.NO_CHANGE;
         }
 
-        final String mergeProfilePropertyValue = (String) action.getParameterValues().get("mergeProfilePropertyValue");
-        if (StringUtils.isEmpty(mergeProfilePropertyValue)) {
-            return EventService.NO_CHANGE;
+        final List<Profile> profilesToBeMerge = getProfilesToBeMerge(mergePropName, mergePropValue);
+
+        // Check if the user switched to another profile
+        if (StringUtils.isNotEmpty(currentProfileMergeValue) && !currentProfileMergeValue.equals(mergePropValue)) {
+            reassignSession(event, profilesToBeMerge, forceEventProfileAsMaster, mergePropName, mergePropValue);
+            return EventService.PROFILE_UPDATED + EventService.SESSION_UPDATED;
         }
 
-        final String mergeProfilePreviousPropertyValue = profile.getSystemProperties().get(mergeProfilePropertyName) != null ? profile.getSystemProperties().get(mergeProfilePropertyName).toString() : "";
+        // Store merge prop on current profile
+        boolean profileUpdated = false;
+        if (StringUtils.isEmpty(currentProfileMergeValue)) {
+            profileUpdated = true;
+            eventProfile.getSystemProperties().put(mergePropName, mergePropValue);
+        }
 
-        final Session currentSession = event.getSession();
+        // If not profiles to merge we are done here.
+        if (profilesToBeMerge.isEmpty()) {
+            return profileUpdated ? EventService.PROFILE_UPDATED : EventService.NO_CHANGE;
+        }
 
-        boolean forceEventProfileAsMaster = action.getParameterValues().containsKey("forceEventProfileAsMaster") ?
-                (boolean) action.getParameterValues().get("forceEventProfileAsMaster") : false;
+        // add current Profile to profiles to be merged
+        if (profilesToBeMerge.stream().noneMatch(p -> StringUtils.equals(p.getItemId(), eventProfile.getItemId()))) {
+            profilesToBeMerge.add(eventProfile);
+        }
 
-        // store the profile id in case the merge change it to a previous one
-        String profileId = profile.getItemId();
+        final String eventProfileId = eventProfile.getItemId();
+        final Profile mergedProfile = profileService.mergeProfiles(forceEventProfileAsMaster ? eventProfile : profilesToBeMerge.get(0), profilesToBeMerge);
+        final String mergedProfileId = mergedProfile.getItemId();
 
-        Condition propertyCondition = new Condition(definitionsService.getConditionType("profilePropertyCondition"));
-        propertyCondition.setParameter("comparisonOperator", "equals");
-        propertyCondition.setParameter("propertyName", "systemProperties." + mergeProfilePropertyName);
-        propertyCondition.setParameter("propertyValue", mergeProfilePropertyValue);
+        // Profile is still using the same profileId after being merged, no need to rewrite exists data, merge is done
+        if (!forceEventProfileAsMaster && mergedProfileId.equals(eventProfileId)) {
+            return profileUpdated ? EventService.PROFILE_UPDATED : EventService.NO_CHANGE;
+        }
 
-        final List<Profile> profiles = persistenceService.query(propertyCondition, "properties.firstVisit", Profile.class, 0, maxProfilesInOneMerge).getList();
+        // ProfileID changed we have a lot to do
+        // First check for privacy stuff (inherit from previous profile if necessary)
+        if (privacyService.isRequireAnonymousBrowsing(eventProfile)) {
+            privacyService.setRequireAnonymousBrowsing(mergedProfileId, true, event.getScope());
+        }
+        final boolean anonymousBrowsing = privacyService.isRequireAnonymousBrowsing(mergedProfileId);
 
-        // Check if the user switched to another profile
-        if (StringUtils.isNotEmpty(mergeProfilePreviousPropertyValue) && !mergeProfilePreviousPropertyValue.equals(mergeProfilePropertyValue)) {
-            if (profiles.size() > 0) {
-                // Take existing profile
-                profile = profiles.get(0);
-            } else {
-                if (forceEventProfileAsMaster) {
-                    profile = event.getProfile();
-                } else {
-                    // Create a new profile
-                    profile = new Profile(UUID.randomUUID().toString());
-                    profile.setProperty("firstVisit", event.getTimeStamp());
-                }
-                profile.getSystemProperties().put(mergeProfilePropertyName, mergeProfilePropertyValue);
-            }
+        // Modify current session:
+        if (event.getSession() != null) {
+            event.getSession().setProfile(anonymousBrowsing ? privacyService.getAnonymousProfile(mergedProfile) : mergedProfile);
+        }
 
-            logger.info("Different users, switch to " + profile.getItemId());
-            // At the end of the merge, we must set the merged profile as profile event to process other Actions
-            event.setProfileId(profile.getItemId());
-            event.setProfile(profile);
+        // Modify current event:
+        event.setProfileId(anonymousBrowsing ? null : mergedProfileId);
+        event.setProfile(mergedProfile);
 
-            if (currentSession != null) {
-                currentSession.setProfile(profile);
-                eventService.send(new Event("sessionReassigned", currentSession, profile, event.getScope(), event, currentSession,
-                        null, event.getTimeStamp(), false));
-            }
+        event.getActionPostExecutors().add(() -> {
+            try {
+                // Save event, as we changed the profileId of the current event
+                if (event.isPersistent()) {
+                    persistenceService.save(event);
+                }
 
-            return EventService.PROFILE_UPDATED + EventService.SESSION_UPDATED;
-        } else {
-            // Store the merge property identifier in the profile
-            profile.getSystemProperties().put(mergeProfilePropertyName, mergeProfilePropertyValue);
+                for (Profile profileToBeMerge : profilesToBeMerge) {
+                    String profileToBeMergeId = profileToBeMerge.getItemId();
+                    if (!StringUtils.equals(profileToBeMergeId, mergedProfileId)) {
+
+                        // TODO (UNOMI-748): the following updates are asynchron due to usage of bulk processor in ElasticSearch persistence service update function.
+                        //  We could consider replacing those updates(one item at a time) by updateByQueryAndScript to avoid loading all the sessions/events in memory,
+                        //  but we would loose the asynchronous nature (By doing that request may take longer than before,
+                        //  and could potentially break client side timeouts on requests)
+                        List<Event> oldEvents = persistenceService.query("profileId", profileToBeMergeId, null, Event.class);
+                        for (Event oldEvent : oldEvents) {
+                            if (!oldEvent.getItemId().equals(event.getItemId())) {
+                                persistenceService.update(oldEvent, Event.class, "profileId", anonymousBrowsing ? null : mergedProfileId);
+                            }
+                        }
 
-            // add current Profile to profiles to be merged
-            boolean add = true;
-            for (Profile p : profiles) {
-                add = add && !StringUtils.equals(p.getItemId(), profile.getItemId());
-            }
-            if (add) {
-                profiles.add(profile);
-            }
+                        // TODO (UNOMI-749): this is creating inconsistent sessions, they still contains old profile.
+                        //  And due to deserialization of sessions the profileId property will always be the one from profile stored in the session
+                        List<Session> oldSessions = persistenceService.query("profileId", profileToBeMergeId, null, Session.class);
+                        for (Session oldSession : oldSessions) {
+                            if (!oldSession.getItemId().equals(event.getSession().getItemId())) {
+                                persistenceService.update(oldSession, Session.class, "profileId", anonymousBrowsing ? null : mergedProfileId);
+                            }
+                        }
 
-            if (profiles.size() == 1) {
-                return StringUtils.isEmpty(mergeProfilePreviousPropertyValue) ? EventService.PROFILE_UPDATED : EventService.NO_CHANGE;
+                        final String clientIdFromEvent = (String) event.getAttributes().get(Event.CLIENT_ID_ATTRIBUTE);
+                        String clientId = clientIdFromEvent != null ? clientIdFromEvent : "defaultClientId";
+                        profileService.addAliasToProfile(mergedProfileId, profileToBeMergeId, clientId);
+                        if (profileService.load(profileToBeMergeId) != null) {
+                            profileService.delete(profileToBeMergeId, false);
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                logger.error("unable to execute callback action, profile and session will not be saved", e);
+                return false;
             }
+            return true;
+        });
 
-            Profile markedMasterProfile;
-            if (forceEventProfileAsMaster)
-                markedMasterProfile = event.getProfile();
-            else
-                markedMasterProfile = profiles.get(0);// Use oldest profile for master profile
-
-            final Profile masterProfile = profileService.mergeProfiles(markedMasterProfile, profiles);
-
-            // Profile has changed
-            if (forceEventProfileAsMaster || !masterProfile.getItemId().equals(profileId)) {
+        return EventService.PROFILE_UPDATED + EventService.SESSION_UPDATED;
+    }
 
-                final String masterProfileId = masterProfile.getItemId();
-                // At the end of the merge, we must set the merged profile as profile event to process other Actions
-                event.setProfileId(masterProfileId);
-                event.setProfile(masterProfile);
+    private List<Profile> getProfilesToBeMerge(String mergeProfilePropertyName, String mergeProfilePropertyValue) {
+        Condition propertyCondition = new Condition(definitionsService.getConditionType("profilePropertyCondition"));
+        propertyCondition.setParameter("comparisonOperator", "equals");
+        propertyCondition.setParameter("propertyName", "systemProperties." + mergeProfilePropertyName);
+        propertyCondition.setParameter("propertyValue", mergeProfilePropertyValue);
 
-                final Boolean anonymousBrowsing = privacyService.isRequireAnonymousBrowsing(masterProfileId);
+        return persistenceService.query(propertyCondition, "properties.firstVisit", Profile.class, 0, maxProfilesInOneMerge).getList();
+    }
 
-                if (currentSession != null) {
-                    currentSession.setProfile(masterProfile);
-                    if (privacyService.isRequireAnonymousBrowsing(profile)) {
-                        privacyService.setRequireAnonymousBrowsing(masterProfileId, true, event.getScope());
-                    }
+    private void reassignSession(Event event, List<Profile> existingMergedProfiles, boolean forceEventProfileAsMaster, String mergePropName, String mergePropValue) {
+        Profile eventProfile = event.getProfile();
 
-                    if (anonymousBrowsing) {
-                        currentSession.setProfile(privacyService.getAnonymousProfile(masterProfile));
-                        event.setProfileId(null);
-                        persistenceService.save(event);
-                    }
-                }
+        if (existingMergedProfiles.size() > 0) {
+            // Take existing profile
+            eventProfile = existingMergedProfiles.get(0);
+        } else {
+            if (!forceEventProfileAsMaster) {
+                // Create a new profile
+                eventProfile = new Profile(UUID.randomUUID().toString());
+                eventProfile.setProperty("firstVisit", event.getTimeStamp());
+            }
+            eventProfile.getSystemProperties().put(mergePropName, mergePropValue);
+        }
 
-                event.getActionPostExecutors().add(new ActionPostExecutor() {
-                    @Override
-                    public boolean execute() {
-                        try {
-                            Event currentEvent = event;
-                            // Update current event explicitly, as it might not return from search query if there wasn't a refresh in ES
-                            if (!StringUtils.equals(profileId, masterProfileId)) {
-                                if (currentEvent.isPersistent()) {
-                                    persistenceService.update(currentEvent, Event.class, "profileId", anonymousBrowsing ? null : masterProfileId);
-                                }
-                            }
+        logger.info("Different users, switch to " + eventProfile.getItemId());
+        // At the end of the merge, we must set the merged profile as profile event to process other Actions
+        event.setProfileId(eventProfile.getItemId());
+        event.setProfile(eventProfile);
 
-                            for (Profile profile : profiles) {
-                                String profileId = profile.getItemId();
-                                if (!StringUtils.equals(profileId, masterProfileId)) {
-                                    // TODO consider udpate by query and/or script
-                                    List<Session> sessions = persistenceService.query("profileId", profileId, null, Session.class);
-                                    if (currentSession != null) {
-                                        if (masterProfileId.equals(profileId) && !sessions.contains(currentSession)) {
-                                            sessions.add(currentSession);
-                                        }
-                                    }
-
-                                    for (Session session : sessions) {
-                                        persistenceService.update(session, Session.class, "profileId", anonymousBrowsing ? null : masterProfileId);
-                                    }
-
-                                    // TODO consider udpate by query and/or script
-                                    List<Event> events = persistenceService.query("profileId", profileId, null, Event.class);
-                                    for (Event event : events) {
-                                        if (!event.getItemId().equals(currentEvent.getItemId())) {
-                                            persistenceService.update(event, Event.class, "profileId", anonymousBrowsing ? null : masterProfileId);
-                                        }
-                                    }
-
-                                    final String clientIdFromEvent = (String) event.getAttributes().get(Event.CLIENT_ID_ATTRIBUTE);
-                                    String clientId = clientIdFromEvent != null ? clientIdFromEvent : "defaultClientId";
-                                    profileService.addAliasToProfile(masterProfileId, profile.getItemId(), clientId);
-
-                                    boolean isExist = profileService.load(profile.getItemId()) != null;
-                                    if (isExist) {
-                                        profileService.delete(profileId, false);
-                                    }
-                                }
-                            }
-                        } catch (Exception e) {
-                            logger.error("unable to execute callback action, profile and session will not be saved", e);
-                            return false;
-                        }
-                        return true;
-                    }
-                });
-                return EventService.PROFILE_UPDATED + EventService.SESSION_UPDATED;
-            } else {
-                return StringUtils.isEmpty(mergeProfilePreviousPropertyValue) ? EventService.PROFILE_UPDATED : EventService.NO_CHANGE;
-            }
+        if (event.getSession() != null) {
+            Session eventSession = event.getSession();
+            eventSession.setProfile(eventProfile);
+            eventService.send(new Event("sessionReassigned", eventSession, eventProfile, event.getScope(), event, eventSession,
+                    null, event.getTimeStamp(), false));
         }
     }
 
@@ -236,4 +215,4 @@ public class MergeProfilesOnPropertyAction implements ActionExecutor {
     public void setMaxProfilesInOneMerge(String maxProfilesInOneMerge) {
         this.maxProfilesInOneMerge = Integer.parseInt(maxProfilesInOneMerge);
     }
-}
+}
\ No newline at end of file
diff --git a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
index cb34515c3..216261c5a 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
@@ -1016,7 +1016,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
 
             if (batch.size() == segmentUpdateBatchSize || (!entryIterator.hasNext() && batch.size() > 0)) {
                 try {
-                    persistenceService.update(batch, null, Profile.class);
+                    persistenceService.update(batch, Profile.class);
                 } catch (Exception e) {
                     logger.error("Error updating {} profiles for past event system properties", batch.size(), e);
                 } finally {
@@ -1115,7 +1115,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
             Map<String, Object> propertiesToUpdate = buildPropertiesMapForUpdateSegment(profileToUpdate, segmentId, isAdd);
             profileToPropertiesMap.put(profileToUpdate, propertiesToUpdate);
         }
-        List<String> failedItemsIds = persistenceService.update(profileToPropertiesMap, null, Profile.class);
+        List<String> failedItemsIds = persistenceService.update(profileToPropertiesMap, Profile.class);
         if (failedItemsIds != null)
             failedItemsIds.forEach(s -> retryFailedSegmentUpdate(s, segmentId, isAdd));
     }