You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by jk...@apache.org on 2022/10/27 16:32:02 UTC

[unomi] branch purge_system created (now 0cde2fa41)

This is an automated email from the ASF dual-hosted git repository.

jkevan pushed a change to branch purge_system
in repository https://gitbox.apache.org/repos/asf/unomi.git


      at 0cde2fa41 UNOMI-670: improve purge system

This branch includes the following new commits:

     new 0cde2fa41 UNOMI-670: improve purge system

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[unomi] 01/01: UNOMI-670: improve purge system

Posted by jk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jkevan pushed a commit to branch purge_system
in repository https://gitbox.apache.org/repos/asf/unomi.git

commit 0cde2fa41042715100e7a5a12dcde3e89d950845
Author: Kevan <ke...@jahia.com>
AuthorDate: Thu Oct 27 18:31:44 2022 +0200

    UNOMI-670: improve purge system
---
 .../apache/unomi/api/services/ProfileService.java  |  19 +++
 .../org/apache/unomi/itests/ProfileServiceIT.java  | 141 ++++++++++++++++++-
 .../ElasticSearchPersistenceServiceImpl.java       | 155 +++++++--------------
 .../resources/OSGI-INF/blueprint/blueprint.xml     |   4 -
 .../services/impl/profiles/ProfileServiceImpl.java | 123 +++++++++-------
 5 files changed, 277 insertions(+), 165 deletions(-)

diff --git a/api/src/main/java/org/apache/unomi/api/services/ProfileService.java b/api/src/main/java/org/apache/unomi/api/services/ProfileService.java
index e51091961..86fac9f61 100644
--- a/api/src/main/java/org/apache/unomi/api/services/ProfileService.java
+++ b/api/src/main/java/org/apache/unomi/api/services/ProfileService.java
@@ -400,4 +400,23 @@ public interface ProfileService {
      * in specific scenarios such as integration tests.
      */
     void refresh();
+
+    /**
+     * Purge (delete) profiles
+     * example: Purge profile inactive since 10 days only:
+     * purgeProfiles(10, 0);
+     *
+     * example: Purge profile created since 30 days only:
+     * purgeProfiles(0, 30);
+     *
+     * @param inactiveNumberOfDays will purge profiles with no visits since this number of days (0 or negative value, will have no effect)
+     * @param existsNumberOfDays will purge profiles created since this number of days (0 or negative value, will have no effect)
+     */
+    void purgeProfiles(int inactiveNumberOfDays, int existsNumberOfDays);
+
+    /**
+     * Purge (delete) monthly indices by removing old indices
+     * @param existsNumberOfMonths used to remove monthly indices older than this number of months
+     */
+    void purgeMonthlyItems(int existsNumberOfMonths);
 }
diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
index c85a83a59..30bf8dcf1 100644
--- a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
@@ -36,10 +36,9 @@ import org.slf4j.LoggerFactory;
 
 import javax.inject.Inject;
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.UUID;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.*;
 import java.util.stream.IntStream;
 
 import static org.junit.Assert.assertEquals;
@@ -65,7 +64,7 @@ public class ProfileServiceIT extends BaseIT {
 
     @After
     public void tearDown() throws InterruptedException {
-        removeItems(Profile.class, ProfileAlias.class);
+        removeItems(Profile.class, ProfileAlias.class, Event.class, Session.class);
     }
 
     @Test
@@ -266,4 +265,136 @@ public class ProfileServiceIT extends BaseIT {
             // do nothing, it's expected
         }
     }
