You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by jk...@apache.org on 2022/10/27 16:32:03 UTC

[unomi] 01/01: UNOMI-670: improve purge system

This is an automated email from the ASF dual-hosted git repository.

jkevan pushed a commit to branch purge_system
in repository https://gitbox.apache.org/repos/asf/unomi.git

commit 0cde2fa41042715100e7a5a12dcde3e89d950845
Author: Kevan <ke...@jahia.com>
AuthorDate: Thu Oct 27 18:31:44 2022 +0200

    UNOMI-670: improve purge system
---
 .../apache/unomi/api/services/ProfileService.java  |  19 +++
 .../org/apache/unomi/itests/ProfileServiceIT.java  | 141 ++++++++++++++++++-
 .../ElasticSearchPersistenceServiceImpl.java       | 155 +++++++--------------
 .../resources/OSGI-INF/blueprint/blueprint.xml     |   4 -
 .../services/impl/profiles/ProfileServiceImpl.java | 123 +++++++++-------
 5 files changed, 277 insertions(+), 165 deletions(-)

diff --git a/api/src/main/java/org/apache/unomi/api/services/ProfileService.java b/api/src/main/java/org/apache/unomi/api/services/ProfileService.java
index e51091961..86fac9f61 100644
--- a/api/src/main/java/org/apache/unomi/api/services/ProfileService.java
+++ b/api/src/main/java/org/apache/unomi/api/services/ProfileService.java
@@ -400,4 +400,23 @@ public interface ProfileService {
      * in specific scenarios such as integration tests.
      */
     void refresh();
+
+    /**
+     * Purge (delete) profiles
+     * example: Purge profile inactive since 10 days only:
+     * purgeProfiles(10, 0);
+     *
+     * example: Purge profile created since 30 days only:
+     * purgeProfiles(0, 30);
+     *
+     * @param inactiveNumberOfDays will purge profiles with no visits since this number of days (0 or negative value, will have no effect)
+     * @param existsNumberOfDays will purge profiles created since this number of days (0 or negative value, will have no effect)
+     */
+    void purgeProfiles(int inactiveNumberOfDays, int existsNumberOfDays);
+
+    /**
+     * Purge (delete) monthly indices by removing old indices
+     * @param existsNumberOfMonths used to remove monthly indices older than this number of months
+     */
+    void purgeMonthlyItems(int existsNumberOfMonths);
 }
diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
index c85a83a59..30bf8dcf1 100644
--- a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
@@ -36,10 +36,9 @@ import org.slf4j.LoggerFactory;
 
 import javax.inject.Inject;
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.UUID;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.*;
 import java.util.stream.IntStream;
 
 import static org.junit.Assert.assertEquals;
@@ -65,7 +64,7 @@ public class ProfileServiceIT extends BaseIT {
 
     @After
     public void tearDown() throws InterruptedException {
-        removeItems(Profile.class, ProfileAlias.class);
+        removeItems(Profile.class, ProfileAlias.class, Event.class, Session.class);
     }
 
     @Test
@@ -266,4 +265,136 @@ public class ProfileServiceIT extends BaseIT {
             // do nothing, it's expected
         }
     }
