You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by js...@apache.org on 2023/02/22 14:33:11 UTC

[unomi] branch UNOMI-739-update-purge-system updated (b34a533f8 -> 246399ed4)

This is an automated email from the ASF dual-hosted git repository.

jsinovassinnaik pushed a change to branch UNOMI-739-update-purge-system
in repository https://gitbox.apache.org/repos/asf/unomi.git


    omit b34a533f8 UNOMI-739 : remove deletion in rollover policy
    omit 935400171 UNOMI-739 : update purge mechanism
     new 30700cb7a UNOMI-739 : update purge mechanism
     new 246399ed4 UNOMI-739 : remove deletion in rollover policy

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (b34a533f8)
            \
             N -- N -- N   refs/heads/UNOMI-739-update-purge-system (246399ed4)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


[unomi] 02/02: UNOMI-739 : remove deletion in rollover policy

Posted by js...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jsinovassinnaik pushed a commit to branch UNOMI-739-update-purge-system
in repository https://gitbox.apache.org/repos/asf/unomi.git

commit 246399ed44dc64a78d1c3ac0e045625923da6d96
Author: jsinovassin <js...@jahia.com>
AuthorDate: Wed Feb 22 08:02:02 2023 +0100

    UNOMI-739 : remove deletion in rollover policy
---
 .../elasticsearch/ElasticSearchPersistenceServiceImpl.java          | 4 ----
 .../resources/requestBody/2.2.0/create_rollover_policy_query.json   | 6 ------
 2 files changed, 10 deletions(-)

diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index fbd0aa44c..87416e22e 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -1375,10 +1375,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                 );
                 phases.put("hot", new Phase("hot", TimeValue.ZERO, hotActions));
 
-                // TODO - Handle this with the purge https://issues.apache.org/jira/browse/UNOMI-739
-                Map<String, LifecycleAction> deleteActions = Collections.singletonMap(DeleteAction.NAME, new DeleteAction());
-                phases.put("delete", new Phase("delete", new TimeValue(90, TimeUnit.DAYS), deleteActions));
-
                 LifecyclePolicy policy = new LifecyclePolicy(indexPrefix + "-" + ROLLOVER_LIFECYCLE_NAME, phases);
                 PutLifecyclePolicyRequest request = new PutLifecyclePolicyRequest(policy);
                 org.elasticsearch.client.core.AcknowledgedResponse putLifecyclePolicy = client.indexLifecycle().putLifecyclePolicy(request, RequestOptions.DEFAULT);
diff --git a/tools/shell-commands/src/main/resources/requestBody/2.2.0/create_rollover_policy_query.json b/tools/shell-commands/src/main/resources/requestBody/2.2.0/create_rollover_policy_query.json
index c9bc94a29..2084db3e7 100644
--- a/tools/shell-commands/src/main/resources/requestBody/2.2.0/create_rollover_policy_query.json
+++ b/tools/shell-commands/src/main/resources/requestBody/2.2.0/create_rollover_policy_query.json
@@ -7,12 +7,6 @@
             #rolloverHotActions
           }
         }
-      },
-      "delete": {
-        "min_age": "90d",
-        "actions": {
-          "delete": {}
-        }
       }
     }
   }


[unomi] 01/02: UNOMI-739 : update purge mechanism

Posted by js...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jsinovassinnaik pushed a commit to branch UNOMI-739-update-purge-system
in repository https://gitbox.apache.org/repos/asf/unomi.git

commit 30700cb7af9b231ccc62297dadd9be22a8100d76
Author: jsinovassin <js...@jahia.com>
AuthorDate: Tue Feb 21 18:59:30 2023 +0100

    UNOMI-739 : update purge mechanism
---
 .../apache/unomi/api/services/ProfileService.java  |  13 ++-
 .../ElasticSearchPersistenceServiceImpl.java       |  42 +++++--
 .../unomi/persistence/spi/PersistenceService.java  |  19 ++++
 .../services/impl/profiles/ProfileServiceImpl.java | 125 ++++++++++++++-------
 4 files changed, 145 insertions(+), 54 deletions(-)

