You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by GitBox <gi...@apache.org> on 2021/02/08 15:50:07 UTC

[GitHub] [unomi] jkevan commented on a change in pull request #237: UNOMI-421 - update segments with its own bulk request, and use retry for failures

jkevan commented on a change in pull request #237:
URL: https://github.com/apache/unomi/pull/237#discussion_r572113484



##########
File path: services/pom.xml
##########
@@ -74,6 +74,18 @@
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>net.jodah</groupId>
+            <artifactId>failsafe</artifactId>
+            <version>2.4.0</version>
+        </dependency>

Review comment:
       duplicated dependency

##########
File path: persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
##########
@@ -175,6 +176,17 @@
      */
     boolean update(Item item, Date dateHint, Class<?> clazz, Map<?, ?> source, final boolean alwaysOverwrite);
 
+    /**
+     * Updates Map of items of the specified class and identified by the specified identifier with a new property value for the specified property name. Same as
+     * {@code update(itemId, dateHint, clazz, Collections.singletonMap(propertyName, propertyValue))}
+     *
+     * @param items         A map the consist of item (key) and properties to update (value)
+     * @param dateHint      a Date helping in identifying where the item is located
+     * @param clazz         the Item subclass of the item to update
+     * @return List of failed Items Ids, if all succesful then returns an empty list. if the whole operation failed then will return null
+     */
+    List<String> update(Map<Item, Map> items, Date dateHint, Class clazz);

Review comment:
       It's strange to have a Map<Item, ...> this mean that it could contains items of different types, and it may not correspond to the Class clazz parameter.
   Could we implement a check before persisting the item that the item is correctly of type clazz ?

