You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by js...@apache.org on 2023/02/21 18:00:14 UTC

[unomi] 02/02: UNOMI-739 : update purge mechanism

This is an automated email from the ASF dual-hosted git repository.

jsinovassinnaik pushed a commit to branch UNOMI-739-update-purge-system
in repository https://gitbox.apache.org/repos/asf/unomi.git

commit 935400171d132604a45d1fd4803e3a23428a4b2b
Author: jsinovassin <js...@jahia.com>
AuthorDate: Tue Feb 21 18:59:30 2023 +0100

    UNOMI-739 : update purge mechanism
---
 .../apache/unomi/api/services/ProfileService.java  |  13 ++-
 .../ElasticSearchPersistenceServiceImpl.java       |  42 +++++--
 .../unomi/persistence/spi/PersistenceService.java  |  19 ++++
 .../services/impl/profiles/ProfileServiceImpl.java | 125 ++++++++++++++-------
 4 files changed, 145 insertions(+), 54 deletions(-)

diff --git a/api/src/main/java/org/apache/unomi/api/services/ProfileService.java b/api/src/main/java/org/apache/unomi/api/services/ProfileService.java
index 527e77fec..a7a3e2a3f 100644
--- a/api/src/main/java/org/apache/unomi/api/services/ProfileService.java
+++ b/api/src/main/java/org/apache/unomi/api/services/ProfileService.java
@@ -417,7 +417,6 @@ public interface ProfileService {
      * Purge (delete) profiles
      * example: Purge profile inactive since 10 days only:
      * purgeProfiles(10, 0);
-     *
      * example: Purge profile created since 30 days only:
      * purgeProfiles(0, 30);
      *
@@ -427,8 +426,14 @@ public interface ProfileService {
     void purgeProfiles(int inactiveNumberOfDays, int existsNumberOfDays);
 
     /**
-     * Purge (delete) monthly indices by removing old indices
-     * @param existsNumberOfMonths used to remove monthly indices older than this number of months
+     * Purge (delete) session items
+     * @param existsNumberOfDays used to remove monthly indices older than this number of days
+     */
+    void purgeSessionItems(int existsNumberOfDays);
+
+    /**
+     * Purge (delete) event items
+     * @param existsNumberOfDays used to remove monthly indices older than this number of days
      */
-    void purgeMonthlyItems(int existsNumberOfMonths);
+    void purgeEventItems(int existsNumberOfDays);
 }
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index ecc703511..fbd0aa44c 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -73,13 +73,7 @@ import org.elasticsearch.action.search.SearchScrollRequest;
 import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.client.Node;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.client.HttpAsyncResponseConsumerFactory;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestClientBuilder;
-import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.*;
 import org.elasticsearch.action.update.UpdateResponse;
 import org.elasticsearch.client.core.CountRequest;
 import org.elasticsearch.client.core.CountResponse;
@@ -170,6 +164,8 @@ import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.elasticsearch.index.query.QueryBuilders.termQuery;
 
@@ -1379,7 +1375,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                 );
                 phases.put("hot", new Phase("hot", TimeValue.ZERO, hotActions));
 
-                // TODO - Handle this with the purge https://issues.apache.org/jira/browse/UNOMI-726
+                // TODO - Handle this with the purge https://issues.apache.org/jira/browse/UNOMI-739
                 Map<String, LifecycleAction> deleteActions = Collections.singletonMap(DeleteAction.NAME, new DeleteAction());
                 phases.put("delete", new Phase("delete", new TimeValue(90, TimeUnit.DAYS), deleteActions));
 
@@ -1424,8 +1420,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
         }
     }
 
-    public boolean removeIndex(final String itemType) {
-        String index = getIndex(itemType);
+    public boolean removeIndex(final String itemType, boolean addPrefix){
+        String index = addPrefix ? getIndex(itemType) : itemType;
 
         Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeIndex", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
             protected Boolean execute(Object... args) throws IOException {
@@ -1445,6 +1441,9 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
             return result;
         }
     }
+    public boolean removeIndex(final String itemType) {
+        return removeIndex(itemType, true);
+    }
 
     private void internalCreateRolloverTemplate(String itemName) throws IOException {
         String rolloverAlias = indexPrefix + "-" + itemName;
@@ -1895,6 +1894,29 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
         }
     }
 