+
+    @Test
+    public void testProfilePurge() throws Exception {
+        Date currentDate = new Date();
+        LocalDateTime minus10Days = LocalDateTime.ofInstant(currentDate.toInstant(), ZoneId.systemDefault()).minusDays(10);
+        LocalDateTime minus30Days = LocalDateTime.ofInstant(currentDate.toInstant(), ZoneId.systemDefault()).minusDays(30);
+        Date currentDateMinus10Days = Date.from(minus10Days.atZone(ZoneId.systemDefault()).toInstant());
+        Date currentDateMinus30Days = Date.from(minus30Days.atZone(ZoneId.systemDefault()).toInstant());
+
+        long originalProfilesCount  = persistenceService.getAllItemsCount(Profile.ITEM_TYPE);
+
+        // create inactive profiles since 10 days
+        for (int i = 0; i < 150; i++) {
+            Profile profile = new Profile("inactive-profile-to-be-purge-" + i);
+            profile.setProperty("lastVisit", currentDateMinus10Days);
+            profile.setProperty("firstVisit", currentDateMinus10Days);
+            persistenceService.save(profile);
+        }
+
+        // create active profiles created 30 days ago
+        for (int i = 0; i < 150; i++) {
+            Profile profile = new Profile("old-profile-to-be-purge-" + i);
+            profile.setProperty("lastVisit", currentDate);
+            profile.setProperty("firstVisit", currentDateMinus30Days);
+            persistenceService.save(profile);
+        }
+
+        // create active and recent profile
+        for (int i = 0; i < 150; i++) {
+            Profile profile = new Profile("active-profile" + i);
+            profile.setProperty("lastVisit", currentDate);
+            profile.setProperty("firstVisit", currentDate);
+            persistenceService.save(profile);
+        }
+
+        keepTrying("Failed waiting for all profiles to be available", () -> profileService.getAllProfilesCount(),
+                (count) -> count == (450 + originalProfilesCount), 1000, 100);
+
+        // Try purge with 0 params: should have no effects
+        profileService.purgeProfiles(0, 0);
+        keepTrying("We should still have 450 profiles", () -> profileService.getAllProfilesCount(),
+                (count) -> count == (450 + originalProfilesCount), 1000, 100);
+
+        // Try purge inactive profiles since 20 days, should have no effects there is no such profiles
+        profileService.purgeProfiles(20, 0);
+        keepTrying("We should still have 450 profiles", () -> profileService.getAllProfilesCount(),
+                (count) -> count == (450 + originalProfilesCount), 1000, 100);
+
+        // Try purge inactive profiles since 20 days and/or older than 40 days, should have no effects there is no such profiles
+        profileService.purgeProfiles(20, 40);
+        keepTrying("We should still have 450 profiles", () -> profileService.getAllProfilesCount(),
+                (count) -> count == (450 + originalProfilesCount), 1000, 100);
+
+        // Try purge inactive profiles since 5 days
+        profileService.purgeProfiles(5, 0);
+        keepTrying("Inactive profiles should be purge so we should have 300 profiles now", () -> profileService.getAllProfilesCount(),
+                (count) -> count == (300 + originalProfilesCount), 1000, 100);
+
+        // Try purge inactive profiles since 5 days and/or older than 25 days
+        profileService.purgeProfiles(5, 25);
+        keepTrying("Older profiles should be purge so we should have 150 profiles now", () -> profileService.getAllProfilesCount(),
+                (count) -> count == (150 + originalProfilesCount), 1000, 100);
+    }
+
+    @Test
+    public void testMonthlyIndicesPurge() throws Exception {
+        Date currentDate = new Date();
+        LocalDateTime minus10Months = LocalDateTime.ofInstant(currentDate.toInstant(), ZoneId.systemDefault()).minusMonths(10);
+        LocalDateTime minus30Months = LocalDateTime.ofInstant(currentDate.toInstant(), ZoneId.systemDefault()).minusMonths(30);
+        Date currentDateMinus10Months = Date.from(minus10Months.atZone(ZoneId.systemDefault()).toInstant());
+        Date currentDateMinus30Months = Date.from(minus30Months.atZone(ZoneId.systemDefault()).toInstant());
+
+        long originalSessionsCount  = persistenceService.getAllItemsCount(Session.ITEM_TYPE);
+        long originalEventsCount  = persistenceService.getAllItemsCount(Event.ITEM_TYPE);
+
+        Profile profile = new Profile("dummy-profile-monthly-purge-test");
+        persistenceService.save(profile);
+
+        // create 10 months old items
+        for (int i = 0; i < 150; i++) {
+            Session session = new Session("10months-old-session-" + i, profile, currentDateMinus10Months, "dummy-scope");
+            persistenceService.save(session);
+            persistenceService.save(new Event("10months-old-event-" + i, "view", session, profile, "dummy-scope", null, null, currentDateMinus10Months));
+        }
+
+        // create 30 months old items
+        for (int i = 0; i < 150; i++) {
+            Session session = new Session("30months-old-session-" + i, profile, currentDateMinus30Months, "dummy-scope");
+            persistenceService.save(session);
+            persistenceService.save(new Event("30months-old-event-" + i, "view", session, profile, "dummy-scope", null, null, currentDateMinus30Months));
+        }
+
+        // create 30 months old items
+        for (int i = 0; i < 150; i++) {
+            Session session = new Session("recent-session-" + i, profile, currentDate, "dummy-scope");
+            persistenceService.save(session);
+            persistenceService.save(new Event("recent-event-" + i, "view", session, profile, "dummy-scope", null, null, currentDate));
+        }
+
+        keepTrying("Sessions number should be 450", () -> persistenceService.getAllItemsCount(Session.ITEM_TYPE),
+                (count) -> count == (450 + originalSessionsCount), 1000, 100);
+        keepTrying("Events number should be 450", () -> persistenceService.getAllItemsCount(Event.ITEM_TYPE),
+                (count) -> count == (450 + originalEventsCount), 1000, 100);
+
+        // Should have no effect
+        profileService.purgeMonthlyItems(0);
+        keepTrying("Sessions number should be 450", () -> persistenceService.getAllItemsCount(Session.ITEM_TYPE),
+                (count) -> count == (450 + originalSessionsCount), 1000, 100);
+        keepTrying("Events number should be 450", () -> persistenceService.getAllItemsCount(Event.ITEM_TYPE),
+                (count) -> count == (450 + originalEventsCount), 1000, 100);
+
+        // Should have no effect there is no monthly items older than 40 months
+        profileService.purgeMonthlyItems(40);
+        keepTrying("Sessions number should be 450", () -> persistenceService.getAllItemsCount(Session.ITEM_TYPE),
+                (count) -> count == (450 + originalSessionsCount), 1000, 100);
+        keepTrying("Events number should be 450", () -> persistenceService.getAllItemsCount(Event.ITEM_TYPE),
+                (count) -> count == (450 + originalEventsCount), 1000, 100);
+
+        // Should purge monthly items older than 25 days
+        profileService.purgeMonthlyItems(25);
+        keepTrying("Sessions number should be 300", () -> persistenceService.getAllItemsCount(Session.ITEM_TYPE),
+                (count) -> count == (300 + originalSessionsCount), 1000, 100);
+        keepTrying("Events number should be 300", () -> persistenceService.getAllItemsCount(Event.ITEM_TYPE),
+                (count) -> count == (300 + originalEventsCount), 1000, 100);
+
+        // Should purge monthly items older than 5 days
+        profileService.purgeMonthlyItems(5);
+        keepTrying("Sessions number should be 150", () -> persistenceService.getAllItemsCount(Session.ITEM_TYPE),
+                (count) -> count == (150 + originalSessionsCount), 1000, 100);
+        keepTrying("Events number should be 150", () -> persistenceService.getAllItemsCount(Event.ITEM_TYPE),
+                (count) -> count == (150 + originalEventsCount), 1000, 100);
+    }
 }
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 27bdd35b3..63150feec 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -19,7 +19,6 @@ package org.apache.unomi.persistence.elasticsearch;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.hazelcast.core.HazelcastInstance;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.http.HttpHost;
 import org.apache.http.auth.AuthScope;
