You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by dr...@apache.org on 2018/10/12 13:00:27 UTC

[14/15] incubator-unomi git commit: UNOMI-204 : Add count method in QueryBuilder, adds cardinality/count aggregates, first optimization for PastEventCondition

UNOMI-204 : Add count method in QueryBuilder, adds cardinality/count aggregates, first optimization for PastEventCondition


Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/bc549076
Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/bc549076
Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/bc549076

Branch: refs/heads/UNOMI-204
Commit: bc549076dcebd76511ad0943071c895a9c05d06b
Parents: 95d0030
Author: tdraier <dr...@apache.org>
Authored: Tue Oct 9 18:15:28 2018 +0200
Committer: tdraier <dr...@apache.org>
Committed: Thu Oct 11 15:07:22 2018 +0200

----------------------------------------------------------------------
 .../ElasticSearchPersistenceServiceImpl.java    | 25 ++++++-
 .../conditions/ConditionESQueryBuilder.java     |  3 +
 .../ConditionESQueryBuilderDispatcher.java      | 32 ++++++++
 .../spi/aggregate/TermsAggregate.java           | 18 +++++
 .../PastEventConditionESQueryBuilder.java       | 77 +++++++++++++++-----
 5 files changed, 137 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/bc549076/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
----------------------------------------------------------------------
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 46e7cbb..23cedfb 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -61,6 +61,7 @@ import org.elasticsearch.common.unit.DistanceUnit;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.index.IndexNotFoundException;
+import org.elasticsearch.index.query.IdsQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.query.RangeQueryBuilder;
@@ -83,6 +84,8 @@ import org.elasticsearch.search.aggregations.bucket.missing.MissingAggregationBu
 import org.elasticsearch.search.aggregations.bucket.range.RangeAggregationBuilder;
 import org.elasticsearch.search.aggregations.bucket.range.date.DateRangeAggregationBuilder;
 import org.elasticsearch.search.aggregations.bucket.range.ip.IpRangeAggregationBuilder;
+import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
+import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude;
 import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
 import org.elasticsearch.search.sort.GeoDistanceSortBuilder;
 import org.elasticsearch.search.sort.SortBuilders;