+    @Override
+    public Map<String, Long> docCountPerIndex(String... indexes) {
+        return new InClassLoaderExecute<Map<String, Long>>(metricsService, this.getClass().getName() + ".docCountPerIndex", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
+            @Override
+            protected Map<String, Long> execute(Object... args) throws IOException {
+                List<String> indexesForQuery = Stream.of(indexes).map(index -> getIndexNameForQuery(index)).collect(Collectors.toList());
+                String[] itemsArray = new String[indexesForQuery.size()];
+                itemsArray = indexesForQuery.toArray(itemsArray);
+                GetIndexRequest request = new GetIndexRequest(itemsArray);
+                GetIndexResponse getIndexResponse = client.indices().get(request, RequestOptions.DEFAULT);
+
+                Map<String, Long> countPerIndex = new HashMap<>();
+
+                for (String index : getIndexResponse.getIndices()) {
+                    CountRequest countRequest = new CountRequest(index);
+                    CountResponse response = client.count(countRequest, RequestOptions.DEFAULT);
+                    countPerIndex.put(index, response.getCount());
+                }
+                return countPerIndex;
+            }
+        }.catchingExecuteInClassLoader(true);
+    }
+
     private long queryCount(final QueryBuilder filter, final String itemType) {
         return new InClassLoaderExecute<Long>(metricsService, this.getClass().getName() + ".queryCount", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
 
diff --git a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
index d0911131d..7ed85e767 100644
--- a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
+++ b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
@@ -682,6 +682,16 @@ public interface PersistenceService {
      */
     void purge(Date date);
 
+    /**
+     * Retrieves the number of document per indexes.
+     * If the index is a rollover index, each rollover index will be return with its own number of document
+     * For example: with "event" as parameter, the indexes named ...-event-000001, ...-event-000002 and so one will be returned
+     *
+     * @param indexes names of the indexes to count the documents
+     * @return Map where the key in the index name and the value is the number of document for this index
+     */
+    Map<String, Long> docCountPerIndex(String... indexes);
+
     /**
      * Retrieves all items of the specified Item subclass which specified ranged property is within the specified bounds, ordered according to the specified {@code sortBy} String
      * and and paged: only {@code size} of them are retrieved, starting with the {@code offset}-th one.
@@ -733,6 +743,15 @@ public interface PersistenceService {
      */
     boolean removeIndex(final String itemType);
 
+    /**
+     * Removes the index for the specified item type.
+     *
+     * @param itemType the item type
+     * @param addPrefix should add the index prefix to the itemType passed as parameter
+     * @return {@code true} if the operation was successful, {@code false} otherwise
+     */
+    boolean removeIndex(final String itemType, boolean addPrefix);
+
     /**
      * Removes all data associated with the provided scope.
      *
diff --git a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
index 36f904aef..c08eb65b4 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
@@ -20,18 +20,7 @@ package org.apache.unomi.services.impl.profiles;
 import org.apache.commons.beanutils.BeanUtils;
 import org.apache.commons.beanutils.PropertyUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.unomi.api.BatchUpdate;
-import org.apache.unomi.api.Item;
-import org.apache.unomi.api.PartialList;
-import org.apache.unomi.api.Persona;
-import org.apache.unomi.api.PersonaSession;
-import org.apache.unomi.api.PersonaWithSessions;
-import org.apache.unomi.api.Profile;
-import org.apache.unomi.api.ProfileAlias;
-import org.apache.unomi.api.PropertyMergeStrategyExecutor;
-import org.apache.unomi.api.PropertyMergeStrategyType;
-import org.apache.unomi.api.PropertyType;
-import org.apache.unomi.api.Session;
+import org.apache.unomi.api.*;
 import org.apache.unomi.api.conditions.Condition;
 import org.apache.unomi.api.conditions.ConditionType;
 import org.apache.unomi.api.query.Query;
@@ -56,25 +45,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.URL;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.Enumeration;
-import java.util.GregorianCalendar;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.TimerTask;
-import java.util.TreeSet;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -203,6 +174,11 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList
 
     private Integer purgeProfileExistTime = 0;
     private Integer purgeProfileInactiveTime = 0;
+
+    /**
+     * Use purgeSessionExistTime and purgeEventExistTime instead
+     */
+    @Deprecated
     private Integer purgeSessionsAndEventsTime = 0;
     private Integer purgeSessionExistTime = 0;
     private Integer purgeEventExistTime = 0;
@@ -258,6 +234,7 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList
             }
         }
         bundleContext.addBundleListener(this);
+        initializeDefaultPurgeValuesIfNecessary();
         initializePurge();
         schedulePropertyTypeLoad();
         logger.info("Profile service initialized.");
@@ -285,6 +262,22 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList
     private void processBundleStop(BundleContext bundleContext) {
     }
 
+    /**
+     * Fill purgeEventExistTime and purgeSessionExistTime with the old property purgeSessionsAndEventsTime
+     * if there is no value set for these properties. This is done to allow the using of the old property.
+     * This method should be removed once the purgeSessionsAndEventsTime property is deleted.
+     */
+    private void initializeDefaultPurgeValuesIfNecessary() {
+        if (purgeSessionsAndEventsTime > 0) {
+            if (purgeEventExistTime <= 0) {
+                purgeEventExistTime = purgeSessionsAndEventsTime * 30;
+            }
+            if (purgeSessionExistTime <= 0) {
+                purgeSessionExistTime = purgeSessionsAndEventsTime * 30;
+            }
+        }
+    }
+
     public void setPurgeProfileExistTime(Integer purgeProfileExistTime) {
         this.purgeProfileExistTime = purgeProfileExistTime;
     }
@@ -293,6 +286,7 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList
         this.purgeProfileInactiveTime = purgeProfileInactiveTime;
     }
 