##########
File path: services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
##########
@@ -919,74 +943,93 @@ private void updateExistingProfilesForSegment(Segment segment) {
             profilesToRemoveSubConditions.add(notNewSegmentCondition);
             profilesToRemoveCondition.setParameter("subConditions", profilesToRemoveSubConditions);
 
-            PartialList<Profile> profilesToRemove = persistenceService.query(profilesToRemoveCondition, null, Profile.class, 0, segmentUpdateBatchSize, "10m");
-            PartialList<Profile> profilesToAdd = persistenceService.query(profilesToAddCondition, null, Profile.class, 0, segmentUpdateBatchSize, "10m");
-
-            while (profilesToAdd.getList().size() > 0) {
-                long profilesToAddStartTime = System.currentTimeMillis();
-                for (Profile profileToAdd : profilesToAdd.getList()) {
-                    profileToAdd.getSegments().add(segment.getItemId());
-                    Map<String,Object> sourceMap = new HashMap<>();
-                    sourceMap.put("segments", profileToAdd.getSegments());
-                    profileToAdd.setSystemProperty("lastUpdated", new Date());
-                    sourceMap.put("systemProperties", profileToAdd.getSystemProperties());
-                    persistenceService.update(profileToAdd, null, Profile.class, sourceMap);
-                    Event profileUpdated = new Event("profileUpdated", null, profileToAdd, null, null, profileToAdd, new Date());
-                    profileUpdated.setPersistent(false);
-                    eventService.send(profileUpdated);
-                    updatedProfileCount++;
-                }
-                logger.info("{} profiles added to segment in {}ms", profilesToAdd.size(), System.currentTimeMillis() - profilesToAddStartTime);
-                profilesToAdd = persistenceService.continueScrollQuery(Profile.class, profilesToAdd.getScrollIdentifier(), profilesToAdd.getScrollTimeValidity());
-                if (profilesToAdd == null || profilesToAdd.getList().size() == 0) {
-                    break;
-                }
+            updatedProfileCount += updateProfilesSegment(profilesToAddCondition, segmentId, true);
+            updatedProfileCount += updateProfilesSegment(profilesToRemoveCondition, segmentId, false);
+        } else {
+            updatedProfileCount += updateProfilesSegment(segmentCondition, segmentId, false);
+        }
+        logger.info("{} profiles updated in {}ms", updatedProfileCount, System.currentTimeMillis() - updateProfilesForSegmentStartTime);
+    }
+
+    private long updateProfilesSegment(Condition profilesToUpdateCondition, String segmentId, boolean isAdd){
+        long updatedProfileCount= 0;
+        PartialList<Profile> profiles = persistenceService.query(profilesToUpdateCondition, null, Profile.class, 0, segmentUpdateBatchSize, "10m");
+
+        while (profiles != null && profiles.getList().size() > 0) {
+            long startTime = System.currentTimeMillis();
+            if (batchSegmentProfileUpdate) {
+                batchUpdateProfilesSegment(segmentId, profiles.getList(), isAdd);
             }
-            while (profilesToRemove.getList().size() > 0) {
-                long profilesToRemoveStartTime = System.currentTimeMillis();
-                for (Profile profileToRemove : profilesToRemove.getList()) {
-                    profileToRemove.getSegments().remove(segment.getItemId());
-                    Map<String,Object> sourceMap = new HashMap<>();
-                    sourceMap.put("segments", profileToRemove.getSegments());
-                    profileToRemove.setSystemProperty("lastUpdated", new Date());
-                    sourceMap.put("systemProperties", profileToRemove.getSystemProperties());
-                    persistenceService.update(profileToRemove, null, Profile.class, sourceMap);
-                    Event profileUpdated = new Event("profileUpdated", null, profileToRemove, null, null, profileToRemove, new Date());
-                    profileUpdated.setPersistent(false);
-                    eventService.send(profileUpdated);
-                    updatedProfileCount++;
-                }
-                logger.info("{} profiles removed from segment in {}ms", profilesToRemove.size(), System.currentTimeMillis() - profilesToRemoveStartTime );
-                profilesToRemove = persistenceService.continueScrollQuery(Profile.class, profilesToRemove.getScrollIdentifier(), profilesToRemove.getScrollTimeValidity());
-                if (profilesToRemove == null || profilesToRemove.getList().size() == 0) {
-                    break;
+            else { //send update profile one by one
+                for (Profile profileToUpdate : profiles.getList()) {
+                    Map<String, Object> sourceMap = buildPropertiesMapForUpdateSegment(profileToUpdate, segmentId, isAdd);
+                    persistenceService.update(profileToUpdate, null, Profile.class, sourceMap);
                 }
             }
+           if (sendProfileUpdateEventForSegmentUpdate)
+                sendProfileUpdatedEvent(profiles.getList());
 
-        } else {
-            PartialList<Profile> profilesToRemove = persistenceService.query(segmentCondition, null, Profile.class, 0, 200, "10m");
-            while (profilesToRemove.getList().size() > 0) {
-                long profilesToRemoveStartTime = System.currentTimeMillis();
-                for (Profile profileToRemove : profilesToRemove.getList()) {
-                    profileToRemove.getSegments().remove(segment.getItemId());
-                    Map<String,Object> sourceMap = new HashMap<>();
-                    sourceMap.put("segments", profileToRemove.getSegments());
-                    profileToRemove.setSystemProperty("lastUpdated", new Date());
-                    sourceMap.put("systemProperties", profileToRemove.getSystemProperties());
-                    persistenceService.update(profileToRemove, null, Profile.class, sourceMap);
-                    Event profileUpdated = new Event("profileUpdated", null, profileToRemove, null, null, profileToRemove, new Date());
-                    profileUpdated.setPersistent(false);
-                    eventService.send(profileUpdated);
-                    updatedProfileCount++;
-                }
-                logger.info("{} profiles removed from segment in {}ms", profilesToRemove.size(), System.currentTimeMillis() - profilesToRemoveStartTime);
-                profilesToRemove = persistenceService.continueScrollQuery(Profile.class, profilesToRemove.getScrollIdentifier(), profilesToRemove.getScrollTimeValidity());
-                if (profilesToRemove == null || profilesToRemove.getList().size() == 0) {
-                    break;
-                }
-            }
+            updatedProfileCount += profiles.size();

Review comment:
       same for the count, it will also contains profiles that failed.
   We should only count the profiles that update successfully.

##########
File path: persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
##########
@@ -235,12 +237,13 @@
     private Set<String> itemClassesToCacheSet = new HashSet<>();
     private String itemClassesToCache;
     private boolean useBatchingForSave = false;
+    private boolean useBatchingForUpdate = true;
+    private boolean alwaysOverwrite = true;
+    private boolean refreshBeforeQuery = false;

Review comment:
       This prop seem's not used.

##########
File path: services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
##########
@@ -919,74 +943,93 @@ private void updateExistingProfilesForSegment(Segment segment) {
             profilesToRemoveSubConditions.add(notNewSegmentCondition);
             profilesToRemoveCondition.setParameter("subConditions", profilesToRemoveSubConditions);
 
-            PartialList<Profile> profilesToRemove = persistenceService.query(profilesToRemoveCondition, null, Profile.class, 0, segmentUpdateBatchSize, "10m");
-            PartialList<Profile> profilesToAdd = persistenceService.query(profilesToAddCondition, null, Profile.class, 0, segmentUpdateBatchSize, "10m");
-
-            while (profilesToAdd.getList().size() > 0) {
-                long profilesToAddStartTime = System.currentTimeMillis();
-                for (Profile profileToAdd : profilesToAdd.getList()) {
-                    profileToAdd.getSegments().add(segment.getItemId());
-                    Map<String,Object> sourceMap = new HashMap<>();
-                    sourceMap.put("segments", profileToAdd.getSegments());
-                    profileToAdd.setSystemProperty("lastUpdated", new Date());
-                    sourceMap.put("systemProperties", profileToAdd.getSystemProperties());
-                    persistenceService.update(profileToAdd, null, Profile.class, sourceMap);
-                    Event profileUpdated = new Event("profileUpdated", null, profileToAdd, null, null, profileToAdd, new Date());
-                    profileUpdated.setPersistent(false);
-                    eventService.send(profileUpdated);
-                    updatedProfileCount++;
-                }
-                logger.info("{} profiles added to segment in {}ms", profilesToAdd.size(), System.currentTimeMillis() - profilesToAddStartTime);
-                profilesToAdd = persistenceService.continueScrollQuery(Profile.class, profilesToAdd.getScrollIdentifier(), profilesToAdd.getScrollTimeValidity());
-                if (profilesToAdd == null || profilesToAdd.getList().size() == 0) {
-                    break;
-                }
+            updatedProfileCount += updateProfilesSegment(profilesToAddCondition, segmentId, true);
+            updatedProfileCount += updateProfilesSegment(profilesToRemoveCondition, segmentId, false);
+        } else {
+            updatedProfileCount += updateProfilesSegment(segmentCondition, segmentId, false);
+        }
+        logger.info("{} profiles updated in {}ms", updatedProfileCount, System.currentTimeMillis() - updateProfilesForSegmentStartTime);
+    }
+
+    private long updateProfilesSegment(Condition profilesToUpdateCondition, String segmentId, boolean isAdd){
+        long updatedProfileCount= 0;
+        PartialList<Profile> profiles = persistenceService.query(profilesToUpdateCondition, null, Profile.class, 0, segmentUpdateBatchSize, "10m");
+
+        while (profiles != null && profiles.getList().size() > 0) {
+            long startTime = System.currentTimeMillis();
+            if (batchSegmentProfileUpdate) {
+                batchUpdateProfilesSegment(segmentId, profiles.getList(), isAdd);
             }
-            while (profilesToRemove.getList().size() > 0) {
-                long profilesToRemoveStartTime = System.currentTimeMillis();
-                for (Profile profileToRemove : profilesToRemove.getList()) {
-                    profileToRemove.getSegments().remove(segment.getItemId());
-                    Map<String,Object> sourceMap = new HashMap<>();
-                    sourceMap.put("segments", profileToRemove.getSegments());
-                    profileToRemove.setSystemProperty("lastUpdated", new Date());
-                    sourceMap.put("systemProperties", profileToRemove.getSystemProperties());
-                    persistenceService.update(profileToRemove, null, Profile.class, sourceMap);
-                    Event profileUpdated = new Event("profileUpdated", null, profileToRemove, null, null, profileToRemove, new Date());
-                    profileUpdated.setPersistent(false);
-                    eventService.send(profileUpdated);
-                    updatedProfileCount++;
-                }
-                logger.info("{} profiles removed from segment in {}ms", profilesToRemove.size(), System.currentTimeMillis() - profilesToRemoveStartTime );
-                profilesToRemove = persistenceService.continueScrollQuery(Profile.class, profilesToRemove.getScrollIdentifier(), profilesToRemove.getScrollTimeValidity());
-                if (profilesToRemove == null || profilesToRemove.getList().size() == 0) {
-                    break;
+            else { //send update profile one by one
+                for (Profile profileToUpdate : profiles.getList()) {
+                    Map<String, Object> sourceMap = buildPropertiesMapForUpdateSegment(profileToUpdate, segmentId, isAdd);
+                    persistenceService.update(profileToUpdate, null, Profile.class, sourceMap);
                 }
             }
+           if (sendProfileUpdateEventForSegmentUpdate)
+                sendProfileUpdatedEvent(profiles.getList());

Review comment:
       In case of batch and failure, this will still send an event if the profile failed to be updated, seem's wrong.
   We should send the event only if we are sure the update worked.

##########
File path: services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
##########
@@ -34,7 +34,11 @@
             <cm:property name="definitions.refresh.interval" value="10000"/>
             <cm:property name="properties.refresh.interval" value="10000"/>
             <cm:property name="segment.refresh.interval" value="1000"/>
+            <cm:property name="segment.max.retries.update.profile.segment" value="5"/>
+            <cm:property name="segment.retry.update.segment.seconds.delay" value="1"/>

Review comment:
       This one is not listed in the configurable values, is it normal ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org