+
+    @Test
+    public void testProfilePurge() throws Exception {
+        Date currentDate = new Date();
+        LocalDateTime minus10Days = LocalDateTime.ofInstant(currentDate.toInstant(), ZoneId.systemDefault()).minusDays(10);
+        LocalDateTime minus30Days = LocalDateTime.ofInstant(currentDate.toInstant(), ZoneId.systemDefault()).minusDays(30);
+        Date currentDateMinus10Days = Date.from(minus10Days.atZone(ZoneId.systemDefault()).toInstant());
+        Date currentDateMinus30Days = Date.from(minus30Days.atZone(ZoneId.systemDefault()).toInstant());
+
+        long originalProfilesCount  = persistenceService.getAllItemsCount(Profile.ITEM_TYPE);
+
+        // create inactive profiles since 10 days
+        for (int i = 0; i < 150; i++) {
+            Profile profile = new Profile("inactive-profile-to-be-purge-" + i);
+            profile.setProperty("lastVisit", currentDateMinus10Days);
+            profile.setProperty("firstVisit", currentDateMinus10Days);
+            persistenceService.save(profile);
+        }
+
+        // create active profiles created 30 days ago
+        for (int i = 0; i < 150; i++) {
+            Profile profile = new Profile("old-profile-to-be-purge-" + i);
+            profile.setProperty("lastVisit", currentDate);
+            profile.setProperty("firstVisit", currentDateMinus30Days);
+            persistenceService.save(profile);
+        }
+
+        // create active and recent profile
+        for (int i = 0; i < 150; i++) {
+            Profile profile = new Profile("active-profile" + i);
+            profile.setProperty("lastVisit", currentDate);
+            profile.setProperty("firstVisit", currentDate);
+            persistenceService.save(profile);
+        }
+
+        keepTrying("Failed waiting for all profiles to be available", () -> profileService.getAllProfilesCount(),
+                (count) -> count == (450 + originalProfilesCount), 1000, 100);
+
+        // Try purge with 0 params: should have no effects
+        profileService.purgeProfiles(0, 0);
+        keepTrying("We should still have 450 profiles", () -> profileService.getAllProfilesCount(),
+                (count) -> count == (450 + originalProfilesCount), 1000, 100);
+
+        // Try purge inactive profiles since 20 days, should have no effects there is no such profiles
+        profileService.purgeProfiles(20, 0);
+        keepTrying("We should still have 450 profiles", () -> profileService.getAllProfilesCount(),
+                (count) -> count == (450 + originalProfilesCount), 1000, 100);
+
+        // Try purge inactive profiles since 20 days and/or older than 40 days, should have no effects there is no such profiles
+        profileService.purgeProfiles(20, 40);
+        keepTrying("We should still have 450 profiles", () -> profileService.getAllProfilesCount(),
+                (count) -> count == (450 + originalProfilesCount), 1000, 100);
+
+        // Try purge inactive profiles since 5 days
+        profileService.purgeProfiles(5, 0);
+        keepTrying("Inactive profiles should be purge so we should have 300 profiles now", () -> profileService.getAllProfilesCount(),
+                (count) -> count == (300 + originalProfilesCount), 1000, 100);
+
+        // Try purge inactive profiles since 5 days and/or older than 25 days
+        profileService.purgeProfiles(5, 25);
+        keepTrying("Older profiles should be purge so we should have 150 profiles now", () -> profileService.getAllProfilesCount(),
+                (count) -> count == (150 + originalProfilesCount), 1000, 100);
+    }
+
+    @Test
+    public void testMonthlyIndicesPurge() throws Exception {
+        Date currentDate = new Date();
+        LocalDateTime minus10Months = LocalDateTime.ofInstant(currentDate.toInstant(), ZoneId.systemDefault()).minusMonths(10);
+        LocalDateTime minus30Months = LocalDateTime.ofInstant(currentDate.toInstant(), ZoneId.systemDefault()).minusMonths(30);
+        Date currentDateMinus10Months = Date.from(minus10Months.atZone(ZoneId.systemDefault()).toInstant());
+        Date currentDateMinus30Months = Date.from(minus30Months.atZone(ZoneId.systemDefault()).toInstant());
+
+        long originalSessionsCount  = persistenceService.getAllItemsCount(Session.ITEM_TYPE);
+        long originalEventsCount  = persistenceService.getAllItemsCount(Event.ITEM_TYPE);
+
+        Profile profile = new Profile("dummy-profile-monthly-purge-test");
+        persistenceService.save(profile);
+
+        // create 10 months old items
+        for (int i = 0; i < 150; i++) {
+            Session session = new Session("10months-old-session-" + i, profile, currentDateMinus10Months, "dummy-scope");
+            persistenceService.save(session);
+            persistenceService.save(new Event("10months-old-event-" + i, "view", session, profile, "dummy-scope", null, null, currentDateMinus10Months));
+        }
+
+        // create 30 months old items
+        for (int i = 0; i < 150; i++) {
+            Session session = new Session("30months-old-session-" + i, profile, currentDateMinus30Months, "dummy-scope");
+            persistenceService.save(session);
+            persistenceService.save(new Event("30months-old-event-" + i, "view", session, profile, "dummy-scope", null, null, currentDateMinus30Months));
+        }
+
+        // create 30 months old items
+        for (int i = 0; i < 150; i++) {
+            Session session = new Session("recent-session-" + i, profile, currentDate, "dummy-scope");
+            persistenceService.save(session);
+            persistenceService.save(new Event("recent-event-" + i, "view", session, profile, "dummy-scope", null, null, currentDate));
+        }
+
+        keepTrying("Sessions number should be 450", () -> persistenceService.getAllItemsCount(Session.ITEM_TYPE),
+                (count) -> count == (450 + originalSessionsCount), 1000, 100);
+        keepTrying("Events number should be 450", () -> persistenceService.getAllItemsCount(Event.ITEM_TYPE),
+                (count) -> count == (450 + originalEventsCount), 1000, 100);
+
+        // Should have no effect
+        profileService.purgeMonthlyItems(0);
+        keepTrying("Sessions number should be 450", () -> persistenceService.getAllItemsCount(Session.ITEM_TYPE),
+                (count) -> count == (450 + originalSessionsCount), 1000, 100);
+        keepTrying("Events number should be 450", () -> persistenceService.getAllItemsCount(Event.ITEM_TYPE),
+                (count) -> count == (450 + originalEventsCount), 1000, 100);
+
+        // Should have no effect there is no monthly items older than 40 months
+        profileService.purgeMonthlyItems(40);
+        keepTrying("Sessions number should be 450", () -> persistenceService.getAllItemsCount(Session.ITEM_TYPE),
+                (count) -> count == (450 + originalSessionsCount), 1000, 100);
+        keepTrying("Events number should be 450", () -> persistenceService.getAllItemsCount(Event.ITEM_TYPE),
+                (count) -> count == (450 + originalEventsCount), 1000, 100);
+
+        // Should purge monthly items older than 25 days
+        profileService.purgeMonthlyItems(25);
+        keepTrying("Sessions number should be 300", () -> persistenceService.getAllItemsCount(Session.ITEM_TYPE),
+                (count) -> count == (300 + originalSessionsCount), 1000, 100);
+        keepTrying("Events number should be 300", () -> persistenceService.getAllItemsCount(Event.ITEM_TYPE),
+                (count) -> count == (300 + originalEventsCount), 1000, 100);
+
+        // Should purge monthly items older than 5 days
+        profileService.purgeMonthlyItems(5);
+        keepTrying("Sessions number should be 150", () -> persistenceService.getAllItemsCount(Session.ITEM_TYPE),
+                (count) -> count == (150 + originalSessionsCount), 1000, 100);
+        keepTrying("Events number should be 150", () -> persistenceService.getAllItemsCount(Event.ITEM_TYPE),
+                (count) -> count == (150 + originalEventsCount), 1000, 100);
+    }
 }
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 27bdd35b3..63150feec 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -19,7 +19,6 @@ package org.apache.unomi.persistence.elasticsearch;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.hazelcast.core.HazelcastInstance;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.http.HttpHost;
 import org.apache.http.auth.AuthScope;
