You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by sh...@apache.org on 2020/11/11 09:20:59 UTC

[unomi] 13/17: Add optimizations for past event queries (#208)

This is an automated email from the ASF dual-hosted git repository.

shuber pushed a commit to branch unomi-1.5.x
in repository https://gitbox.apache.org/repos/asf/unomi.git

commit 2f8d397f240cd29023ebce29293b82cd477709e0
Author: liatiusim <62...@users.noreply.github.com>
AuthorDate: Tue Nov 10 10:51:05 2020 +0100

    Add optimizations for past event queries (#208)
    
    (cherry picked from commit 628fb77ec77bfff8b82997db124e73899ee49fd8)
---
 .../main/resources/etc/custom.system.properties    |  4 ++
 .../ElasticSearchPersistenceServiceImpl.java       | 81 +++++++++++++++++++---
 .../resources/OSGI-INF/blueprint/blueprint.xml     |  7 +-
 .../org.apache.unomi.persistence.elasticsearch.cfg | 11 +++
 .../unomi/persistence/spi/PersistenceService.java  | 12 ++++
 .../PastEventConditionESQueryBuilder.java          | 78 +++++++++++++--------
 .../resources/OSGI-INF/blueprint/blueprint.xml     |  2 +
 .../services/impl/segments/SegmentServiceImpl.java | 63 +++++++++++------
 .../resources/OSGI-INF/blueprint/blueprint.xml     |  2 +
 9 files changed, 202 insertions(+), 58 deletions(-)

diff --git a/package/src/main/resources/etc/custom.system.properties b/package/src/main/resources/etc/custom.system.properties
index f183969..a7b665c 100644
--- a/package/src/main/resources/etc/custom.system.properties
+++ b/package/src/main/resources/etc/custom.system.properties
@@ -90,6 +90,10 @@ org.apache.unomi.elasticsearch.defaultIndex.indexMaxDocValueFieldsSearch=${env:U
 org.apache.unomi.elasticsearch.defaultQueryLimit=${env:UNOMI_ELASTICSEARCH_DEFAULTQUERYLIMIT:-10}
 org.apache.unomi.elasticsearch.aggregateQueryBucketSize=${env:UNOMI_ELASTICSEARCH_AGGREGATEBUCKETSIZE:-5000}
 org.apache.unomi.elasticsearch.maximumIdsQueryCount=${env:UNOMI_ELASTICSEARCH_MAXIMUMIDSQUERYCOUNT:-5000}
+org.apache.unomi.elasticsearch.clientSocketTimeout=${env:UNOMI_ELASTICSEARCH_CLIENT_SOCKET_TIMEOUT:-}
+org.apache.unomi.elasticsearch.pastEventsDisablePartitions=${env:UNOMI_ELASTICSEARCH_PAST_EVENTS_DISABLE_PARTITIONS:-false}
+org.apache.unomi.elasticsearch.aggQueryThrowOnMissingDocs=${env:UNOMI_ELASTICSEARCH_AGG_QUERY_THROW_ON_MISSING_DOCS:-false}
+org.apache.unomi.elasticsearch.aggQueryMaxResponseSizeHttp=${env:UNOMI_ELASTICSEARCH_AGG_QUERY_MAX_RESPONSE_SIZE_HTTP:-}
 # The following settings control the behavior of the BulkProcessor API. You can find more information about these
 # settings and their behavior here : https://www.elastic.co/guide/en/elasticsearch/client/java-api/2.4/java-docs-bulk-processor.html
 # The values used here are the default values of the API
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 530395b..0bd2f13 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -23,6 +23,7 @@ import org.apache.http.HttpHost;
 import org.apache.http.auth.AuthScope;
 import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.config.RequestConfig;
 import org.apache.http.conn.ssl.NoopHostnameVerifier;
 import org.apache.http.impl.client.BasicCredentialsProvider;
 import org.apache.lucene.search.TotalHits;
@@ -55,7 +56,13 @@ import org.elasticsearch.action.search.SearchScrollRequest;
 import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.client.*;
+import org.elasticsearch.client.Node;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.client.HttpAsyncResponseConsumerFactory;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.client.core.CountRequest;
 import org.elasticsearch.client.core.CountResponse;
 import org.elasticsearch.client.core.MainResponse;
@@ -67,10 +74,7 @@ import org.elasticsearch.common.unit.DistanceUnit;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.index.IndexNotFoundException;
-import org.elasticsearch.index.query.IdsQueryBuilder;
-import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.index.query.RangeQueryBuilder;
+import org.elasticsearch.index.query.*;
 import org.elasticsearch.index.reindex.BulkByScrollResponse;
 import org.elasticsearch.index.reindex.UpdateByQueryRequest;
 import org.elasticsearch.rest.RestStatus;
@@ -91,6 +95,7 @@ import org.elasticsearch.search.aggregations.bucket.range.DateRangeAggregationBu
 import org.elasticsearch.search.aggregations.bucket.range.IpRangeAggregationBuilder;
 import org.elasticsearch.search.aggregations.bucket.range.RangeAggregationBuilder;
 import org.elasticsearch.search.aggregations.bucket.terms.IncludeExclude;
+import org.elasticsearch.search.aggregations.bucket.terms.Terms;
 import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
 import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
@@ -182,6 +187,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
     private Set<String> itemClassesToCacheSet = new HashSet<>();
     private String itemClassesToCache;
     private boolean useBatchingForSave = false;
+    private boolean aggQueryThrowOnMissingDocs = false;
+    private Integer aggQueryMaxResponseSizeHttp = null;
+    private Integer clientSocketTimeout = null;
+
 
     private Map<String, Map<String, Map<String, Object>>> knownMappings = new HashMap<>();
 
@@ -207,6 +216,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                 .map(i -> i.trim()).filter(i -> !i.isEmpty()).toArray(String[]::new);
     }
 
+    public void setAggQueryMaxResponseSizeHttp(String aggQueryMaxResponseSizeHttp) {
+        if (StringUtils.isNumeric(aggQueryMaxResponseSizeHttp)) {
+            this.aggQueryMaxResponseSizeHttp = Integer.parseInt(aggQueryMaxResponseSizeHttp);
+        }
+    }
+
     public void setIndexPrefix(String indexPrefix) {
         this.indexPrefix = indexPrefix;
     }
@@ -295,6 +310,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
         this.aggregateQueryBucketSize = aggregateQueryBucketSize;
     }
 
+    public void setClientSocketTimeout(String clientSocketTimeout) {
+        if (StringUtils.isNumeric(clientSocketTimeout)) {
+            this.clientSocketTimeout = Integer.parseInt(clientSocketTimeout);
+        }
+    }
+
     public void setMetricsService(MetricsService metricsService) {
         this.metricsService = metricsService;
     }
@@ -336,6 +357,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
         this.sslTrustAllCertificates = sslTrustAllCertificates;
     }
 
