You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by dr...@apache.org on 2018/10/18 12:24:27 UTC
[1/5] incubator-unomi git commit: UNOMI-204 : Add count method in
QueryBuilder, adds cardinality/count aggregates,
first optimization for PastEventCondition
Repository: incubator-unomi
Updated Branches:
refs/heads/master 55bd603ca -> aa82e46a0
UNOMI-204 : Add count method in QueryBuilder, adds cardinality/count aggregates, first optimization for PastEventCondition
Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/bc549076
Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/bc549076
Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/bc549076
Branch: refs/heads/master
Commit: bc549076dcebd76511ad0943071c895a9c05d06b
Parents: 95d0030
Author: tdraier <dr...@apache.org>
Authored: Tue Oct 9 18:15:28 2018 +0200
Committer: tdraier <dr...@apache.org>
Committed: Thu Oct 11 15:07:22 2018 +0200
----------------------------------------------------------------------
.../ElasticSearchPersistenceServiceImpl.java | 25 ++++++-
.../conditions/ConditionESQueryBuilder.java | 3 +
.../ConditionESQueryBuilderDispatcher.java | 32 ++++++++
.../spi/aggregate/TermsAggregate.java | 18 +++++
.../PastEventConditionESQueryBuilder.java | 77 +++++++++++++++-----
5 files changed, 137 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/bc549076/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
----------------------------------------------------------------------
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 46e7cbb..23cedfb 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -61,6 +61,7 @@ import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexNotFoundException;
+import org.elasticsearch.index.query.IdsQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
@@ -83,6 +84,8 @@ import org.elasticsearch.search.aggregations.bucket.missing.MissingAggregationBu
import org.elasticsearch.search.aggregations.bucket.range.RangeAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.range.date.DateRangeAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.range.ip.IpRangeAggregationBuilder;
+import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
+import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude;
import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
import org.elasticsearch.search.sort.GeoDistanceSortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
@@ -1306,7 +1309,15 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
@Override
public long queryCount(Condition query, String itemType) {
- return queryCount(conditionESQueryBuilderDispatcher.buildFilter(query), itemType);
+ try {
+ return conditionESQueryBuilderDispatcher.count(query);
+ } catch (UnsupportedOperationException e) {
+ QueryBuilder filter = conditionESQueryBuilderDispatcher.buildFilter(query);
+ if (filter instanceof IdsQueryBuilder) {
+ return ((IdsQueryBuilder) filter).ids().size();
+ }
+ return queryCount(filter, itemType);
+ }
}
private long queryCount(final QueryBuilder filter, final String itemType) {
@@ -1564,6 +1575,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
//default
if (fieldName != null) {
bucketsAggregation = AggregationBuilders.terms("buckets").field(fieldName).size(Integer.parseInt(aggregateQueryBucketSize));
+ if (aggregate instanceof TermsAggregate) {
+ TermsAggregate termsAggregate = (TermsAggregate) aggregate;
+ if (termsAggregate.getPartition() > -1 && termsAggregate.getNumPartitions() > -1) {
+ ((TermsAggregationBuilder) bucketsAggregation).includeExclude(new IncludeExclude(termsAggregate.getPartition(), termsAggregate.getNumPartitions()));
+ }
+ }
} else {
// field name could be null if no existing data exists
}
@@ -1781,6 +1798,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
case "max":
filterAggregation.subAggregation(AggregationBuilders.max("max").field(field));
break;
+ case "card":
+ filterAggregation.subAggregation(AggregationBuilders.cardinality("card").field(field));
+ break;
+ case "count":
+ filterAggregation.subAggregation(AggregationBuilders.count("count").field(field));
+ break;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/bc549076/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilder.java
----------------------------------------------------------------------
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilder.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilder.java
index 78a5cba..7a100d0 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilder.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilder.java
@@ -26,4 +26,7 @@ public interface ConditionESQueryBuilder {
QueryBuilder buildQuery(Condition condition, Map<String, Object> context, ConditionESQueryBuilderDispatcher dispatcher);
+ default long count(Condition condition, Map<String, Object> context, ConditionESQueryBuilderDispatcher dispatcher) {
+ throw new UnsupportedOperationException();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/bc549076/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java
----------------------------------------------------------------------
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java
index b29a631..d08e283 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java
@@ -86,5 +86,37 @@ public class ConditionESQueryBuilderDispatcher {
return QueryBuilders.matchAllQuery();
}
+ public long count(Condition condition) {
+ return count(condition, new HashMap<>());
+ }
+
+ public long count(Condition condition, Map<String, Object> context) {
+ if(condition == null || condition.getConditionType() == null) {
+ throw new IllegalArgumentException("Condition is null or doesn't have type, impossible to build filter");
+ }
+
+ String queryBuilderKey = condition.getConditionType().getQueryBuilder();
+ if (queryBuilderKey == null && condition.getConditionType().getParentCondition() != null) {
+ context.putAll(condition.getParameterValues());
+ return count(condition.getConditionType().getParentCondition(), context);
+ }
+
+ if (queryBuilderKey == null) {
+ throw new UnsupportedOperationException("No query builder defined for : " + condition.getConditionTypeId());
+ }
+ if (queryBuilders.containsKey(queryBuilderKey)) {
+ ConditionESQueryBuilder queryBuilder = queryBuilders.get(queryBuilderKey);
+ Condition contextualCondition = ConditionContextHelper.getContextualCondition(condition, context);
+ if (contextualCondition != null) {
+ return queryBuilder.count(contextualCondition, context, this);
+ }
+ }
+
+ // if no matching
+ if (logger.isDebugEnabled()) {
+ logger.debug("No matching query builder for condition {} and context {}", condition, context);
+ }
+ throw new UnsupportedOperationException();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/bc549076/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/aggregate/TermsAggregate.java
----------------------------------------------------------------------
diff --git a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/aggregate/TermsAggregate.java b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/aggregate/TermsAggregate.java
index a0591f5..3b9d741 100644
--- a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/aggregate/TermsAggregate.java
+++ b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/aggregate/TermsAggregate.java
@@ -18,7 +18,25 @@
package org.apache.unomi.persistence.spi.aggregate;
public class TermsAggregate extends BaseAggregate{
+ private int partition = -1;
+ private int numPartitions = -1;
+
+
public TermsAggregate(String field) {
super(field);
}
+
+ public TermsAggregate(String field, int partition, int numPartitions) {
+ super(field);
+ this.partition = partition;
+ this.numPartitions = numPartitions;
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+
+ public int getNumPartitions() {
+ return numPartitions;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/bc549076/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
----------------------------------------------------------------------
diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
index e51aaa8..b3c169c 100644
--- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
+++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
@@ -44,6 +44,64 @@ public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder
}
public QueryBuilder buildQuery(Condition condition, Map<String, Object> context, ConditionESQueryBuilderDispatcher dispatcher) {
+ Condition eventCondition = getEventCondition(condition, context);
+ //todo : Check behaviour with important number of profiles
+ Set<String> ids = new HashSet<String>();
+ Integer minimumEventCount = condition.getParameter("minimumEventCount") == null ? 0 : (Integer) condition.getParameter("minimumEventCount");
+ Integer maximumEventCount = condition.getParameter("maximumEventCount") == null ? Integer.MAX_VALUE : (Integer) condition.getParameter("maximumEventCount");
+
+ Map<String, Double> m = persistenceService.getSingleValuesMetrics(eventCondition, new String[]{"card", "count"}, "profileId.keyword", Event.ITEM_TYPE);
+ long card = m.get("_card").longValue();
+
+ int numParts = (int) (card / 1000);
+ for (int i = 0; i < numParts; i++) {
+ Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(eventCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE);
+ if (eventCountByProfile != null) {
+ for (Map.Entry<String, Long> entry : eventCountByProfile.entrySet()) {
+ if (!entry.getKey().startsWith("_")) {
+ if (entry.getValue() >= minimumEventCount && entry.getValue() <= maximumEventCount) {
+ ids.add(entry.getKey());
+ }
+ }
+ }
+ }
+ }
+
+ return QueryBuilders.idsQuery(Profile.ITEM_TYPE).addIds(ids.toArray(new String[ids.size()]));
+ }
+
+ public long count(Condition condition, Map<String, Object> context, ConditionESQueryBuilderDispatcher dispatcher) {
+ Condition eventCondition = getEventCondition(condition, context);
+
+ Integer minimumEventCount = condition.getParameter("minimumEventCount") == null ? 0 : (Integer) condition.getParameter("minimumEventCount");
+ Integer maximumEventCount = condition.getParameter("maximumEventCount") == null ? Integer.MAX_VALUE : (Integer) condition.getParameter("maximumEventCount");
+
+ Map<String, Double> m = persistenceService.getSingleValuesMetrics(eventCondition, new String[]{"card", "count"}, "profileId.keyword", Event.ITEM_TYPE);
+ long card = m.get("_card").longValue();
+ long count = m.get("_count").longValue();
+
+ if (minimumEventCount != 0 || maximumEventCount != Integer.MAX_VALUE) {
+ int result = 0;
+ int numParts = (int) (card / 1000);
+ for (int i = 0; i < numParts; i++) {
+ Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(eventCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE);
+ if (eventCountByProfile != null) {
+ for (Map.Entry<String, Long> entry : eventCountByProfile.entrySet()) {
+ if (!entry.getKey().startsWith("_")) {
+ if (entry.getValue() >= minimumEventCount && entry.getValue() <= maximumEventCount) {
+ result ++;
+ }
+ }
+ }
+ }
+ }
+ return result;
+ }
+
+ return card;
+ }
+
+ private Condition getEventCondition(Condition condition, Map<String, Object> context) {
Condition eventCondition;
try {
eventCondition = (Condition) condition.getParameter("eventCondition");
@@ -70,22 +128,7 @@ public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder
numberOfDaysCondition.setParameter("propertyValueDateExpr", "now-" + numberOfDays + "d");
l.add(numberOfDaysCondition);
}
- //todo : Check behaviour with important number of profiles
- Set<String> ids = new HashSet<String>();
- Integer minimumEventCount = condition.getParameter("minimumEventCount") == null ? 0 : (Integer) condition.getParameter("minimumEventCount");
- Integer maximumEventCount = condition.getParameter("maximumEventCount") == null ? Integer.MAX_VALUE : (Integer) condition.getParameter("maximumEventCount");
-
- Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(andCondition, new TermsAggregate("profileId"), Event.ITEM_TYPE);
- if (eventCountByProfile != null) {
- for (Map.Entry<String, Long> entry : eventCountByProfile.entrySet()) {
- if (!entry.getKey().startsWith("_")) {
- if (entry.getValue() >= minimumEventCount && entry.getValue() <= maximumEventCount) {
- ids.add(entry.getKey());
- }
- }
- }
- }
-
- return QueryBuilders.idsQuery(Profile.ITEM_TYPE).addIds(ids.toArray(new String[ids.size()]));
+ return andCondition;
}
+
}
[2/5] incubator-unomi git commit: UNOMI-204 : More optimizations in
past event count and segment calculation
Posted by dr...@apache.org.
UNOMI-204 : More optimizations in past event count and segment calculation
Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/ae31c533
Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/ae31c533
Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/ae31c533
Branch: refs/heads/master
Commit: ae31c533c2f63820bd275c32cf0af0f5c2e6817c
Parents: bc54907
Author: tdraier <dr...@apache.org>
Authored: Fri Oct 12 14:59:54 2018 +0200
Committer: tdraier <dr...@apache.org>
Committed: Fri Oct 12 14:59:54 2018 +0200
----------------------------------------------------------------------
.../unomi/api/services/SegmentService.java | 12 +-
.../ElasticSearchPersistenceServiceImpl.java | 12 +-
.../PastEventConditionESQueryBuilder.java | 114 ++++++++++++++-----
.../resources/OSGI-INF/blueprint/blueprint.xml | 1 +
.../services/services/SegmentServiceImpl.java | 93 +++++++++------
5 files changed, 164 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/ae31c533/api/src/main/java/org/apache/unomi/api/services/SegmentService.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/unomi/api/services/SegmentService.java b/api/src/main/java/org/apache/unomi/api/services/SegmentService.java
index 4b03e78..1ed152d 100644
--- a/api/src/main/java/org/apache/unomi/api/services/SegmentService.java
+++ b/api/src/main/java/org/apache/unomi/api/services/SegmentService.java
@@ -21,6 +21,7 @@ import org.apache.unomi.api.Item;
import org.apache.unomi.api.Metadata;
import org.apache.unomi.api.PartialList;
import org.apache.unomi.api.Profile;
+import org.apache.unomi.api.conditions.Condition;
import org.apache.unomi.api.query.Query;
import org.apache.unomi.api.segments.DependentMetadata;
import org.apache.unomi.api.segments.Scoring;
@@ -50,7 +51,7 @@ public interface SegmentService {
/**
* Retrieves segment metadatas for segments in the specified scope, ordered according to the specified {@code sortBy} String and and paged: only {@code size} of them are
* retrieved, starting with the {@code offset}-th one.
- *
+ * <p>
* TODO: remove?
*
* @param scope the scope for which we want to retrieve segment metadata
@@ -158,7 +159,7 @@ public interface SegmentService {
* Retrieves the set of all scoring metadata.
*
* @param offset the offset
- * @param size the size
+ * @param size the size
* @param sortBy sort by
* @return the set of all scoring metadata
*/
@@ -219,4 +220,11 @@ public interface SegmentService {
*/
DependentMetadata getScoringDependentMetadata(String scoringId);
+ /**
+ * Get generated property key for past event condition
+ * @param condition The event condition
+ * @param parentCondition The past event condition
+ * @return
+ */
+ String getGeneratedPropertyKey(Condition condition, Condition parentCondition);
}
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/ae31c533/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
----------------------------------------------------------------------
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 23cedfb..87a46eb 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -1312,11 +1312,15 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
try {
return conditionESQueryBuilderDispatcher.count(query);
} catch (UnsupportedOperationException e) {
- QueryBuilder filter = conditionESQueryBuilderDispatcher.buildFilter(query);
- if (filter instanceof IdsQueryBuilder) {
- return ((IdsQueryBuilder) filter).ids().size();
+ try {
+ QueryBuilder filter = conditionESQueryBuilderDispatcher.buildFilter(query);
+ if (filter instanceof IdsQueryBuilder) {
+ return ((IdsQueryBuilder) filter).ids().size();
+ }
+ return queryCount(filter, itemType);
+ } catch (UnsupportedOperationException e1) {
+ return -1;
}
- return queryCount(filter, itemType);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/ae31c533/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
----------------------------------------------------------------------
diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
index b3c169c..c891abd 100644
--- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
+++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
@@ -21,6 +21,7 @@ import org.apache.unomi.api.Event;
import org.apache.unomi.api.Profile;
import org.apache.unomi.api.conditions.Condition;
import org.apache.unomi.api.services.DefinitionsService;
+import org.apache.unomi.api.services.SegmentService;
import org.apache.unomi.persistence.elasticsearch.conditions.ConditionContextHelper;
import org.apache.unomi.persistence.elasticsearch.conditions.ConditionESQueryBuilder;
import org.apache.unomi.persistence.elasticsearch.conditions.ConditionESQueryBuilderDispatcher;
@@ -28,12 +29,20 @@ import org.apache.unomi.persistence.spi.PersistenceService;
import org.apache.unomi.persistence.spi.aggregate.TermsAggregate;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.query.RangeQueryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.*;
public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder {
+
private DefinitionsService definitionsService;
private PersistenceService persistenceService;
+ private SegmentService segmentService;
+
+ private int maximumIdsQueryCount = 1000;
+ private int termsAggregatePartitionSize = 1000;
public void setDefinitionsService(DefinitionsService definitionsService) {
this.definitionsService = definitionsService;
@@ -43,62 +52,115 @@ public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder
this.persistenceService = persistenceService;
}
+ public void setMaximumIdsQueryCount(int maximumIdsQueryCount) {
+ this.maximumIdsQueryCount = maximumIdsQueryCount;
+ }
+
+ public void setTermsAggregatePartitionSize(int termsAggregatePartitionSize) {
+ this.termsAggregatePartitionSize = termsAggregatePartitionSize;
+ }
+
+ public void setSegmentService(SegmentService segmentService) {
+ this.segmentService = segmentService;
+ }
+
public QueryBuilder buildQuery(Condition condition, Map<String, Object> context, ConditionESQueryBuilderDispatcher dispatcher) {
- Condition eventCondition = getEventCondition(condition, context);
- //todo : Check behaviour with important number of profiles
- Set<String> ids = new HashSet<String>();
- Integer minimumEventCount = condition.getParameter("minimumEventCount") == null ? 0 : (Integer) condition.getParameter("minimumEventCount");
+ Integer minimumEventCount = condition.getParameter("minimumEventCount") == null ? 1 : (Integer) condition.getParameter("minimumEventCount");
Integer maximumEventCount = condition.getParameter("maximumEventCount") == null ? Integer.MAX_VALUE : (Integer) condition.getParameter("maximumEventCount");
- Map<String, Double> m = persistenceService.getSingleValuesMetrics(eventCondition, new String[]{"card", "count"}, "profileId.keyword", Event.ITEM_TYPE);
- long card = m.get("_card").longValue();
+ final Logger logger = LoggerFactory.getLogger(PastEventConditionESQueryBuilder.class.getName());
+
+ if (condition.getParameter("generatedPropertyKey") != null && condition.getParameter("generatedPropertyKey").equals(segmentService.getGeneratedPropertyKey((Condition) condition.getParameter("eventCondition"), condition))) {
+ // A property is already set on profiles matching the past event condition, use it
+ if (minimumEventCount != 1 || maximumEventCount != Integer.MAX_VALUE) {
+ // Check the number of occurences
+ RangeQueryBuilder builder = QueryBuilders.rangeQuery("systemProperties.pastEvents." + condition.getParameter("generatedPropertyKey"));
+ if (minimumEventCount != 1) {
+ builder.gte(minimumEventCount);
+ }
+ if (maximumEventCount != Integer.MAX_VALUE) {
+ builder.lte(minimumEventCount);
+ }
+ return builder;
+ } else {
+ // Simply get profiles who have the property set
+ return QueryBuilders.existsQuery("systemProperties.pastEvents." + condition.getParameter("generatedPropertyKey"));
+ }
+ } else {
+ // No property set - tries to build an idsQuery
+ // Build past event condition
+ Condition eventCondition = getEventCondition(condition, context);
+
+ Set<String> ids = new HashSet<>();
- int numParts = (int) (card / 1000);
- for (int i = 0; i < numParts; i++) {
- Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(eventCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE);
- if (eventCountByProfile != null) {
- for (Map.Entry<String, Long> entry : eventCountByProfile.entrySet()) {
- if (!entry.getKey().startsWith("_")) {
- if (entry.getValue() >= minimumEventCount && entry.getValue() <= maximumEventCount) {
+ // Get full cardinality to partition the terms aggreggation
+ Map<String, Double> m = persistenceService.getSingleValuesMetrics(eventCondition, new String[]{"card"}, "profileId.keyword", Event.ITEM_TYPE);
+ long card = m.get("_card").longValue();
+
+ int numParts = (int) (card / termsAggregatePartitionSize);
+ for (int i = 0; i < numParts; i++) {
+ Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(eventCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE);
+ if (eventCountByProfile != null) {
+ eventCountByProfile.remove("_filtered");
+ for (Map.Entry<String, Long> entry : eventCountByProfile.entrySet()) {
+ if (entry.getValue() < minimumEventCount) {
+ // No more interesting buckets in this partition
+ break;
+ } else if (entry.getValue() <= maximumEventCount) {
ids.add(entry.getKey());
+
+ if (ids.size() > maximumIdsQueryCount) {
+ // Avoid building too big ids query - throw exception instead
+ throw new UnsupportedOperationException("Too many profiles");
+ }
}
}
}
}
- }
- return QueryBuilders.idsQuery(Profile.ITEM_TYPE).addIds(ids.toArray(new String[ids.size()]));
+ return QueryBuilders.idsQuery(Profile.ITEM_TYPE).addIds(ids.toArray(new String[0]));
+ }
}
public long count(Condition condition, Map<String, Object> context, ConditionESQueryBuilderDispatcher dispatcher) {
Condition eventCondition = getEventCondition(condition, context);
- Integer minimumEventCount = condition.getParameter("minimumEventCount") == null ? 0 : (Integer) condition.getParameter("minimumEventCount");
+ Integer minimumEventCount = condition.getParameter("minimumEventCount") == null ? 1 : (Integer) condition.getParameter("minimumEventCount");
Integer maximumEventCount = condition.getParameter("maximumEventCount") == null ? Integer.MAX_VALUE : (Integer) condition.getParameter("maximumEventCount");
- Map<String, Double> m = persistenceService.getSingleValuesMetrics(eventCondition, new String[]{"card", "count"}, "profileId.keyword", Event.ITEM_TYPE);
+ // Get full cardinality to partition the terms aggreggation
+ Map<String, Double> m = persistenceService.getSingleValuesMetrics(eventCondition, new String[]{"card"}, "profileId.keyword", Event.ITEM_TYPE);
long card = m.get("_card").longValue();
- long count = m.get("_count").longValue();
- if (minimumEventCount != 0 || maximumEventCount != Integer.MAX_VALUE) {
+ if (minimumEventCount != 1 || maximumEventCount != Integer.MAX_VALUE) {
+ // Event count specified, must check occurences count for each profile
int result = 0;
- int numParts = (int) (card / 1000);
+ int numParts = (int) (card / termsAggregatePartitionSize);
for (int i = 0; i < numParts; i++) {
Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(eventCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE);
+ int j = 0;
if (eventCountByProfile != null) {
+ eventCountByProfile.remove("_filtered");
for (Map.Entry<String, Long> entry : eventCountByProfile.entrySet()) {
- if (!entry.getKey().startsWith("_")) {
- if (entry.getValue() >= minimumEventCount && entry.getValue() <= maximumEventCount) {
- result ++;
- }
+ if (entry.getValue() < minimumEventCount) {
+ // No more interesting buckets in this partition
+ break;
+ } else if (entry.getValue() <= maximumEventCount && minimumEventCount == 1) {
+ // Take all remaining elements
+ result += eventCountByProfile.size() - j;
+ break;
+ } else if (entry.getValue() <= maximumEventCount) {
+ result++;
}
+ j++;
}
}
}
return result;
+ } else {
+ // Simply get the full number of distinct profiles
+ return card;
}
-
- return card;
}
private Condition getEventCondition(Condition condition, Map<String, Object> context) {
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/ae31c533/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml
----------------------------------------------------------------------
diff --git a/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 2cf78ab..9aa9d6c 100644
--- a/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -93,6 +93,7 @@
<bean class="org.apache.unomi.plugins.baseplugin.conditions.PastEventConditionESQueryBuilder">
<property name="definitionsService" ref="definitionsService"/>
<property name="persistenceService" ref="persistenceService"/>
+ <property name="segmentService" ref="segmentService"/>
</bean>
</service>
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/ae31c533/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
----------------------------------------------------------------------
diff --git a/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
index 0fe5f2e..c68f691 100644
--- a/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
@@ -27,7 +27,6 @@ import org.apache.unomi.api.rules.Rule;
import org.apache.unomi.api.segments.*;
import org.apache.unomi.api.services.*;
import org.apache.unomi.persistence.spi.CustomObjectMapper;
-import org.apache.unomi.persistence.spi.PersistenceService;
import org.apache.unomi.persistence.spi.aggregate.TermsAggregate;
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleContext;
@@ -58,6 +57,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
private List<Scoring> allScoring;
private Timer segmentTimer;
private int segmentUpdateBatchSize = 1000;
+ private int termsAggregatePartitionSize = 1000;
public SegmentServiceImpl() {
logger.info("Initializing segment service...");
@@ -83,6 +83,10 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
this.segmentUpdateBatchSize = segmentUpdateBatchSize;
}
+ public void setTermsAggregatePartitionSize(int termsAggregatePartitionSize) {
+ this.termsAggregatePartitionSize = termsAggregatePartitionSize;
+ }
+
public void postConstruct() {
logger.debug("postConstruct {" + bundleContext.getBundle() + "}");
loadPredefinedSegments(bundleContext);
@@ -708,13 +712,8 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
private void getAutoGeneratedRules(Metadata metadata, Condition condition, Condition parentCondition, List<Rule> rules) {
Set<String> tags = condition.getConditionType().getMetadata().getSystemTags();
if (tags.contains("eventCondition") && !tags.contains("profileCondition")) {
- try {
- Map<String, Object> m = new HashMap<>(3);
- m.put("scope", metadata.getScope());
- m.put("condition", condition);
- m.put("numberOfDays", parentCondition.getParameter("numberOfDays"));
- String key = CustomObjectMapper.getObjectMapper().writeValueAsString(m);
- key = "eventTriggered" + getMD5(key);
+ String key = getGeneratedPropertyKey(condition, parentCondition);
+ if (key != null) {
parentCondition.setParameter("generatedPropertyKey", key);
Rule rule = rulesService.getRule(key);
if (rule == null) {
@@ -734,8 +733,6 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
rule.getLinkedItems().add(metadata.getId());
rules.add(rule);
}
- } catch (JsonProcessingException e) {
- logger.error(e.getMessage(), e);
}
} else {
Collection<Object> values = new ArrayList<>(condition.getParameterValues().values());
@@ -773,18 +770,25 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
l.add(numberOfDaysCondition);
}
String propertyKey = (String) parentCondition.getParameter("generatedPropertyKey");
- Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(andCondition, new TermsAggregate("profileId"), Event.ITEM_TYPE);
- for (Map.Entry<String, Long> entry : eventCountByProfile.entrySet()) {
- String profileId = entry.getKey();
- if (!profileId.startsWith("_")) {
- Map<String, Long> pastEventCounts = new HashMap<>();
- pastEventCounts.put(propertyKey, entry.getValue());
- Map<String, Object> systemProperties = new HashMap<>();
- systemProperties.put("pastEvents", pastEventCounts);
- try {
- persistenceService.update(profileId, null, Profile.class, "systemProperties", systemProperties);
- } catch (Exception e) {
- logger.error("Error updating profile {} past event system properties", profileId, e);
+
+ Map<String, Double> m = persistenceService.getSingleValuesMetrics(andCondition, new String[]{"card"}, "profileId.keyword", Event.ITEM_TYPE);
+ long card = m.get("_card").longValue();
+
+ int numParts = (int) (card / termsAggregatePartitionSize);
+ for (int i = 0; i < numParts; i++) {
+ Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(andCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE);
+ for (Map.Entry<String, Long> entry : eventCountByProfile.entrySet()) {
+ String profileId = entry.getKey();
+ if (!profileId.startsWith("_")) {
+ Map<String, Long> pastEventCounts = new HashMap<>();
+ pastEventCounts.put(propertyKey, entry.getValue());
+ Map<String, Object> systemProperties = new HashMap<>();
+ systemProperties.put("pastEvents", pastEventCounts);
+ try {
+ persistenceService.update(profileId, null, Profile.class, "systemProperties", systemProperties);
+ } catch (Exception e) {
+ logger.error("Error updating profile {} past event system properties", profileId, e);
+ }
}
}
}
@@ -792,6 +796,33 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
logger.info("Profiles past condition updated in {}ms", System.currentTimeMillis() - t);
}
+ public String getGeneratedPropertyKey(Condition condition, Condition parentCondition) {
+ try {
+ Map<String, Object> m = new HashMap<>();
+ m.put("condition", condition);
+ m.put("numberOfDays", parentCondition.getParameter("numberOfDays"));
+ String key = CustomObjectMapper.getObjectMapper().writeValueAsString(m);
+ return "eventTriggered" + getMD5(key);
+ } catch (JsonProcessingException e) {
+ logger.error("Cannot generate key",e);
+ return null;
+ }
+ }
+
+ private String getMD5(String md5) {
+ try {
+ MessageDigest md = MessageDigest.getInstance("MD5");
+ byte[] array = md.digest(md5.getBytes());
+ StringBuffer sb = new StringBuffer();
+ for (int i = 0; i < array.length; ++i) {
+ sb.append(Integer.toHexString((array[i] & 0xFF) | 0x100).substring(1, 3));
+ }
+ return sb.toString();
+ } catch (java.security.NoSuchAlgorithmException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
private void updateExistingProfilesForSegment(Segment segment) {
long t = System.currentTimeMillis();
Condition segmentCondition = new Condition();
@@ -830,6 +861,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
PartialList<Profile> profilesToAdd = persistenceService.query(profilesToAddCondition, null, Profile.class, 0, segmentUpdateBatchSize, "10m");
while (profilesToAdd.getList().size() > 0) {
+ long t2= System.currentTimeMillis();
for (Profile profileToAdd : profilesToAdd.getList()) {
profileToAdd.getSegments().add(segment.getItemId());
persistenceService.update(profileToAdd.getItemId(), null, Profile.class, "segments", profileToAdd.getSegments());
@@ -838,12 +870,14 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
eventService.send(profileUpdated);
updatedProfileCount++;
}
+ logger.info("{} profiles added in segment in {}ms", profilesToAdd.size(), System.currentTimeMillis() - t2);
profilesToAdd = persistenceService.continueScrollQuery(Profile.class, profilesToAdd.getScrollIdentifier(), profilesToAdd.getScrollTimeValidity());
if (profilesToAdd == null || profilesToAdd.getList().size() == 0) {
break;
}
}
while (profilesToRemove.getList().size() > 0) {
+ long t2= System.currentTimeMillis();
for (Profile profileToRemove : profilesToRemove.getList()) {
profileToRemove.getSegments().remove(segment.getItemId());
persistenceService.update(profileToRemove.getItemId(), null, Profile.class, "segments", profileToRemove.getSegments());
@@ -852,6 +886,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
eventService.send(profileUpdated);
updatedProfileCount++;
}
+ logger.info("{} profiles removed from segment in {}ms", profilesToRemove.size(), System.currentTimeMillis() - t2);
profilesToRemove = persistenceService.continueScrollQuery(Profile.class, profilesToRemove.getScrollIdentifier(), profilesToRemove.getScrollTimeValidity());
if (profilesToRemove == null || profilesToRemove.getList().size() == 0) {
break;
@@ -929,20 +964,6 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
logger.info("Profiles updated in {}ms", System.currentTimeMillis() - t);
}
- private String getMD5(String md5) {
- try {
- MessageDigest md = MessageDigest.getInstance("MD5");
- byte[] array = md.digest(md5.getBytes());
- StringBuffer sb = new StringBuffer();
- for (int i = 0; i < array.length; ++i) {
- sb.append(Integer.toHexString((array[i] & 0xFF) | 0x100).substring(1, 3));
- }
- return sb.toString();
- } catch (java.security.NoSuchAlgorithmException e) {
- throw new RuntimeException(e);
- }
- }
-
public void bundleChanged(BundleEvent event) {
switch (event.getType()) {
case BundleEvent.STARTED:
[5/5] incubator-unomi git commit: Merge branch 'UNOMI-204'
Posted by dr...@apache.org.
Merge branch 'UNOMI-204'
Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/aa82e46a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/aa82e46a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/aa82e46a
Branch: refs/heads/master
Commit: aa82e46a0600908f0c5bf0a7e0081aac07aa0fdc
Parents: 55bd603 183ffcf
Author: tdraier <dr...@apache.org>
Authored: Thu Oct 18 14:23:32 2018 +0200
Committer: tdraier <dr...@apache.org>
Committed: Thu Oct 18 14:23:32 2018 +0200
----------------------------------------------------------------------
.../unomi/api/services/SegmentService.java | 12 +-
.../ElasticSearchPersistenceServiceImpl.java | 35 ++++-
.../conditions/ConditionESQueryBuilder.java | 3 +
.../ConditionESQueryBuilderDispatcher.java | 32 +++++
...g.apache.unomi.persistence.elasticsearch.cfg | 3 +
.../spi/aggregate/TermsAggregate.java | 18 +++
.../PastEventConditionESQueryBuilder.java | 135 ++++++++++++++++---
.../resources/OSGI-INF/blueprint/blueprint.xml | 11 ++
.../services/services/SegmentServiceImpl.java | 92 ++++++++-----
.../resources/OSGI-INF/blueprint/blueprint.xml | 8 ++
10 files changed, 291 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/aa82e46a/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/aa82e46a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
----------------------------------------------------------------------
[3/5] incubator-unomi git commit: UNOMI-204 : removed logger
Posted by dr...@apache.org.
UNOMI-204 : removed logger
Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/7aa45d62
Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/7aa45d62
Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/7aa45d62
Branch: refs/heads/master
Commit: 7aa45d625a7dc04f0422e13b2790aebfb46cf1fc
Parents: ae31c53
Author: tdraier <dr...@apache.org>
Authored: Fri Oct 12 15:56:05 2018 +0200
Committer: tdraier <dr...@apache.org>
Committed: Fri Oct 12 15:56:05 2018 +0200
----------------------------------------------------------------------
.../baseplugin/conditions/PastEventConditionESQueryBuilder.java | 2 --
1 file changed, 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/7aa45d62/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
----------------------------------------------------------------------
diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
index c891abd..0265a42 100644
--- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
+++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
@@ -68,8 +68,6 @@ public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder
Integer minimumEventCount = condition.getParameter("minimumEventCount") == null ? 1 : (Integer) condition.getParameter("minimumEventCount");
Integer maximumEventCount = condition.getParameter("maximumEventCount") == null ? Integer.MAX_VALUE : (Integer) condition.getParameter("maximumEventCount");
- final Logger logger = LoggerFactory.getLogger(PastEventConditionESQueryBuilder.class.getName());
-
if (condition.getParameter("generatedPropertyKey") != null && condition.getParameter("generatedPropertyKey").equals(segmentService.getGeneratedPropertyKey((Condition) condition.getParameter("eventCondition"), condition))) {
// A property is already set on profiles matching the past event condition, use it
if (minimumEventCount != 1 || maximumEventCount != Integer.MAX_VALUE) {
[4/5] incubator-unomi git commit: UNOMI-204 : added configurations
parameters
Posted by dr...@apache.org.
UNOMI-204 : added configurations parameters
Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/183ffcf1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/183ffcf1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/183ffcf1
Branch: refs/heads/master
Commit: 183ffcf1f49efce0244e38c100a963687ada1033
Parents: 7aa45d6
Author: tdraier <dr...@apache.org>
Authored: Tue Oct 16 14:10:27 2018 +0200
Committer: tdraier <dr...@apache.org>
Committed: Tue Oct 16 14:10:27 2018 +0200
----------------------------------------------------------------------
.../ElasticSearchPersistenceServiceImpl.java | 6 +++---
.../org.apache.unomi.persistence.elasticsearch.cfg | 3 +++
.../conditions/PastEventConditionESQueryBuilder.java | 14 ++++++--------
.../main/resources/OSGI-INF/blueprint/blueprint.xml | 10 ++++++++++
.../unomi/services/services/SegmentServiceImpl.java | 8 ++++----
.../main/resources/OSGI-INF/blueprint/blueprint.xml | 8 ++++++++
6 files changed, 34 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/183ffcf1/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
----------------------------------------------------------------------
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 87a46eb..dd8c48f 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -156,7 +156,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
private String minimalElasticSearchVersion = "5.0.0";
private String maximalElasticSearchVersion = "5.7.0";
- private String aggregateQueryBucketSize = "5000";
+ private int aggregateQueryBucketSize = 5000;
private String transportClientClassName = null;
private String transportClientProperties = null;
@@ -259,7 +259,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
this.maximalElasticSearchVersion = maximalElasticSearchVersion;
}
- public void setAggregateQueryBucketSize(String aggregateQueryBucketSize) {
+ public void setAggregateQueryBucketSize(int aggregateQueryBucketSize) {
this.aggregateQueryBucketSize = aggregateQueryBucketSize;
}
@@ -1578,7 +1578,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
fieldName = getPropertyNameWithData(fieldName, itemType);
//default
if (fieldName != null) {
- bucketsAggregation = AggregationBuilders.terms("buckets").field(fieldName).size(Integer.parseInt(aggregateQueryBucketSize));
+ bucketsAggregation = AggregationBuilders.terms("buckets").field(fieldName).size(aggregateQueryBucketSize);
if (aggregate instanceof TermsAggregate) {
TermsAggregate termsAggregate = (TermsAggregate) aggregate;
if (termsAggregate.getPartition() > -1 && termsAggregate.getNumPartitions() > -1) {
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/183ffcf1/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
----------------------------------------------------------------------
diff --git a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
index 324a626..dd0d0ec 100644
--- a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
+++ b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
@@ -45,3 +45,6 @@ maximalElasticSearchVersion=5.7.0
# The following setting is used to set the aggregate query bucket size
aggregateQueryBucketSize=5000
+
+# Maximum size allowed for an elastic "ids" query
+maximumIdsQueryCount=5000
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/183ffcf1/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
----------------------------------------------------------------------
diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
index 0265a42..c8aeaca 100644
--- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
+++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
@@ -30,8 +30,6 @@ import org.apache.unomi.persistence.spi.aggregate.TermsAggregate;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.*;
@@ -41,8 +39,8 @@ public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder
private PersistenceService persistenceService;
private SegmentService segmentService;
- private int maximumIdsQueryCount = 1000;
- private int termsAggregatePartitionSize = 1000;
+ private int maximumIdsQueryCount = 5000;
+ private int aggregateQueryBucketSize = 5000;
public void setDefinitionsService(DefinitionsService definitionsService) {
this.definitionsService = definitionsService;
@@ -56,8 +54,8 @@ public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder
this.maximumIdsQueryCount = maximumIdsQueryCount;
}
- public void setTermsAggregatePartitionSize(int termsAggregatePartitionSize) {
- this.termsAggregatePartitionSize = termsAggregatePartitionSize;
+ public void setAggregateQueryBucketSize(int aggregateQueryBucketSize) {
+ this.aggregateQueryBucketSize = aggregateQueryBucketSize;
}
public void setSegmentService(SegmentService segmentService) {
@@ -95,7 +93,7 @@ public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder
Map<String, Double> m = persistenceService.getSingleValuesMetrics(eventCondition, new String[]{"card"}, "profileId.keyword", Event.ITEM_TYPE);
long card = m.get("_card").longValue();
- int numParts = (int) (card / termsAggregatePartitionSize);
+ int numParts = (int) (card / aggregateQueryBucketSize) + 2;
for (int i = 0; i < numParts; i++) {
Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(eventCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE);
if (eventCountByProfile != null) {
@@ -133,7 +131,7 @@ public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder
if (minimumEventCount != 1 || maximumEventCount != Integer.MAX_VALUE) {
// Event count specified, must check occurences count for each profile
int result = 0;
- int numParts = (int) (card / termsAggregatePartitionSize);
+ int numParts = (int) (card / aggregateQueryBucketSize) + 2;
for (int i = 0; i < numParts; i++) {
Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(eventCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE);
int j = 0;
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/183ffcf1/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml
----------------------------------------------------------------------
diff --git a/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 9aa9d6c..5a10214 100644
--- a/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -29,6 +29,14 @@
</cm:default-properties>
</cm:property-placeholder>
+ <cm:property-placeholder persistent-id="org.apache.unomi.persistence.elasticsearch"
+ update-strategy="reload" placeholder-prefix="${es.">
+ <cm:default-properties>
+ <cm:property name="maximumIdsQueryCount" value="5000"/>
+ <cm:property name="aggregateQueryBucketSize" value="5000"/>
+ </cm:default-properties>
+ </cm:property-placeholder>
+
<reference id="definitionsService" interface="org.apache.unomi.api.services.DefinitionsService"/>
<reference id="persistenceService" interface="org.apache.unomi.persistence.spi.PersistenceService"/>
<reference id="profileService" interface="org.apache.unomi.api.services.ProfileService"/>
@@ -94,6 +102,8 @@
<property name="definitionsService" ref="definitionsService"/>
<property name="persistenceService" ref="persistenceService"/>
<property name="segmentService" ref="segmentService"/>
+ <property name="maximumIdsQueryCount" value="${es.maximumIdsQueryCount}"/>
+ <property name="aggregateQueryBucketSize" value="${es.aggregateQueryBucketSize}"/>
</bean>
</service>
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/183ffcf1/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
----------------------------------------------------------------------
diff --git a/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
index c68f691..ec74525 100644
--- a/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
@@ -57,7 +57,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
private List<Scoring> allScoring;
private Timer segmentTimer;
private int segmentUpdateBatchSize = 1000;
- private int termsAggregatePartitionSize = 1000;
+ private int aggregateQueryBucketSize = 5000;
public SegmentServiceImpl() {
logger.info("Initializing segment service...");
@@ -83,8 +83,8 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
this.segmentUpdateBatchSize = segmentUpdateBatchSize;
}
- public void setTermsAggregatePartitionSize(int termsAggregatePartitionSize) {
- this.termsAggregatePartitionSize = termsAggregatePartitionSize;
+ public void setAggregateQueryBucketSize(int aggregateQueryBucketSize) {
+ this.aggregateQueryBucketSize = aggregateQueryBucketSize;
}
public void postConstruct() {
@@ -774,7 +774,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
Map<String, Double> m = persistenceService.getSingleValuesMetrics(andCondition, new String[]{"card"}, "profileId.keyword", Event.ITEM_TYPE);
long card = m.get("_card").longValue();
- int numParts = (int) (card / termsAggregatePartitionSize);
+ int numParts = (int) (card / aggregateQueryBucketSize) + 2;
for (int i = 0; i < numParts; i++) {
Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(andCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE);
for (Map.Entry<String, Long> entry : eventCountByProfile.entrySet()) {
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/183ffcf1/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
----------------------------------------------------------------------
diff --git a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index eed06f2..979c6be 100644
--- a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -44,6 +44,13 @@
</cm:default-properties>
</cm:property-placeholder>
+ <cm:property-placeholder persistent-id="org.apache.unomi.persistence.elasticsearch"
+ update-strategy="reload" placeholder-prefix="${es.">
+ <cm:default-properties>
+ <cm:property name="aggregateQueryBucketSize" value="5000"/>
+ </cm:default-properties>
+ </cm:property-placeholder>
+
<reference id="persistenceService"
interface="org.apache.unomi.persistence.spi.PersistenceService"/>
<reference id="httpService" interface="org.osgi.service.http.HttpService"/>
@@ -144,6 +151,7 @@
<property name="bundleContext" ref="blueprintBundleContext"/>
<property name="taskExecutionPeriod" value="86400000"/>
<property name="segmentUpdateBatchSize" value="${services.segment.update.batchSize}" />
+ <property name="aggregateQueryBucketSize" value="${es.aggregateQueryBucketSize}" />
</bean>
<service id="segmentService" ref="segmentServiceImpl">
<interfaces>