You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by dr...@apache.org on 2018/10/18 12:24:27 UTC

[1/5] incubator-unomi git commit: UNOMI-204 : Add count method in QueryBuilder, adds cardinality/count aggregates, first optimization for PastEventCondition

Repository: incubator-unomi
Updated Branches:
  refs/heads/master 55bd603ca -> aa82e46a0


UNOMI-204 : Add count method in QueryBuilder, adds cardinality/count aggregates, first optimization for PastEventCondition


Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/bc549076
Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/bc549076
Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/bc549076

Branch: refs/heads/master
Commit: bc549076dcebd76511ad0943071c895a9c05d06b
Parents: 95d0030
Author: tdraier <dr...@apache.org>
Authored: Tue Oct 9 18:15:28 2018 +0200
Committer: tdraier <dr...@apache.org>
Committed: Thu Oct 11 15:07:22 2018 +0200

----------------------------------------------------------------------
 .../ElasticSearchPersistenceServiceImpl.java    | 25 ++++++-
 .../conditions/ConditionESQueryBuilder.java     |  3 +
 .../ConditionESQueryBuilderDispatcher.java      | 32 ++++++++
 .../spi/aggregate/TermsAggregate.java           | 18 +++++
 .../PastEventConditionESQueryBuilder.java       | 77 +++++++++++++++-----
 5 files changed, 137 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/bc549076/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
----------------------------------------------------------------------
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 46e7cbb..23cedfb 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -61,6 +61,7 @@ import org.elasticsearch.common.unit.DistanceUnit;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.index.IndexNotFoundException;
+import org.elasticsearch.index.query.IdsQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.query.RangeQueryBuilder;
@@ -83,6 +84,8 @@ import org.elasticsearch.search.aggregations.bucket.missing.MissingAggregationBu
 import org.elasticsearch.search.aggregations.bucket.range.RangeAggregationBuilder;
 import org.elasticsearch.search.aggregations.bucket.range.date.DateRangeAggregationBuilder;
 import org.elasticsearch.search.aggregations.bucket.range.ip.IpRangeAggregationBuilder;
+import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
+import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude;
 import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
 import org.elasticsearch.search.sort.GeoDistanceSortBuilder;
 import org.elasticsearch.search.sort.SortBuilders;