@@ -49,6 +48,7 @@ import org.apache.unomi.persistence.spi.aggregate.IpRangeAggregate;
 import org.apache.unomi.persistence.spi.aggregate.NumericRangeAggregate;
 import org.apache.unomi.persistence.spi.aggregate.TermsAggregate;
 import org.elasticsearch.ElasticsearchStatusException;
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptRequest;
@@ -103,8 +103,7 @@ import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.query.*;
-import org.elasticsearch.index.reindex.BulkByScrollResponse;
-import org.elasticsearch.index.reindex.UpdateByQueryRequest;
+import org.elasticsearch.index.reindex.*;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.script.Script;
 import org.elasticsearch.script.ScriptException;
@@ -161,7 +160,6 @@ import java.util.Collections;
 import java.util.Date;
 import java.util.Enumeration;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -215,6 +213,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
     private Map<String, String> routingByType;
 
     private Integer defaultQueryLimit = 10;
+    private Integer removeByQueryTimeoutInMinutes = 10;
 
     private String itemsMonthlyIndexedOverride = "event,session";
     private String bulkProcessorConcurrentRequests = "1";
@@ -235,9 +234,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
     private int aggregateQueryBucketSize = 5000;
 
     private MetricsService metricsService;
-    private HazelcastInstance hazelcastInstance;
-    private Set<String> itemClassesToCacheSet = new HashSet<>();
-    private String itemClassesToCache;
     private boolean useBatchingForSave = false;
     private boolean useBatchingForUpdate = true;
     private String logLevelRestClient = "ERROR";