diff --git a/api/src/main/java/org/apache/unomi/api/services/ProfileService.java b/api/src/main/java/org/apache/unomi/api/services/ProfileService.java
index 527e77fec..a7a3e2a3f 100644
--- a/api/src/main/java/org/apache/unomi/api/services/ProfileService.java
+++ b/api/src/main/java/org/apache/unomi/api/services/ProfileService.java
@@ -417,7 +417,6 @@ public interface ProfileService {
      * Purge (delete) profiles
      * example: Purge profile inactive since 10 days only:
      * purgeProfiles(10, 0);
-     *
      * example: Purge profile created since 30 days only:
      * purgeProfiles(0, 30);
      *
@@ -427,8 +426,14 @@ public interface ProfileService {
     void purgeProfiles(int inactiveNumberOfDays, int existsNumberOfDays);
 
     /**
-     * Purge (delete) monthly indices by removing old indices
-     * @param existsNumberOfMonths used to remove monthly indices older than this number of months
+     * Purge (delete) session items
+     * @param existsNumberOfDays used to remove monthly indices older than this number of days
+     */
+    void purgeSessionItems(int existsNumberOfDays);
+
+    /**
+     * Purge (delete) event items
+     * @param existsNumberOfDays used to remove monthly indices older than this number of days
      */
-    void purgeMonthlyItems(int existsNumberOfMonths);
+    void purgeEventItems(int existsNumberOfDays);
 }
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index ecc703511..fbd0aa44c 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -73,13 +73,7 @@ import org.elasticsearch.action.search.SearchScrollRequest;
 import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.client.Node;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.client.HttpAsyncResponseConsumerFactory;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestClientBuilder;
-import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.*;
 import org.elasticsearch.action.update.UpdateResponse;
 import org.elasticsearch.client.core.CountRequest;
 import org.elasticsearch.client.core.CountResponse;
@@ -170,6 +164,8 @@ import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.elasticsearch.index.query.QueryBuilders.termQuery;
 
@@ -1379,7 +1375,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                 );
                 phases.put("hot", new Phase("hot", TimeValue.ZERO, hotActions));
 
-                // TODO - Handle this with the purge https://issues.apache.org/jira/browse/UNOMI-726
+                // TODO - Handle this with the purge https://issues.apache.org/jira/browse/UNOMI-739
                 Map<String, LifecycleAction> deleteActions = Collections.singletonMap(DeleteAction.NAME, new DeleteAction());
                 phases.put("delete", new Phase("delete", new TimeValue(90, TimeUnit.DAYS), deleteActions));
 
@@ -1424,8 +1420,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
         }
     }
 
-    public boolean removeIndex(final String itemType) {
-        String index = getIndex(itemType);
+    public boolean removeIndex(final String itemType, boolean addPrefix){
+        String index = addPrefix ? getIndex(itemType) : itemType;
 
         Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeIndex", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
             protected Boolean execute(Object... args) throws IOException {
@@ -1445,6 +1441,9 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
             return result;
         }
     }
+    public boolean removeIndex(final String itemType) {
+        return removeIndex(itemType, true);
+    }
 
     private void internalCreateRolloverTemplate(String itemName) throws IOException {
         String rolloverAlias = indexPrefix + "-" + itemName;
@@ -1895,6 +1894,29 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
         }
     }
 