@@ -1306,7 +1309,15 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
 
     @Override
     public long queryCount(Condition query, String itemType) {
-        return queryCount(conditionESQueryBuilderDispatcher.buildFilter(query), itemType);
+        try {
+            return conditionESQueryBuilderDispatcher.count(query);
+        } catch (UnsupportedOperationException e) {
+            QueryBuilder filter = conditionESQueryBuilderDispatcher.buildFilter(query);
+            if (filter instanceof IdsQueryBuilder) {
+                return ((IdsQueryBuilder) filter).ids().size();
+            }
+            return queryCount(filter, itemType);
+        }
     }
 
     private long queryCount(final QueryBuilder filter, final String itemType) {
@@ -1564,6 +1575,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                         //default
                         if (fieldName != null) {
                             bucketsAggregation = AggregationBuilders.terms("buckets").field(fieldName).size(Integer.parseInt(aggregateQueryBucketSize));
+                            if (aggregate instanceof TermsAggregate) {
+                                TermsAggregate termsAggregate = (TermsAggregate) aggregate;
+                                if (termsAggregate.getPartition() > -1 && termsAggregate.getNumPartitions() > -1) {
+                                    ((TermsAggregationBuilder) bucketsAggregation).includeExclude(new IncludeExclude(termsAggregate.getPartition(), termsAggregate.getNumPartitions()));
+                                }
+                            }
                         } else {
                             // field name could be null if no existing data exists
                         }
@@ -1781,6 +1798,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                             case "max":
                                 filterAggregation.subAggregation(AggregationBuilders.max("max").field(field));
                                 break;
+                            case "card":
+                                filterAggregation.subAggregation(AggregationBuilders.cardinality("card").field(field));
+                                break;
+                            case "count":
+                                filterAggregation.subAggregation(AggregationBuilders.count("count").field(field));
+                                break;
                         }
                     }
                 }

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/bc549076/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilder.java
----------------------------------------------------------------------
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilder.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilder.java
index 78a5cba..7a100d0 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilder.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilder.java
@@ -26,4 +26,7 @@ public interface ConditionESQueryBuilder {
 
     QueryBuilder buildQuery(Condition condition, Map<String, Object> context, ConditionESQueryBuilderDispatcher dispatcher);
 
+    default long count(Condition condition, Map<String, Object> context, ConditionESQueryBuilderDispatcher dispatcher) {
+        throw new UnsupportedOperationException();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/bc549076/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java
----------------------------------------------------------------------
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java
index b29a631..d08e283 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java
@@ -86,5 +86,37 @@ public class ConditionESQueryBuilderDispatcher {
         return QueryBuilders.matchAllQuery();
     }
 
+    public long count(Condition condition) {
+        return count(condition, new HashMap<>());
+    }
+
+    public long count(Condition condition, Map<String, Object> context) {
+        if(condition == null || condition.getConditionType() == null) {
+            throw new IllegalArgumentException("Condition is null or doesn't have type, impossible to build filter");
+        }
+
+        String queryBuilderKey = condition.getConditionType().getQueryBuilder();
+        if (queryBuilderKey == null && condition.getConditionType().getParentCondition() != null) {
+            context.putAll(condition.getParameterValues());
+            return count(condition.getConditionType().getParentCondition(), context);
+        }
+
+        if (queryBuilderKey == null) {
+            throw new UnsupportedOperationException("No query builder defined for : " + condition.getConditionTypeId());
+        }
 
+        if (queryBuilders.containsKey(queryBuilderKey)) {
+            ConditionESQueryBuilder queryBuilder = queryBuilders.get(queryBuilderKey);
+            Condition contextualCondition = ConditionContextHelper.getContextualCondition(condition, context);
+            if (contextualCondition != null) {
+                return queryBuilder.count(contextualCondition, context, this);
+            }
+        }
+
+        // if no matching
+        if (logger.isDebugEnabled()) {
+            logger.debug("No matching query builder for condition {} and context {}", condition, context);
+        }
+        throw new UnsupportedOperationException();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/bc549076/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/aggregate/TermsAggregate.java
----------------------------------------------------------------------
diff --git a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/aggregate/TermsAggregate.java b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/aggregate/TermsAggregate.java
index a0591f5..3b9d741 100644
--- a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/aggregate/TermsAggregate.java
+++ b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/aggregate/TermsAggregate.java
@@ -18,7 +18,25 @@
 package org.apache.unomi.persistence.spi.aggregate;
 
 public class TermsAggregate extends BaseAggregate{
+    private int partition = -1;
+    private int numPartitions = -1;
+
+
     public TermsAggregate(String field) {
         super(field);
     }
+
+    public TermsAggregate(String field, int partition, int numPartitions) {
+        super(field);
+        this.partition = partition;
+        this.numPartitions = numPartitions;
+    }
+
+    public int getPartition() {
+        return partition;
+    }
+
+    public int getNumPartitions() {
+        return numPartitions;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/bc549076/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
----------------------------------------------------------------------
diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
index e51aaa8..b3c169c 100644
--- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
+++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
@@ -44,6 +44,64 @@ public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder
     }
 
     public QueryBuilder buildQuery(Condition condition, Map<String, Object> context, ConditionESQueryBuilderDispatcher dispatcher) {
+        Condition eventCondition = getEventCondition(condition, context);
+        //todo : Check behaviour with important number of profiles
+        Set<String> ids = new HashSet<String>();
+        Integer minimumEventCount = condition.getParameter("minimumEventCount") == null ? 0 : (Integer) condition.getParameter("minimumEventCount");
+        Integer maximumEventCount = condition.getParameter("maximumEventCount") == null ? Integer.MAX_VALUE : (Integer) condition.getParameter("maximumEventCount");
+
+        Map<String, Double> m = persistenceService.getSingleValuesMetrics(eventCondition, new String[]{"card", "count"}, "profileId.keyword", Event.ITEM_TYPE);
+        long card = m.get("_card").longValue();
+
+        int numParts = (int) (card / 1000);
+        for (int i = 0; i < numParts; i++) {
+            Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(eventCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE);
+            if (eventCountByProfile != null) {
+                for (Map.Entry<String, Long> entry : eventCountByProfile.entrySet()) {
+                    if (!entry.getKey().startsWith("_")) {
+                        if (entry.getValue() >= minimumEventCount && entry.getValue() <= maximumEventCount) {
+                            ids.add(entry.getKey());
+                        }
+                    }
+                }
+            }
+        }
+
+        return QueryBuilders.idsQuery(Profile.ITEM_TYPE).addIds(ids.toArray(new String[ids.size()]));
+    }
+
+    public long count(Condition condition, Map<String, Object> context, ConditionESQueryBuilderDispatcher dispatcher) {
+        Condition eventCondition = getEventCondition(condition, context);
+
+        Integer minimumEventCount = condition.getParameter("minimumEventCount") == null ? 0 : (Integer) condition.getParameter("minimumEventCount");
+        Integer maximumEventCount = condition.getParameter("maximumEventCount") == null ? Integer.MAX_VALUE : (Integer) condition.getParameter("maximumEventCount");
+
+        Map<String, Double> m = persistenceService.getSingleValuesMetrics(eventCondition, new String[]{"card", "count"}, "profileId.keyword", Event.ITEM_TYPE);
+        long card = m.get("_card").longValue();
+        long count = m.get("_count").longValue();
+
+        if (minimumEventCount != 0 || maximumEventCount != Integer.MAX_VALUE) {
+            int result = 0;
+            int numParts = (int) (card / 1000);
+            for (int i = 0; i < numParts; i++) {
+                Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(eventCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE);
+                if (eventCountByProfile != null) {
+                    for (Map.Entry<String, Long> entry : eventCountByProfile.entrySet()) {
+                        if (!entry.getKey().startsWith("_")) {
+                            if (entry.getValue() >= minimumEventCount && entry.getValue() <= maximumEventCount) {
+                                result ++;
+                            }
+                        }
+                    }
+                }
+            }
+            return result;
+        }
+
+        return card;
+    }
+
+    private Condition getEventCondition(Condition condition, Map<String, Object> context) {
         Condition eventCondition;
         try {
             eventCondition = (Condition) condition.getParameter("eventCondition");
@@ -70,22 +128,7 @@ public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder
             numberOfDaysCondition.setParameter("propertyValueDateExpr", "now-" + numberOfDays + "d");
             l.add(numberOfDaysCondition);
         }
-        //todo : Check behaviour with important number of profiles
-        Set<String> ids = new HashSet<String>();
-        Integer minimumEventCount = condition.getParameter("minimumEventCount") == null ? 0 : (Integer) condition.getParameter("minimumEventCount");
-        Integer maximumEventCount = condition.getParameter("maximumEventCount") == null  ? Integer.MAX_VALUE : (Integer) condition.getParameter("maximumEventCount");
-
-        Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(andCondition, new TermsAggregate("profileId"), Event.ITEM_TYPE);
-        if (eventCountByProfile != null) {
-            for (Map.Entry<String, Long> entry : eventCountByProfile.entrySet()) {
-                if (!entry.getKey().startsWith("_")) {
-                    if (entry.getValue() >= minimumEventCount && entry.getValue() <= maximumEventCount) {
-                        ids.add(entry.getKey());
-                    }
-                }
-            }
-        }
-
-        return QueryBuilders.idsQuery(Profile.ITEM_TYPE).addIds(ids.toArray(new String[ids.size()]));
+        return andCondition;
     }
+
 }


[2/5] incubator-unomi git commit: UNOMI-204 : More optimizations in past event count and segment calculation

Posted by dr...@apache.org.
UNOMI-204 : More optimizations in past event count and segment calculation


Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/ae31c533
Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/ae31c533
Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/ae31c533

Branch: refs/heads/master
Commit: ae31c533c2f63820bd275c32cf0af0f5c2e6817c
Parents: bc54907
Author: tdraier <dr...@apache.org>
Authored: Fri Oct 12 14:59:54 2018 +0200
Committer: tdraier <dr...@apache.org>
Committed: Fri Oct 12 14:59:54 2018 +0200

----------------------------------------------------------------------
 .../unomi/api/services/SegmentService.java      |  12 +-
 .../ElasticSearchPersistenceServiceImpl.java    |  12 +-
 .../PastEventConditionESQueryBuilder.java       | 114 ++++++++++++++-----
 .../resources/OSGI-INF/blueprint/blueprint.xml  |   1 +
 .../services/services/SegmentServiceImpl.java   |  93 +++++++++------
 5 files changed, 164 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/ae31c533/api/src/main/java/org/apache/unomi/api/services/SegmentService.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/unomi/api/services/SegmentService.java b/api/src/main/java/org/apache/unomi/api/services/SegmentService.java
index 4b03e78..1ed152d 100644
--- a/api/src/main/java/org/apache/unomi/api/services/SegmentService.java
+++ b/api/src/main/java/org/apache/unomi/api/services/SegmentService.java
@@ -21,6 +21,7 @@ import org.apache.unomi.api.Item;
 import org.apache.unomi.api.Metadata;
 import org.apache.unomi.api.PartialList;
 import org.apache.unomi.api.Profile;
+import org.apache.unomi.api.conditions.Condition;
 import org.apache.unomi.api.query.Query;
 import org.apache.unomi.api.segments.DependentMetadata;
 import org.apache.unomi.api.segments.Scoring;
@@ -50,7 +51,7 @@ public interface SegmentService {
     /**
      * Retrieves segment metadatas for segments in the specified scope, ordered according to the specified {@code sortBy} String and and paged: only {@code size} of them are
      * retrieved, starting with the {@code offset}-th one.
-     *
+     * <p>
      * TODO: remove?
      *
      * @param scope  the scope for which we want to retrieve segment metadata
@@ -158,7 +159,7 @@ public interface SegmentService {
      * Retrieves the set of all scoring metadata.
      *
      * @param offset the offset
-     * @param size the size
+     * @param size   the size
      * @param sortBy sort by
      * @return the set of all scoring metadata
      */
@@ -219,4 +220,11 @@ public interface SegmentService {
      */
     DependentMetadata getScoringDependentMetadata(String scoringId);
 
+    /**
+     * Get generated property key for past event condition
+     * @param condition The event condition
+     * @param parentCondition The past event condition
+     * @return
+     */
+    String getGeneratedPropertyKey(Condition condition, Condition parentCondition);
 }

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/ae31c533/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
----------------------------------------------------------------------
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 23cedfb..87a46eb 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -1312,11 +1312,15 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
         try {
             return conditionESQueryBuilderDispatcher.count(query);
         } catch (UnsupportedOperationException e) {
-            QueryBuilder filter = conditionESQueryBuilderDispatcher.buildFilter(query);
-            if (filter instanceof IdsQueryBuilder) {
-                return ((IdsQueryBuilder) filter).ids().size();
+            try {
+                QueryBuilder filter = conditionESQueryBuilderDispatcher.buildFilter(query);
+                if (filter instanceof IdsQueryBuilder) {
+                    return ((IdsQueryBuilder) filter).ids().size();
+                }
+                return queryCount(filter, itemType);
+            } catch (UnsupportedOperationException e1) {
+                return -1;
             }
-            return queryCount(filter, itemType);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/ae31c533/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
----------------------------------------------------------------------
diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
index b3c169c..c891abd 100644
--- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
+++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
@@ -21,6 +21,7 @@ import org.apache.unomi.api.Event;
 import org.apache.unomi.api.Profile;
 import org.apache.unomi.api.conditions.Condition;
 import org.apache.unomi.api.services.DefinitionsService;
+import org.apache.unomi.api.services.SegmentService;
 import org.apache.unomi.persistence.elasticsearch.conditions.ConditionContextHelper;
 import org.apache.unomi.persistence.elasticsearch.conditions.ConditionESQueryBuilder;
 import org.apache.unomi.persistence.elasticsearch.conditions.ConditionESQueryBuilderDispatcher;
@@ -28,12 +29,20 @@ import org.apache.unomi.persistence.spi.PersistenceService;
 import org.apache.unomi.persistence.spi.aggregate.TermsAggregate;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.query.RangeQueryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.*;
 
 public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder {
+
     private DefinitionsService definitionsService;
     private PersistenceService persistenceService;
+    private SegmentService segmentService;
+
+    private int maximumIdsQueryCount = 1000;
+    private int termsAggregatePartitionSize = 1000;
 
     public void setDefinitionsService(DefinitionsService definitionsService) {
         this.definitionsService = definitionsService;
@@ -43,62 +52,115 @@ public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder
         this.persistenceService = persistenceService;
     }
 
+    public void setMaximumIdsQueryCount(int maximumIdsQueryCount) {
+        this.maximumIdsQueryCount = maximumIdsQueryCount;
+    }
+
+    public void setTermsAggregatePartitionSize(int termsAggregatePartitionSize) {
+        this.termsAggregatePartitionSize = termsAggregatePartitionSize;
+    }
+
+    public void setSegmentService(SegmentService segmentService) {
+        this.segmentService = segmentService;
+    }
+
     public QueryBuilder buildQuery(Condition condition, Map<String, Object> context, ConditionESQueryBuilderDispatcher dispatcher) {
-        Condition eventCondition = getEventCondition(condition, context);
-        //todo : Check behaviour with important number of profiles
-        Set<String> ids = new HashSet<String>();
-        Integer minimumEventCount = condition.getParameter("minimumEventCount") == null ? 0 : (Integer) condition.getParameter("minimumEventCount");
+        Integer minimumEventCount = condition.getParameter("minimumEventCount") == null ? 1 : (Integer) condition.getParameter("minimumEventCount");
         Integer maximumEventCount = condition.getParameter("maximumEventCount") == null ? Integer.MAX_VALUE : (Integer) condition.getParameter("maximumEventCount");
 
-        Map<String, Double> m = persistenceService.getSingleValuesMetrics(eventCondition, new String[]{"card", "count"}, "profileId.keyword", Event.ITEM_TYPE);
-        long card = m.get("_card").longValue();
+        final Logger logger = LoggerFactory.getLogger(PastEventConditionESQueryBuilder.class.getName());
+
+        if (condition.getParameter("generatedPropertyKey") != null && condition.getParameter("generatedPropertyKey").equals(segmentService.getGeneratedPropertyKey((Condition) condition.getParameter("eventCondition"), condition))) {
+            // A property is already set on profiles matching the past event condition, use it
+            if (minimumEventCount != 1 || maximumEventCount != Integer.MAX_VALUE) {
+                // Check the number of occurences
+                RangeQueryBuilder builder = QueryBuilders.rangeQuery("systemProperties.pastEvents." + condition.getParameter("generatedPropertyKey"));
+                if (minimumEventCount != 1) {
+                    builder.gte(minimumEventCount);
+                }
+                if (maximumEventCount != Integer.MAX_VALUE) {
+                    builder.lte(minimumEventCount);
+                }
+                return builder;
+            } else {
+                // Simply get profiles who have the property set
+                return QueryBuilders.existsQuery("systemProperties.pastEvents." + condition.getParameter("generatedPropertyKey"));
+            }
+        } else {
+            // No property set - tries to build an idsQuery
+            // Build past event condition
+            Condition eventCondition = getEventCondition(condition, context);
+
+            Set<String> ids = new HashSet<>();
 
-        int numParts = (int) (card / 1000);
-        for (int i = 0; i < numParts; i++) {
-            Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(eventCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE);
-            if (eventCountByProfile != null) {
-                for (Map.Entry<String, Long> entry : eventCountByProfile.entrySet()) {
-                    if (!entry.getKey().startsWith("_")) {
-                        if (entry.getValue() >= minimumEventCount && entry.getValue() <= maximumEventCount) {
+            // Get full cardinality to partition the terms aggreggation
+            Map<String, Double> m = persistenceService.getSingleValuesMetrics(eventCondition, new String[]{"card"}, "profileId.keyword", Event.ITEM_TYPE);
+            long card = m.get("_card").longValue();
+
+            int numParts = (int) (card / termsAggregatePartitionSize);
+            for (int i = 0; i < numParts; i++) {
+                Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(eventCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE);
+                if (eventCountByProfile != null) {
+                    eventCountByProfile.remove("_filtered");
+                    for (Map.Entry<String, Long> entry : eventCountByProfile.entrySet()) {
+                        if (entry.getValue() < minimumEventCount) {
+                            // No more interesting buckets in this partition
+                            break;
+                        } else if (entry.getValue() <= maximumEventCount) {
                             ids.add(entry.getKey());
+
+                            if (ids.size() > maximumIdsQueryCount) {
+                                // Avoid building too big ids query - throw exception instead
+                                throw new UnsupportedOperationException("Too many profiles");
+                            }
                         }
                     }
                 }
             }
-        }
 
-        return QueryBuilders.idsQuery(Profile.ITEM_TYPE).addIds(ids.toArray(new String[ids.size()]));
+            return QueryBuilders.idsQuery(Profile.ITEM_TYPE).addIds(ids.toArray(new String[0]));
+        }
     }
 
     public long count(Condition condition, Map<String, Object> context, ConditionESQueryBuilderDispatcher dispatcher) {
         Condition eventCondition = getEventCondition(condition, context);
 
-        Integer minimumEventCount = condition.getParameter("minimumEventCount") == null ? 0 : (Integer) condition.getParameter("minimumEventCount");
+        Integer minimumEventCount = condition.getParameter("minimumEventCount") == null ? 1 : (Integer) condition.getParameter("minimumEventCount");
         Integer maximumEventCount = condition.getParameter("maximumEventCount") == null ? Integer.MAX_VALUE : (Integer) condition.getParameter("maximumEventCount");
 
-        Map<String, Double> m = persistenceService.getSingleValuesMetrics(eventCondition, new String[]{"card", "count"}, "profileId.keyword", Event.ITEM_TYPE);
+        // Get full cardinality to partition the terms aggreggation
+        Map<String, Double> m = persistenceService.getSingleValuesMetrics(eventCondition, new String[]{"card"}, "profileId.keyword", Event.ITEM_TYPE);
         long card = m.get("_card").longValue();
-        long count = m.get("_count").longValue();
 
-        if (minimumEventCount != 0 || maximumEventCount != Integer.MAX_VALUE) {
+        if (minimumEventCount != 1 || maximumEventCount != Integer.MAX_VALUE) {
+            // Event count specified, must check occurences count for each profile
             int result = 0;
-            int numParts = (int) (card / 1000);
+            int numParts = (int) (card / termsAggregatePartitionSize);
             for (int i = 0; i < numParts; i++) {
                 Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(eventCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE);
+                int j = 0;
                 if (eventCountByProfile != null) {
+                    eventCountByProfile.remove("_filtered");
                     for (Map.Entry<String, Long> entry : eventCountByProfile.entrySet()) {
-                        if (!entry.getKey().startsWith("_")) {
-                            if (entry.getValue() >= minimumEventCount && entry.getValue() <= maximumEventCount) {
-                                result ++;
-                            }
+                        if (entry.getValue() < minimumEventCount) {
+                            // No more interesting buckets in this partition
+                            break;
+                        } else if (entry.getValue() <= maximumEventCount && minimumEventCount == 1) {
+                            // Take all remaining elements
+                            result += eventCountByProfile.size() - j;
+                            break;
+                        } else if (entry.getValue() <= maximumEventCount) {
+                            result++;
                         }
+                        j++;
                     }
                 }
             }
             return result;
+        } else {
+            // Simply get the full number of distinct profiles
+            return card;
         }
-
-        return card;
     }
 
     private Condition getEventCondition(Condition condition, Map<String, Object> context) {

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/ae31c533/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml
----------------------------------------------------------------------
diff --git a/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 2cf78ab..9aa9d6c 100644
--- a/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -93,6 +93,7 @@
         <bean class="org.apache.unomi.plugins.baseplugin.conditions.PastEventConditionESQueryBuilder">
             <property name="definitionsService" ref="definitionsService"/>
             <property name="persistenceService" ref="persistenceService"/>
+            <property name="segmentService" ref="segmentService"/>
         </bean>
     </service>
 

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/ae31c533/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
----------------------------------------------------------------------
diff --git a/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
index 0fe5f2e..c68f691 100644
--- a/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
@@ -27,7 +27,6 @@ import org.apache.unomi.api.rules.Rule;
 import org.apache.unomi.api.segments.*;
 import org.apache.unomi.api.services.*;
 import org.apache.unomi.persistence.spi.CustomObjectMapper;
-import org.apache.unomi.persistence.spi.PersistenceService;
 import org.apache.unomi.persistence.spi.aggregate.TermsAggregate;
 import org.osgi.framework.Bundle;
 import org.osgi.framework.BundleContext;
@@ -58,6 +57,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
     private List<Scoring> allScoring;
     private Timer segmentTimer;
     private int segmentUpdateBatchSize = 1000;
+    private int termsAggregatePartitionSize = 1000;
 
     public SegmentServiceImpl() {
         logger.info("Initializing segment service...");
@@ -83,6 +83,10 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
         this.segmentUpdateBatchSize = segmentUpdateBatchSize;
     }
 
+    public void setTermsAggregatePartitionSize(int termsAggregatePartitionSize) {
+        this.termsAggregatePartitionSize = termsAggregatePartitionSize;
+    }
+
     public void postConstruct() {
         logger.debug("postConstruct {" + bundleContext.getBundle() + "}");
         loadPredefinedSegments(bundleContext);
@@ -708,13 +712,8 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
     private void getAutoGeneratedRules(Metadata metadata, Condition condition, Condition parentCondition, List<Rule> rules) {
         Set<String> tags = condition.getConditionType().getMetadata().getSystemTags();
         if (tags.contains("eventCondition") && !tags.contains("profileCondition")) {
-            try {
-                Map<String, Object> m = new HashMap<>(3);
-                m.put("scope", metadata.getScope());
-                m.put("condition", condition);
-                m.put("numberOfDays", parentCondition.getParameter("numberOfDays"));
-                String key = CustomObjectMapper.getObjectMapper().writeValueAsString(m);
-                key = "eventTriggered" + getMD5(key);
+            String key = getGeneratedPropertyKey(condition, parentCondition);
+            if (key != null) {
                 parentCondition.setParameter("generatedPropertyKey", key);
                 Rule rule = rulesService.getRule(key);
                 if (rule == null) {
@@ -734,8 +733,6 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
                     rule.getLinkedItems().add(metadata.getId());
                     rules.add(rule);
                 }
-            } catch (JsonProcessingException e) {
-                logger.error(e.getMessage(), e);
             }
         } else {
             Collection<Object> values = new ArrayList<>(condition.getParameterValues().values());
@@ -773,18 +770,25 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
             l.add(numberOfDaysCondition);
         }
         String propertyKey = (String) parentCondition.getParameter("generatedPropertyKey");
-        Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(andCondition, new TermsAggregate("profileId"), Event.ITEM_TYPE);
-        for (Map.Entry<String, Long> entry : eventCountByProfile.entrySet()) {
-            String profileId = entry.getKey();
-            if (!profileId.startsWith("_")) {
-                Map<String, Long> pastEventCounts = new HashMap<>();
-                pastEventCounts.put(propertyKey, entry.getValue());
-                Map<String, Object> systemProperties = new HashMap<>();
-                systemProperties.put("pastEvents", pastEventCounts);
-                try {
-                    persistenceService.update(profileId, null, Profile.class, "systemProperties", systemProperties);
-                } catch (Exception e) {
-                    logger.error("Error updating profile {} past event system properties", profileId, e);
+
+        Map<String, Double> m = persistenceService.getSingleValuesMetrics(andCondition, new String[]{"card"}, "profileId.keyword", Event.ITEM_TYPE);
+        long card = m.get("_card").longValue();
+
+        int numParts = (int) (card / termsAggregatePartitionSize);
+        for (int i = 0; i < numParts; i++) {
+            Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(andCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE);
+            for (Map.Entry<String, Long> entry : eventCountByProfile.entrySet()) {
+                String profileId = entry.getKey();
+                if (!profileId.startsWith("_")) {
+                    Map<String, Long> pastEventCounts = new HashMap<>();
+                    pastEventCounts.put(propertyKey, entry.getValue());
+                    Map<String, Object> systemProperties = new HashMap<>();
+                    systemProperties.put("pastEvents", pastEventCounts);
+                    try {
+                        persistenceService.update(profileId, null, Profile.class, "systemProperties", systemProperties);
+                    } catch (Exception e) {
+                        logger.error("Error updating profile {} past event system properties", profileId, e);
+                    }
                 }
             }
         }
@@ -792,6 +796,33 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
         logger.info("Profiles past condition updated in {}ms", System.currentTimeMillis() - t);
     }
 
+    public String getGeneratedPropertyKey(Condition condition, Condition parentCondition) {
+        try {
+            Map<String, Object> m = new HashMap<>();
+            m.put("condition", condition);
+            m.put("numberOfDays", parentCondition.getParameter("numberOfDays"));
+            String key = CustomObjectMapper.getObjectMapper().writeValueAsString(m);
+            return "eventTriggered" + getMD5(key);
+        } catch (JsonProcessingException e) {
+            logger.error("Cannot generate key",e);
+            return null;
+        }
+    }
+
+    private String getMD5(String md5) {
+        try {
+            MessageDigest md = MessageDigest.getInstance("MD5");
+            byte[] array = md.digest(md5.getBytes());
+            StringBuffer sb = new StringBuffer();
+            for (int i = 0; i < array.length; ++i) {
+                sb.append(Integer.toHexString((array[i] & 0xFF) | 0x100).substring(1, 3));
+            }
+            return sb.toString();
+        } catch (java.security.NoSuchAlgorithmException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     private void updateExistingProfilesForSegment(Segment segment) {
         long t = System.currentTimeMillis();
         Condition segmentCondition = new Condition();
@@ -830,6 +861,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
             PartialList<Profile> profilesToAdd = persistenceService.query(profilesToAddCondition, null, Profile.class, 0, segmentUpdateBatchSize, "10m");
 
             while (profilesToAdd.getList().size() > 0) {
+                long t2= System.currentTimeMillis();
                 for (Profile profileToAdd : profilesToAdd.getList()) {
                     profileToAdd.getSegments().add(segment.getItemId());
                     persistenceService.update(profileToAdd.getItemId(), null, Profile.class, "segments", profileToAdd.getSegments());
@@ -838,12 +870,14 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
                     eventService.send(profileUpdated);
                     updatedProfileCount++;
                 }
+                logger.info("{} profiles added in segment in {}ms", profilesToAdd.size(), System.currentTimeMillis() - t2);
                 profilesToAdd = persistenceService.continueScrollQuery(Profile.class, profilesToAdd.getScrollIdentifier(), profilesToAdd.getScrollTimeValidity());
                 if (profilesToAdd == null || profilesToAdd.getList().size() == 0) {
                     break;
                 }
             }
             while (profilesToRemove.getList().size() > 0) {
+                long t2= System.currentTimeMillis();
                 for (Profile profileToRemove : profilesToRemove.getList()) {
                     profileToRemove.getSegments().remove(segment.getItemId());
                     persistenceService.update(profileToRemove.getItemId(), null, Profile.class, "segments", profileToRemove.getSegments());
@@ -852,6 +886,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
                     eventService.send(profileUpdated);
                     updatedProfileCount++;
                 }
+                logger.info("{} profiles removed from segment in {}ms", profilesToRemove.size(), System.currentTimeMillis() - t2);
                 profilesToRemove = persistenceService.continueScrollQuery(Profile.class, profilesToRemove.getScrollIdentifier(), profilesToRemove.getScrollTimeValidity());
                 if (profilesToRemove == null || profilesToRemove.getList().size() == 0) {
                     break;
@@ -929,20 +964,6 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
         logger.info("Profiles updated in {}ms", System.currentTimeMillis() - t);
     }
 
-    private String getMD5(String md5) {
-        try {
-            MessageDigest md = MessageDigest.getInstance("MD5");
-            byte[] array = md.digest(md5.getBytes());
-            StringBuffer sb = new StringBuffer();
-            for (int i = 0; i < array.length; ++i) {
-                sb.append(Integer.toHexString((array[i] & 0xFF) | 0x100).substring(1, 3));
-            }
-            return sb.toString();
-        } catch (java.security.NoSuchAlgorithmException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
     public void bundleChanged(BundleEvent event) {
         switch (event.getType()) {
             case BundleEvent.STARTED:


[5/5] incubator-unomi git commit: Merge branch 'UNOMI-204'

Posted by dr...@apache.org.
Merge branch 'UNOMI-204'


Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/aa82e46a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/aa82e46a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/aa82e46a

Branch: refs/heads/master
Commit: aa82e46a0600908f0c5bf0a7e0081aac07aa0fdc
Parents: 55bd603 183ffcf
Author: tdraier <dr...@apache.org>
Authored: Thu Oct 18 14:23:32 2018 +0200
Committer: tdraier <dr...@apache.org>
Committed: Thu Oct 18 14:23:32 2018 +0200

----------------------------------------------------------------------
 .../unomi/api/services/SegmentService.java      |  12 +-
 .../ElasticSearchPersistenceServiceImpl.java    |  35 ++++-
 .../conditions/ConditionESQueryBuilder.java     |   3 +
 .../ConditionESQueryBuilderDispatcher.java      |  32 +++++
 ...g.apache.unomi.persistence.elasticsearch.cfg |   3 +
 .../spi/aggregate/TermsAggregate.java           |  18 +++
 .../PastEventConditionESQueryBuilder.java       | 135 ++++++++++++++++---
 .../resources/OSGI-INF/blueprint/blueprint.xml  |  11 ++
 .../services/services/SegmentServiceImpl.java   |  92 ++++++++-----
 .../resources/OSGI-INF/blueprint/blueprint.xml  |   8 ++
 10 files changed, 291 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/aa82e46a/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/aa82e46a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
----------------------------------------------------------------------


[3/5] incubator-unomi git commit: UNOMI-204 : removed logger

Posted by dr...@apache.org.
UNOMI-204 : removed logger


Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/7aa45d62
Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/7aa45d62
Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/7aa45d62

Branch: refs/heads/master
Commit: 7aa45d625a7dc04f0422e13b2790aebfb46cf1fc
Parents: ae31c53
Author: tdraier <dr...@apache.org>
Authored: Fri Oct 12 15:56:05 2018 +0200
Committer: tdraier <dr...@apache.org>
Committed: Fri Oct 12 15:56:05 2018 +0200

----------------------------------------------------------------------
 .../baseplugin/conditions/PastEventConditionESQueryBuilder.java    | 2 --
 1 file changed, 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/7aa45d62/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
----------------------------------------------------------------------
diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
index c891abd..0265a42 100644
--- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
+++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
@@ -68,8 +68,6 @@ public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder
         Integer minimumEventCount = condition.getParameter("minimumEventCount") == null ? 1 : (Integer) condition.getParameter("minimumEventCount");
         Integer maximumEventCount = condition.getParameter("maximumEventCount") == null ? Integer.MAX_VALUE : (Integer) condition.getParameter("maximumEventCount");
 
-        final Logger logger = LoggerFactory.getLogger(PastEventConditionESQueryBuilder.class.getName());
-
         if (condition.getParameter("generatedPropertyKey") != null && condition.getParameter("generatedPropertyKey").equals(segmentService.getGeneratedPropertyKey((Condition) condition.getParameter("eventCondition"), condition))) {
             // A property is already set on profiles matching the past event condition, use it
             if (minimumEventCount != 1 || maximumEventCount != Integer.MAX_VALUE) {


[4/5] incubator-unomi git commit: UNOMI-204 : added configurations parameters

Posted by dr...@apache.org.
UNOMI-204 : added configurations parameters


Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/183ffcf1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/183ffcf1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/183ffcf1

Branch: refs/heads/master
Commit: 183ffcf1f49efce0244e38c100a963687ada1033
Parents: 7aa45d6
Author: tdraier <dr...@apache.org>
Authored: Tue Oct 16 14:10:27 2018 +0200
Committer: tdraier <dr...@apache.org>
Committed: Tue Oct 16 14:10:27 2018 +0200

----------------------------------------------------------------------
 .../ElasticSearchPersistenceServiceImpl.java          |  6 +++---
 .../org.apache.unomi.persistence.elasticsearch.cfg    |  3 +++
 .../conditions/PastEventConditionESQueryBuilder.java  | 14 ++++++--------
 .../main/resources/OSGI-INF/blueprint/blueprint.xml   | 10 ++++++++++
 .../unomi/services/services/SegmentServiceImpl.java   |  8 ++++----
 .../main/resources/OSGI-INF/blueprint/blueprint.xml   |  8 ++++++++
 6 files changed, 34 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/183ffcf1/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
----------------------------------------------------------------------
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 87a46eb..dd8c48f 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -156,7 +156,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
     private String minimalElasticSearchVersion = "5.0.0";
     private String maximalElasticSearchVersion = "5.7.0";
 
-    private String aggregateQueryBucketSize = "5000";
+    private int aggregateQueryBucketSize = 5000;
 
     private String transportClientClassName = null;
     private String transportClientProperties = null;
@@ -259,7 +259,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
         this.maximalElasticSearchVersion = maximalElasticSearchVersion;
     }
 
-    public void setAggregateQueryBucketSize(String aggregateQueryBucketSize) {
+    public void setAggregateQueryBucketSize(int aggregateQueryBucketSize) {
         this.aggregateQueryBucketSize = aggregateQueryBucketSize;
     }
 
@@ -1578,7 +1578,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                         fieldName = getPropertyNameWithData(fieldName, itemType);
                         //default
                         if (fieldName != null) {
-                            bucketsAggregation = AggregationBuilders.terms("buckets").field(fieldName).size(Integer.parseInt(aggregateQueryBucketSize));
+                            bucketsAggregation = AggregationBuilders.terms("buckets").field(fieldName).size(aggregateQueryBucketSize);
                             if (aggregate instanceof TermsAggregate) {
                                 TermsAggregate termsAggregate = (TermsAggregate) aggregate;
                                 if (termsAggregate.getPartition() > -1 && termsAggregate.getNumPartitions() > -1) {

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/183ffcf1/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
----------------------------------------------------------------------
diff --git a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
index 324a626..dd0d0ec 100644
--- a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
+++ b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
@@ -45,3 +45,6 @@ maximalElasticSearchVersion=5.7.0
 
 # The following setting is used to set the aggregate query bucket size
 aggregateQueryBucketSize=5000
+
+# Maximum size allowed for an elastic "ids" query
+maximumIdsQueryCount=5000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/183ffcf1/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
----------------------------------------------------------------------
diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
index 0265a42..c8aeaca 100644
--- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
+++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
@@ -30,8 +30,6 @@ import org.apache.unomi.persistence.spi.aggregate.TermsAggregate;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.query.RangeQueryBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.*;
 
@@ -41,8 +39,8 @@ public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder
     private PersistenceService persistenceService;
     private SegmentService segmentService;
 
-    private int maximumIdsQueryCount = 1000;
-    private int termsAggregatePartitionSize = 1000;
+    private int maximumIdsQueryCount = 5000;
+    private int aggregateQueryBucketSize = 5000;
 
     public void setDefinitionsService(DefinitionsService definitionsService) {
         this.definitionsService = definitionsService;
@@ -56,8 +54,8 @@ public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder
         this.maximumIdsQueryCount = maximumIdsQueryCount;
     }
 
-    public void setTermsAggregatePartitionSize(int termsAggregatePartitionSize) {
-        this.termsAggregatePartitionSize = termsAggregatePartitionSize;
+    public void setAggregateQueryBucketSize(int aggregateQueryBucketSize) {
+        this.aggregateQueryBucketSize = aggregateQueryBucketSize;
     }
 
     public void setSegmentService(SegmentService segmentService) {
@@ -95,7 +93,7 @@ public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder
             Map<String, Double> m = persistenceService.getSingleValuesMetrics(eventCondition, new String[]{"card"}, "profileId.keyword", Event.ITEM_TYPE);
             long card = m.get("_card").longValue();
 
-            int numParts = (int) (card / termsAggregatePartitionSize);
+            int numParts = (int) (card / aggregateQueryBucketSize) + 2;
             for (int i = 0; i < numParts; i++) {
                 Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(eventCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE);
                 if (eventCountByProfile != null) {
@@ -133,7 +131,7 @@ public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder
         if (minimumEventCount != 1 || maximumEventCount != Integer.MAX_VALUE) {
             // Event count specified, must check occurences count for each profile
             int result = 0;
-            int numParts = (int) (card / termsAggregatePartitionSize);
+            int numParts = (int) (card / aggregateQueryBucketSize) + 2;
             for (int i = 0; i < numParts; i++) {
                 Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(eventCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE);
                 int j = 0;

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/183ffcf1/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml
----------------------------------------------------------------------
diff --git a/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 9aa9d6c..5a10214 100644
--- a/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -29,6 +29,14 @@
         </cm:default-properties>
     </cm:property-placeholder>
 
+    <cm:property-placeholder persistent-id="org.apache.unomi.persistence.elasticsearch"
+                             update-strategy="reload" placeholder-prefix="${es.">
+        <cm:default-properties>
+            <cm:property name="maximumIdsQueryCount" value="5000"/>
+            <cm:property name="aggregateQueryBucketSize" value="5000"/>
+        </cm:default-properties>
+    </cm:property-placeholder>
+
     <reference id="definitionsService" interface="org.apache.unomi.api.services.DefinitionsService"/>
     <reference id="persistenceService" interface="org.apache.unomi.persistence.spi.PersistenceService"/>
     <reference id="profileService" interface="org.apache.unomi.api.services.ProfileService"/>
@@ -94,6 +102,8 @@
             <property name="definitionsService" ref="definitionsService"/>
             <property name="persistenceService" ref="persistenceService"/>
             <property name="segmentService" ref="segmentService"/>
+            <property name="maximumIdsQueryCount" value="${es.maximumIdsQueryCount}"/>
+            <property name="aggregateQueryBucketSize" value="${es.aggregateQueryBucketSize}"/>
         </bean>
     </service>
 

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/183ffcf1/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
----------------------------------------------------------------------
diff --git a/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
index c68f691..ec74525 100644
--- a/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
@@ -57,7 +57,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
     private List<Scoring> allScoring;
     private Timer segmentTimer;
     private int segmentUpdateBatchSize = 1000;
-    private int termsAggregatePartitionSize = 1000;
+    private int aggregateQueryBucketSize = 5000;
 
     public SegmentServiceImpl() {
         logger.info("Initializing segment service...");
@@ -83,8 +83,8 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
         this.segmentUpdateBatchSize = segmentUpdateBatchSize;
     }
 
-    public void setTermsAggregatePartitionSize(int termsAggregatePartitionSize) {
-        this.termsAggregatePartitionSize = termsAggregatePartitionSize;
+    public void setAggregateQueryBucketSize(int aggregateQueryBucketSize) {
+        this.aggregateQueryBucketSize = aggregateQueryBucketSize;
     }
 
     public void postConstruct() {
@@ -774,7 +774,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
         Map<String, Double> m = persistenceService.getSingleValuesMetrics(andCondition, new String[]{"card"}, "profileId.keyword", Event.ITEM_TYPE);
         long card = m.get("_card").longValue();
 
-        int numParts = (int) (card / termsAggregatePartitionSize);
+        int numParts = (int) (card / aggregateQueryBucketSize) + 2;
         for (int i = 0; i < numParts; i++) {
             Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(andCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE);
             for (Map.Entry<String, Long> entry : eventCountByProfile.entrySet()) {

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/183ffcf1/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
----------------------------------------------------------------------
diff --git a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index eed06f2..979c6be 100644
--- a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -44,6 +44,13 @@
         </cm:default-properties>
     </cm:property-placeholder>
 
+    <cm:property-placeholder persistent-id="org.apache.unomi.persistence.elasticsearch"
+                             update-strategy="reload" placeholder-prefix="${es.">
+        <cm:default-properties>
+            <cm:property name="aggregateQueryBucketSize" value="5000"/>
+        </cm:default-properties>
+    </cm:property-placeholder>
+
     <reference id="persistenceService"
                interface="org.apache.unomi.persistence.spi.PersistenceService"/>
     <reference id="httpService" interface="org.osgi.service.http.HttpService"/>
@@ -144,6 +151,7 @@
         <property name="bundleContext" ref="blueprintBundleContext"/>
         <property name="taskExecutionPeriod" value="86400000"/>
         <property name="segmentUpdateBatchSize" value="${services.segment.update.batchSize}" />
+        <property name="aggregateQueryBucketSize" value="${es.aggregateQueryBucketSize}" />
     </bean>
     <service id="segmentService" ref="segmentServiceImpl">
         <interfaces>