@@ -382,23 +378,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
         this.metricsService = metricsService;
     }
 
-    public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
-        this.hazelcastInstance = hazelcastInstance;
-    }
-
-    public void setItemClassesToCache(String itemClassesToCache) {
-        this.itemClassesToCache = itemClassesToCache;
-        if (StringUtils.isNotBlank(itemClassesToCache)) {
-            String[] itemClassesToCacheParts = itemClassesToCache.split(",");
-            if (itemClassesToCacheParts != null) {
-                itemClassesToCacheSet.clear();
-                for (String itemClassToCache : itemClassesToCacheParts) {
-                    itemClassesToCacheSet.add(itemClassToCache.trim());
-                }
-            }
-        }
-    }
-
     public void setUseBatchingForSave(boolean useBatchingForSave) {
         this.useBatchingForSave = useBatchingForSave;
     }
@@ -789,16 +768,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                 try {
                     String itemType = Item.getItemType(clazz);
                     String className = clazz.getName();
-                    if (customItemType == null) {
-                        T itemFromCache = getFromCache(itemId, clazz.getName());
-                        if (itemFromCache != null) {
-                            return itemFromCache;
-                        }
-                    } else {
-                        T itemFromCache = getFromCache(itemId, CustomItem.class.getName() + "." + customItemType);
-                        if (itemFromCache != null) {
-                            return itemFromCache;
-                        }
+                    if (customItemType != null) {
                         className = CustomItem.class.getName() + "." + customItemType;
                         itemType = customItemType;
                     }
@@ -828,7 +798,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                             String sourceAsString = response.getSourceAsString();
                             final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
                             setMetadata(value, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm());
-                            putInCache(itemId, value, className);
                             return value;
                         } else {
                             return null;
@@ -889,7 +858,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                         className = CustomItem.class.getName() + "." + itemType;
                     }
                     String itemId = item.getItemId();
-                    putInCache(itemId, item, className);
                     String index = getIndex(itemType, itemsMonthlyIndexed.contains(itemType) ? ((TimestampedItem) item).getTimeStamp() : null);
                     IndexRequest indexRequest = new IndexRequest(index);
                     indexRequest.id(itemId);
@@ -1215,50 +1183,59 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
             protected Boolean execute(Object... args) throws Exception {
                 try {
                     String itemType = Item.getItemType(clazz);
+                    final DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(getIndexNameForQuery(itemType))
+                            .setQuery(conditionESQueryBuilderDispatcher.getQueryBuilder(query))
+                            // Setting slices to auto will let Elasticsearch choose the number of slices to use.
+                            // This setting will use one slice per shard, up to a certain limit.
+                            // The delete request will be more efficient and faster than no slicing.
+                            .setSlices(AbstractBulkByScrollRequest.AUTO_SLICES)
+                            // Elasticsearch takes a snapshot of the index when you hit delete by query request and uses the _version of the documents to process the request.
+                            // If a document gets updated in the meantime, it will result in a version conflict error and the delete operation will fail.
+                            // So we explicitly set the conflict strategy to proceed in case of version conflict.
+                            .setAbortOnVersionConflict(false)
+                            // Remove by Query is mostly used for purge and cleaning up old data
+                            // It's mostly used in jobs/timed tasks so we don't really care about long request
+                            // So we increase default timeout of 1min to 10min
+                            .setTimeout(TimeValue.timeValueMinutes(removeByQueryTimeoutInMinutes));
+
+                    BulkByScrollResponse bulkByScrollResponse = client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
+
+                    if (bulkByScrollResponse == null) {
+                        logger.error("Remove by query: no response returned for query: {}", query);
+                        return false;
+                    }
 
-                    BulkRequest deleteByScopeBulkRequest = new BulkRequest();
-
-                    final TimeValue keepAlive = TimeValue.timeValueHours(1);
-                    SearchRequest searchRequest = new SearchRequest(getAllIndexForQuery())
-                            .indices(getIndexNameForQuery(itemType))
-                            .scroll(keepAlive);
-                    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
-                            .query(conditionESQueryBuilderDispatcher.getQueryBuilder(query))
-                            .size(100);
-                    searchRequest.source(searchSourceBuilder);
-
-                    SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
+                    if (bulkByScrollResponse.isTimedOut()) {
+                        logger.warn("Remove by query: timed out because took more than {} minutes for query: {}", removeByQueryTimeoutInMinutes, query);
+                    }
 
-                    // Scroll until no more hits are returned
-                    while (true) {
+                    if ((bulkByScrollResponse.getSearchFailures() != null && bulkByScrollResponse.getSearchFailures().size() > 0) ||
+                            bulkByScrollResponse.getBulkFailures() != null && bulkByScrollResponse.getBulkFailures().size() > 0) {
+                        logger.warn("Remove by query: we found some failure during the process of query: {}", query);
 
-                        for (SearchHit hit : response.getHits().getHits()) {
-                            // add hit to bulk delete
-                            deleteFromCache(hit.getId(), clazz.getName());
-                            deleteByScopeBulkRequest.add(Requests.deleteRequest(hit.getIndex()).type(hit.getType()).id(hit.getId()));
+                        if (bulkByScrollResponse.getSearchFailures() != null && bulkByScrollResponse.getSearchFailures().size() > 0) {
+                            for (ScrollableHitSource.SearchFailure searchFailure : bulkByScrollResponse.getSearchFailures()) {
+                                logger.warn("Remove by query, search failure: {}", searchFailure.toString());
+                            }
                         }
 
-                        SearchScrollRequest searchScrollRequest = new SearchScrollRequest(response.getScrollId());
-                        searchScrollRequest.scroll(keepAlive);
-                        response = client.scroll(searchScrollRequest, RequestOptions.DEFAULT);
-
-                        // If we have no more hits, exit
-                        if (response.getHits().getHits().length == 0) {
-                            break;
+                        if (bulkByScrollResponse.getBulkFailures() != null && bulkByScrollResponse.getBulkFailures().size() > 0) {
+                            for (BulkItemResponse.Failure bulkFailure : bulkByScrollResponse.getBulkFailures()) {
+                                logger.warn("Remove by query, bulk failure: {}", bulkFailure.toString());
+                            }
                         }
                     }
 
-                    ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
-                    clearScrollRequest.addScrollId(response.getScrollId());
-                    client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
-
-                    // we're done with the scrolling, delete now
-                    if (deleteByScopeBulkRequest.numberOfActions() > 0) {
-                        final BulkResponse deleteResponse = client.bulk(deleteByScopeBulkRequest, RequestOptions.DEFAULT);
-                        if (deleteResponse.hasFailures()) {
-                            // do something
-                            logger.warn("Couldn't remove by query " + query + ":\n{}", deleteResponse.buildFailureMessage());
-                        }
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("Remove by query: took {}, deleted docs: {}, batches executed: {}, skipped docs: {}, version conflicts: {}, search retries: {}, bulk retries: {}, for query: {}",
+                                bulkByScrollResponse.getTook().toHumanReadableString(1),
+                                bulkByScrollResponse.getDeleted(),
+                                bulkByScrollResponse.getBatches(),
+                                bulkByScrollResponse.getNoops(),
+                                bulkByScrollResponse.getVersionConflicts(),
+                                bulkByScrollResponse.getSearchRetries(),
+                                bulkByScrollResponse.getBulkRetries(),
+                                query);
                     }
 
                     return true;
@@ -2503,40 +2480,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
         }
     }
 
-    private <T extends Item> boolean isCacheActiveForClass(String className) {
-        if (itemClassesToCacheSet.contains("*")) {
-            return true;
-        }
-        if (itemClassesToCacheSet.contains(className)) {
-            return true;
-        }
-        return false;
-    }
-
-    private <T extends Item> T getFromCache(String itemId, String className) {
-        if (!isCacheActiveForClass(className)) {
-            return null;
-        }
-        Map<String, T> itemCache = hazelcastInstance.getMap(className);
-        return itemCache.get(itemId);
-    }
-
-    private <T extends Item> T putInCache(String itemId, T item, String className) {
-        if (!isCacheActiveForClass(className)) {
-            return null;
-        }
-        Map<String, T> itemCache = hazelcastInstance.getMap(className);
-        return itemCache.put(itemId, item);
-    }
-
-    private <T extends Item> T deleteFromCache(String itemId, String className) {
-        if (!isCacheActiveForClass(className)) {
-            return null;
-        }
-        Map<String, T> itemCache = hazelcastInstance.getMap(className);
-        return itemCache.remove(itemId);
-    }
-
     private String getAllIndexForQuery() {
         return indexPrefix + "*";
     }
diff --git a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 44e0411e6..e0fdded38 100644
--- a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -58,7 +58,6 @@
             <cm:property name="aggQueryMaxResponseSizeHttp" value="" />
             <cm:property name="aggQueryThrowOnMissingDocs" value="false" />
             <cm:property name="itemTypeToRefreshPolicy" value="" />
-            <cm:property name="itemClassesToCache" value="" />
             <cm:property name="useBatchingForSave" value="false" />
             <cm:property name="useBatchingForUpdate" value="true" />
 
@@ -74,7 +73,6 @@
     </cm:property-placeholder>
 
     <reference id="metricsService" interface="org.apache.unomi.metrics.MetricsService" />
-    <reference id="hazelcastInstance" interface="com.hazelcast.core.HazelcastInstance" />
     <reference id="scriptExecutor" interface="org.apache.unomi.scripting.ScriptExecutor" />
 
     <service id="elasticSearchPersistenceService" ref="elasticSearchPersistenceServiceImpl">
@@ -137,8 +135,6 @@
         <property name="clientSocketTimeout" value="${es.clientSocketTimeout}" />
 
         <property name="metricsService" ref="metricsService" />
-        <property name="hazelcastInstance" ref="hazelcastInstance" />
-        <property name="itemClassesToCache" value="${es.itemClassesToCache}" />
         <property name="useBatchingForSave" value="${es.useBatchingForSave}" />
         <property name="useBatchingForUpdate" value="${es.useBatchingForUpdate}" />
 
diff --git a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
index a11189ae9..a259c1491 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
@@ -198,14 +198,15 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList
 
     private SegmentService segmentService;
 
-    private Condition purgeProfileQuery;
     private Integer purgeProfileExistTime = 0;
     private Integer purgeProfileInactiveTime = 0;
     private Integer purgeSessionsAndEventsTime = 0;
     private Integer purgeProfileInterval = 0;
+    private TimerTask purgeTask = null;
     private long propertiesRefreshInterval = 10000;
 
     private PropertyTypes propertyTypes;
+    private TimerTask propertyTypeLoadTask = null;
 
     private boolean forceRefreshOnSave = false;
 
@@ -258,6 +259,12 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList
     }
 
     public void preDestroy() {
+        if (purgeTask != null) {
+            purgeTask.cancel();
+        }
+        if (propertyTypeLoadTask != null) {
+            propertyTypeLoadTask.cancel();
+        }
         bundleContext.removeBundleListener(this);
         logger.info("Profile service shutdown.");
     }
@@ -290,13 +297,13 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList
     }
 
     private void schedulePropertyTypeLoad() {
-        TimerTask task = new TimerTask() {
+        propertyTypeLoadTask = new TimerTask() {
             @Override
             public void run() {
                 reloadPropertyTypes(false);
             }
         };
-        schedulerService.getScheduleExecutorService().scheduleAtFixedRate(task, 10000, propertiesRefreshInterval, TimeUnit.MILLISECONDS);
+        schedulerService.getScheduleExecutorService().scheduleAtFixedRate(propertyTypeLoadTask, 10000, propertiesRefreshInterval, TimeUnit.MILLISECONDS);
         logger.info("Scheduled task for property type loading each 10s");
     }
 
@@ -319,74 +326,90 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList
         }
     }
 
+    @Override
+    public void purgeProfiles(int inactiveNumberOfDays, int existsNumberOfDays) {
+        if (inactiveNumberOfDays > 0 || existsNumberOfDays > 0) {
+            ConditionType profilePropertyConditionType = definitionsService.getConditionType("profilePropertyCondition");
+            ConditionType booleanCondition = definitionsService.getConditionType("booleanCondition");
+            if (profilePropertyConditionType == null || booleanCondition == null) {
+                // definition service not yet fully instantiate
+                return;
+            }
+
+            Condition purgeProfileQuery = new Condition(booleanCondition);
+            purgeProfileQuery.setParameter("operator", "or");
+            List<Condition> subConditions = new ArrayList<>();
+
+            if (inactiveNumberOfDays > 0) {
+                logger.info("Purging: Profile with no visits since {} days", inactiveNumberOfDays);
+                Condition inactiveTimeCondition = new Condition(profilePropertyConditionType);
+                inactiveTimeCondition.setParameter("propertyName", "properties.lastVisit");
+                inactiveTimeCondition.setParameter("comparisonOperator", "lessThanOrEqualTo");
+                inactiveTimeCondition.setParameter("propertyValueDateExpr", "now-" + inactiveNumberOfDays + "d");
+                subConditions.add(inactiveTimeCondition);
+            }
+
+            if (existsNumberOfDays > 0) {
+                Condition existTimeCondition = new Condition(profilePropertyConditionType);
+                logger.info("Purging: Profile created since more than {} days", existsNumberOfDays);
+                existTimeCondition.setParameter("propertyName", "properties.firstVisit");
+                existTimeCondition.setParameter("comparisonOperator", "lessThanOrEqualTo");
+                existTimeCondition.setParameter("propertyValueDateExpr", "now-" + existsNumberOfDays + "d");
+                subConditions.add(existTimeCondition);
+            }
+
+            purgeProfileQuery.setParameter("subConditions", subConditions);
+            persistenceService.removeByQuery(purgeProfileQuery, Profile.class);
+        }
+    }
+
+    @Override
+    public void purgeMonthlyItems(int existsNumberOfMonths) {
+        if (existsNumberOfMonths > 0) {
+            logger.info("Purging: Monthly items (sessions/events) created before {} months", existsNumberOfMonths);
+            persistenceService.purge(getMonth(-existsNumberOfMonths).getTime());
+        }
+    }
+
     private void initializePurge() {
-        logger.info("Profile purge: Initializing");
+        logger.info("Purge: Initializing");
 
         if (purgeProfileInactiveTime > 0 || purgeProfileExistTime > 0 || purgeSessionsAndEventsTime > 0) {
             if (purgeProfileInactiveTime > 0) {
-                logger.info("Profile purge: Profile with no visits since {} days, will be purged", purgeProfileInactiveTime);
+                logger.info("Purge: Profile with no visits since more than {} days, will be purged", purgeProfileInactiveTime);
             }
             if (purgeProfileExistTime > 0) {
-                logger.info("Profile purge: Profile created since {} days, will be purged", purgeProfileExistTime);
+                logger.info("Purge: Profile created since more than {} days, will be purged", purgeProfileExistTime);
+            }
+            if (purgeSessionsAndEventsTime > 0) {
+                logger.info("Purge: Monthly items (sessions/events) created since more than {} months, will be purged", purgeSessionsAndEventsTime);
             }
 
-            TimerTask task = new TimerTask() {
+            purgeTask = new TimerTask() {
                 @Override
                 public void run() {
                     try {
                         long purgeStartTime = System.currentTimeMillis();
-                        logger.debug("Profile purge: Purge triggered");
-
-                        if (purgeProfileQuery == null) {
-                            ConditionType profilePropertyConditionType = definitionsService.getConditionType("profilePropertyCondition");
-                            ConditionType booleanCondition = definitionsService.getConditionType("booleanCondition");
-                            if (profilePropertyConditionType == null || booleanCondition == null) {
-                                // definition service not yet fully instantiate
-                                return;
-                            }
+                        logger.info("Purge: triggered");
 
-                            purgeProfileQuery = new Condition(booleanCondition);
-                            purgeProfileQuery.setParameter("operator", "or");
-                            List<Condition> subConditions = new ArrayList<>();
+                        // Profile purge
+                        purgeProfiles(purgeProfileInactiveTime, purgeProfileExistTime);
 
-                            if (purgeProfileInactiveTime > 0) {
-                                Condition inactiveTimeCondition = new Condition(profilePropertyConditionType);
-                                inactiveTimeCondition.setParameter("propertyName", "properties.lastVisit");
-                                inactiveTimeCondition.setParameter("comparisonOperator", "lessThanOrEqualTo");
-                                inactiveTimeCondition.setParameter("propertyValueDateExpr", "now-" + purgeProfileInactiveTime + "d");
-                                subConditions.add(inactiveTimeCondition);
-                            }
-
-                            if (purgeProfileExistTime > 0) {
-                                Condition existTimeCondition = new Condition(profilePropertyConditionType);
-                                existTimeCondition.setParameter("propertyName", "properties.firstVisit");
-                                existTimeCondition.setParameter("comparisonOperator", "lessThanOrEqualTo");
-                                existTimeCondition.setParameter("propertyValueDateExpr", "now-" + purgeProfileExistTime + "d");
-                                subConditions.add(existTimeCondition);
-                            }
+                        // Monthly items purge
+                        purgeMonthlyItems(purgeSessionsAndEventsTime);
 
-                            purgeProfileQuery.setParameter("subConditions", subConditions);
-                        }
-
-                        persistenceService.removeByQuery(purgeProfileQuery, Profile.class);
-
-                        if (purgeSessionsAndEventsTime > 0) {
-                            logger.info("Monthly indexes purge: Session and events created before {} months, will be purged",
-                                    purgeSessionsAndEventsTime);
-                            persistenceService.purge(getMonth(-purgeSessionsAndEventsTime).getTime());
-                        }
-
-                        logger.info("Profile purge: purge executed in {} ms", System.currentTimeMillis() - purgeStartTime);
+                        logger.info("Purge: executed in {} ms", System.currentTimeMillis() - purgeStartTime);
                     } catch (Throwable t) {
-                        logger.error("Error while purging profiles", t);
+                        logger.error("Error while purging", t);
                     }
                 }
             };
-            schedulerService.getScheduleExecutorService().scheduleAtFixedRate(task, 1, purgeProfileInterval, TimeUnit.DAYS);
 
-            logger.info("Profile purge: purge scheduled with an interval of {} days", purgeProfileInterval);
+            schedulerService.getScheduleExecutorService().scheduleAtFixedRate(purgeTask, 1, purgeProfileInterval, TimeUnit.DAYS);
+
+            logger.info("Purge: purge scheduled with an interval of {} days", purgeProfileInterval);
         } else {
-            logger.info("Profile purge: No purge scheduled");
+            logger.info("Purge: No purge scheduled");
         }
     }