+    @Override
+    public Map<String, Long> docCountPerIndex(String... indexes) {
+        return new InClassLoaderExecute<Map<String, Long>>(metricsService, this.getClass().getName() + ".docCountPerIndex", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
+            @Override
+            protected Map<String, Long> execute(Object... args) throws IOException {
+                List<String> indexesForQuery = Stream.of(indexes).map(index -> getIndexNameForQuery(index)).collect(Collectors.toList());
+                String[] itemsArray = new String[indexesForQuery.size()];
+                itemsArray = indexesForQuery.toArray(itemsArray);
+                GetIndexRequest request = new GetIndexRequest(itemsArray);
+                GetIndexResponse getIndexResponse = client.indices().get(request, RequestOptions.DEFAULT);
+
+                Map<String, Long> countPerIndex = new HashMap<>();
+
+                for (String index : getIndexResponse.getIndices()) {
+                    CountRequest countRequest = new CountRequest(index);
+                    CountResponse response = client.count(countRequest, RequestOptions.DEFAULT);
+                    countPerIndex.put(index, response.getCount());
+                }
+                return countPerIndex;
+            }
+        }.catchingExecuteInClassLoader(true);
+    }
+
     private long queryCount(final QueryBuilder filter, final String itemType) {
         return new InClassLoaderExecute<Long>(metricsService, this.getClass().getName() + ".queryCount", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
 
diff --git a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
index d0911131d..7ed85e767 100644
--- a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
+++ b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
@@ -682,6 +682,16 @@ public interface PersistenceService {
      */
     void purge(Date date);
 
+    /**
+     * Retrieves the number of document per indexes.
+     * If the index is a rollover index, each rollover index will be return with its own number of document
+     * For example: with "event" as parameter, the indexes named ...-event-000001, ...-event-000002 and so one will be returned
+     *
+     * @param indexes names of the indexes to count the documents
+     * @return Map where the key in the index name and the value is the number of document for this index
+     */
+    Map<String, Long> docCountPerIndex(String... indexes);
+
     /**
      * Retrieves all items of the specified Item subclass which specified ranged property is within the specified bounds, ordered according to the specified {@code sortBy} String
      * and and paged: only {@code size} of them are retrieved, starting with the {@code offset}-th one.
@@ -733,6 +743,15 @@ public interface PersistenceService {
      */
     boolean removeIndex(final String itemType);
 
+    /**
+     * Removes the index for the specified item type.
+     *
+     * @param itemType the item type
+     * @param addPrefix should add the index prefix to the itemType passed as parameter
+     * @return {@code true} if the operation was successful, {@code false} otherwise
+     */
+    boolean removeIndex(final String itemType, boolean addPrefix);
+
     /**
      * Removes all data associated with the provided scope.
      *
diff --git a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
index 36f904aef..399c3e3b9 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
@@ -20,18 +20,7 @@ package org.apache.unomi.services.impl.profiles;
 import org.apache.commons.beanutils.BeanUtils;
 import org.apache.commons.beanutils.PropertyUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.unomi.api.BatchUpdate;
-import org.apache.unomi.api.Item;
-import org.apache.unomi.api.PartialList;
-import org.apache.unomi.api.Persona;
-import org.apache.unomi.api.PersonaSession;
-import org.apache.unomi.api.PersonaWithSessions;
-import org.apache.unomi.api.Profile;
-import org.apache.unomi.api.ProfileAlias;
-import org.apache.unomi.api.PropertyMergeStrategyExecutor;
-import org.apache.unomi.api.PropertyMergeStrategyType;
-import org.apache.unomi.api.PropertyType;
-import org.apache.unomi.api.Session;
+import org.apache.unomi.api.*;
 import org.apache.unomi.api.conditions.Condition;
 import org.apache.unomi.api.conditions.ConditionType;
 import org.apache.unomi.api.query.Query;
@@ -56,25 +45,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.URL;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.Enumeration;
-import java.util.GregorianCalendar;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.TimerTask;
-import java.util.TreeSet;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -203,6 +174,11 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList
 
     private Integer purgeProfileExistTime = 0;
     private Integer purgeProfileInactiveTime = 0;
+
+    /**
+     * Use purgeSessionExistTime and purgeEventExistTime instead
+     */
+    @Deprecated
     private Integer purgeSessionsAndEventsTime = 0;
     private Integer purgeSessionExistTime = 0;
     private Integer purgeEventExistTime = 0;
@@ -258,6 +234,7 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList
             }
         }
         bundleContext.addBundleListener(this);
+        initializeDefaultPurgeValuesIfNecessary();
         initializePurge();
         schedulePropertyTypeLoad();
         logger.info("Profile service initialized.");
@@ -285,6 +262,22 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList
     private void processBundleStop(BundleContext bundleContext) {
     }
 
+    /**
+     * Fill purgeEventExistTime and purgeSessionExistTime with the old property purgeSessionsAndEventsTime
+     * if there is no value set for these properties. This is done to allow the using of the old property.
+     * This method should be removed once the purgeSessionsAndEventsTime property is deleted.
+     */
+    private void initializeDefaultPurgeValuesIfNecessary() {
+        if (purgeSessionsAndEventsTime > 0) {
+            if (purgeEventExistTime <= 0) {
+                purgeEventExistTime = purgeSessionsAndEventsTime * 30;
+            }
+            if (purgeSessionExistTime <= 0) {
+                purgeSessionExistTime = purgeSessionsAndEventsTime * 30;
+            }
+        }
+    }
+
     public void setPurgeProfileExistTime(Integer purgeProfileExistTime) {
         this.purgeProfileExistTime = purgeProfileExistTime;
     }
@@ -293,6 +286,7 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList
         this.purgeProfileInactiveTime = purgeProfileInactiveTime;
     }
 
+    @Deprecated
     public void setPurgeSessionsAndEventsTime(Integer purgeSessionsAndEventsTime) {
         this.purgeSessionsAndEventsTime = purgeSessionsAndEventsTime;
     }