+
+    public void setAggQueryThrowOnMissingDocs(boolean aggQueryThrowOnMissingDocs) {
+        this.aggQueryThrowOnMissingDocs = aggQueryThrowOnMissingDocs;
+    }
     public void start() throws Exception {
 
         // on startup
@@ -416,6 +441,13 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
 
         RestClientBuilder clientBuilder = RestClient.builder(nodeList.toArray(new Node[nodeList.size()]));
 
+        if (clientSocketTimeout != null) {
+            clientBuilder.setRequestConfigCallback(requestConfigBuilder -> {
+                requestConfigBuilder.setSocketTimeout(clientSocketTimeout);
+                return requestConfigBuilder;
+            });
+        }
+
         clientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
             if (sslTrustAllCertificates) {
                 try {
@@ -1466,6 +1498,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                     searchSourceBuilder.version(true);
                     searchRequest.source(searchSourceBuilder);
                     SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
+
                     if (size == -1) {
                         // Scroll until no more hits are returned
                         while (true) {
@@ -1576,16 +1609,21 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
     @Deprecated
     @Override
     public Map<String, Long> aggregateQuery(Condition filter, BaseAggregate aggregate, String itemType) {
-        return aggregateQuery(filter, aggregate, itemType, false);
+        return aggregateQuery(filter, aggregate, itemType, false, aggregateQueryBucketSize);
     }
 
     @Override
     public Map<String, Long> aggregateWithOptimizedQuery(Condition filter, BaseAggregate aggregate, String itemType) {
-        return aggregateQuery(filter, aggregate, itemType, true);
+        return aggregateQuery(filter, aggregate, itemType, true, aggregateQueryBucketSize);
+    }
+
+    @Override
+    public Map<String, Long> aggregateWithOptimizedQuery(Condition filter, BaseAggregate aggregate, String itemType, int size) {
+        return aggregateQuery(filter, aggregate, itemType, true, size);
     }
 
     private Map<String, Long> aggregateQuery(final Condition filter, final BaseAggregate aggregate, final String itemType,
-            final boolean optimizedQuery) {
+            final boolean optimizedQuery, int queryBucketSize) {
         return new InClassLoaderExecute<Map<String, Long>>(metricsService, this.getClass().getName() + ".aggregateQuery", this.bundleContext, this.fatalIllegalStateErrors) {
 
             @Override
@@ -1647,7 +1685,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                         fieldName = getPropertyNameWithData(fieldName, itemType);
                         //default
                         if (fieldName != null) {
-                            bucketsAggregation = AggregationBuilders.terms("buckets").field(fieldName).size(aggregateQueryBucketSize);
+                            bucketsAggregation = AggregationBuilders.terms("buckets").field(fieldName).size(queryBucketSize);
                             if (aggregate instanceof TermsAggregate) {
                                 TermsAggregate termsAggregate = (TermsAggregate) aggregate;
                                 if (termsAggregate.getPartition() > -1 && termsAggregate.getNumPartitions() > -1) {
@@ -1696,9 +1734,21 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                 }
 
                 searchRequest.source(searchSourceBuilder);
-                SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
+
+                RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
+
+                if (aggQueryMaxResponseSizeHttp != null) {
+                    builder.setHttpAsyncResponseConsumerFactory(
+                            new HttpAsyncResponseConsumerFactory
+                                    .HeapBufferedResponseConsumerFactory(aggQueryMaxResponseSizeHttp));
+                }
+
+                SearchResponse response = client.search(searchRequest, builder.build());
                 Aggregations aggregations = response.getAggregations();
+
+
                 if (aggregations != null) {
+
                     if (optimizedQuery) {
                         if (response.getHits() != null) {
                             results.put("_filtered", response.getHits().getTotalHits().value);
@@ -1715,6 +1765,17 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                         }
                     }
                     if (aggregations.get("buckets") != null) {
+
+                        if (aggQueryThrowOnMissingDocs) {
+                            if (aggregations.get("buckets") instanceof Terms) {
+                                Terms terms = aggregations.get("buckets");
+                                if (terms.getDocCountError() > 0 || terms.getSumOfOtherDocCounts() > 0) {
+                                    throw new UnsupportedOperationException("Some docs are missing in aggregation query. docCountError is:" +
+                                            terms.getDocCountError() + " sumOfOtherDocCounts:" + terms.getSumOfOtherDocCounts());
+                                }
+                            }
+                        }
+
                         long totalDocCount = 0;
                         MultiBucketsAggregation terms = aggregations.get("buckets");
                         for (MultiBucketsAggregation.Bucket bucket : terms.getBuckets()) {
diff --git a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 5888c26..48402f3 100644
--- a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -53,7 +53,9 @@
             <cm:property name="maximalElasticSearchVersion" value="8.0.0" />
 
             <cm:property name="aggregateQueryBucketSize" value="5000" />
-
+            <cm:property name="clientSocketTimeout" value="" />
+            <cm:property name="aggQueryMaxResponseSizeHttp" value="" />
+            <cm:property name="aggQueryThrowOnMissingDocs" value="false" />
             <cm:property name="itemClassesToCache" value="" />
             <cm:property name="useBatchingForSave" value="false" />
 
@@ -121,6 +123,9 @@
         <property name="maximalElasticSearchVersion" value="${es.maximalElasticSearchVersion}" />
 
         <property name="aggregateQueryBucketSize" value="${es.aggregateQueryBucketSize}" />
+        <property name="aggQueryMaxResponseSizeHttp" value="${es.aggQueryMaxResponseSizeHttp}" />
+        <property name="aggQueryThrowOnMissingDocs" value="${es.aggQueryThrowOnMissingDocs}" />
+        <property name="clientSocketTimeout" value="${es.clientSocketTimeout}" />
 
         <property name="metricsService" ref="metricsService" />
         <property name="hazelcastInstance" ref="hazelcastInstance" />
diff --git a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
index c6205ed..c0e7b46 100644
--- a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
+++ b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
@@ -55,6 +55,17 @@ aggregateQueryBucketSize=${org.apache.unomi.elasticsearch.aggregateQueryBucketSi
 # Maximum size allowed for an elastic "ids" query
 maximumIdsQueryCount=${org.apache.unomi.elasticsearch.maximumIdsQueryCount:-5000}
 
+# Disable partitions on aggregation queries for past events.
+pastEventsDisablePartitions=${org.apache.unomi.elasticsearch.pastEventsDisablePartitions:-false}
+
+# max socket timeout in millis
+clientSocketTimeout=${org.apache.unomi.elasticsearch.clientSocketTimeout:-}
+
+# Retrun error in docs are missing in es aggregation calculation
+aggQueryThrowOnMissingDocs=${org.apache.unomi.elasticsearch.aggQueryThrowOnMissingDocs:-false}
+
+aggQueryMaxResponseSizeHttp=${org.apache.unomi.elasticsearch.aggQueryMaxResponseSizeHttp:-}
+
 # Authentication
 username=${org.apache.unomi.elasticsearch.username:-}
 password=${org.apache.unomi.elasticsearch.password:-}
diff --git a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
index 0e41df3..ce7d28e 100644
--- a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
+++ b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
@@ -458,6 +458,18 @@ public interface PersistenceService {
     Map<String, Long> aggregateWithOptimizedQuery(Condition filter, BaseAggregate aggregate, String itemType);
 
     /**
+     * Retrieves the number of items with the specified type as defined by the Item subclass public field {@code ITEM_TYPE} matching the optional specified condition and
+     * aggregated according to the specified {@link BaseAggregate}.
+     *
+     * @param filter    the condition the items must match or {@code null} if no filtering is needed
+     * @param aggregate an aggregate specifying how matching items must be bundled
+     * @param itemType  the String representation of the item type we want to retrieve the count of, as defined by its class' {@code ITEM_TYPE} field
+     * @param size      size of returned buckets in the response
+     * @return a Map associating aggregation dimension name as key and cardinality for that dimension as value
+     */
+    Map<String, Long> aggregateWithOptimizedQuery(Condition filter, BaseAggregate aggregate, String itemType, int size);
+
+    /**
      * Updates the persistence's engine indices if needed.
      */
     void refresh();
diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
index 6afffb1..3c676d4 100644
--- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
+++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
@@ -33,6 +33,7 @@ import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.query.RangeQueryBuilder;
 
 import java.util.*;
+import java.util.stream.Collectors;
 
 public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder {
 
@@ -43,6 +44,7 @@ public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder
 
     private int maximumIdsQueryCount = 5000;
     private int aggregateQueryBucketSize = 5000;
+    private boolean pastEventsDisablePartitions = false;
 
     public void setDefinitionsService(DefinitionsService definitionsService) {
         this.definitionsService = definitionsService;
@@ -64,6 +66,10 @@ public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder
         this.aggregateQueryBucketSize = aggregateQueryBucketSize;
     }
 
+    public void setPastEventsDisablePartitions(boolean pastEventsDisablePartitions) {
+        this.pastEventsDisablePartitions = pastEventsDisablePartitions;
+    }
+
     public void setSegmentService(SegmentService segmentService) {
         this.segmentService = segmentService;
     }
@@ -95,25 +101,34 @@ public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder
 
             Set<String> ids = new HashSet<>();
 
-            // Get full cardinality to partition the terms aggreggation
-            Map<String, Double> m = persistenceService.getSingleValuesMetrics(eventCondition, new String[]{"card"}, "profileId.keyword", Event.ITEM_TYPE);
-            long card = m.get("_card").longValue();
-
-            int numParts = (int) (card / aggregateQueryBucketSize) + 2;
-            for (int i = 0; i < numParts; i++) {
-                Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(eventCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE);
-                if (eventCountByProfile != null) {
-                    eventCountByProfile.remove("_filtered");
-                    for (Map.Entry<String, Long> entry : eventCountByProfile.entrySet()) {
-                        if (entry.getValue() < minimumEventCount) {
-                            // No more interesting buckets in this partition
-                            break;
-                        } else if (entry.getValue() <= maximumEventCount) {
-                            ids.add(entry.getKey());
-
-                            if (ids.size() > maximumIdsQueryCount) {
-                                // Avoid building too big ids query - throw exception instead
-                                throw new UnsupportedOperationException("Too many profiles");
+            if (pastEventsDisablePartitions) {
+                Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(eventCondition, new TermsAggregate("profileId"), Event.ITEM_TYPE, maximumIdsQueryCount);
+                ids = eventCountByProfile.entrySet().stream()
+                        .filter(x -> !x.getKey().equals("_filtered"))
+                        .filter(x -> x.getValue() >= minimumEventCount && x.getValue() <= maximumEventCount)
+                        .map(Map.Entry::getKey)
+                        .collect(Collectors.toSet());
+            } else {
+                // Get full cardinality to partition the terms aggreggation
+                Map<String, Double> m = persistenceService.getSingleValuesMetrics(eventCondition, new String[]{"card"}, "profileId.keyword", Event.ITEM_TYPE);
+                long card = m.get("_card").longValue();
+
+                int numParts = (int) (card / aggregateQueryBucketSize) + 2;
+                for (int i = 0; i < numParts; i++) {
+                    Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(eventCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE);
+                    if (eventCountByProfile != null) {
+                        eventCountByProfile.remove("_filtered");
+                        for (Map.Entry<String, Long> entry : eventCountByProfile.entrySet()) {
+                            if (entry.getValue() < minimumEventCount) {
+                                // No more interesting buckets in this partition
+                                break;
+                            } else if (entry.getValue() <= maximumEventCount) {
+                                ids.add(entry.getKey());
+
+                                if (ids.size() > maximumIdsQueryCount) {
+                                    // Avoid building too big ids query - throw exception instead
+                                    throw new UnsupportedOperationException("Too many profiles");
+                                }
                             }
                         }
                     }
@@ -126,16 +141,28 @@ public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder
 
     public long count(Condition condition, Map<String, Object> context, ConditionESQueryBuilderDispatcher dispatcher) {
         Condition eventCondition = getEventCondition(condition, context);
+        Map<String, Double> aggResult = null;
 
         Integer minimumEventCount = condition.getParameter("minimumEventCount") == null ? 1 : (Integer) condition.getParameter("minimumEventCount");
         Integer maximumEventCount = condition.getParameter("maximumEventCount") == null ? Integer.MAX_VALUE : (Integer) condition.getParameter("maximumEventCount");
 
-        // Get full cardinality to partition the terms aggreggation
-        Map<String, Double> m = persistenceService.getSingleValuesMetrics(eventCondition, new String[]{"card"}, "profileId.keyword", Event.ITEM_TYPE);
-        long card = m.get("_card").longValue();
+        // No count filter - simply get the full number of distinct profiles
+        if (minimumEventCount == 1 && maximumEventCount == Integer.MAX_VALUE) {
+            aggResult = persistenceService.getSingleValuesMetrics(eventCondition, new String[]{"card"}, "profileId.keyword", Event.ITEM_TYPE);
+            return aggResult.get("_card").longValue();
+        }
 
-        if (minimumEventCount != 1 || maximumEventCount != Integer.MAX_VALUE) {
-            // Event count specified, must check occurences count for each profile
+        if (pastEventsDisablePartitions) {
+            Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(eventCondition, new TermsAggregate("profileId"), Event.ITEM_TYPE, maximumIdsQueryCount);
+            return eventCountByProfile.entrySet().stream()
+                    .filter(x -> x.getKey().equals("_filtered"))
+                    .filter(x -> x.getValue() >= minimumEventCount && x.getValue() <= maximumEventCount)
+                    .count();
+        } else {
+            // Get full cardinality to partition the terms aggreggation
+            aggResult = persistenceService.getSingleValuesMetrics(eventCondition, new String[]{"card"}, "profileId.keyword", Event.ITEM_TYPE);
+            long card = aggResult.get("_card").longValue();
+             // Event count specified, must check occurences count for each profile
             int result = 0;
             int numParts = (int) (card / aggregateQueryBucketSize) + 2;
             for (int i = 0; i < numParts; i++) {
@@ -159,9 +186,6 @@ public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder
                 }
             }
             return result;
-        } else {
-            // Simply get the full number of distinct profiles
-            return card;
         }
     }
 
diff --git a/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index cd75ce1..9f7169a 100644
--- a/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -34,6 +34,7 @@
         <cm:default-properties>
             <cm:property name="maximumIdsQueryCount" value="5000"/>
             <cm:property name="aggregateQueryBucketSize" value="5000"/>
+            <cm:property name="pastEventsDisablePartitions" value="false"/>
         </cm:default-properties>
     </cm:property-placeholder>
 
@@ -105,6 +106,7 @@
             <property name="segmentService" ref="segmentService"/>
             <property name="scriptExecutor" ref="scriptExecutor" />
             <property name="maximumIdsQueryCount" value="${es.maximumIdsQueryCount}"/>
+            <property name="pastEventsDisablePartitions" value="${es.pastEventsDisablePartitions}"/>
             <property name="aggregateQueryBucketSize" value="${es.aggregateQueryBucketSize}"/>
         </bean>
     </service>
diff --git a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
index 03400f1..36a2a67 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
@@ -66,6 +66,9 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
     private long segmentRefreshInterval = 1000;
     private int aggregateQueryBucketSize = 5000;
 
+    private int maximumIdsQueryCount = 5000;
+    private boolean pastEventsDisablePartitions = false;
+
     public SegmentServiceImpl() {
         logger.info("Initializing segment service...");
     }
@@ -94,6 +97,14 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
         this.aggregateQueryBucketSize = aggregateQueryBucketSize;
     }
 
+    public void setMaximumIdsQueryCount(int maximumIdsQueryCount) {
+        this.maximumIdsQueryCount = maximumIdsQueryCount;
+    }
+
+    public void setPastEventsDisablePartitions(boolean pastEventsDisablePartitions) {
+        this.pastEventsDisablePartitions = pastEventsDisablePartitions;
+    }
+
     public void setSegmentRefreshInterval(long segmentRefreshInterval) {
         this.segmentRefreshInterval = segmentRefreshInterval;
     }
@@ -797,28 +808,19 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
             endDateCondition.setParameter("propertyValueDate", toDate);
             l.add(endDateCondition);
         }
-        String propertyKey = (String) parentCondition.getParameter("generatedPropertyKey");
 
-        Map<String, Double> m = persistenceService.getSingleValuesMetrics(andCondition, new String[]{"card"}, "profileId.keyword", Event.ITEM_TYPE);
-        long card = m.get("_card").longValue();
+        String propertyKey = (String) parentCondition.getParameter("generatedPropertyKey");
 
-        int numParts = (int) (card / aggregateQueryBucketSize) + 2;
-        for (int i = 0; i < numParts; i++) {
-            Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(andCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE);
-            for (Map.Entry<String, Long> entry : eventCountByProfile.entrySet()) {
-                String profileId = entry.getKey();
-                if (!profileId.startsWith("_")) {
-                    Map<String, Long> pastEventCounts = new HashMap<>();
-                    pastEventCounts.put(propertyKey, entry.getValue());
-                    Map<String, Object> systemProperties = new HashMap<>();
-                    systemProperties.put("pastEvents", pastEventCounts);
-                    try {
-                        systemProperties.put("lastUpdated", new Date());
-                        persistenceService.update(profileId, null, Profile.class, "systemProperties", systemProperties);
-                    } catch (Exception e) {
-                        logger.error("Error updating profile {} past event system properties", profileId, e);
-                    }
-                }
+        if(pastEventsDisablePartitions) {
+            Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(eventCondition, new TermsAggregate("profileId"), Event.ITEM_TYPE, maximumIdsQueryCount);
+            updateProfilesWithPastEventProperty(eventCountByProfile, propertyKey);
+        } else {
+            Map<String, Double> m = persistenceService.getSingleValuesMetrics(andCondition, new String[]{"card"}, "profileId.keyword", Event.ITEM_TYPE);
+            long card = m.get("_card").longValue();
+            int numParts = (int) (card / aggregateQueryBucketSize) + 2;
+            for (int i = 0; i < numParts; i++) {
+                Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(andCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE);
+                updateProfilesWithPastEventProperty(eventCountByProfile, propertyKey);
             }
         }
 
@@ -848,6 +850,27 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
         }
     }
 
+    private void updateProfilesWithPastEventProperty(Map<String, Long> eventCountByProfile, String propertyKey) {
+            for (Map.Entry<String, Long> entry : eventCountByProfile.entrySet()) {
+                String profileId = entry.getKey();
+                if (!profileId.startsWith("_")) {
+                    Map<String, Long> pastEventCounts = new HashMap<>();
+                    pastEventCounts.put(propertyKey, entry.getValue());
+                    Map<String, Object> systemProperties = new HashMap<>();
+                    systemProperties.put("pastEvents", pastEventCounts);
+                    try {
+                        systemProperties.put("lastUpdated", new Date());
+                        Profile profile = new Profile();
+                        profile.setItemId(profileId);
+                        persistenceService.update(profile.getItemId(), null, Profile.class, "systemProperties", systemProperties);
+                    } catch (Exception e) {
+                        logger.error("Error updating profile {} past event system properties", profileId, e);
+                    }
+                }
+            }
+
+    }
+
     private String getMD5(String md5) {
         try {
             MessageDigest md = MessageDigest.getInstance("MD5");
diff --git a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index b2cf0c0..3e7b0eb 100644
--- a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -172,6 +172,8 @@
         <property name="schedulerService" ref="schedulerServiceImpl"/>
         <property name="segmentRefreshInterval" value="${services.segment.refresh.interval}"/>
         <property name="aggregateQueryBucketSize" value="${es.aggregateQueryBucketSize}" />
+        <property name="pastEventsDisablePartitions" value="${es.pastEventsDisablePartitions}"/>
+        <property name="maximumIdsQueryCount" value="${es.maximumIdsQueryCount}"/>
     </bean>
     <service id="segmentService" ref="segmentServiceImpl">
         <interfaces>