You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by jk...@apache.org on 2022/10/27 16:32:03 UTC
[unomi] 01/01: UNOMI-670: improve purge system
This is an automated email from the ASF dual-hosted git repository.
jkevan pushed a commit to branch purge_system
in repository https://gitbox.apache.org/repos/asf/unomi.git
commit 0cde2fa41042715100e7a5a12dcde3e89d950845
Author: Kevan <ke...@jahia.com>
AuthorDate: Thu Oct 27 18:31:44 2022 +0200
UNOMI-670: improve purge system
---
.../apache/unomi/api/services/ProfileService.java | 19 +++
.../org/apache/unomi/itests/ProfileServiceIT.java | 141 ++++++++++++++++++-
.../ElasticSearchPersistenceServiceImpl.java | 155 +++++++--------------
.../resources/OSGI-INF/blueprint/blueprint.xml | 4 -
.../services/impl/profiles/ProfileServiceImpl.java | 123 +++++++++-------
5 files changed, 277 insertions(+), 165 deletions(-)
diff --git a/api/src/main/java/org/apache/unomi/api/services/ProfileService.java b/api/src/main/java/org/apache/unomi/api/services/ProfileService.java
index e51091961..86fac9f61 100644
--- a/api/src/main/java/org/apache/unomi/api/services/ProfileService.java
+++ b/api/src/main/java/org/apache/unomi/api/services/ProfileService.java
@@ -400,4 +400,23 @@ public interface ProfileService {
* in specific scenarios such as integration tests.
*/
void refresh();
+
+ /**
+ * Purge (delete) profiles
+ * example: Purge profile inactive since 10 days only:
+ * purgeProfiles(10, 0);
+ *
+ * example: Purge profile created since 30 days only:
+ * purgeProfiles(0, 30);
+ *
+ * @param inactiveNumberOfDays will purge profiles with no visits since this number of days (0 or negative value, will have no effect)
+ * @param existsNumberOfDays will purge profiles created since this number of days (0 or negative value, will have no effect)
+ */
+ void purgeProfiles(int inactiveNumberOfDays, int existsNumberOfDays);
+
+ /**
+ * Purge (delete) monthly indices by removing old indices
+ * @param existsNumberOfMonths used to remove monthly indices older than this number of months
+ */
+ void purgeMonthlyItems(int existsNumberOfMonths);
}
diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
index c85a83a59..30bf8dcf1 100644
--- a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
@@ -36,10 +36,9 @@ import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.UUID;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.*;
import java.util.stream.IntStream;
import static org.junit.Assert.assertEquals;
@@ -65,7 +64,7 @@ public class ProfileServiceIT extends BaseIT {
@After
public void tearDown() throws InterruptedException {
- removeItems(Profile.class, ProfileAlias.class);
+ removeItems(Profile.class, ProfileAlias.class, Event.class, Session.class);
}
@Test
@@ -266,4 +265,136 @@ public class ProfileServiceIT extends BaseIT {
// do nothing, it's expected
}
}
+
+ @Test
+ public void testProfilePurge() throws Exception {
+ Date currentDate = new Date();
+ LocalDateTime minus10Days = LocalDateTime.ofInstant(currentDate.toInstant(), ZoneId.systemDefault()).minusDays(10);
+ LocalDateTime minus30Days = LocalDateTime.ofInstant(currentDate.toInstant(), ZoneId.systemDefault()).minusDays(30);
+ Date currentDateMinus10Days = Date.from(minus10Days.atZone(ZoneId.systemDefault()).toInstant());
+ Date currentDateMinus30Days = Date.from(minus30Days.atZone(ZoneId.systemDefault()).toInstant());
+
+ long originalProfilesCount = persistenceService.getAllItemsCount(Profile.ITEM_TYPE);
+
+ // create inactive profiles since 10 days
+ for (int i = 0; i < 150; i++) {
+ Profile profile = new Profile("inactive-profile-to-be-purge-" + i);
+ profile.setProperty("lastVisit", currentDateMinus10Days);
+ profile.setProperty("firstVisit", currentDateMinus10Days);
+ persistenceService.save(profile);
+ }
+
+ // create active profiles created 30 days ago
+ for (int i = 0; i < 150; i++) {
+ Profile profile = new Profile("old-profile-to-be-purge-" + i);
+ profile.setProperty("lastVisit", currentDate);
+ profile.setProperty("firstVisit", currentDateMinus30Days);
+ persistenceService.save(profile);
+ }
+
+ // create active and recent profile
+ for (int i = 0; i < 150; i++) {
+ Profile profile = new Profile("active-profile" + i);
+ profile.setProperty("lastVisit", currentDate);
+ profile.setProperty("firstVisit", currentDate);
+ persistenceService.save(profile);
+ }
+
+ keepTrying("Failed waiting for all profiles to be available", () -> profileService.getAllProfilesCount(),
+ (count) -> count == (450 + originalProfilesCount), 1000, 100);
+
+ // Try purge with 0 params: should have no effects
+ profileService.purgeProfiles(0, 0);
+ keepTrying("We should still have 450 profiles", () -> profileService.getAllProfilesCount(),
+ (count) -> count == (450 + originalProfilesCount), 1000, 100);
+
+ // Try purge inactive profiles since 20 days, should have no effects there is no such profiles
+ profileService.purgeProfiles(20, 0);
+ keepTrying("We should still have 450 profiles", () -> profileService.getAllProfilesCount(),
+ (count) -> count == (450 + originalProfilesCount), 1000, 100);
+
+ // Try purge inactive profiles since 20 days and/or older than 40 days, should have no effects there is no such profiles
+ profileService.purgeProfiles(20, 40);
+ keepTrying("We should still have 450 profiles", () -> profileService.getAllProfilesCount(),
+ (count) -> count == (450 + originalProfilesCount), 1000, 100);
+
+ // Try purge inactive profiles since 5 days
+ profileService.purgeProfiles(5, 0);
+ keepTrying("Inactive profiles should be purge so we should have 300 profiles now", () -> profileService.getAllProfilesCount(),
+ (count) -> count == (300 + originalProfilesCount), 1000, 100);
+
+ // Try purge inactive profiles since 5 days and/or older than 25 days
+ profileService.purgeProfiles(5, 25);
+ keepTrying("Older profiles should be purge so we should have 150 profiles now", () -> profileService.getAllProfilesCount(),
+ (count) -> count == (150 + originalProfilesCount), 1000, 100);
+ }
+
+ @Test
+ public void testMonthlyIndicesPurge() throws Exception {
+ Date currentDate = new Date();
+ LocalDateTime minus10Months = LocalDateTime.ofInstant(currentDate.toInstant(), ZoneId.systemDefault()).minusMonths(10);
+ LocalDateTime minus30Months = LocalDateTime.ofInstant(currentDate.toInstant(), ZoneId.systemDefault()).minusMonths(30);
+ Date currentDateMinus10Months = Date.from(minus10Months.atZone(ZoneId.systemDefault()).toInstant());
+ Date currentDateMinus30Months = Date.from(minus30Months.atZone(ZoneId.systemDefault()).toInstant());
+
+ long originalSessionsCount = persistenceService.getAllItemsCount(Session.ITEM_TYPE);
+ long originalEventsCount = persistenceService.getAllItemsCount(Event.ITEM_TYPE);
+
+ Profile profile = new Profile("dummy-profile-monthly-purge-test");
+ persistenceService.save(profile);
+
+ // create 10 months old items
+ for (int i = 0; i < 150; i++) {
+ Session session = new Session("10months-old-session-" + i, profile, currentDateMinus10Months, "dummy-scope");
+ persistenceService.save(session);
+ persistenceService.save(new Event("10months-old-event-" + i, "view", session, profile, "dummy-scope", null, null, currentDateMinus10Months));
+ }
+
+ // create 30 months old items
+ for (int i = 0; i < 150; i++) {
+ Session session = new Session("30months-old-session-" + i, profile, currentDateMinus30Months, "dummy-scope");
+ persistenceService.save(session);
+ persistenceService.save(new Event("30months-old-event-" + i, "view", session, profile, "dummy-scope", null, null, currentDateMinus30Months));
+ }
+
+ // create 30 months old items
+ for (int i = 0; i < 150; i++) {
+ Session session = new Session("recent-session-" + i, profile, currentDate, "dummy-scope");
+ persistenceService.save(session);
+ persistenceService.save(new Event("recent-event-" + i, "view", session, profile, "dummy-scope", null, null, currentDate));
+ }
+
+ keepTrying("Sessions number should be 450", () -> persistenceService.getAllItemsCount(Session.ITEM_TYPE),
+ (count) -> count == (450 + originalSessionsCount), 1000, 100);
+ keepTrying("Events number should be 450", () -> persistenceService.getAllItemsCount(Event.ITEM_TYPE),
+ (count) -> count == (450 + originalEventsCount), 1000, 100);
+
+ // Should have no effect
+ profileService.purgeMonthlyItems(0);
+ keepTrying("Sessions number should be 450", () -> persistenceService.getAllItemsCount(Session.ITEM_TYPE),
+ (count) -> count == (450 + originalSessionsCount), 1000, 100);
+ keepTrying("Events number should be 450", () -> persistenceService.getAllItemsCount(Event.ITEM_TYPE),
+ (count) -> count == (450 + originalEventsCount), 1000, 100);
+
+ // Should have no effect there is no monthly items older than 40 months
+ profileService.purgeMonthlyItems(40);
+ keepTrying("Sessions number should be 450", () -> persistenceService.getAllItemsCount(Session.ITEM_TYPE),
+ (count) -> count == (450 + originalSessionsCount), 1000, 100);
+ keepTrying("Events number should be 450", () -> persistenceService.getAllItemsCount(Event.ITEM_TYPE),
+ (count) -> count == (450 + originalEventsCount), 1000, 100);
+
+ // Should purge monthly items older than 25 days
+ profileService.purgeMonthlyItems(25);
+ keepTrying("Sessions number should be 300", () -> persistenceService.getAllItemsCount(Session.ITEM_TYPE),
+ (count) -> count == (300 + originalSessionsCount), 1000, 100);
+ keepTrying("Events number should be 300", () -> persistenceService.getAllItemsCount(Event.ITEM_TYPE),
+ (count) -> count == (300 + originalEventsCount), 1000, 100);
+
+ // Should purge monthly items older than 5 days
+ profileService.purgeMonthlyItems(5);
+ keepTrying("Sessions number should be 150", () -> persistenceService.getAllItemsCount(Session.ITEM_TYPE),
+ (count) -> count == (150 + originalSessionsCount), 1000, 100);
+ keepTrying("Events number should be 150", () -> persistenceService.getAllItemsCount(Event.ITEM_TYPE),
+ (count) -> count == (150 + originalEventsCount), 1000, 100);
+ }
}
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 27bdd35b3..63150feec 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -19,7 +19,6 @@ package org.apache.unomi.persistence.elasticsearch;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.hazelcast.core.HazelcastInstance;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
@@ -49,6 +48,7 @@ import org.apache.unomi.persistence.spi.aggregate.IpRangeAggregate;
import org.apache.unomi.persistence.spi.aggregate.NumericRangeAggregate;
import org.apache.unomi.persistence.spi.aggregate.TermsAggregate;
import org.elasticsearch.ElasticsearchStatusException;
+import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptRequest;
@@ -103,8 +103,7 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.*;
-import org.elasticsearch.index.reindex.BulkByScrollResponse;
-import org.elasticsearch.index.reindex.UpdateByQueryRequest;
+import org.elasticsearch.index.reindex.*;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptException;
@@ -161,7 +160,6 @@ import java.util.Collections;
import java.util.Date;
import java.util.Enumeration;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@@ -215,6 +213,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
private Map<String, String> routingByType;
private Integer defaultQueryLimit = 10;
+ private Integer removeByQueryTimeoutInMinutes = 10;
private String itemsMonthlyIndexedOverride = "event,session";
private String bulkProcessorConcurrentRequests = "1";
@@ -235,9 +234,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
private int aggregateQueryBucketSize = 5000;
private MetricsService metricsService;
- private HazelcastInstance hazelcastInstance;
- private Set<String> itemClassesToCacheSet = new HashSet<>();
- private String itemClassesToCache;
private boolean useBatchingForSave = false;
private boolean useBatchingForUpdate = true;
private String logLevelRestClient = "ERROR";
@@ -382,23 +378,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
this.metricsService = metricsService;
}
- public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
- this.hazelcastInstance = hazelcastInstance;
- }
-
- public void setItemClassesToCache(String itemClassesToCache) {
- this.itemClassesToCache = itemClassesToCache;
- if (StringUtils.isNotBlank(itemClassesToCache)) {
- String[] itemClassesToCacheParts = itemClassesToCache.split(",");
- if (itemClassesToCacheParts != null) {
- itemClassesToCacheSet.clear();
- for (String itemClassToCache : itemClassesToCacheParts) {
- itemClassesToCacheSet.add(itemClassToCache.trim());
- }
- }
- }
- }
-
public void setUseBatchingForSave(boolean useBatchingForSave) {
this.useBatchingForSave = useBatchingForSave;
}
@@ -789,16 +768,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
try {
String itemType = Item.getItemType(clazz);
String className = clazz.getName();
- if (customItemType == null) {
- T itemFromCache = getFromCache(itemId, clazz.getName());
- if (itemFromCache != null) {
- return itemFromCache;
- }
- } else {
- T itemFromCache = getFromCache(itemId, CustomItem.class.getName() + "." + customItemType);
- if (itemFromCache != null) {
- return itemFromCache;
- }
+ if (customItemType != null) {
className = CustomItem.class.getName() + "." + customItemType;
itemType = customItemType;
}
@@ -828,7 +798,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
String sourceAsString = response.getSourceAsString();
final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
setMetadata(value, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm());
- putInCache(itemId, value, className);
return value;
} else {
return null;
@@ -889,7 +858,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
className = CustomItem.class.getName() + "." + itemType;
}
String itemId = item.getItemId();
- putInCache(itemId, item, className);
String index = getIndex(itemType, itemsMonthlyIndexed.contains(itemType) ? ((TimestampedItem) item).getTimeStamp() : null);
IndexRequest indexRequest = new IndexRequest(index);
indexRequest.id(itemId);
@@ -1215,50 +1183,59 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
protected Boolean execute(Object... args) throws Exception {
try {
String itemType = Item.getItemType(clazz);
+ final DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(getIndexNameForQuery(itemType))
+ .setQuery(conditionESQueryBuilderDispatcher.getQueryBuilder(query))
+ // Setting slices to auto will let Elasticsearch choose the number of slices to use.
+ // This setting will use one slice per shard, up to a certain limit.
+ // The delete request will be more efficient and faster than no slicing.
+ .setSlices(AbstractBulkByScrollRequest.AUTO_SLICES)
+ // Elasticsearch takes a snapshot of the index when you hit delete by query request and uses the _version of the documents to process the request.
+ // If a document gets updated in the meantime, it will result in a version conflict error and the delete operation will fail.
+ // So we explicitly set the conflict strategy to proceed in case of version conflict.
+ .setAbortOnVersionConflict(false)
+ // Remove by Query is mostly used for purge and cleaning up old data
+ // It's mostly used in jobs/timed tasks so we don't really care about long request
+ // So we increase default timeout of 1min to 10min
+ .setTimeout(TimeValue.timeValueMinutes(removeByQueryTimeoutInMinutes));
+
+ BulkByScrollResponse bulkByScrollResponse = client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
+
+ if (bulkByScrollResponse == null) {
+ logger.error("Remove by query: no response returned for query: {}", query);
+ return false;
+ }
- BulkRequest deleteByScopeBulkRequest = new BulkRequest();
-
- final TimeValue keepAlive = TimeValue.timeValueHours(1);
- SearchRequest searchRequest = new SearchRequest(getAllIndexForQuery())
- .indices(getIndexNameForQuery(itemType))
- .scroll(keepAlive);
- SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
- .query(conditionESQueryBuilderDispatcher.getQueryBuilder(query))
- .size(100);
- searchRequest.source(searchSourceBuilder);
-
- SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
+ if (bulkByScrollResponse.isTimedOut()) {
+ logger.warn("Remove by query: timed out because took more than {} minutes for query: {}", removeByQueryTimeoutInMinutes, query);
+ }
- // Scroll until no more hits are returned
- while (true) {
+ if ((bulkByScrollResponse.getSearchFailures() != null && bulkByScrollResponse.getSearchFailures().size() > 0) ||
+ bulkByScrollResponse.getBulkFailures() != null && bulkByScrollResponse.getBulkFailures().size() > 0) {
+ logger.warn("Remove by query: we found some failure during the process of query: {}", query);
- for (SearchHit hit : response.getHits().getHits()) {
- // add hit to bulk delete
- deleteFromCache(hit.getId(), clazz.getName());
- deleteByScopeBulkRequest.add(Requests.deleteRequest(hit.getIndex()).type(hit.getType()).id(hit.getId()));
+ if (bulkByScrollResponse.getSearchFailures() != null && bulkByScrollResponse.getSearchFailures().size() > 0) {
+ for (ScrollableHitSource.SearchFailure searchFailure : bulkByScrollResponse.getSearchFailures()) {
+ logger.warn("Remove by query, search failure: {}", searchFailure.toString());
+ }
}
- SearchScrollRequest searchScrollRequest = new SearchScrollRequest(response.getScrollId());
- searchScrollRequest.scroll(keepAlive);
- response = client.scroll(searchScrollRequest, RequestOptions.DEFAULT);
-
- // If we have no more hits, exit
- if (response.getHits().getHits().length == 0) {
- break;
+ if (bulkByScrollResponse.getBulkFailures() != null && bulkByScrollResponse.getBulkFailures().size() > 0) {
+ for (BulkItemResponse.Failure bulkFailure : bulkByScrollResponse.getBulkFailures()) {
+ logger.warn("Remove by query, bulk failure: {}", bulkFailure.toString());
+ }
}
}
- ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
- clearScrollRequest.addScrollId(response.getScrollId());
- client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
-
- // we're done with the scrolling, delete now
- if (deleteByScopeBulkRequest.numberOfActions() > 0) {
- final BulkResponse deleteResponse = client.bulk(deleteByScopeBulkRequest, RequestOptions.DEFAULT);
- if (deleteResponse.hasFailures()) {
- // do something
- logger.warn("Couldn't remove by query " + query + ":\n{}", deleteResponse.buildFailureMessage());
- }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Remove by query: took {}, deleted docs: {}, batches executed: {}, skipped docs: {}, version conflicts: {}, search retries: {}, bulk retries: {}, for query: {}",
+ bulkByScrollResponse.getTook().toHumanReadableString(1),
+ bulkByScrollResponse.getDeleted(),
+ bulkByScrollResponse.getBatches(),
+ bulkByScrollResponse.getNoops(),
+ bulkByScrollResponse.getVersionConflicts(),
+ bulkByScrollResponse.getSearchRetries(),
+ bulkByScrollResponse.getBulkRetries(),
+ query);
}
return true;
@@ -2503,40 +2480,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
}
- private <T extends Item> boolean isCacheActiveForClass(String className) {
- if (itemClassesToCacheSet.contains("*")) {
- return true;
- }
- if (itemClassesToCacheSet.contains(className)) {
- return true;
- }
- return false;
- }
-
- private <T extends Item> T getFromCache(String itemId, String className) {
- if (!isCacheActiveForClass(className)) {
- return null;
- }
- Map<String, T> itemCache = hazelcastInstance.getMap(className);
- return itemCache.get(itemId);
- }
-
- private <T extends Item> T putInCache(String itemId, T item, String className) {
- if (!isCacheActiveForClass(className)) {
- return null;
- }
- Map<String, T> itemCache = hazelcastInstance.getMap(className);
- return itemCache.put(itemId, item);
- }
-
- private <T extends Item> T deleteFromCache(String itemId, String className) {
- if (!isCacheActiveForClass(className)) {
- return null;
- }
- Map<String, T> itemCache = hazelcastInstance.getMap(className);
- return itemCache.remove(itemId);
- }
-
private String getAllIndexForQuery() {
return indexPrefix + "*";
}
diff --git a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 44e0411e6..e0fdded38 100644
--- a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -58,7 +58,6 @@
<cm:property name="aggQueryMaxResponseSizeHttp" value="" />
<cm:property name="aggQueryThrowOnMissingDocs" value="false" />
<cm:property name="itemTypeToRefreshPolicy" value="" />
- <cm:property name="itemClassesToCache" value="" />
<cm:property name="useBatchingForSave" value="false" />
<cm:property name="useBatchingForUpdate" value="true" />
@@ -74,7 +73,6 @@
</cm:property-placeholder>
<reference id="metricsService" interface="org.apache.unomi.metrics.MetricsService" />
- <reference id="hazelcastInstance" interface="com.hazelcast.core.HazelcastInstance" />
<reference id="scriptExecutor" interface="org.apache.unomi.scripting.ScriptExecutor" />
<service id="elasticSearchPersistenceService" ref="elasticSearchPersistenceServiceImpl">
@@ -137,8 +135,6 @@
<property name="clientSocketTimeout" value="${es.clientSocketTimeout}" />
<property name="metricsService" ref="metricsService" />
- <property name="hazelcastInstance" ref="hazelcastInstance" />
- <property name="itemClassesToCache" value="${es.itemClassesToCache}" />
<property name="useBatchingForSave" value="${es.useBatchingForSave}" />
<property name="useBatchingForUpdate" value="${es.useBatchingForUpdate}" />
diff --git a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
index a11189ae9..a259c1491 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
@@ -198,14 +198,15 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList
private SegmentService segmentService;
- private Condition purgeProfileQuery;
private Integer purgeProfileExistTime = 0;
private Integer purgeProfileInactiveTime = 0;
private Integer purgeSessionsAndEventsTime = 0;
private Integer purgeProfileInterval = 0;
+ private TimerTask purgeTask = null;
private long propertiesRefreshInterval = 10000;
private PropertyTypes propertyTypes;
+ private TimerTask propertyTypeLoadTask = null;
private boolean forceRefreshOnSave = false;
@@ -258,6 +259,12 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList
}
public void preDestroy() {
+ if (purgeTask != null) {
+ purgeTask.cancel();
+ }
+ if (propertyTypeLoadTask != null) {
+ propertyTypeLoadTask.cancel();
+ }
bundleContext.removeBundleListener(this);
logger.info("Profile service shutdown.");
}
@@ -290,13 +297,13 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList
}
private void schedulePropertyTypeLoad() {
- TimerTask task = new TimerTask() {
+ propertyTypeLoadTask = new TimerTask() {
@Override
public void run() {
reloadPropertyTypes(false);
}
};
- schedulerService.getScheduleExecutorService().scheduleAtFixedRate(task, 10000, propertiesRefreshInterval, TimeUnit.MILLISECONDS);
+ schedulerService.getScheduleExecutorService().scheduleAtFixedRate(propertyTypeLoadTask, 10000, propertiesRefreshInterval, TimeUnit.MILLISECONDS);
logger.info("Scheduled task for property type loading each 10s");
}
@@ -319,74 +326,90 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList
}
}
+ @Override
+ public void purgeProfiles(int inactiveNumberOfDays, int existsNumberOfDays) {
+ if (inactiveNumberOfDays > 0 || existsNumberOfDays > 0) {
+ ConditionType profilePropertyConditionType = definitionsService.getConditionType("profilePropertyCondition");
+ ConditionType booleanCondition = definitionsService.getConditionType("booleanCondition");
+ if (profilePropertyConditionType == null || booleanCondition == null) {
+ // definition service not yet fully instantiate
+ return;
+ }
+
+ Condition purgeProfileQuery = new Condition(booleanCondition);
+ purgeProfileQuery.setParameter("operator", "or");
+ List<Condition> subConditions = new ArrayList<>();
+
+ if (inactiveNumberOfDays > 0) {
+ logger.info("Purging: Profile with no visits since {} days", inactiveNumberOfDays);
+ Condition inactiveTimeCondition = new Condition(profilePropertyConditionType);
+ inactiveTimeCondition.setParameter("propertyName", "properties.lastVisit");
+ inactiveTimeCondition.setParameter("comparisonOperator", "lessThanOrEqualTo");
+ inactiveTimeCondition.setParameter("propertyValueDateExpr", "now-" + inactiveNumberOfDays + "d");
+ subConditions.add(inactiveTimeCondition);
+ }
+
+ if (existsNumberOfDays > 0) {
+ Condition existTimeCondition = new Condition(profilePropertyConditionType);
+ logger.info("Purging: Profile created since more than {} days", existsNumberOfDays);
+ existTimeCondition.setParameter("propertyName", "properties.firstVisit");
+ existTimeCondition.setParameter("comparisonOperator", "lessThanOrEqualTo");
+ existTimeCondition.setParameter("propertyValueDateExpr", "now-" + existsNumberOfDays + "d");
+ subConditions.add(existTimeCondition);
+ }
+
+ purgeProfileQuery.setParameter("subConditions", subConditions);
+ persistenceService.removeByQuery(purgeProfileQuery, Profile.class);
+ }
+ }
+
+ @Override
+ public void purgeMonthlyItems(int existsNumberOfMonths) {
+ if (existsNumberOfMonths > 0) {
+ logger.info("Purging: Monthly items (sessions/events) created before {} months", existsNumberOfMonths);
+ persistenceService.purge(getMonth(-existsNumberOfMonths).getTime());
+ }
+ }
+
private void initializePurge() {
- logger.info("Profile purge: Initializing");
+ logger.info("Purge: Initializing");
if (purgeProfileInactiveTime > 0 || purgeProfileExistTime > 0 || purgeSessionsAndEventsTime > 0) {
if (purgeProfileInactiveTime > 0) {
- logger.info("Profile purge: Profile with no visits since {} days, will be purged", purgeProfileInactiveTime);
+ logger.info("Purge: Profile with no visits since more than {} days, will be purged", purgeProfileInactiveTime);
}
if (purgeProfileExistTime > 0) {
- logger.info("Profile purge: Profile created since {} days, will be purged", purgeProfileExistTime);
+ logger.info("Purge: Profile created since more than {} days, will be purged", purgeProfileExistTime);
+ }
+ if (purgeSessionsAndEventsTime > 0) {
+ logger.info("Purge: Monthly items (sessions/events) created since more than {} months, will be purged", purgeSessionsAndEventsTime);
}
- TimerTask task = new TimerTask() {
+ purgeTask = new TimerTask() {
@Override
public void run() {
try {
long purgeStartTime = System.currentTimeMillis();
- logger.debug("Profile purge: Purge triggered");
-
- if (purgeProfileQuery == null) {
- ConditionType profilePropertyConditionType = definitionsService.getConditionType("profilePropertyCondition");
- ConditionType booleanCondition = definitionsService.getConditionType("booleanCondition");
- if (profilePropertyConditionType == null || booleanCondition == null) {
- // definition service not yet fully instantiate
- return;
- }
+ logger.info("Purge: triggered");
- purgeProfileQuery = new Condition(booleanCondition);
- purgeProfileQuery.setParameter("operator", "or");
- List<Condition> subConditions = new ArrayList<>();
+ // Profile purge
+ purgeProfiles(purgeProfileInactiveTime, purgeProfileExistTime);
- if (purgeProfileInactiveTime > 0) {
- Condition inactiveTimeCondition = new Condition(profilePropertyConditionType);
- inactiveTimeCondition.setParameter("propertyName", "properties.lastVisit");
- inactiveTimeCondition.setParameter("comparisonOperator", "lessThanOrEqualTo");
- inactiveTimeCondition.setParameter("propertyValueDateExpr", "now-" + purgeProfileInactiveTime + "d");
- subConditions.add(inactiveTimeCondition);
- }
-
- if (purgeProfileExistTime > 0) {
- Condition existTimeCondition = new Condition(profilePropertyConditionType);
- existTimeCondition.setParameter("propertyName", "properties.firstVisit");
- existTimeCondition.setParameter("comparisonOperator", "lessThanOrEqualTo");
- existTimeCondition.setParameter("propertyValueDateExpr", "now-" + purgeProfileExistTime + "d");
- subConditions.add(existTimeCondition);
- }
+ // Monthly items purge
+ purgeMonthlyItems(purgeSessionsAndEventsTime);
- purgeProfileQuery.setParameter("subConditions", subConditions);
- }
-
- persistenceService.removeByQuery(purgeProfileQuery, Profile.class);
-
- if (purgeSessionsAndEventsTime > 0) {
- logger.info("Monthly indexes purge: Session and events created before {} months, will be purged",
- purgeSessionsAndEventsTime);
- persistenceService.purge(getMonth(-purgeSessionsAndEventsTime).getTime());
- }
-
- logger.info("Profile purge: purge executed in {} ms", System.currentTimeMillis() - purgeStartTime);
+ logger.info("Purge: executed in {} ms", System.currentTimeMillis() - purgeStartTime);
} catch (Throwable t) {
- logger.error("Error while purging profiles", t);
+ logger.error("Error while purging", t);
}
}
};
- schedulerService.getScheduleExecutorService().scheduleAtFixedRate(task, 1, purgeProfileInterval, TimeUnit.DAYS);
- logger.info("Profile purge: purge scheduled with an interval of {} days", purgeProfileInterval);
+ schedulerService.getScheduleExecutorService().scheduleAtFixedRate(purgeTask, 1, purgeProfileInterval, TimeUnit.DAYS);
+
+ logger.info("Purge: purge scheduled with an interval of {} days", purgeProfileInterval);
} else {
- logger.info("Profile purge: No purge scheduled");
+ logger.info("Purge: No purge scheduled");
}
}