@@ -49,6 +48,7 @@ import org.apache.unomi.persistence.spi.aggregate.IpRangeAggregate;
 import org.apache.unomi.persistence.spi.aggregate.NumericRangeAggregate;
 import org.apache.unomi.persistence.spi.aggregate.TermsAggregate;
 import org.elasticsearch.ElasticsearchStatusException;
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptRequest;
@@ -103,8 +103,7 @@ import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.query.*;
-import org.elasticsearch.index.reindex.BulkByScrollResponse;
-import org.elasticsearch.index.reindex.UpdateByQueryRequest;
+import org.elasticsearch.index.reindex.*;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.script.Script;
 import org.elasticsearch.script.ScriptException;
@@ -161,7 +160,6 @@ import java.util.Collections;
 import java.util.Date;
 import java.util.Enumeration;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -215,6 +213,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
     private Map<String, String> routingByType;
 
     private Integer defaultQueryLimit = 10;
+    private Integer removeByQueryTimeoutInMinutes = 10;
 
     private String itemsMonthlyIndexedOverride = "event,session";
     private String bulkProcessorConcurrentRequests = "1";
@@ -235,9 +234,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
     private int aggregateQueryBucketSize = 5000;
 
     private MetricsService metricsService;
-    private HazelcastInstance hazelcastInstance;
-    private Set<String> itemClassesToCacheSet = new HashSet<>();
-    private String itemClassesToCache;
     private boolean useBatchingForSave = false;
     private boolean useBatchingForUpdate = true;
     private String logLevelRestClient = "ERROR";
