You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by jk...@apache.org on 2023/03/08 11:11:09 UTC

[unomi] branch master updated: UNOMI-737: indices reduction migration (ES document id suffixed to av… (#582)

This is an automated email from the ASF dual-hosted git repository.

jkevan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/unomi.git


The following commit(s) were added to refs/heads/master by this push:
     new f5eed2ccb UNOMI-737: indices reduction migration (ES document id suffixed to av… (#582)
f5eed2ccb is described below

commit f5eed2ccbe3b970ca91e1762e5a3e4722096fb9a
Author: kevan Jahanshahi <jk...@apache.org>
AuthorDate: Wed Mar 8 12:11:03 2023 +0100

    UNOMI-737: indices reduction migration (ES document id suffixed to av… (#582)
    
    * UNOMI-737: indices reduction migration (ES document id suffixed to avoid conflicts)
    
    * UNOMI-737: add integration tests
    
    * UNOMI-737: add integration test
    
    * UNOMI-737: un-skip a test about purge
---
 .../services/impl/GroovyActionsServiceImpl.java    |  25 +-
 .../test/java/org/apache/unomi/itests/BaseIT.java  |   6 -
 .../unomi/itests/GroovyActionsServiceIT.java       |   2 +-
 .../org/apache/unomi/itests/ProfileServiceIT.java  |   1 -
 .../unomi/itests/migration/Migrate16xTo220IT.java  |  45 +++-
 .../ElasticSearchPersistenceServiceImpl.java       | 298 ++++++++++-----------
 .../META-INF/cxs/mappings/personaSession.json      |  41 +++
 .../unomi/persistence/spi/PersistenceService.java  |  25 +-
 .../services/impl/profiles/ProfileServiceImpl.java |  46 +---
 .../services/impl/rules/RulesServiceImpl.java      |  22 +-
 .../migrate-2.2.0-05-indicesReduction.groovy       |  58 ++--
 11 files changed, 264 insertions(+), 305 deletions(-)

diff --git a/extensions/groovy-actions/services/src/main/java/org/apache/unomi/groovy/actions/services/impl/GroovyActionsServiceImpl.java b/extensions/groovy-actions/services/src/main/java/org/apache/unomi/groovy/actions/services/impl/GroovyActionsServiceImpl.java
index 5761cc1c5..7b215c187 100644
--- a/extensions/groovy-actions/services/src/main/java/org/apache/unomi/groovy/actions/services/impl/GroovyActionsServiceImpl.java
+++ b/extensions/groovy-actions/services/src/main/java/org/apache/unomi/groovy/actions/services/impl/GroovyActionsServiceImpl.java
@@ -79,7 +79,6 @@ public class GroovyActionsServiceImpl implements GroovyActionsService {
     private static final Logger logger = LoggerFactory.getLogger(GroovyActionsServiceImpl.class.getName());
 
     private static final String BASE_SCRIPT_NAME = "BaseScript";
-    private static final String GROOVY_SOURCE_CODE_ID_SUFFIX = "-groovySourceCode";
 
     private DefinitionsService definitionsService;
     private PersistenceService persistenceService;
@@ -217,21 +216,20 @@ public class GroovyActionsServiceImpl implements GroovyActionsService {
 
     @Override
     public void remove(String id) {
-        String groovySourceCodeId = getGroovyCodeSourceIdForActionId(id);
-        if (groovyCodeSourceMap.containsKey(groovySourceCodeId)) {
+        if (groovyCodeSourceMap.containsKey(id)) {
             try {
                 definitionsService.removeActionType(
-                        groovyShell.parse(groovyCodeSourceMap.get(groovySourceCodeId)).getClass().getMethod("execute").getAnnotation(Action.class).id());
+                        groovyShell.parse(groovyCodeSourceMap.get(id)).getClass().getMethod("execute").getAnnotation(Action.class).id());
             } catch (NoSuchMethodException e) {
                 logger.error("Failed to delete the action type for the id {}", id, e);
             }
-            persistenceService.remove(groovySourceCodeId, GroovyAction.class);
+            persistenceService.remove(id, GroovyAction.class);
         }
     }
 
     @Override
     public GroovyCodeSource getGroovyCodeSource(String id) {
-        return groovyCodeSourceMap.get(getGroovyCodeSourceIdForActionId(id));
+        return groovyCodeSourceMap.get(id);
     }
 
     /**
@@ -245,21 +243,10 @@ public class GroovyActionsServiceImpl implements GroovyActionsService {
         return new GroovyCodeSource(groovyScript, actionName, "/groovy/script");
     }
 
-    /**
-     * We use a suffix for avoiding id conflict between the actionType and the groovyAction in ElasticSearch
-     * Since those items are now stored in the same ES index
-     * @param actionName name/id of the actionType
-     * @return id of the groovyAction source code for query/save/storage usage.
-     */
-    private String getGroovyCodeSourceIdForActionId(String actionName) {
-        return actionName + GROOVY_SOURCE_CODE_ID_SUFFIX;
-    }
-
     private void saveScript(String actionName, String script) {
-        String groovyName = getGroovyCodeSourceIdForActionId(actionName);
-        GroovyAction groovyScript = new GroovyAction(groovyName, script);
+        GroovyAction groovyScript = new GroovyAction(actionName, script);
         persistenceService.save(groovyScript);
-        logger.info("The script {} has been persisted.", groovyName);
+        logger.info("The script {} has been persisted.", actionName);
     }
 
     private void refreshGroovyActions() {
diff --git a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
index 1f5ebf1a4..7e4e20cfa 100644
--- a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
@@ -214,12 +214,6 @@ public abstract class BaseIT extends KarafTestSupport {
         refreshPersistence();
     }
 
-    protected void recreateIndex(final String itemType) {
-        if (persistenceService.removeIndex(itemType)) {
-            persistenceService.createIndex(itemType);
-        }
-    }
-
     protected void refreshPersistence() throws InterruptedException {
         persistenceService.refresh();
         Thread.sleep(1000);
diff --git a/itests/src/test/java/org/apache/unomi/itests/GroovyActionsServiceIT.java b/itests/src/test/java/org/apache/unomi/itests/GroovyActionsServiceIT.java
index 7fecea183..f86ff7f7e 100644
--- a/itests/src/test/java/org/apache/unomi/itests/GroovyActionsServiceIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/GroovyActionsServiceIT.java
@@ -123,7 +123,7 @@ public class GroovyActionsServiceIT extends BaseIT {
         GroovyCodeSource groovyCodeSource = keepTrying("Failed waiting for the creation of the GroovyAction for the save test",
                 () -> groovyActionsService.getGroovyCodeSource(UPDATE_ADDRESS_ACTION), Objects::nonNull, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
 
-        Assert.assertEquals(UPDATE_ADDRESS_ACTION + "-groovySourceCode", groovyActionsService.getGroovyCodeSource(UPDATE_ADDRESS_ACTION).getName());
+        Assert.assertEquals(UPDATE_ADDRESS_ACTION, groovyActionsService.getGroovyCodeSource(UPDATE_ADDRESS_ACTION).getName());
 
         Assert.assertTrue(actionType.getMetadata().getId().contains(UPDATE_ADDRESS_GROOVY_ACTION));
         Assert.assertEquals(2, actionType.getMetadata().getSystemTags().size());
diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
index 455ab9073..67cc5c16e 100644
--- a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
@@ -331,7 +331,6 @@ public class ProfileServiceIT extends BaseIT {
     }
 
     @Test
-    @Ignore // TODO - fix test  https://issues.apache.org/jira/browse/UNOMI-726
     public void testMonthlyIndicesPurge() throws Exception {
         Date currentDate = new Date();
         LocalDateTime minus10Months = LocalDateTime.ofInstant(currentDate.toInstant(), ZoneId.systemDefault()).minusMonths(10);
diff --git a/itests/src/test/java/org/apache/unomi/itests/migration/Migrate16xTo220IT.java b/itests/src/test/java/org/apache/unomi/itests/migration/Migrate16xTo220IT.java
index 8ae93daf4..7dc321268 100644
--- a/itests/src/test/java/org/apache/unomi/itests/migration/Migrate16xTo220IT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/migration/Migrate16xTo220IT.java
@@ -40,7 +40,10 @@ public class Migrate16xTo220IT extends BaseIT {
     private int sessionCount = 0;
 
     private static final int NUMBER_DUPLICATE_SESSIONS = 3;
-    private static final int NUMBER_PERSONA_SESSIONS = 2;
+    private static final List<String> oldSystemItemsIndices = Arrays.asList("context-actiontype", "context-campaign", "context-campaignevent", "context-goal",
+            "context-userlist", "context-propertytype", "context-scope", "context-conditiontype", "context-rule", "context-scoring", "context-segment", "context-groovyaction", "context-topic",
+            "context-patch", "context-jsonschema", "context-importconfig", "context-exportconfig", "context-rulestats");
+
     @Override
     @Before
     public void waitForStartup() throws InterruptedException {
@@ -56,8 +59,9 @@ public class Migrate16xTo220IT extends BaseIT {
             }
             // Restore the snapshot
             HttpUtils.executePostRequest(httpClient, "http://localhost:9400/_snapshot/snapshots_repository/snapshot_1.6.x/_restore?wait_for_completion=true", "{}", null);
-            fillNumberEventAndSessionBeforeMigration(httpClient);
 
+            // Get initial counts of items to compare after migration
+            initCounts(httpClient);
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
@@ -93,6 +97,7 @@ public class Migrate16xTo220IT extends BaseIT {
         checkEventTypesNotPersistedAnymore();
         checkForMappingUpdates();
         checkEventSessionRollover2_2_0();
+        checkIndexReductions2_2_0();
     }
 
     /**
@@ -107,17 +112,25 @@ public class Migrate16xTo220IT extends BaseIT {
 
         int newEventcount = 0;
         for (String eventIndex : MigrationUtils.getIndexesPrefixedBy(httpClient, "http://localhost:9400", "context-event-0")) {
-            JsonNode jsonNode = objectMapper.readTree(HttpUtils.executePostRequest(httpClient, "http://localhost:9400" + "/" + eventIndex + "/_count", resourceAsString("migration/match_all_body_request.json"), null));
-            newEventcount += jsonNode.get("count").asInt();
+            newEventcount += countItems(httpClient, eventIndex, null);
         }
 
         int newSessioncount = 0;
         for (String sessionIndex : MigrationUtils.getIndexesPrefixedBy(httpClient, "http://localhost:9400", "context-session-0")) {
-            JsonNode jsonNode = objectMapper.readTree(HttpUtils.executePostRequest(httpClient, "http://localhost:9400" + "/" + sessionIndex + "/_count", resourceAsString("migration/match_all_body_request.json"), null));
-            newSessioncount += jsonNode.get("count").asInt();
+            newSessioncount += countItems(httpClient, sessionIndex, null);
         }
         Assert.assertEquals(eventCount, newEventcount);
-        Assert.assertEquals(sessionCount - NUMBER_DUPLICATE_SESSIONS + NUMBER_PERSONA_SESSIONS, newSessioncount);
+        Assert.assertEquals(sessionCount - NUMBER_DUPLICATE_SESSIONS, newSessioncount);
+    }
+
+    private void checkIndexReductions2_2_0() throws IOException {
+        // new index for system items:
+        Assert.assertTrue(MigrationUtils.indexExists(httpClient, "http://localhost:9400", "context-systemitems"));
+
+        // old indices should be removed:
+        for (String oldSystemItemsIndex : oldSystemItemsIndices) {
+            Assert.assertFalse(MigrationUtils.indexExists(httpClient, "http://localhost:9400", oldSystemItemsIndex));
+        }
     }
 
     /**
@@ -298,21 +311,25 @@ public class Migrate16xTo220IT extends BaseIT {
         Assert.assertNull(persistenceService.load(masterProfile, ProfileAlias.class));
     }
 
-    private void fillNumberEventAndSessionBeforeMigration(CloseableHttpClient httpClient) {
+    private void initCounts(CloseableHttpClient httpClient) {
         try {
             for (String eventIndex : MigrationUtils.getIndexesPrefixedBy(httpClient, "http://localhost:9400", "context-event-date")) {
-                JsonNode jsonNode = objectMapper.readTree(HttpUtils.executePostRequest(httpClient, "http://localhost:9400" + "/" + eventIndex + "/_count", resourceAsString("migration/must_not_match_some_eventype_body.json"), null));
-                eventCount += jsonNode.get("count").asInt();
+                eventCount += countItems(httpClient, eventIndex, resourceAsString("migration/must_not_match_some_eventype_body.json"));
             }
 
-            for (String eventIndex : MigrationUtils.getIndexesPrefixedBy(httpClient, "http://localhost:9400", "context-session-date")) {
-                JsonNode jsonNode = objectMapper.readTree(HttpUtils.executePostRequest(httpClient, "http://localhost:9400" + "/" + eventIndex + "/_count", resourceAsString("migration/match_all_body_request.json"), null));
-                sessionCount += jsonNode.get("count").asInt();
+            for (String sessionIndex : MigrationUtils.getIndexesPrefixedBy(httpClient, "http://localhost:9400", "context-session-date")) {
+                sessionCount += countItems(httpClient, sessionIndex, null);
             }
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
+    }
 
-
+    private int countItems(CloseableHttpClient httpClient, String index, String requestBody) throws IOException {
+        if (requestBody == null) {
+            requestBody = resourceAsString("migration/must_not_match_some_eventype_body.json");
+        }
+        JsonNode jsonNode = objectMapper.readTree(HttpUtils.executePostRequest(httpClient, "http://localhost:9400" + "/" + index + "/_count", requestBody, null));
+        return jsonNode.get("count").asInt();
     }
 }
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index d14d284d9..f0929887b 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -148,22 +148,9 @@ import java.security.KeyManagementException;
 import java.security.NoSuchAlgorithmException;
 import java.security.SecureRandom;
 import java.security.cert.X509Certificate;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 import static org.elasticsearch.index.query.QueryBuilders.termQuery;
 
@@ -249,31 +236,16 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
     private Map<String, Map<String, Map<String, Object>>> knownMappings = new HashMap<>();
 
     private static final Map<String, String> itemTypeIndexNameMap = new HashMap<>();
+    private static final Collection<String> systemItems = Arrays.asList("actionType", "campaign", "campaignevent", "goal",
+            "userList", "propertyType", "scope", "conditionType", "rule", "scoring", "segment", "groovyAction", "topic",
+            "patch", "jsonSchema", "importConfig", "exportConfig", "rulestats");
     static {
-        itemTypeIndexNameMap.put("actionType", "systemItems");
-        itemTypeIndexNameMap.put("campaign", "systemItems");
-        itemTypeIndexNameMap.put("campaignevent", "systemItems");
-        itemTypeIndexNameMap.put("goal", "systemItems");
-        itemTypeIndexNameMap.put("userList", "systemItems");
-        itemTypeIndexNameMap.put("propertyType", "systemItems");
-        itemTypeIndexNameMap.put("scope", "systemItems");
-        itemTypeIndexNameMap.put("conditionType", "systemItems");
-        itemTypeIndexNameMap.put("rule", "systemItems");
-        itemTypeIndexNameMap.put("scoring", "systemItems");
-        itemTypeIndexNameMap.put("segment", "systemItems");
-        itemTypeIndexNameMap.put("groovyAction", "systemItems");
-        itemTypeIndexNameMap.put("topic", "systemItems");
-        itemTypeIndexNameMap.put("patch", "systemItems");
-        itemTypeIndexNameMap.put("jsonSchema", "systemItems");
-        itemTypeIndexNameMap.put("importConfig", "systemItems");
-        itemTypeIndexNameMap.put("exportConfig", "systemItems");
-        itemTypeIndexNameMap.put("rulestats", "systemItems");
+        for (String systemItem : systemItems) {
+            itemTypeIndexNameMap.put(systemItem, "systemItems");
+        }
 
         itemTypeIndexNameMap.put("profile", "profile");
         itemTypeIndexNameMap.put("persona", "profile");
-
-        itemTypeIndexNameMap.put("session", "session");
-        itemTypeIndexNameMap.put("personaSession", "session");
     }
 
     public void setBundleContext(BundleContext bundleContext) {
@@ -840,19 +812,20 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                     if (customItemType != null) {
                         itemType = customItemType;
                     }
+                    String documentId = getDocumentIDForItemType(itemId, itemType);
 
-                    String affinityIndex = "session".equals(itemType) && sessionAffinityCache.containsKey(itemId) ? sessionAffinityCache.get(itemId) : null;
+                    String affinityIndex = "session".equals(itemType) && sessionAffinityCache.containsKey(documentId) ? sessionAffinityCache.get(documentId) : null;
                     if (affinityIndex == null && isItemTypeRollingOver(itemType)) {
                         return new MetricAdapter<T>(metricsService, ".loadItemWithQuery") {
                             @Override
                             public T execute(Object... args) throws Exception {
                                 if (customItemType == null) {
-                                    PartialList<T> r = query(QueryBuilders.idsQuery().addIds(itemId), null, clazz, 0, 1, null, null);
+                                    PartialList<T> r = query(QueryBuilders.idsQuery().addIds(documentId), null, clazz, 0, 1, null, null);
                                     if (r.size() > 0) {
                                         return r.get(0);
                                     }
                                 } else {
-                                    PartialList<CustomItem> r = query(QueryBuilders.idsQuery().addIds(itemId), null, customItemType, 0, 1, null, null);
+                                    PartialList<CustomItem> r = query(QueryBuilders.idsQuery().addIds(documentId), null, customItemType, 0, 1, null, null);
                                     if (r.size() > 0) {
                                         return (T) r.get(0);
                                     }
@@ -861,12 +834,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                             }
                         }.execute();
                     } else {
-                        GetRequest getRequest = new GetRequest(affinityIndex != null ? affinityIndex : getIndex(itemType), itemId);
+                        GetRequest getRequest = new GetRequest(affinityIndex != null ? affinityIndex : getIndex(itemType), documentId);
                         GetResponse response = client.get(getRequest, RequestOptions.DEFAULT);
                         if (response.isExists()) {
                             String sourceAsString = response.getSourceAsString();
                             final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
-                            setMetadata(value, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), response.getIndex());
+                            setMetadata(value, response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), response.getIndex());
                             return value;
                         } else {
                             return null;
@@ -889,15 +862,11 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
 
     }
 
-    private void setMetadata(Item item, String id, long version, long seqNo, long primaryTerm, String index) {
-        item.setItemId(id);
+    private void setMetadata(Item item, long version, long seqNo, long primaryTerm, String index) {
         item.setVersion(version);
         item.setSystemMetadata(SEQ_NO, seqNo);
         item.setSystemMetadata(PRIMARY_TERM, primaryTerm);
         item.setSystemMetadata("index", index);
-        if (item.getItemType().equals("session") && !sessionAffinityCache.containsKey(id)) {
-            sessionAffinityCache.put(id, index);
-        }
     }
 
     @Override
@@ -925,17 +894,14 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                 try {
                     String source = ESCustomObjectMapper.getObjectMapper().writeValueAsString(item);
                     String itemType = item.getItemType();
-                    String className = item.getClass().getName();
                     if (item instanceof CustomItem) {
                         itemType = ((CustomItem) item).getCustomItemType();
-                        className = CustomItem.class.getName() + "." + itemType;
                     }
-                    String itemId = item.getItemId();
-                    String index = item.getSystemMetadata("index") != null ?
-                            (String) item.getSystemMetadata("index") :
-                            getIndex(itemType);
+                    String documentId = getDocumentIDForItemType(item.getItemId(), itemType);
+                    String index = item.getSystemMetadata("index") != null ? (String) item.getSystemMetadata("index") : getIndex(itemType);
+
                     IndexRequest indexRequest = new IndexRequest(index);
-                    indexRequest.id(itemId);
+                    indexRequest.id(documentId);
                     indexRequest.source(source, XContentType.JSON);
 
                     if (!alwaysOverwrite) {
@@ -958,13 +924,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                         if (bulkProcessor == null || !useBatching) {
                             indexRequest.setRefreshPolicy(getRefreshPolicy(itemType));
                             IndexResponse response = client.index(indexRequest, RequestOptions.DEFAULT);
-                            setMetadata(item, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), response.getIndex());
+                            setMetadata(item, response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), response.getIndex());
                         } else {
                             bulkProcessor.add(indexRequest);
                         }
                     } catch (IndexNotFoundException e) {
-                        logger.error("Could not find index {}, could not register item type {} with id {} ",
-                                index, itemType, itemId, e);
+                        logger.error("Could not find index {}, could not register item type {} with id {} ", index, itemType, item.getItemId(), e);
                         return false;
                     }
                     return true;
@@ -1015,7 +980,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
 
                     if (bulkProcessor == null || !useBatchingForUpdate) {
                         UpdateResponse response = client.update(updateRequest, RequestOptions.DEFAULT);
-                        setMetadata(item, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), response.getIndex());
+                        setMetadata(item, response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), response.getIndex());
                     } else {
                         bulkProcessor.add(updateRequest);
                     }
@@ -1034,7 +999,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
 
     private UpdateRequest createUpdateRequest(Class clazz, Item item, Map source, boolean alwaysOverwrite) {
         String itemType = Item.getItemType(clazz);
-        UpdateRequest updateRequest = new UpdateRequest(getIndex(itemType), item.getItemId());
+        String documentId = getDocumentIDForItemType(item.getItemId(), itemType);
+        String index = getIndex(itemType);
+
+        UpdateRequest updateRequest = new UpdateRequest(index, documentId);
         updateRequest.doc(source);
 
         if (!alwaysOverwrite) {
@@ -1115,7 +1083,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
             protected Boolean execute(Object... args) throws Exception {
                 try {
                     String itemType = Item.getItemType(clazz);
-
                     String index = getIndex(itemType);
 
                     for (int i = 0; i < scripts.length; i++) {
@@ -1213,12 +1180,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
             protected Boolean execute(Object... args) throws Exception {
                 try {
                     String itemType = Item.getItemType(clazz);
-
                     String index = getIndex(itemType);
+                    String documentId = getDocumentIDForItemType(item.getItemId(), itemType);
 
                     Script actualScript = new Script(ScriptType.INLINE, "painless", script, scriptParams);
 
-                    UpdateRequest updateRequest = new UpdateRequest(index, item.getItemId());
+                    UpdateRequest updateRequest = new UpdateRequest(index, documentId);
 
                     Long seqNo = (Long) item.getSystemMetadata(SEQ_NO);
                     Long primaryTerm = (Long) item.getSystemMetadata(PRIMARY_TERM);
@@ -1230,7 +1197,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                     updateRequest.script(actualScript);
                     if (bulkProcessor == null) {
                         UpdateResponse response = client.update(updateRequest, RequestOptions.DEFAULT);
-                        setMetadata(item, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), response.getIndex());
+                        setMetadata(item, response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), response.getIndex());
                     } else {
                         bulkProcessor.add(updateRequest);
                     }
@@ -1266,8 +1233,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                     if (customItemType != null) {
                         itemType = customItemType;
                     }
+                    String documentId = getDocumentIDForItemType(itemId, itemType);
+                    String index = getIndexNameForQuery(itemType);
 
-                    DeleteRequest deleteRequest = new DeleteRequest(getIndexNameForQuery(itemType), itemId);
+                    DeleteRequest deleteRequest = new DeleteRequest(index, documentId);
                     client.delete(deleteRequest, RequestOptions.DEFAULT);
                     return true;
                 } catch (Exception e) {
@@ -1285,78 +1254,81 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
     public <T extends Item> boolean removeByQuery(final Condition query, final Class<T> clazz) {
         Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeByQuery", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
             protected Boolean execute(Object... args) throws Exception {
-                try {
-                    String itemType = Item.getItemType(clazz);
-                    QueryBuilder queryBuilder = conditionESQueryBuilderDispatcher.getQueryBuilder(query);
-                    final DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(getIndexNameForQuery(itemType))
-                            .setQuery(isItemTypeSharingIndex(itemType) ? wrapWithItemTypeQuery(itemType, queryBuilder) : queryBuilder)
-                            // Setting slices to auto will let Elasticsearch choose the number of slices to use.
-                            // This setting will use one slice per shard, up to a certain limit.
-                            // The delete request will be more efficient and faster than no slicing.
-                            .setSlices(AbstractBulkByScrollRequest.AUTO_SLICES)
-                            // Elasticsearch takes a snapshot of the index when you hit delete by query request and uses the _version of the documents to process the request.
-                            // If a document gets updated in the meantime, it will result in a version conflict error and the delete operation will fail.
-                            // So we explicitly set the conflict strategy to proceed in case of version conflict.
-                            .setAbortOnVersionConflict(false)
-                            // Remove by Query is mostly used for purge and cleaning up old data
-                            // It's mostly used in jobs/timed tasks so we don't really care about long request
-                            // So we increase default timeout of 1min to 10min
-                            .setTimeout(TimeValue.timeValueMinutes(removeByQueryTimeoutInMinutes));
-
-                    BulkByScrollResponse bulkByScrollResponse = client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
-
-                    if (bulkByScrollResponse == null) {
-                        logger.error("Remove by query: no response returned for query: {}", query);
-                        return false;
-                    }
+                QueryBuilder queryBuilder = conditionESQueryBuilderDispatcher.getQueryBuilder(query);
+                return removeByQuery(queryBuilder, clazz);
+            }
+        }.catchingExecuteInClassLoader(true);
+        if (result == null) {
+            return false;
+        } else {
+            return result;
+        }
+    }
 
-                    if (bulkByScrollResponse.isTimedOut()) {
-                        logger.warn("Remove by query: timed out because took more than {} minutes for query: {}", removeByQueryTimeoutInMinutes, query);
-                    }
+    public <T extends Item> boolean removeByQuery(QueryBuilder queryBuilder, final Class<T> clazz) throws Exception {
+        try {
+            String itemType = Item.getItemType(clazz);
+            final DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(getIndexNameForQuery(itemType))
+                    .setQuery(isItemTypeSharingIndex(itemType) ? wrapWithItemTypeQuery(itemType, queryBuilder) : queryBuilder)
+                    // Setting slices to auto will let Elasticsearch choose the number of slices to use.
+                    // This setting will use one slice per shard, up to a certain limit.
+                    // The delete request will be more efficient and faster than no slicing.
+                    .setSlices(AbstractBulkByScrollRequest.AUTO_SLICES)
+                    // Elasticsearch takes a snapshot of the index when you hit delete by query request and uses the _version of the documents to process the request.
+                    // If a document gets updated in the meantime, it will result in a version conflict error and the delete operation will fail.
+                    // So we explicitly set the conflict strategy to proceed in case of version conflict.
+                    .setAbortOnVersionConflict(false)
+                    // Remove by Query is mostly used for purge and cleaning up old data
+                    // It's mostly used in jobs/timed tasks so we don't really care about long request
+                    // So we increase default timeout of 1min to 10min
+                    .setTimeout(TimeValue.timeValueMinutes(removeByQueryTimeoutInMinutes));
+
+            BulkByScrollResponse bulkByScrollResponse = client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
+
+            if (bulkByScrollResponse == null) {
+                logger.error("Remove by query: no response returned for query: {}", queryBuilder);
+                return false;
+            }
 
-                    if ((bulkByScrollResponse.getSearchFailures() != null && bulkByScrollResponse.getSearchFailures().size() > 0) ||
-                            bulkByScrollResponse.getBulkFailures() != null && bulkByScrollResponse.getBulkFailures().size() > 0) {
-                        logger.warn("Remove by query: we found some failure during the process of query: {}", query);
+            if (bulkByScrollResponse.isTimedOut()) {
+                logger.warn("Remove by query: timed out because took more than {} minutes for query: {}", removeByQueryTimeoutInMinutes, queryBuilder);
+            }
 
-                        if (bulkByScrollResponse.getSearchFailures() != null && bulkByScrollResponse.getSearchFailures().size() > 0) {
-                            for (ScrollableHitSource.SearchFailure searchFailure : bulkByScrollResponse.getSearchFailures()) {
-                                logger.warn("Remove by query, search failure: {}", searchFailure.toString());
-                            }
-                        }
+            if ((bulkByScrollResponse.getSearchFailures() != null && bulkByScrollResponse.getSearchFailures().size() > 0) ||
+                    bulkByScrollResponse.getBulkFailures() != null && bulkByScrollResponse.getBulkFailures().size() > 0) {
+                logger.warn("Remove by query: we found some failure during the process of query: {}", queryBuilder);
 
-                        if (bulkByScrollResponse.getBulkFailures() != null && bulkByScrollResponse.getBulkFailures().size() > 0) {
-                            for (BulkItemResponse.Failure bulkFailure : bulkByScrollResponse.getBulkFailures()) {
-                                logger.warn("Remove by query, bulk failure: {}", bulkFailure.toString());
-                            }
-                        }
+                if (bulkByScrollResponse.getSearchFailures() != null && bulkByScrollResponse.getSearchFailures().size() > 0) {
+                    for (ScrollableHitSource.SearchFailure searchFailure : bulkByScrollResponse.getSearchFailures()) {
+                        logger.warn("Remove by query, search failure: {}", searchFailure.toString());
                     }
+                }
 
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("Remove by query: took {}, deleted docs: {}, batches executed: {}, skipped docs: {}, version conflicts: {}, search retries: {}, bulk retries: {}, for query: {}",
-                                bulkByScrollResponse.getTook().toHumanReadableString(1),
-                                bulkByScrollResponse.getDeleted(),
-                                bulkByScrollResponse.getBatches(),
-                                bulkByScrollResponse.getNoops(),
-                                bulkByScrollResponse.getVersionConflicts(),
-                                bulkByScrollResponse.getSearchRetries(),
-                                bulkByScrollResponse.getBulkRetries(),
-                                query);
+                if (bulkByScrollResponse.getBulkFailures() != null && bulkByScrollResponse.getBulkFailures().size() > 0) {
+                    for (BulkItemResponse.Failure bulkFailure : bulkByScrollResponse.getBulkFailures()) {
+                        logger.warn("Remove by query, bulk failure: {}", bulkFailure.toString());
                     }
-
-                    return true;
-                } catch (Exception e) {
-                    throw new Exception("Cannot remove by query", e);
                 }
             }
-        }.catchingExecuteInClassLoader(true);
-        if (result == null) {
-            return false;
-        } else {
-            return result;
+
+            if (logger.isDebugEnabled()) {
+                logger.debug("Remove by query: took {}, deleted docs: {}, batches executed: {}, skipped docs: {}, version conflicts: {}, search retries: {}, bulk retries: {}, for query: {}",
+                        bulkByScrollResponse.getTook().toHumanReadableString(1),
+                        bulkByScrollResponse.getDeleted(),
+                        bulkByScrollResponse.getBatches(),
+                        bulkByScrollResponse.getNoops(),
+                        bulkByScrollResponse.getVersionConflicts(),
+                        bulkByScrollResponse.getSearchRetries(),
+                        bulkByScrollResponse.getBulkRetries(),
+                        queryBuilder);
+            }
+
+            return true;
+        } catch (Exception e) {
+            throw new Exception("Cannot remove by query", e);
         }
     }
 
-
     public boolean indexTemplateExists(final String templateName) {
         Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".indexTemplateExists", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
             protected Boolean execute(Object... args) throws IOException {
@@ -1444,8 +1416,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
         }
     }
 
-    public boolean removeIndex(final String itemType, boolean addPrefix){
-        String index = addPrefix ? getIndex(itemType) : itemType;
+    public boolean removeIndex(final String itemType) {
+        String index = getIndex(itemType);
 
         Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeIndex", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
             protected Boolean execute(Object... args) throws IOException {
@@ -1465,9 +1437,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
             return result;
         }
     }
-    public boolean removeIndex(final String itemType) {
-        return removeIndex(itemType, true);
-    }
 
     private void internalCreateRolloverTemplate(String itemName) throws IOException {
         String rolloverAlias = indexPrefix + "-" + itemName;
@@ -1830,9 +1799,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
         try {
             final Class<? extends Item> clazz = item.getClass();
             String itemType = Item.getItemType(clazz);
+            String documentId = getDocumentIDForItemType(item.getItemId(), itemType);
 
             QueryBuilder builder = QueryBuilders.boolQuery()
-                    .must(QueryBuilders.idsQuery().addIds(item.getItemId()))
+                    .must(QueryBuilders.idsQuery().addIds(documentId))
                     .must(conditionESQueryBuilderDispatcher.buildFilter(query));
             return queryCount(builder, itemType) > 0;
         } finally {
@@ -1918,29 +1888,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
         }
     }
 
-    @Override
-    public Map<String, Long> docCountPerIndex(String... indexes) {
-        return new InClassLoaderExecute<Map<String, Long>>(metricsService, this.getClass().getName() + ".docCountPerIndex", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
-            @Override
-            protected Map<String, Long> execute(Object... args) throws IOException {
-                List<String> indexesForQuery = Stream.of(indexes).map(index -> getIndexNameForQuery(index)).collect(Collectors.toList());
-                String[] itemsArray = new String[indexesForQuery.size()];
-                itemsArray = indexesForQuery.toArray(itemsArray);
-                GetIndexRequest request = new GetIndexRequest(itemsArray);
-                GetIndexResponse getIndexResponse = client.indices().get(request, RequestOptions.DEFAULT);
-
-                Map<String, Long> countPerIndex = new HashMap<>();
-
-                for (String index : getIndexResponse.getIndices()) {
-                    CountRequest countRequest = new CountRequest(index);
-                    CountResponse response = client.count(countRequest, RequestOptions.DEFAULT);
-                    countPerIndex.put(index, response.getCount());
-                }
-                return countPerIndex;
-            }
-        }.catchingExecuteInClassLoader(true);
-    }
-
     private long queryCount(final QueryBuilder filter, final String itemType) {
         return new InClassLoaderExecute<Long>(metricsService, this.getClass().getName() + ".queryCount", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
 
@@ -2041,7 +1988,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                                 // add hit to results
                                 String sourceAsString = searchHit.getSourceAsString();
                                 final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
-                                setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm(), searchHit.getIndex());
+                                setMetadata(value, searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm(), searchHit.getIndex());
                                 results.add(value);
                             }
 
@@ -2071,7 +2018,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                         for (SearchHit searchHit : searchHits) {
                             String sourceAsString = searchHit.getSourceAsString();
                             final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
-                            setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm(), searchHit.getIndex());
+                            setMetadata(value, searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm(), searchHit.getIndex());
                             results.add(value);
                         }
                     }
@@ -2117,7 +2064,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                             // add hit to results
                             String sourceAsString = searchHit.getSourceAsString();
                             final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
-                            setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm(), searchHit.getIndex());
+                            setMetadata(value, searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm(), searchHit.getIndex());
                             results.add(value);
                         }
                     }
@@ -2158,7 +2105,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                             // add hit to results
                             String sourceAsString = searchHit.getSourceAsString();
                             final CustomItem value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, CustomItem.class);
-                            setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm(), searchHit.getIndex());
+                            setMetadata(value, searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm(), searchHit.getIndex());
                             results.add(value);
                         }
                     }
@@ -2420,6 +2367,42 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
 
     @Override
     public void purge(final Date date) {
+        // nothing, this method is deprecated since 2.2.0
+    }
+
+    @Override
+    public <T extends Item> void purgeTimeBasedItems(int existsNumberOfDays, Class<T> clazz) {
+        new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".purgeTimeBasedItems", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
+            protected Boolean execute(Object... args) throws Exception {
+                String itemType = Item.getItemType(clazz);
+
+                if (existsNumberOfDays > 0 && isItemTypeRollingOver(itemType)) {
+                    // First we purge the documents
+                    removeByQuery(QueryBuilders.rangeQuery("timeStamp").lte("now-" + existsNumberOfDays + "d"), clazz);
+
+                    // get count per index for those time based data
+                    TreeMap<String, Long> countsPerIndex = new TreeMap<>();
+                    GetIndexResponse getIndexResponse = client.indices().get(new GetIndexRequest(getIndexNameForQuery(itemType)), RequestOptions.DEFAULT);
+                    for (String index : getIndexResponse.getIndices()) {
+                        countsPerIndex.put(index, client.count(new CountRequest(index), RequestOptions.DEFAULT).getCount());
+                    }
+
+                    // Check for count=0 and remove them
+                    if (countsPerIndex.size() >= 1) {
+                        // do not check the last index, because it's the one used to write documents
+                        countsPerIndex.pollLastEntry();
+
+                        for (Map.Entry<String, Long> indexCount : countsPerIndex.entrySet()) {
+                            if (indexCount.getValue() == 0) {
+                                client.indices().delete(new DeleteIndexRequest(indexCount.getKey()), RequestOptions.DEFAULT);
+                            }
+                        }
+                    }
+                }
+
+                return true;
+            }
+        }.catchingExecuteInClassLoader(true);
     }
 
     @Override
@@ -2632,6 +2615,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
         return itemTypeIndexNameMap.getOrDefault(itemType, itemType);
     }
 
+    private String getDocumentIDForItemType(String itemId, String itemType) {
+        return systemItems.contains(itemType) ? (itemId + "_" + itemType.toLowerCase()) : itemId;
+    }
+
     private QueryBuilder wrapWithItemTypeQuery(String itemType, QueryBuilder originalQuery) {
         BoolQueryBuilder wrappedQuery = QueryBuilders.boolQuery();
         wrappedQuery.must(getItemTypeQueryBuilder(itemType));
@@ -2657,5 +2644,4 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
         }
         return WriteRequest.RefreshPolicy.NONE;
     }
-
 }
diff --git a/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/personaSession.json b/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/personaSession.json
new file mode 100644
index 000000000..c635e0285
--- /dev/null
+++ b/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/personaSession.json
@@ -0,0 +1,41 @@
+{
+  "dynamic_templates": [
+    {
+      "all": {
+        "match": "*",
+        "match_mapping_type": "string",
+        "mapping": {
+          "type": "text",
+          "analyzer": "folding",
+          "fields": {
+            "keyword": {
+              "type": "keyword",
+              "ignore_above": 256
+            }
+          }
+        }
+      }
+    }
+  ],
+  "properties": {
+    "duration": {
+      "type": "long"
+    },
+    "timeStamp": {
+      "type": "date"
+    },
+    "lastEventDate": {
+      "type": "date"
+    },
+    "properties": {
+      "properties": {
+        "location": {
+          "type": "geo_point"
+        }
+      }
+    },
+    "size": {
+      "type": "long"
+    }
+  }
+}
\ No newline at end of file
diff --git a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
index 751a34c85..29c196a2b 100644
--- a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
+++ b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
@@ -292,6 +292,7 @@ public interface PersistenceService {
     /**
      * @deprecated use {@link #loadCustomItem(String, String)}
      */
+    @Deprecated
     CustomItem loadCustomItem(String itemId, Date dateHint, String customItemType);
 
     /**
@@ -676,22 +677,19 @@ public interface PersistenceService {
     <T extends Item> void refreshIndex(Class<T> clazz, Date dateHint);
 
     /**
-     * Purges all data in the context server up to the specified date, not included.
-     *
-     * @param date the date (not included) before which we want to erase all data
+     * deprecated: (use: purgeTimeBasedItems instead)
      */
     @Deprecated
     void purge(Date date);
 
     /**
-     * Retrieves the number of document per indexes.
-     * If the index is a rollover index, each rollover index will be return with its own number of document
-     * For example: with "event" as parameter, the indexes named ...-event-000001, ...-event-000002 and so one will be returned
+     * Purges time based data in the context server up to the specified days number of existence.
+     * (This only works for time based data stored in rolling over indices, it have no effect on other types)
      *
-     * @param indexes names of the indexes to count the documents
-     * @return Map where the key in the index name and the value is the number of document for this index
+     * @param existsNumberOfDays the number of days
+     * @param clazz the item type to be purged
      */
-    Map<String, Long> docCountPerIndex(String... indexes);
+    <T extends Item> void purgeTimeBasedItems(int existsNumberOfDays, Class<T> clazz);
 
     /**
      * Retrieves all items of the specified Item subclass which specified ranged property is within the specified bounds, ordered according to the specified {@code sortBy} String
@@ -744,15 +742,6 @@ public interface PersistenceService {
      */
     boolean removeIndex(final String itemType);
 
-    /**
-     * Removes the index for the specified item type.
-     *
-     * @param itemType the item type
-     * @param addPrefix should add the index prefix to the itemType passed as parameter
-     * @return {@code true} if the operation was successful, {@code false} otherwise
-     */
-    boolean removeIndex(final String itemType, boolean addPrefix);
-
     /**
      * Removes all data associated with the provided scope.
      *
diff --git a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
index 7912cc6de..8221f19ff 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
@@ -370,40 +370,11 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList
         }
     }
 
-    private <T extends Item> void purgeRolloverItems(int existsNumberOfDays, Class<T> clazz) {
-        if (existsNumberOfDays > 0) {
-            String conditionType = null;
-            String itemType = null;
-
-            if (clazz.getName().equals(Event.class.getName())) {
-                conditionType = "eventPropertyCondition";
-                itemType = Event.ITEM_TYPE;
-            } else if (clazz.getName().equals(Session.class.getName())) {
-                conditionType = "sessionPropertyCondition";
-                itemType = Session.ITEM_TYPE;
-            }
-
-            ConditionType propertyConditionType = definitionsService.getConditionType(conditionType);
-            if (propertyConditionType == null) {
-                // definition service not yet fully instantiate
-                return;
-            }
-
-            Condition condition = new Condition(propertyConditionType);
-
-            condition.setParameter("propertyName", "timeStamp");
-            condition.setParameter("comparisonOperator", "lessThanOrEqualTo");
-            condition.setParameter("propertyValueDateExpr", "now-" + existsNumberOfDays + "d");
-            persistenceService.removeByQuery(condition, clazz);
-            deleteEmptyRolloverIndex(itemType);
-        }
-    }
-
     @Override
     public void purgeSessionItems(int existsNumberOfDays) {
         if (existsNumberOfDays > 0) {
             logger.info("Purging: Sessions created since more than {} days", existsNumberOfDays);
-            purgeRolloverItems(existsNumberOfDays, Session.class);
+            persistenceService.purgeTimeBasedItems(existsNumberOfDays, Session.class);
         }
     }
 
@@ -411,7 +382,7 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList
     public void purgeEventItems(int existsNumberOfDays) {
         if (existsNumberOfDays > 0) {
             logger.info("Purging: Events created since more than {} days", existsNumberOfDays);
-            purgeRolloverItems(existsNumberOfDays, Event.class);
+            persistenceService.purgeTimeBasedItems(existsNumberOfDays, Event.class);
         }
     }
 
@@ -421,19 +392,6 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList
 
     }
 
-    public void deleteEmptyRolloverIndex(String indexName) {
-        TreeMap<String, Long> countsPerIndex = new TreeMap<>(persistenceService.docCountPerIndex(indexName));
-        if (countsPerIndex.size() >= 1) {
-            // do not check the last index, because it's the one used to write documents
-            countsPerIndex.pollLastEntry();
-            countsPerIndex.forEach((index, count) -> {
-                if (count == 0) {
-                    persistenceService.removeIndex(index, false);
-                }
-            });
-        }
-    }
-
     private void initializePurge() {
         logger.info("Purge: Initializing");
 
diff --git a/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java
index fafec7bcf..ea6109064 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java
@@ -46,8 +46,6 @@ import java.util.stream.Collectors;
 
 public class RulesServiceImpl implements RulesService, EventListenerService, SynchronousBundleListener {
 
-    public static final String RULE_QUERY_PREFIX = "rule_";
-    private static final String RULE_STAT_ID_SUFFIX = "-stat";
     public static final String TRACKED_PARAMETER = "trackedConditionParameters";
     private static final Logger logger = LoggerFactory.getLogger(RulesServiceImpl.class.getName());
 
@@ -253,10 +251,9 @@ public class RulesServiceImpl implements RulesService, EventListenerService, Syn
     }
 
     private RuleStatistics getLocalRuleStatistics(Rule rule) {
-        String ruleStatisticsId = getRuleStatisticId(rule.getItemId());
-        RuleStatistics ruleStatistics = this.allRuleStatistics.get(ruleStatisticsId);
+        RuleStatistics ruleStatistics = this.allRuleStatistics.get(rule.getItemId());
         if (ruleStatistics == null) {
-            ruleStatistics = new RuleStatistics(ruleStatisticsId);
+            ruleStatistics = new RuleStatistics(rule.getItemId());
         }
         return ruleStatistics;
     }
@@ -267,10 +264,6 @@ public class RulesServiceImpl implements RulesService, EventListenerService, Syn
         allRuleStatistics.put(ruleStatistics.getItemId(), ruleStatistics);
     }
 
-    private String getRuleStatisticId(String ruleID) {
-        return ruleID + RULE_STAT_ID_SUFFIX;
-    }
-
     public void refreshRules() {
         try {
             // we use local variables to make sure we quickly switch the collections since the refresh is called often
@@ -344,17 +337,14 @@ public class RulesServiceImpl implements RulesService, EventListenerService, Syn
 
     @Override
     public RuleStatistics getRuleStatistics(String ruleId) {
-        String ruleStatisticsId = getRuleStatisticId(ruleId);
-        if (allRuleStatistics.containsKey(ruleStatisticsId)) {
-            return allRuleStatistics.get(ruleStatisticsId);
+        if (allRuleStatistics.containsKey(ruleId)) {
+            return allRuleStatistics.get(ruleId);
         }
-        return persistenceService.load(ruleStatisticsId, RuleStatistics.class);
+        return persistenceService.load(ruleId, RuleStatistics.class);
     }
 
     public Map<String, RuleStatistics> getAllRuleStatistics() {
-        return allRuleStatistics.keySet().stream()
-                .collect(Collectors.toMap(key -> key.endsWith(RULE_STAT_ID_SUFFIX) ?
-                        key.substring(0, key.length() - RULE_STAT_ID_SUFFIX.length()) : key, allRuleStatistics::get));
+        return allRuleStatistics;
     }
 
     @Override
diff --git a/tools/shell-commands/src/main/resources/META-INF/cxs/migration/migrate-2.2.0-05-indicesReduction.groovy b/tools/shell-commands/src/main/resources/META-INF/cxs/migration/migrate-2.2.0-05-indicesReduction.groovy
index 8a07a5278..631f0bced 100644
--- a/tools/shell-commands/src/main/resources/META-INF/cxs/migration/migrate-2.2.0-05-indicesReduction.groovy
+++ b/tools/shell-commands/src/main/resources/META-INF/cxs/migration/migrate-2.2.0-05-indicesReduction.groovy
@@ -23,6 +23,28 @@ MigrationContext context = migrationContext
 String esAddress = context.getConfigString("esAddress")
 String indexPrefix = context.getConfigString("indexPrefix")
 String baseSettings = MigrationUtils.resourceAsString(bundleContext, "requestBody/2.0.0/base_index_mapping.json")
+def indicesToReduce = [
+        actiontype: [reduceTo: "systemitems", renameId: true],
+        campaign: [reduceTo: "systemitems", renameId: true],
+        campaignevent: [reduceTo: "systemitems", renameId: true],
+        goal: [reduceTo: "systemitems", renameId: true],
+        userlist: [reduceTo: "systemitems", renameId: true],
+        propertytype: [reduceTo: "systemitems", renameId: true],
+        scope: [reduceTo: "systemitems", renameId: true],
+        conditiontype: [reduceTo: "systemitems", renameId: true],
+        rule: [reduceTo: "systemitems", renameId: true],
+        scoring: [reduceTo: "systemitems", renameId: true],
+        segment: [reduceTo: "systemitems", renameId: true],
+        topic: [reduceTo: "systemitems", renameId: true],
+        patch: [reduceTo: "systemitems", renameId: true],
+        jsonschema: [reduceTo: "systemitems", renameId: true],
+        importconfig: [reduceTo: "systemitems", renameId: true],
+        exportconfig: [reduceTo: "systemitems", renameId: true],
+        rulestats: [reduceTo: "systemitems", renameId: true],
+        groovyaction: [reduceTo: "systemitems", renameId: true],
+
+        persona: [reduceTo: "profile", renameId: false]
+]
 
 context.performMigrationStep("2.2.0-create-systemItems-index", () -> {
     if (!MigrationUtils.indexExists(context.getHttpClient(), esAddress, "${indexPrefix}-systemitems")) {
@@ -32,44 +54,20 @@ context.performMigrationStep("2.2.0-create-systemItems-index", () -> {
     }
 })
 
-def indicesToReduce = [
-        actiontype: "systemitems",
-        campaign: "systemitems",
-        campaignevent: "systemitems",
-        goal: "systemitems",
-        userlist: "systemitems",
-        propertytype: "systemitems",
-        scope: "systemitems",
-        conditiontype: "systemitems",
-        rule: "systemitems",
-        scoring: "systemitems",
-        segment: "systemitems",
-        topic: "systemitems",
-        patch: "systemitems",
-        jsonschema: "systemitems",
-        importconfig: "systemitems",
-        exportconfig: "systemitems",
-        rulestats: "systemitems",
-        groovyaction: "systemitems",
-        persona: "profile",
-        personasession: "session"
-]
-def indicesToSuffixIds = [
-        rulestats: "-stat",
-        groovyaction: "-groovySourceCode"
-]
 indicesToReduce.each { indexToReduce ->
     context.performMigrationStep("2.2.0-reduce-${indexToReduce.key}", () -> {
         if (MigrationUtils.indexExists(context.getHttpClient(), esAddress, "${indexPrefix}-${indexToReduce.key}")) {
             def painless = null
             // check if we need to update the ids of those items first
-            if (indicesToSuffixIds.containsKey(indexToReduce.key)) {
-                painless = MigrationUtils.getFileWithoutComments(bundleContext, "requestBody/2.2.0/suffix_ids.painless").replace("#ID_SUFFIX", indicesToSuffixIds.get(indexToReduce.key))
+            if (indexToReduce.value.renameId) {
+                painless = MigrationUtils.getFileWithoutComments(bundleContext, "requestBody/2.2.0/suffix_ids.painless").replace("#ID_SUFFIX", "_${indexToReduce.key}")
             }
             // move items
-            MigrationUtils.moveToIndex(context.getHttpClient(), bundleContext, esAddress, "${indexPrefix}-${indexToReduce.key}", "${indexPrefix}-${indexToReduce.value}", painless)
+            def reduceToIndex = "${indexPrefix}-${indexToReduce.value.reduceTo}"
+            MigrationUtils.moveToIndex(context.getHttpClient(), bundleContext, esAddress, "${indexPrefix}-${indexToReduce.key}", reduceToIndex, painless)
             MigrationUtils.deleteIndex(context.getHttpClient(), esAddress, "${indexPrefix}-${indexToReduce.key}")
-            HttpUtils.executePostRequest(context.getHttpClient(), esAddress + "/${indexPrefix}-${indexToReduce.value}/_refresh", null, null);
+
+            HttpUtils.executePostRequest(context.getHttpClient(), esAddress + "/${reduceToIndex}/_refresh", null, null);
             MigrationUtils.waitForYellowStatus(context.getHttpClient(), esAddress, context);
         }
     })