@@ -1306,7 +1309,15 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
 
     @Override
     public long queryCount(Condition query, String itemType) {
-        return queryCount(conditionESQueryBuilderDispatcher.buildFilter(query), itemType);
+        try {
+            return conditionESQueryBuilderDispatcher.count(query);
+        } catch (UnsupportedOperationException e) {
+            QueryBuilder filter = conditionESQueryBuilderDispatcher.buildFilter(query);
+            if (filter instanceof IdsQueryBuilder) {
+                return ((IdsQueryBuilder) filter).ids().size();
+            }
+            return queryCount(filter, itemType);
+        }
     }
 
     private long queryCount(final QueryBuilder filter, final String itemType) {
@@ -1564,6 +1575,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                         //default
                         if (fieldName != null) {
                             bucketsAggregation = AggregationBuilders.terms("buckets").field(fieldName).size(Integer.parseInt(aggregateQueryBucketSize));
+                            if (aggregate instanceof TermsAggregate) {
+                                TermsAggregate termsAggregate = (TermsAggregate) aggregate;
+                                if (termsAggregate.getPartition() > -1 && termsAggregate.getNumPartitions() > -1) {
+                                    ((TermsAggregationBuilder) bucketsAggregation).includeExclude(new IncludeExclude(termsAggregate.getPartition(), termsAggregate.getNumPartitions()));
+                                }
+                            }
                         } else {
                             // field name could be null if no existing data exists
                         }
@@ -1781,6 +1798,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                             case "max":
                                 filterAggregation.subAggregation(AggregationBuilders.max("max").field(field));
                                 break;
+                            case "card":
+                                filterAggregation.subAggregation(AggregationBuilders.cardinality("card").field(field));
+                                break;
+                            case "count":
+                                filterAggregation.subAggregation(AggregationBuilders.count("count").field(field));
+                                break;
                         }
                     }
                 }

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/bc549076/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilder.java
----------------------------------------------------------------------
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilder.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilder.java
index 78a5cba..7a100d0 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilder.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilder.java
@@ -26,4 +26,7 @@ public interface ConditionESQueryBuilder {
 
     QueryBuilder buildQuery(Condition condition, Map<String, Object> context, ConditionESQueryBuilderDispatcher dispatcher);
 
+    default long count(Condition condition, Map<String, Object> context, ConditionESQueryBuilderDispatcher dispatcher) {
+        throw new UnsupportedOperationException();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/bc549076/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java
----------------------------------------------------------------------
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java
index b29a631..d08e283 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java
@@ -86,5 +86,37 @@ public class ConditionESQueryBuilderDispatcher {
         return QueryBuilders.matchAllQuery();
     }
 
+    public long count(Condition condition) {
+        return count(condition, new HashMap<>());
+    }
+
+    public long count(Condition condition, Map<String, Object> context) {
+        if(condition == null || condition.getConditionType() == null) {
+            throw new IllegalArgumentException("Condition is null or doesn't have type, impossible to build filter");
+        }
+
+        String queryBuilderKey = condition.getConditionType().getQueryBuilder();
+        if (queryBuilderKey == null && condition.getConditionType().getParentCondition() != null) {
+            context.putAll(condition.getParameterValues());
+            return count(condition.getConditionType().getParentCondition(), context);
+        }
+
+        if (queryBuilderKey == null) {
+            throw new UnsupportedOperationException("No query builder defined for : " + condition.getConditionTypeId());
+        }
 
+        if (queryBuilders.containsKey(queryBuilderKey)) {
+            ConditionESQueryBuilder queryBuilder = queryBuilders.get(queryBuilderKey);
+            Condition contextualCondition = ConditionContextHelper.getContextualCondition(condition, context);
+            if (contextualCondition != null) {
+                return queryBuilder.count(contextualCondition, context, this);
+            }
+        }
+
+        // if no matching
+        if (logger.isDebugEnabled()) {
+            logger.debug("No matching query builder for condition {} and context {}", condition, context);
+        }
+        throw new UnsupportedOperationException();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/bc549076/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/aggregate/TermsAggregate.java
----------------------------------------------------------------------
diff --git a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/aggregate/TermsAggregate.java b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/aggregate/TermsAggregate.java
index a0591f5..3b9d741 100644
--- a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/aggregate/TermsAggregate.java
+++ b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/aggregate/TermsAggregate.java
@@ -18,7 +18,25 @@
 package org.apache.unomi.persistence.spi.aggregate;
 
 public class TermsAggregate extends BaseAggregate{
+    private int partition = -1;
+    private int numPartitions = -1;
+
+
     public TermsAggregate(String field) {
         super(field);
     }
+
+    public TermsAggregate(String field, int partition, int numPartitions) {
+        super(field);
+        this.partition = partition;
+        this.numPartitions = numPartitions;
+    }
+
+    public int getPartition() {
+        return partition;
+    }
+
+    public int getNumPartitions() {
+        return numPartitions;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/bc549076/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
----------------------------------------------------------------------
diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
index e51aaa8..b3c169c 100644
--- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
+++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
@@ -44,6 +44,64 @@ public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder
     }
 
     public QueryBuilder buildQuery(Condition condition, Map<String, Object> context, ConditionESQueryBuilderDispatcher dispatcher) {
+        Condition eventCondition = getEventCondition(condition, context);
+        //todo : Check behaviour with important number of profiles
+        Set<String> ids = new HashSet<String>();
+        Integer minimumEventCount = condition.getParameter("minimumEventCount") == null ? 0 : (Integer) condition.getParameter("minimumEventCount");
+        Integer maximumEventCount = condition.getParameter("maximumEventCount") == null ? Integer.MAX_VALUE : (Integer) condition.getParameter("maximumEventCount");
+
+        Map<String, Double> m = persistenceService.getSingleValuesMetrics(eventCondition, new String[]{"card", "count"}, "profileId.keyword", Event.ITEM_TYPE);
+        long card = m.get("_card").longValue();
+
+        int numParts = (int) (card / 1000);
+        for (int i = 0; i < numParts; i++) {
+            Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(eventCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE);
+            if (eventCountByProfile != null) {
+                for (Map.Entry<String, Long> entry : eventCountByProfile.entrySet()) {
+                    if (!entry.getKey().startsWith("_")) {
+                        if (entry.getValue() >= minimumEventCount && entry.getValue() <= maximumEventCount) {
+                            ids.add(entry.getKey());
+                        }
+                    }
+                }
+            }
+        }
+
+        return QueryBuilders.idsQuery(Profile.ITEM_TYPE).addIds(ids.toArray(new String[ids.size()]));
+    }
+
+    public long count(Condition condition, Map<String, Object> context, ConditionESQueryBuilderDispatcher dispatcher) {
+        Condition eventCondition = getEventCondition(condition, context);
+
+        Integer minimumEventCount = condition.getParameter("minimumEventCount") == null ? 0 : (Integer) condition.getParameter("minimumEventCount");
+        Integer maximumEventCount = condition.getParameter("maximumEventCount") == null ? Integer.MAX_VALUE : (Integer) condition.getParameter("maximumEventCount");
+
+        Map<String, Double> m = persistenceService.getSingleValuesMetrics(eventCondition, new String[]{"card", "count"}, "profileId.keyword", Event.ITEM_TYPE);
+        long card = m.get("_card").longValue();
+        long count = m.get("_count").longValue();
+
+        if (minimumEventCount != 0 || maximumEventCount != Integer.MAX_VALUE) {
+            int result = 0;
+            int numParts = (int) (card / 1000);
+            for (int i = 0; i < numParts; i++) {
+                Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(eventCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE);
+                if (eventCountByProfile != null) {
+                    for (Map.Entry<String, Long> entry : eventCountByProfile.entrySet()) {
+                        if (!entry.getKey().startsWith("_")) {
+                            if (entry.getValue() >= minimumEventCount && entry.getValue() <= maximumEventCount) {
+                                result ++;
+                            }
+                        }
+                    }
+                }
+            }
+            return result;
+        }
+
+        return card;
+    }
+
+    private Condition getEventCondition(Condition condition, Map<String, Object> context) {
         Condition eventCondition;
         try {
             eventCondition = (Condition) condition.getParameter("eventCondition");
@@ -70,22 +128,7 @@ public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder
             numberOfDaysCondition.setParameter("propertyValueDateExpr", "now-" + numberOfDays + "d");
             l.add(numberOfDaysCondition);
         }
-        //todo : Check behaviour with important number of profiles
-        Set<String> ids = new HashSet<String>();
-        Integer minimumEventCount = condition.getParameter("minimumEventCount") == null ? 0 : (Integer) condition.getParameter("minimumEventCount");
-        Integer maximumEventCount = condition.getParameter("maximumEventCount") == null  ? Integer.MAX_VALUE : (Integer) condition.getParameter("maximumEventCount");
-
-        Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(andCondition, new TermsAggregate("profileId"), Event.ITEM_TYPE);
-        if (eventCountByProfile != null) {
-            for (Map.Entry<String, Long> entry : eventCountByProfile.entrySet()) {
-                if (!entry.getKey().startsWith("_")) {
-                    if (entry.getValue() >= minimumEventCount && entry.getValue() <= maximumEventCount) {
-                        ids.add(entry.getKey());
-                    }
-                }
-            }
-        }
-
-        return QueryBuilders.idsQuery(Profile.ITEM_TYPE).addIds(ids.toArray(new String[ids.size()]));
+        return andCondition;
     }
+
 }