@@ -382,23 +378,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
         this.metricsService = metricsService;
     }
 
-    public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
-        this.hazelcastInstance = hazelcastInstance;
-    }
-
-    public void setItemClassesToCache(String itemClassesToCache) {
-        this.itemClassesToCache = itemClassesToCache;
-        if (StringUtils.isNotBlank(itemClassesToCache)) {
-            String[] itemClassesToCacheParts = itemClassesToCache.split(",");
-            if (itemClassesToCacheParts != null) {
-                itemClassesToCacheSet.clear();
-                for (String itemClassToCache : itemClassesToCacheParts) {
-                    itemClassesToCacheSet.add(itemClassToCache.trim());
-                }
-            }
-        }
-    }
-
     public void setUseBatchingForSave(boolean useBatchingForSave) {
         this.useBatchingForSave = useBatchingForSave;
     }
@@ -789,16 +768,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                 try {
                     String itemType = Item.getItemType(clazz);
                     String className = clazz.getName();
-                    if (customItemType == null) {
-                        T itemFromCache = getFromCache(itemId, clazz.getName());
-                        if (itemFromCache != null) {
-                            return itemFromCache;
-                        }
-                    } else {
-                        T itemFromCache = getFromCache(itemId, CustomItem.class.getName() + "." + customItemType);
-                        if (itemFromCache != null) {
-                            return itemFromCache;
-                        }
+                    if (customItemType != null) {
                         className = CustomItem.class.getName() + "." + customItemType;
                         itemType = customItemType;
                     }
@@ -828,7 +798,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                             String sourceAsString = response.getSourceAsString();
                             final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
                             setMetadata(value, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm());
-                            putInCache(itemId, value, className);
                             return value;
                         } else {
                             return null;
@@ -889,7 +858,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                         className = CustomItem.class.getName() + "." + itemType;
                     }
                     String itemId = item.getItemId();
-                    putInCache(itemId, item, className);
                     String index = getIndex(itemType, itemsMonthlyIndexed.contains(itemType) ? ((TimestampedItem) item).getTimeStamp() : null);
                     IndexRequest indexRequest = new IndexRequest(index);
                     indexRequest.id(itemId);
@@ -1215,50 +1183,59 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
             protected Boolean execute(Object... args) throws Exception {
                 try {
                     String itemType = Item.getItemType(clazz);
+                    final DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(getIndexNameForQuery(itemType))
+                            .setQuery(conditionESQueryBuilderDispatcher.getQueryBuilder(query))
+                            // Setting slices to auto will let Elasticsearch choose the number of slices to use.
+                            // This setting will use one slice per shard, up to a certain limit.
+                            // The delete request will be more efficient and faster than no slicing.
+                            .setSlices(AbstractBulkByScrollRequest.AUTO_SLICES)
+                            // Elasticsearch takes a snapshot of the index when you hit delete by query request and uses the _version of the documents to process the request.
+                            // If a document gets updated in the meantime, it will result in a version conflict error and the delete operation will fail.
+                            // So we explicitly set the conflict strategy to proceed in case of version conflict.
+                            .setAbortOnVersionConflict(false)
+                            // Remove by Query is mostly used for purge and cleaning up old data
+                            // It's mostly used in jobs/timed tasks so we don't really care about long request
+                            // So we increase default timeout of 1min to 10min
+                            .setTimeout(TimeValue.timeValueMinutes(removeByQueryTimeoutInMinutes));
+
+                    BulkByScrollResponse bulkByScrollResponse = client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
+
+                    if (bulkByScrollResponse == null) {
+                        logger.error("Remove by query: no response returned for query: {}", query);
+                        return false;
+                    }
 
-                    BulkRequest deleteByScopeBulkRequest = new BulkRequest();
-
-                    final TimeValue keepAlive = TimeValue.timeValueHours(1);
-                    SearchRequest searchRequest = new SearchRequest(getAllIndexForQuery())
-                            .indices(getIndexNameForQuery(itemType))
-                            .scroll(keepAlive);
-                    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
-                            .query(conditionESQueryBuilderDispatcher.getQueryBuilder(query))
-                            .size(100);
-                    searchRequest.source(searchSourceBuilder);
-
-                    SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
+                    if (bulkByScrollResponse.isTimedOut()) {
+                        logger.warn("Remove by query: timed out because took more than {} minutes for query: {}", removeByQueryTimeoutInMinutes, query);
+                    }
 
-                    // Scroll until no more hits are returned
-                    while (true) {
+                    if ((bulkByScrollResponse.getSearchFailures() != null && bulkByScrollResponse.getSearchFailures().size() > 0) ||
+                            bulkByScrollResponse.getBulkFailures() != null && bulkByScrollResponse.getBulkFailures().size() > 0) {
+                        logger.warn("Remove by query: we found some failure during the process of query: {}", query);
 
-                        for (SearchHit hit : response.getHits().getHits()) {
-                            // add hit to bulk delete
-                            deleteFromCache(hit.getId(), clazz.getName());
-                            deleteByScopeBulkRequest.add(Requests.deleteRequest(hit.getIndex()).type(hit.getType()).id(hit.getId()));
+                        if (bulkByScrollResponse.getSearchFailures() != null && bulkByScrollResponse.getSearchFailures().size() > 0) {
+                            for (ScrollableHitSource.SearchFailure searchFailure : bulkByScrollResponse.getSearchFailures()) {
+                                logger.warn("Remove by query, search failure: {}", searchFailure.toString());
+                            }
                         }
 
-                        SearchScrollRequest searchScrollRequest = new SearchScrollRequest(response.getScrollId());
-                        searchScrollRequest.scroll(keepAlive);
-                        response = client.scroll(searchScrollRequest, RequestOptions.DEFAULT);
-
-                        // If we have no more hits, exit
-                        if (response.getHits().getHits().length == 0) {
-                            break;
+                        if (bulkByScrollResponse.getBulkFailures() != null && bulkByScrollResponse.getBulkFailures().size() > 0) {
+                            for (BulkItemResponse.Failure bulkFailure : bulkByScrollResponse.getBulkFailures()) {
+                                logger.warn("Remove by query, bulk failure: {}", bulkFailure.toString());
+                            }
                         }
                     }
 
-                    ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
-                    clearScrollRequest.addScrollId(response.getScrollId());
-                    client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
-
-                    // we're done with the scrolling, delete now
-                    if (deleteByScopeBulkRequest.numberOfActions() > 0) {
-                        final BulkResponse deleteResponse = client.bulk(deleteByScopeBulkRequest, RequestOptions.DEFAULT);
-                        if (deleteResponse.hasFailures()) {
-                            // do something
-                            logger.warn("Couldn't remove by query " + query + ":\n{}", deleteResponse.buildFailureMessage());
-                        }
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("Remove by query: took {}, deleted docs: {}, batches executed: {}, skipped docs: {}, version conflicts: {}, search retries: {}, bulk retries: {}, for query: {}",
+                                bulkByScrollResponse.getTook().toHumanReadableString(1),
+                                bulkByScrollResponse.getDeleted(),
+                                bulkByScrollResponse.getBatches(),
+                                bulkByScrollResponse.getNoops(),
+                                bulkByScrollResponse.getVersionConflicts(),
+                                bulkByScrollResponse.getSearchRetries(),
+                                bulkByScrollResponse.getBulkRetries(),
+                                query);
                     }
 
                     return true;
@@ -2503,40 +2480,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
         }
     }
 
-    private <T extends Item> boolean isCacheActiveForClass(String className) {
-        if (itemClassesToCacheSet.contains("*")) {
-            return true;
-        }
-        if (itemClassesToCacheSet.contains(className)) {
-            return true;
-        }
-        return false;
-    }
-
-    private <T extends Item> T getFromCache(String itemId, String className) {
-        if (!isCacheActiveForClass(className)) {
-            return null;
-        }
-        Map<String, T> itemCache = hazelcastInstance.getMap(className);
-        return itemCache.get(itemId);
-    }
-
-    private <T extends Item> T putInCache(String itemId, T item, String className) {
-        if (!isCacheActiveForClass(className)) {
-            return null;
-        }
-        Map<String, T> itemCache = hazelcastInstance.getMap(className);
-        return itemCache.put(itemId, item);
-    }
-
-    private <T extends Item> T deleteFromCache(String itemId, String className) {
-        if (!isCacheActiveForClass(className)) {
-            return null;
-        }
-        Map<String, T> itemCache = hazelcastInstance.getMap(className);
-        return itemCache.remove(itemId);
-    }
-
     private String getAllIndexForQuery() {
         return indexPrefix + "*";
     }
diff --git a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 44e0411e6..e0fdded38 100644
--- a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -58,7 +58,6 @@
             <cm:property name="aggQueryMaxResponseSizeHttp" value="" />
             <cm:property name="aggQueryThrowOnMissingDocs" value="false" />
             <cm:property name="itemTypeToRefreshPolicy" value="" />
-            <cm:property name="itemClassesToCache" value="" />
             <cm:property name="useBatchingForSave" value="false" />
             <cm:property name="useBatchingForUpdate" value="true" />
 
@@ -74,7 +73,6 @@
     </cm:property-placeholder>
 
     <reference id="metricsService" interface="org.apache.unomi.metrics.MetricsService" />
-    <reference id="hazelcastInstance" interface="com.hazelcast.core.HazelcastInstance" />
     <reference id="scriptExecutor" interface="org.apache.unomi.scripting.ScriptExecutor" />
 
     <service id="elasticSearchPersistenceService" ref="elasticSearchPersistenceServiceImpl">
@@ -137,8 +135,6 @@
         <property name="clientSocketTimeout" value="${es.clientSocketTimeout}" />
 
         <property name="metricsService" ref="metricsService" />
-        <property name="hazelcastInstance" ref="hazelcastInstance" />
-        <property name="itemClassesToCache" value="${es.itemClassesToCache}" />
         <property name="useBatchingForSave" value="${es.useBatchingForSave}" />
         <property name="useBatchingForUpdate" value="${es.useBatchingForUpdate}" />
 
diff --git a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
index a11189ae9..a259c1491 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
@@ -198,14 +198,15 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList
 
     private SegmentService segmentService;
 
-    private Condition purgeProfileQuery;
     private Integer purgeProfileExistTime = 0;
     private Integer purgeProfileInactiveTime = 0;
     private Integer purgeSessionsAndEventsTime = 0;
     private Integer purgeProfileInterval = 0;
+    private TimerTask purgeTask = null;
     private long propertiesRefreshInterval = 10000;
 
     private PropertyTypes propertyTypes;
+    private TimerTask propertyTypeLoadTask = null;
 
     private boolean forceRefreshOnSave = false;
 
@@ -258,6 +259,12 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList
     }
 
     public void preDestroy() {
+        if (purgeTask != null) {
+            purgeTask.cancel();
+        }
+        if (propertyTypeLoadTask != null) {
+            propertyTypeLoadTask.cancel();
+        }
         bundleContext.removeBundleListener(this);
         logger.info("Profile service shutdown.");
     }
@@ -290,13 +297,13 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList
     }
 
     private void schedulePropertyTypeLoad() {
-        TimerTask task = new TimerTask() {
+        propertyTypeLoadTask = new TimerTask() {
             @Override
             public void run() {
                 reloadPropertyTypes(false);
             }
         };
-        schedulerService.getScheduleExecutorService().scheduleAtFixedRate(task, 10000, propertiesRefreshInterval, TimeUnit.MILLISECONDS);
+        schedulerService.getScheduleExecutorService().scheduleAtFixedRate(propertyTypeLoadTask, 10000, propertiesRefreshInterval, TimeUnit.MILLISECONDS);
         logger.info("Scheduled task for property type loading each 10s");
     }
 
@@ -319,74 +326,90 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList
         }
     }
 
+    @Override
+    public void purgeProfiles(int inactiveNumberOfDays, int existsNumberOfDays) {
+        if (inactiveNumberOfDays > 0 || existsNumberOfDays > 0) {
+            ConditionType profilePropertyConditionType = definitionsService.getConditionType("profilePropertyCondition");
+            ConditionType booleanCondition = definitionsService.getConditionType("booleanCondition");
+            if (profilePropertyConditionType == null || booleanCondition == null) {
+                // definition service not yet fully instantiate
+                return;
+            }
+
+            Condition purgeProfileQuery = new Condition(booleanCondition);
+            purgeProfileQuery.setParameter("operator", "or");
+            List<Condition> subConditions = new ArrayList<>();
+
+            if (inactiveNumberOfDays > 0) {
+                logger.info("Purging: Profile with no visits since {} days", inactiveNumberOfDays);
+                Condition inactiveTimeCondition = new Condition(profilePropertyConditionType);
+                inactiveTimeCondition.setParameter("propertyName", "properties.lastVisit");
+                inactiveTimeCondition.setParameter("comparisonOperator", "lessThanOrEqualTo");
+                inactiveTimeCondition.setParameter("propertyValueDateExpr", "now-" + inactiveNumberOfDays + "d");
+                subConditions.add(inactiveTimeCondition);
+            }
+
+            if (existsNumberOfDays > 0) {
+                Condition existTimeCondition = new Condition(profilePropertyConditionType);
+                logger.info("Purging: Profile created since more than {} days", existsNumberOfDays);
+                existTimeCondition.setParameter("propertyName", "properties.firstVisit");
+                existTimeCondition.setParameter("comparisonOperator", "lessThanOrEqualTo");
+                existTimeCondition.setParameter("propertyValueDateExpr", "now-" + existsNumberOfDays + "d");
+                subConditions.add(existTimeCondition);
+            }
+
+            purgeProfileQuery.setParameter("subConditions", subConditions);
+            persistenceService.removeByQuery(purgeProfileQuery, Profile.class);
+        }
+    }
+
+    @Override
+    public void purgeMonthlyItems(int existsNumberOfMonths) {
+        if (existsNumberOfMonths > 0) {
+            logger.info("Purging: Monthly items (sessions/events) created before {} months", existsNumberOfMonths);
+            persistenceService.purge(getMonth(-existsNumberOfMonths).getTime());
+        }
+    }
+
     private void initializePurge() {
-        logger.info("Profile purge: Initializing");
+        logger.info("Purge: Initializing");
 
         if (purgeProfileInactiveTime > 0 || purgeProfileExistTime > 0 || purgeSessionsAndEventsTime > 0) {
             if (purgeProfileInactiveTime > 0) {
-                logger.info("Profile purge: Profile with no visits since {} days, will be purged", purgeProfileInactiveTime);
+                logger.info("Purge: Profile with no visits since more than {} days, will be purged", purgeProfileInactiveTime);
             }
             if (purgeProfileExistTime > 0) {
-                logger.info("Profile purge: Profile created since {} days, will be purged", purgeProfileExistTime);
+                logger.info("Purge: Profile created since more than {} days, will be purged", purgeProfileExistTime);
+            }
+            if (purgeSessionsAndEventsTime > 0) {
+                logger.info("Purge: Monthly items (sessions/events) created since more than {} months, will be purged", purgeSessionsAndEventsTime);
             }
 
-            TimerTask task = new TimerTask() {
+            purgeTask = new TimerTask() {
                 @Override
                 public void run() {
                     try {
                         long purgeStartTime = System.currentTimeMillis();
-                        logger.debug("Profile purge: Purge triggered");
-
-                        if (purgeProfileQuery == null) {
-                            ConditionType profilePropertyConditionType = definitionsService.getConditionType("profilePropertyCondition");
-                            ConditionType booleanCondition = definitionsService.getConditionType("booleanCondition");
-                            if (profilePropertyConditionType == null || booleanCondition == null) {
-                                // definition service not yet fully instantiate
-                                return;
-                            }
+                        logger.info("Purge: triggered");
 
-                            purgeProfileQuery = new Condition(booleanCondition);
-                            purgeProfileQuery.setParameter("operator", "or");
-                            List<Condition> subConditions = new ArrayList<>();
+                        // Profile purge
+                        purgeProfiles(purgeProfileInactiveTime, purgeProfileExistTime);
 
-                            if (purgeProfileInactiveTime > 0) {
-                                Condition inactiveTimeCondition = new Condition(profilePropertyConditionType);
-                                inactiveTimeCondition.setParameter("propertyName", "properties.lastVisit");
-                                inactiveTimeCondition.setParameter("comparisonOperator", "lessThanOrEqualTo");
-                                inactiveTimeCondition.setParameter("propertyValueDateExpr", "now-" + purgeProfileInactiveTime + "d");
-                                subConditions.add(inactiveTimeCondition);
-                            }
-
-                            if (purgeProfileExistTime > 0) {
-                                Condition existTimeCondition = new Condition(profilePropertyConditionType);
-                                existTimeCondition.setParameter("propertyName", "properties.firstVisit");
-                                existTimeCondition.setParameter("comparisonOperator", "lessThanOrEqualTo");
-                                existTimeCondition.setParameter("propertyValueDateExpr", "now-" + purgeProfileExistTime + "d");
-                                subConditions.add(existTimeCondition);
-                            }
+                        // Monthly items purge
+                        purgeMonthlyItems(purgeSessionsAndEventsTime);
 
-                            purgeProfileQuery.setParameter("subConditions", subConditions);
-                        }
-
-                        persistenceService.removeByQuery(purgeProfileQuery, Profile.class);
-
-                        if (purgeSessionsAndEventsTime > 0) {
-                            logger.info("Monthly indexes purge: Session and events created before {} months, will be purged",
-                                    purgeSessionsAndEventsTime);
-                            persistenceService.purge(getMonth(-purgeSessionsAndEventsTime).getTime());
-                        }
-
-                        logger.info("Profile purge: purge executed in {} ms", System.currentTimeMillis() - purgeStartTime);
+                        logger.info("Purge: executed in {} ms", System.currentTimeMillis() - purgeStartTime);
                     } catch (Throwable t) {
-                        logger.error("Error while purging profiles", t);
+                        logger.error("Error while purging", t);
                     }
                 }
             };
-            schedulerService.getScheduleExecutorService().scheduleAtFixedRate(task, 1, purgeProfileInterval, TimeUnit.DAYS);
 
-            logger.info("Profile purge: purge scheduled with an interval of {} days", purgeProfileInterval);
+            schedulerService.getScheduleExecutorService().scheduleAtFixedRate(purgeTask, 1, purgeProfileInterval, TimeUnit.DAYS);
+
+            logger.info("Purge: purge scheduled with an interval of {} days", purgeProfileInterval);
         } else {
-            logger.info("Profile purge: No purge scheduled");
+            logger.info("Purge: No purge scheduled");
         }
     }