You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by GitBox <gi...@apache.org> on 2021/03/10 15:47:29 UTC

[GitHub] [unomi] jkevan commented on a change in pull request #260: UNOMI-442: fix creation of Segment that contains past event condition…

jkevan commented on a change in pull request #260:
URL: https://github.com/apache/unomi/pull/260#discussion_r591637316



##########
File path: services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
##########
@@ -839,20 +839,25 @@ private void updateExistingProfilesForPastEventCondition(Condition eventConditio
 
         String propertyKey = (String) parentCondition.getParameter("generatedPropertyKey");
 
+        int updatedProfileCount = 0;
         if(pastEventsDisablePartitions) {
             Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(eventCondition, new TermsAggregate("profileId"), Event.ITEM_TYPE, maximumIdsQueryCount);
-            updateProfilesWithPastEventProperty(eventCountByProfile, propertyKey);
+            updatedProfileCount = updateProfilesWithPastEventProperty(eventCountByProfile, propertyKey);
         } else {
             Map<String, Double> m = persistenceService.getSingleValuesMetrics(andCondition, new String[]{"card"}, "profileId.keyword", Event.ITEM_TYPE);
             long card = m.get("_card").longValue();
             int numParts = (int) (card / aggregateQueryBucketSize) + 2;
             for (int i = 0; i < numParts; i++) {
                 Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(andCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE);
-                updateProfilesWithPastEventProperty(eventCountByProfile, propertyKey);
+                updatedProfileCount += updateProfilesWithPastEventProperty(eventCountByProfile, propertyKey);
             }
         }
 
-        logger.info("Profiles past condition updated in {}ms", System.currentTimeMillis() - t);
+        if (forceRefresh && updatedProfileCount > 0) {

Review comment:
       That the point of the PR, I'm not using the bulk processor anymore.
   But to answer: you cannot be sure, even if you do a bulkProcessor.flush()
   The bulkProcessor is doing the update asynchronously in a separate thread.
   I did debug this part with logs and the bulkProcessor listener, I was able to see that the refresh always come before bulkProcessor perform the request.
   
   For the current PR, I'm now using this function to do the update (that do not use the bulk processor): https://github.com/apache/unomi/blob/master/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java#L967-L997
   Especially because I cannot handle the bulk processor asynchronous behavior, and I need the past event props to be saved before the segment save try to update the profiles.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org