+    @Deprecated
     public void setPurgeSessionsAndEventsTime(Integer purgeSessionsAndEventsTime) {
         this.purgeSessionsAndEventsTime = purgeSessionsAndEventsTime;
     }
@@ -377,25 +371,76 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList
     }
 
     @Override
-    public void purgeMonthlyItems(int existsNumberOfMonths) {
-        if (existsNumberOfMonths > 0) {
-            logger.info("Purging: Monthly items (sessions/events) created before {} months", existsNumberOfMonths);
-            persistenceService.purge(getMonth(-existsNumberOfMonths).getTime());
+    public void purgeSessionItems(int existsNumberOfDays) {
+        if (existsNumberOfDays > 0) {
+            ConditionType propertyConditionType = definitionsService.getConditionType("sessionPropertyCondition");
+            if (propertyConditionType == null) {
+                // definition service not yet fully instantiate
+                return;
+            }
+
+            Condition condition = new Condition(propertyConditionType);
+
+            logger.info("Purging: Session created since more than {} days", existsNumberOfDays);
+            condition.setParameter("propertyName", "timeStamp");
+            condition.setParameter("comparisonOperator", "lessThanOrEqualTo");
+            condition.setParameter("propertyValueDateExpr", "now-" + existsNumberOfDays + "d");
+
+            persistenceService.removeByQuery(condition, Session.class);
+            deleteEmptyRolloverIndex(Session.ITEM_TYPE);
+        }
+    }
+
+    @Override
+    public void purgeEventItems(int existsNumberOfDays) {
+        if (existsNumberOfDays > 0) {
+            ConditionType propertyConditionType = definitionsService.getConditionType("eventPropertyCondition");
+            if (propertyConditionType == null) {
+                // definition service not yet fully instantiate
+                return;
+            }
+
+            Condition condition = new Condition(propertyConditionType);
+
+            logger.info("Purging: Session created since more than {} days", existsNumberOfDays);
+            condition.setParameter("propertyName", "timeStamp");
+            condition.setParameter("comparisonOperator", "lessThanOrEqualTo");
+            condition.setParameter("propertyValueDateExpr", "now-" + existsNumberOfDays + "d");
+
+            persistenceService.removeByQuery(condition, Event.class);
+            deleteEmptyRolloverIndex(Event.ITEM_TYPE);
+        }
+    }
+
+    public void deleteEmptyRolloverIndex(String indexName) {
+        TreeMap<String, Long> countsPerIndex = new TreeMap<>(persistenceService.docCountPerIndex(indexName));
+        if (countsPerIndex.size() >= 1) {
+            // do not check the last index, because it's the one used to write documents
+            countsPerIndex.pollLastEntry();
+            countsPerIndex.forEach((index, count) -> {
+                if (count == 0) {
+                    persistenceService.removeIndex(index, false);
+                }
+            });
         }
     }
 
     private void initializePurge() {
         logger.info("Purge: Initializing");
 
-        if (purgeProfileInactiveTime > 0 || purgeProfileExistTime > 0 || purgeSessionsAndEventsTime > 0) {
+        if (purgeProfileInactiveTime > 0 || purgeProfileExistTime > 0 || purgeSessionsAndEventsTime > 0 || purgeSessionExistTime > 0 || purgeSessionExistTime > 0) {
             if (purgeProfileInactiveTime > 0) {
                 logger.info("Purge: Profile with no visits since more than {} days, will be purged", purgeProfileInactiveTime);
             }
             if (purgeProfileExistTime > 0) {
                 logger.info("Purge: Profile created since more than {} days, will be purged", purgeProfileExistTime);
             }
-            if (purgeSessionsAndEventsTime > 0) {
-                logger.info("Purge: Monthly items (sessions/events) created since more than {} months, will be purged", purgeSessionsAndEventsTime);
+
+            if (purgeSessionExistTime > 0) {
+                logger.info("Purge: Session items created since more than {} days, will be purged", purgeSessionExistTime);
+            }
+            if (purgeEventExistTime > 0) {
+                logger.info("Purge: Event items created since more than {} days, will be purged", purgeEventExistTime);
             }
 
             purgeTask = new TimerTask() {
@@ -409,8 +454,8 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList
                         purgeProfiles(purgeProfileInactiveTime, purgeProfileExistTime);
 
                         // Monthly items purge
-                        purgeMonthlyItems(purgeSessionsAndEventsTime);
-
+                        purgeSessionItems(purgeSessionExistTime);
+                        purgeEventItems(purgeEventExistTime);
                         logger.info("Purge: executed in {} ms", System.currentTimeMillis() - purgeStartTime);
                     } catch (Throwable t) {
                         logger.error("Error while purging", t);