@@ -377,25 +371,76 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList
     }
 
     @Override
-    public void purgeMonthlyItems(int existsNumberOfMonths) {
-        if (existsNumberOfMonths > 0) {
-            logger.info("Purging: Monthly items (sessions/events) created before {} months", existsNumberOfMonths);
-            persistenceService.purge(getMonth(-existsNumberOfMonths).getTime());
+    public void purgeSessionItems(int existsNumberOfDays) {
+        if (existsNumberOfDays > 0) {
+            ConditionType propertyConditionType = definitionsService.getConditionType("sessionPropertyCondition");
+            if (propertyConditionType == null) {
+                // definition service not yet fully instantiate
+                return;
+            }
+
+            Condition condition = new Condition(propertyConditionType);
+
+            logger.info("Purging: Session created since more than {} days", existsNumberOfDays);
+            condition.setParameter("propertyName", "timeStamp");
+            condition.setParameter("comparisonOperator", "lessThanOrEqualTo");
+            condition.setParameter("propertyValueDateExpr", "now-" + existsNumberOfDays + "d");
+
+            persistenceService.removeByQuery(condition, Session.class);
+            deleteEmptyRolloverIndex(Session.ITEM_TYPE);
+        }
+    }
+
+    @Override
+    public void purgeEventItems(int existsNumberOfDays) {
+        if (existsNumberOfDays > 0) {
+            ConditionType propertyConditionType = definitionsService.getConditionType("eventPropertyCondition");
+            if (propertyConditionType == null) {
+                // definition service not yet fully instantiate
+                return;
+            }
+
+            Condition condition = new Condition(propertyConditionType);
+
+            logger.info("Purging: Session created since more than {} days", existsNumberOfDays);
+            condition.setParameter("propertyName", "timeStamp");
+            condition.setParameter("comparisonOperator", "lessThanOrEqualTo");
+            condition.setParameter("propertyValueDateExpr", "now-" + existsNumberOfDays + "d");
+
+            persistenceService.removeByQuery(condition, Event.class);
+            deleteEmptyRolloverIndex(Event.ITEM_TYPE);
+        }
+    }
+
+    public void deleteEmptyRolloverIndex(String indexName) {
+        TreeMap<String, Long> countsPerIndex = new TreeMap<>(persistenceService.docCountPerIndex(indexName));
+        if (countsPerIndex.size() >= 1) {
+            // do not check the last index, because it's the one used to write documents
+            countsPerIndex.pollLastEntry();
+            countsPerIndex.forEach((index, count) -> {
+                if (count == 0) {
+                    persistenceService.removeIndex(index, false);
+                }
+            });
         }
     }
 
     private void initializePurge() {
         logger.info("Purge: Initializing");
 
-        if (purgeProfileInactiveTime > 0 || purgeProfileExistTime > 0 || purgeSessionsAndEventsTime > 0) {
+        if (purgeProfileInactiveTime > 0 || purgeProfileExistTime > 0 || purgeSessionsAndEventsTime > 0 || purgeSessionExistTime > 0 || purgeEventExistTime > 0) {
             if (purgeProfileInactiveTime > 0) {
                 logger.info("Purge: Profile with no visits since more than {} days, will be purged", purgeProfileInactiveTime);
             }
             if (purgeProfileExistTime > 0) {
                 logger.info("Purge: Profile created since more than {} days, will be purged", purgeProfileExistTime);
             }
-            if (purgeSessionsAndEventsTime > 0) {
-                logger.info("Purge: Monthly items (sessions/events) created since more than {} months, will be purged", purgeSessionsAndEventsTime);
+
+            if (purgeSessionExistTime > 0) {
+                logger.info("Purge: Session items created since more than {} days, will be purged", purgeSessionExistTime);
+            }
+            if (purgeEventExistTime > 0) {
+                logger.info("Purge: Event items created since more than {} days, will be purged", purgeEventExistTime);
             }
 
             purgeTask = new TimerTask() {
@@ -409,8 +454,8 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList
                         purgeProfiles(purgeProfileInactiveTime, purgeProfileExistTime);
 
                         // Monthly items purge
-                        purgeMonthlyItems(purgeSessionsAndEventsTime);
-
+                        purgeSessionItems(purgeSessionExistTime);
+                        purgeEventItems(purgeEventExistTime);
                         logger.info("Purge: executed in {} ms", System.currentTimeMillis() - purgeStartTime);
                     } catch (Throwable t) {
                         logger.error("Error while purging", t);