You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by sh...@apache.org on 2016/12/15 20:56:02 UTC

[1/3] incubator-unomi git commit: Further optimizations - Add shell script to build and run without integration and performance tests. - Segment updates should now use a lot more memory since we now use scrolling queries

Repository: incubator-unomi
Updated Branches:
  refs/heads/feature-UNOMI-70-ES5X a94fdb0e0 -> 1cd827a12


Further optimizations
- Add shell script to build and run without integration and performance tests.
- Segment updates should now use a lot more memory since we now use scrolling queries


Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/28d7dbde
Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/28d7dbde
Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/28d7dbde

Branch: refs/heads/feature-UNOMI-70-ES5X
Commit: 28d7dbde4728ed41eed6cc43022039ad02ddce7e
Parents: d7a7969
Author: Serge Huber <sh...@apache.org>
Authored: Thu Dec 15 16:22:38 2016 +0100
Committer: Serge Huber <sh...@apache.org>
Committed: Thu Dec 15 16:22:38 2016 +0100

----------------------------------------------------------------------
 .../java/org/apache/unomi/api/PartialList.java  |  25 +++++
 buildAndRunNoTests.sh                           |  34 ++++++
 .../ElasticSearchPersistenceServiceImpl.java    |  94 +++++++++++++---
 .../persistence/spi/PersistenceService.java     |  33 ++++++
 .../PropertyConditionESQueryBuilder.java        |   3 +-
 .../services/services/SegmentServiceImpl.java   | 111 ++++++++++++-------
 .../resources/OSGI-INF/blueprint/blueprint.xml  |   2 +
 7 files changed, 242 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/28d7dbde/api/src/main/java/org/apache/unomi/api/PartialList.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/unomi/api/PartialList.java b/api/src/main/java/org/apache/unomi/api/PartialList.java
index 59f7e4d..daa69c9 100644
--- a/api/src/main/java/org/apache/unomi/api/PartialList.java
+++ b/api/src/main/java/org/apache/unomi/api/PartialList.java
@@ -37,6 +37,8 @@ public class PartialList<T> implements Serializable {
     private long offset;
     private long pageSize;
     private long totalSize;
+    private String scrollIdentifier = null;
+    private String scrollTimeValidity = null;
 
     /**
      * Instantiates a new PartialList.
@@ -141,4 +143,27 @@ public class PartialList<T> implements Serializable {
         return list.get(index);
     }
 
+    /**
+     * Retrieve the scroll identifier to make it possible to continue a scrolling list query
+     * @return a string containing the scroll identifier, to be sent back in an subsequent request
+     */
+    public String getScrollIdentifier() {
+        return scrollIdentifier;
+    }
+
+    public void setScrollIdentifier(String scrollIdentifier) {
+        this.scrollIdentifier = scrollIdentifier;
+    }
+
+    /**
+     * Retrieve the value of the scroll time validity to make it possible to continue a scrolling list query
+     * @return a string containing a time value for the scroll validity, to be sent back in a subsequent request
+     */
+    public String getScrollTimeValidity() {
+        return scrollTimeValidity;
+    }
+
+    public void setScrollTimeValidity(String scrollTimeValidity) {
+        this.scrollTimeValidity = scrollTimeValidity;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/28d7dbde/buildAndRunNoTests.sh
----------------------------------------------------------------------
diff --git a/buildAndRunNoTests.sh b/buildAndRunNoTests.sh
new file mode 100755
index 0000000..c8f7641
--- /dev/null
+++ b/buildAndRunNoTests.sh
@@ -0,0 +1,34 @@
+#!/bin/sh
+################################################################################
+#
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+#
+################################################################################
+echo Building...
+DIRNAME=`dirname "$0"`
+PROGNAME=`basename "$0"`
+if [ -f "$DIRNAME/setenv.sh" ]; then
+  . "$DIRNAME/setenv.sh"
+fi
+mvn clean install -P \!integration-tests,\!performance-tests,rat
+pushd package/target
+echo Uncompressing Unomi package...
+tar zxvf unomi-$UNOMI_VERSION.tar.gz
+cd unomi-$UNOMI_VERSION/bin
+echo Starting Unomi...
+./karaf debug
+popd
+

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/28d7dbde/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
----------------------------------------------------------------------
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 794b03b..d6a136e 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -695,7 +695,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
 
     @Override
     public <T extends Item> PartialList<T> getAllItems(final Class<T> clazz, int offset, int size, String sortBy) {
-        return query(QueryBuilders.matchAllQuery(), sortBy, clazz, offset, size, null);
+        return query(QueryBuilders.matchAllQuery(), sortBy, clazz, offset, size, null, null);
     }
 
     @Override
@@ -711,7 +711,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                     String itemType = (String) clazz.getField("ITEM_TYPE").get(null);
 
                     if (itemsMonthlyIndexed.contains(itemType) && dateHint == null) {
-                        PartialList<T> r = query(QueryBuilders.idsQuery(itemType).ids(itemId), null, clazz, 0, 1, null);
+                        PartialList<T> r = query(QueryBuilders.idsQuery(itemType).ids(itemId), null, clazz, 0, 1, null, null);
                         if (r.size() > 0) {
                             return r.get(0);
                         }
@@ -1128,12 +1128,17 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
 
     @Override
     public <T extends Item> PartialList<T> query(final Condition query, String sortBy, final Class<T> clazz, final int offset, final int size) {
-        return query(conditionESQueryBuilderDispatcher.getQueryBuilder(query), sortBy, clazz, offset, size, null);
+        return query(conditionESQueryBuilderDispatcher.getQueryBuilder(query), sortBy, clazz, offset, size, null, null);
+    }
+
+    @Override
+    public <T extends Item> PartialList<T> query(final Condition query, String sortBy, final Class<T> clazz, final int offset, final int size, final String scrollTimeValidity) {
+        return query(conditionESQueryBuilderDispatcher.getQueryBuilder(query), sortBy, clazz, offset, size, null, scrollTimeValidity);
     }
 
     @Override
     public <T extends Item> PartialList<T> queryFullText(final String fulltext, final Condition query, String sortBy, final Class<T> clazz, final int offset, final int size) {
-        return query(QueryBuilders.boolQuery().must(QueryBuilders.queryStringQuery(fulltext).defaultField("_all")).must(conditionESQueryBuilderDispatcher.getQueryBuilder(query)), sortBy, clazz, offset, size, null);
+        return query(QueryBuilders.boolQuery().must(QueryBuilders.queryStringQuery(fulltext).defaultField("_all")).must(conditionESQueryBuilderDispatcher.getQueryBuilder(query)), sortBy, clazz, offset, size, null, null);
     }
 
     @Override
@@ -1143,22 +1148,22 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
 
     @Override
     public <T extends Item> List<T> query(final String fieldName, final String[] fieldValues, String sortBy, final Class<T> clazz) {
-        return query(QueryBuilders.termsQuery(fieldName, ConditionContextHelper.foldToASCII(fieldValues)), sortBy, clazz, 0, -1, getRouting(fieldName, fieldValues, clazz)).getList();
+        return query(QueryBuilders.termsQuery(fieldName, ConditionContextHelper.foldToASCII(fieldValues)), sortBy, clazz, 0, -1, getRouting(fieldName, fieldValues, clazz), null).getList();
     }
 
     @Override
     public <T extends Item> PartialList<T> query(String fieldName, String fieldValue, String sortBy, Class<T> clazz, int offset, int size) {
-        return query(QueryBuilders.termQuery(fieldName, ConditionContextHelper.foldToASCII(fieldValue)), sortBy, clazz, offset, size, getRouting(fieldName, new String[]{fieldValue}, clazz));
+        return query(QueryBuilders.termQuery(fieldName, ConditionContextHelper.foldToASCII(fieldValue)), sortBy, clazz, offset, size, getRouting(fieldName, new String[]{fieldValue}, clazz), null);
     }
 
     @Override
     public <T extends Item> PartialList<T> queryFullText(String fieldName, String fieldValue, String fulltext, String sortBy, Class<T> clazz, int offset, int size) {
-        return query(QueryBuilders.boolQuery().must(QueryBuilders.queryStringQuery(fulltext).defaultField("_all")).must(QueryBuilders.termQuery(fieldName, fieldValue)), sortBy, clazz, offset, size, getRouting(fieldName, new String[]{fieldValue}, clazz));
+        return query(QueryBuilders.boolQuery().must(QueryBuilders.queryStringQuery(fulltext).defaultField("_all")).must(QueryBuilders.termQuery(fieldName, fieldValue)), sortBy, clazz, offset, size, getRouting(fieldName, new String[]{fieldValue}, clazz), null);
     }
 
     @Override
     public <T extends Item> PartialList<T> queryFullText(String fulltext, String sortBy, Class<T> clazz, int offset, int size) {
-        return query(QueryBuilders.queryStringQuery(fulltext).defaultField("_all"), sortBy, clazz, offset, size, getRouting("_all", new String[]{fulltext}, clazz));
+        return query(QueryBuilders.queryStringQuery(fulltext).defaultField("_all"), sortBy, clazz, offset, size, getRouting("_all", new String[]{fulltext}, clazz), null);
     }
 
     @Override
@@ -1166,7 +1171,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
         RangeQueryBuilder builder = QueryBuilders.rangeQuery(fieldName);
         builder.from(from);
         builder.to(to);
-        return query(builder, sortBy, clazz, offset, size, null);
+        return query(builder, sortBy, clazz, offset, size, null, null);
     }
 
     @Override
@@ -1193,21 +1198,35 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
         }.executeInClassLoader();
     }
 
-    private <T extends Item> PartialList<T> query(final QueryBuilder query, final String sortBy, final Class<T> clazz, final int offset, final int size, final String[] routing) {
+    private <T extends Item> PartialList<T> query(final QueryBuilder query, final String sortBy, final Class<T> clazz, final int offset, final int size, final String[] routing, final String scrollTimeValidity) {
         return new InClassLoaderExecute<PartialList<T>>() {
 
             @Override
             protected PartialList<T> execute(Object... args) {
                 List<T> results = new ArrayList<T>();
+                String scrollIdentifier = null;
                 long totalHits = 0;
                 try {
                     String itemType = getItemType(clazz);
+                    TimeValue keepAlive = TimeValue.timeValueHours(1);
+                    SearchRequestBuilder requestBuilder = null;
+                    if (scrollTimeValidity != null) {
+                        keepAlive = TimeValue.parseTimeValue(scrollTimeValidity, TimeValue.timeValueHours(1), "scrollTimeValidity");
+                        requestBuilder = client.prepareSearch(getIndexNameForQuery(itemType))
+                                .setTypes(itemType)
+                                .setFetchSource(true)
+                                .setScroll(keepAlive)
+                                .setFrom(offset)
+                                .setQuery(query)
+                                .setSize(size);
+                    } else {
+                        requestBuilder = client.prepareSearch(getIndexNameForQuery(itemType))
+                                .setTypes(itemType)
+                                .setFetchSource(true)
+                                .setQuery(query)
+                                .setFrom(offset);
+                    }
 
-                    SearchRequestBuilder requestBuilder = client.prepareSearch(getIndexNameForQuery(itemType))
-                            .setTypes(itemType)
-                            .setFetchSource(true)
-                            .setQuery(query)
-                            .setFrom(offset);
                     if (size == Integer.MIN_VALUE) {
                         requestBuilder.setSize(defaultQueryLimit);
                     } else if (size != -1) {
@@ -1244,6 +1263,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                             .execute()
                             .actionGet();
                     SearchHits searchHits = response.getHits();
+                    scrollIdentifier = response.getScrollId();
                     totalHits = searchHits.getTotalHits();
                     for (SearchHit searchHit : searchHits) {
                         String sourceAsString = searchHit.getSourceAsString();
@@ -1255,7 +1275,49 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                     logger.error("Error loading itemType=" + clazz.getName() + " query=" + query + " sortBy=" + sortBy, t);
                 }
 
-                return new PartialList<T>(results, offset, size, totalHits);
+                PartialList<T> result = new PartialList<T>(results, offset, size, totalHits);
+                if (scrollIdentifier != null && totalHits != 0) {
+                    result.setScrollIdentifier(scrollIdentifier);
+                    result.setScrollTimeValidity(scrollTimeValidity);
+                }
+                return result;
+            }
+        }.executeInClassLoader();
+    }
+
+    @Override
+    public <T extends Item> PartialList<T> continueScrollQuery(final Class<T> clazz, final String scrollIdentifier, final String scrollTimeValidity) {
+        return new InClassLoaderExecute<PartialList<T>>() {
+
+            @Override
+            protected PartialList<T> execute(Object... args) {
+                List<T> results = new ArrayList<T>();
+                long totalHits = 0;
+                try {
+                    TimeValue keepAlive = TimeValue.parseTimeValue(scrollTimeValidity, TimeValue.timeValueMinutes(10), "scrollTimeValidity");
+                    SearchResponse response = client.prepareSearchScroll(scrollIdentifier).setScroll(keepAlive).execute().actionGet();
+
+                    if (response.getHits().getHits().length == 0) {
+                        client.prepareClearScroll().addScrollId(response.getScrollId()).execute().actionGet();
+                    } else {
+                        for (SearchHit searchHit : response.getHits().getHits()) {
+                            // add hit to results
+                            String sourceAsString = searchHit.getSourceAsString();
+                            final T value = CustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
+                            value.setItemId(searchHit.getId());
+                            results.add(value);
+                        }
+                    }
+                    PartialList<T> result = new PartialList<T>(results, 0, response.getHits().getHits().length, response.getHits().getTotalHits());
+                    if (scrollIdentifier != null) {
+                        result.setScrollIdentifier(scrollIdentifier);
+                        result.setScrollTimeValidity(scrollTimeValidity);
+                    }
+                    return result;
+                } catch (Exception t) {
+                    logger.error("Error continuing scrolling query for itemType=" + clazz.getName() + " scrollIdentifier=" + scrollIdentifier + " scrollTimeValidity=" + scrollTimeValidity, t);
+                }
+                return null;
             }
         }.executeInClassLoader();
     }

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/28d7dbde/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
----------------------------------------------------------------------
diff --git a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
index 90b0efc..881c395 100644
--- a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
+++ b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
@@ -294,6 +294,39 @@ public interface PersistenceService {
     <T extends Item> PartialList<T> query(Condition query, String sortBy, Class<T> clazz, int offset, int size);
 
     /**
+     * Retrieves a list of items satisfying the specified {@link Condition}, ordered according to the specified {@code sortBy} String and and paged: only {@code size} of them
+     * are retrieved, starting with the {@code offset}-th one. If a scroll identifier and time validity are specified, they will be used to perform a scrolling query, meaning
+     * that only partial results will be returned, but the scrolling can be continued.
+     *
+     * @param <T>    the type of the Item subclass we want to retrieve
+     * @param query  the {@link Condition} the items must satisfy to be retrieved
+     * @param sortBy an optional ({@code null} if no sorting is required) String of comma ({@code ,}) separated property names on which ordering should be performed, ordering
+     *               elements according to the property order in the
+     *               String, considering each in turn and moving on to the next one in case of equality of all preceding ones. Each property name is optionally followed by
+     *               a column ({@code :}) and an order specifier: {@code asc} or {@code desc}.
+     * @param clazz  the {@link Item} subclass of the items we want to retrieve
+     * @param offset zero or a positive integer specifying the position of the first item in the total ordered collection of matching items
+     * @param size   a positive integer specifying how many matching items should be retrieved or {@code -1} if all of them should be retrieved. In the case of a scroll query
+     *               this will be used as the scrolling window size.
+     * @param scrollTimeValidity the time the scrolling query should stay valid. This must contain a time unit value such as the ones supported by ElasticSearch, such as
+     *                           the ones declared here : https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#time-units
+     * @return a {@link PartialList} of items matching the specified criteria, with an scroll identifier and the scroll validity used if a scroll query was requested.
+     */
+    <T extends Item> PartialList<T> query(Condition query, String sortBy, Class<T> clazz, int offset, int size, String scrollTimeValidity);
+
+    /**
+     * Continues the execution of a scroll query, to retrieve the next results. If there are no more results the scroll query is also cleared.
+     * @param clazz  the {@link Item} subclass of the items we want to retrieve
+     * @param scrollIdentifier a scroll identifier obtained by the execution of a first query and returned in the {@link PartialList} object
+     * @param scrollTimeValidity a scroll time validity value for the scroll query to stay valid. This must contain a time unit value such as the ones supported by ElasticSearch, such as
+     *                           the ones declared here : https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#time-units
+     * @param <T>    the type of the Item subclass we want to retrieve
+     * @return a {@link PartialList} of items matching the specified criteria, with an scroll identifier and the scroll validity used if a scroll query was requested. Note that if
+     * there are no more results the list will be empty but not null.
+     */
+    <T extends Item> PartialList<T> continueScrollQuery(Class<T> clazz, String scrollIdentifier, String scrollTimeValidity);
+
+    /**
      * Retrieves the same items as {@code query(query, sortBy, clazz, 0, -1)} with the added constraints that the matching elements must also have at least a field matching the
      * specified full text query.
      *

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/28d7dbde/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionESQueryBuilder.java
----------------------------------------------------------------------
diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionESQueryBuilder.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionESQueryBuilder.java
index 7013581..c36722b 100644
--- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionESQueryBuilder.java
+++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionESQueryBuilder.java
@@ -129,8 +129,9 @@ public class PropertyConditionESQueryBuilder implements ConditionESQueryBuilder
             case "isNotDay":
                 checkRequiredValue(value, name, op, false);
                 return QueryBuilders.boolQuery().mustNot(getIsSameDayRange(value, name));
+            default:
+                throw new IllegalArgumentException("Impossible to build ES filter, unrecognized op=" + op);
         }
-        return null;
     }
 
     private void checkRequiredValuesSize(List<?> values, String name, String operator, int expectedSize) {

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/28d7dbde/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
----------------------------------------------------------------------
diff --git a/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
index 3a64be1..2275d60 100644
--- a/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.unomi.api.*;
 import org.apache.unomi.api.actions.Action;
 import org.apache.unomi.api.conditions.Condition;
+import org.apache.unomi.api.conditions.ConditionType;
 import org.apache.unomi.api.query.Query;
 import org.apache.unomi.api.rules.Rule;
 import org.apache.unomi.api.segments.*;
@@ -62,6 +63,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
     private List<Segment> allSegments;
     private List<Scoring> allScoring;
     private Timer segmentTimer;
+    private int segmentUpdateBatchSize = 1000;
 
     public SegmentServiceImpl() {
         logger.info("Initializing segment service...");
@@ -119,6 +121,10 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
         this.rulesService = rulesService;
     }
 
+    public void setSegmentUpdateBatchSize(int segmentUpdateBatchSize) {
+        this.segmentUpdateBatchSize = segmentUpdateBatchSize;
+    }
+
     public void postConstruct() {
         logger.debug("postConstruct {" + bundleContext.getBundle() + "}");
         loadPredefinedSegments(bundleContext);
@@ -819,64 +825,83 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
         long t = System.currentTimeMillis();
         Condition segmentCondition = new Condition();
 
+        long updatedProfileCount = 0;
+
         segmentCondition.setConditionType(definitionsService.getConditionType("profilePropertyCondition"));
         segmentCondition.setParameter("propertyName", "segments");
         segmentCondition.setParameter("comparisonOperator", "equals");
         segmentCondition.setParameter("propertyValue", segment.getItemId());
 
         if(segment.getMetadata().isEnabled()) {
-            // the following list can grow really big if the segments are large.
-            // We might want to replace this with scrolling if it becomes huge
-            // (100million profiles)
-            List<Profile> previousProfiles = persistenceService.query(segmentCondition, null, Profile.class);
-            List<Profile> newProfiles = persistenceService.query(segment.getCondition(), null, Profile.class);
-
-            // we use sets instead of lists to speed up contains() calls that are very expensive on lists.
-
-            // we use to use removeAll calls but these are expensive because they require lots of copies upon element
-            // removal so we implemented them with adds instead.
-            //profilesToAdd.removeAll(previousProfiles);
-            //profilesToRemove.removeAll(newProfiles);
 
-            Set<Profile> newProfilesSet = new HashSet<>(newProfiles);
-            Set<Profile> previousProfilesSet = new HashSet<>(previousProfiles);
-            Set<Profile> profilesToAdd = new HashSet<>(newProfilesSet.size() / 2);
-            for (Profile newProfile : newProfilesSet) {
-                if (!previousProfilesSet.contains(newProfile)) {
-                    profilesToAdd.add(newProfile);
+            ConditionType booleanConditionType = definitionsService.getConditionType("booleanCondition");
+            ConditionType notConditionType = definitionsService.getConditionType("notCondition");
+
+            Condition profilesToAddCondition = new Condition(booleanConditionType);
+            profilesToAddCondition.setParameter("operator", "and");
+            List<Condition> profilesToAddSubConditions = new ArrayList<>();
+            profilesToAddSubConditions.add(segment.getCondition());
+            Condition notOldSegmentCondition = new Condition(notConditionType);
+            notOldSegmentCondition.setParameter("subCondition", segmentCondition);
+            profilesToAddSubConditions.add(notOldSegmentCondition);
+            profilesToAddCondition.setParameter("subConditions", profilesToAddSubConditions);
+
+            Condition profilesToRemoveCondition = new Condition(booleanConditionType);
+            profilesToRemoveCondition.setParameter("operator", "and");
+            List<Condition> profilesToRemoveSubConditions = new ArrayList<>();
+            profilesToRemoveSubConditions.add(segmentCondition);
+            Condition notNewSegmentCondition = new Condition(notConditionType);
+            notNewSegmentCondition.setParameter("subCondition", segment.getCondition());
+            profilesToRemoveSubConditions.add(notNewSegmentCondition);
+            profilesToRemoveCondition.setParameter("subConditions", profilesToRemoveSubConditions);
+
+            PartialList<Profile> profilesToRemove = persistenceService.query(profilesToRemoveCondition, null, Profile.class, 0, segmentUpdateBatchSize, "10m");
+            PartialList<Profile> profilesToAdd = persistenceService.query(profilesToAddCondition, null, Profile.class, 0, segmentUpdateBatchSize, "10m");
+
+            while (profilesToAdd.getList().size() > 0) {
+                for (Profile profileToAdd : profilesToAdd.getList()) {
+                    profileToAdd.getSegments().add(segment.getItemId());
+                    persistenceService.update(profileToAdd.getItemId(), null, Profile.class, "segments", profileToAdd.getSegments());
+                    Event profileUpdated = new Event("profileUpdated", null, profileToAdd, null, null, profileToAdd, new Date());
+                    profileUpdated.setPersistent(false);
+                    eventService.send(profileUpdated);
+                    updatedProfileCount++;
                 }
-            }
-            Set<Profile> profilesToRemove = new HashSet<>(previousProfilesSet.size() / 2);
-            for (Profile previousProfile : previousProfilesSet) {
-                if (!newProfilesSet.contains(previousProfile)) {
-                    profilesToRemove.add(previousProfile);
+                profilesToAdd = persistenceService.continueScrollQuery(Profile.class, profilesToAdd.getScrollIdentifier(), profilesToAdd.getScrollTimeValidity());
+                if (profilesToAdd == null || profilesToAdd.getList().size() == 0) {
+                    break;
                 }
             }
-
-
-            for (Profile profileToAdd : profilesToAdd) {
-                profileToAdd.getSegments().add(segment.getItemId());
-                persistenceService.update(profileToAdd.getItemId(), null, Profile.class, "segments", profileToAdd.getSegments());
-                Event profileUpdated = new Event("profileUpdated", null, profileToAdd, null, null, profileToAdd, new Date());
-                profileUpdated.setPersistent(false);
-                eventService.send(profileUpdated);
-            }
-            for (Profile profileToRemove : profilesToRemove) {
-                profileToRemove.getSegments().remove(segment.getItemId());
-                persistenceService.update(profileToRemove.getItemId(), null, Profile.class, "segments", profileToRemove.getSegments());
-                Event profileUpdated = new Event("profileUpdated", null, profileToRemove, null, null, profileToRemove, new Date());
-                profileUpdated.setPersistent(false);
-                eventService.send(profileUpdated);
+            while (profilesToRemove.getList().size() > 0) {
+                for (Profile profileToRemove : profilesToRemove.getList()) {
+                    profileToRemove.getSegments().remove(segment.getItemId());
+                    persistenceService.update(profileToRemove.getItemId(), null, Profile.class, "segments", profileToRemove.getSegments());
+                    Event profileUpdated = new Event("profileUpdated", null, profileToRemove, null, null, profileToRemove, new Date());
+                    profileUpdated.setPersistent(false);
+                    eventService.send(profileUpdated);
+                    updatedProfileCount++;
+                }
+                profilesToRemove = persistenceService.continueScrollQuery(Profile.class, profilesToRemove.getScrollIdentifier(), profilesToRemove.getScrollTimeValidity());
+                if (profilesToRemove == null || profilesToRemove.getList().size() == 0) {
+                    break;
+                }
             }
 
         } else {
-            List<Profile> previousProfiles = persistenceService.query(segmentCondition, null, Profile.class);
-            for (Profile profileToRemove : previousProfiles) {
-                profileToRemove.getSegments().remove(segment.getItemId());
-                persistenceService.update(profileToRemove.getItemId(), null, Profile.class, "segments", profileToRemove.getSegments());
+            PartialList<Profile> profilesToRemove = persistenceService.query(segmentCondition, null, Profile.class, 0, 200, "10m");
+            while (profilesToRemove.getList().size() > 0) {
+                for (Profile profileToRemove : profilesToRemove.getList()) {
+                    profileToRemove.getSegments().remove(segment.getItemId());
+                    persistenceService.update(profileToRemove.getItemId(), null, Profile.class, "segments", profileToRemove.getSegments());
+                    updatedProfileCount++;
+                }
+                profilesToRemove = persistenceService.continueScrollQuery(Profile.class, profilesToRemove.getScrollIdentifier(), profilesToRemove.getScrollTimeValidity());
+                if (profilesToRemove == null || profilesToRemove.getList().size() == 0) {
+                    break;
+                }
             }
         }
-        logger.info("Profiles updated in {}ms", System.currentTimeMillis()-t);
+        logger.info("{} profiles updated in {}ms", updatedProfileCount, System.currentTimeMillis()-t);
     }
 
     private void updateExistingProfilesForScoring(Scoring scoring) {

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/28d7dbde/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
----------------------------------------------------------------------
diff --git a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 5564e94..3176639 100644
--- a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -29,6 +29,7 @@
             <cm:property name="profile.purge.inactiveTime" value="30"/>
             <cm:property name="profile.purge.existTime" value="-1"/>
             <cm:property name="event.purge.existTime" value="12"/>
+            <cm:property name="segment.update.batchSize" value="1000"/>
         </cm:default-properties>
     </cm:property-placeholder>
 
@@ -104,6 +105,7 @@
         <property name="rulesService" ref="rulesServiceImpl"/>
         <property name="bundleContext" ref="blueprintBundleContext"/>
         <property name="taskExecutionPeriod" value="86400000"/>
+        <property name="segmentUpdateBatchSize" value="${services.segment.update.batchSize}" />
     </bean>
     <service id="segmentService" ref="segmentServiceImpl" auto-export="interfaces"/>
 


[3/3] incubator-unomi git commit: Upgrade to ElasticSearch 5.1.1

Posted by sh...@apache.org.
Upgrade to ElasticSearch 5.1.1


Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/1cd827a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/1cd827a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/1cd827a1

Branch: refs/heads/feature-UNOMI-70-ES5X
Commit: 1cd827a1252c07e9083aee9ec81104ab66b2881f
Parents: 923e491
Author: Serge Huber <sh...@apache.org>
Authored: Thu Dec 15 21:55:52 2016 +0100
Committer: Serge Huber <sh...@apache.org>
Committed: Thu Dec 15 21:55:52 2016 +0100

----------------------------------------------------------------------
 .../elasticsearch/ElasticSearchPersistenceServiceImpl.java    | 4 ++--
 .../baseplugin/conditions/PropertyConditionEvaluator.java     | 7 +++----
 pom.xml                                                       | 2 +-
 3 files changed, 6 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/1cd827a1/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
----------------------------------------------------------------------
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 0651c3f..020d1d4 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -63,7 +63,7 @@ import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.query.RangeQueryBuilder;
 import org.elasticsearch.script.Script;
-import org.elasticsearch.script.ScriptService;
+import org.elasticsearch.script.ScriptType;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
 import org.elasticsearch.search.aggregations.*;
@@ -827,7 +827,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                     String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) :
                             (itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndex(dateHint) : indexName);
 
-                    Script actualScript = new Script(script, ScriptService.ScriptType.INLINE, null, scriptParams);
+                    Script actualScript = new Script(ScriptType.INLINE, "groovy", script, scriptParams);
                     if (bulkProcessor == null) {
                         client.prepareUpdate(index, itemType, itemId).setScript(actualScript)
                                 .execute()

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/1cd827a1/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionEvaluator.java
----------------------------------------------------------------------
diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionEvaluator.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionEvaluator.java
index 67404a8..68e58a6 100644
--- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionEvaluator.java
+++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionEvaluator.java
@@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory;
 
 import java.text.SimpleDateFormat;
 import java.util.*;
-import java.util.concurrent.Callable;
+import java.util.function.LongSupplier;
 import java.util.regex.Pattern;
 
 /**
@@ -282,9 +282,8 @@ public class PropertyConditionEvaluator implements ConditionEvaluator {
         } else {
             DateMathParser parser = new DateMathParser(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER);
             try {
-                return new Date(parser.parse(value.toString(), new Callable<Long>() {
-                    @Override
-                    public Long call() throws Exception {
+                return new Date(parser.parse(value.toString(), new LongSupplier() {
+                    public long getAsLong() {
                         return System.currentTimeMillis();
                     }
                 }));

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/1cd827a1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 040e382..1aba4e5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -69,7 +69,7 @@
         <version.karaf>3.0.8</version.karaf>
         <version.karaf.cellar>3.0.3</version.karaf.cellar>
         <version.pax.exam>4.9.1</version.pax.exam>
-        <elasticsearch.version>5.0.1</elasticsearch.version>
+        <elasticsearch.version>5.1.1</elasticsearch.version>
 
         <maven.compiler.source>1.7</maven.compiler.source>
         <maven.compiler.target>1.7</maven.compiler.target>


[2/3] incubator-unomi git commit: Merge branch 'feature-UNOMI-28-ES2X' into feature-UNOMI-70-ES5X

Posted by sh...@apache.org.
Merge branch 'feature-UNOMI-28-ES2X' into feature-UNOMI-70-ES5X

# Conflicts:
#	persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/923e4917
Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/923e4917
Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/923e4917

Branch: refs/heads/feature-UNOMI-70-ES5X
Commit: 923e49171fae139e2197b55ef4b5102d61221edc
Parents: a94fdb0 28d7dbd
Author: Serge Huber <sh...@apache.org>
Authored: Thu Dec 15 16:40:41 2016 +0100
Committer: Serge Huber <sh...@apache.org>
Committed: Thu Dec 15 16:40:41 2016 +0100

----------------------------------------------------------------------
 .../java/org/apache/unomi/api/PartialList.java  |  25 +++++
 buildAndRunNoTests.sh                           |  34 ++++++
 .../ElasticSearchPersistenceServiceImpl.java    |  95 +++++++++++++---
 .../persistence/spi/PersistenceService.java     |  33 ++++++
 .../PropertyConditionESQueryBuilder.java        |   3 +-
 .../services/services/SegmentServiceImpl.java   | 111 ++++++++++++-------
 .../resources/OSGI-INF/blueprint/blueprint.xml  |   2 +
 7 files changed, 242 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/923e4917/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
----------------------------------------------------------------------
diff --cc persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index fa4b5e1,d6a136e..0651c3f
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@@ -699,7 -711,7 +699,7 @@@ public class ElasticSearchPersistenceSe
                      String itemType = (String) clazz.getField("ITEM_TYPE").get(null);
  
                      if (itemsMonthlyIndexed.contains(itemType) && dateHint == null) {
-                         PartialList<T> r = query(QueryBuilders.idsQuery(itemType).addIds(itemId), null, clazz, 0, 1, null);
 -                        PartialList<T> r = query(QueryBuilders.idsQuery(itemType).ids(itemId), null, clazz, 0, 1, null, null);
++                        PartialList<T> r = query(QueryBuilders.idsQuery(itemType).addIds(itemId), null, clazz, 0, 1, null, null);
                          if (r.size() > 0) {
                              return r.get(0);
                          }
@@@ -1238,35 -1262,14 +1256,36 @@@
                      SearchResponse response = requestBuilder
                              .execute()
                              .actionGet();
 -                    SearchHits searchHits = response.getHits();
 -                    scrollIdentifier = response.getScrollId();
 -                    totalHits = searchHits.getTotalHits();
 -                    for (SearchHit searchHit : searchHits) {
 -                        String sourceAsString = searchHit.getSourceAsString();
 -                        final T value = CustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
 -                        value.setItemId(searchHit.getId());
 -                        results.add(value);
 +                    if (size == -1) {
 +                        // Scroll until no more hits are returned
 +                        while (true) {
 +
 +                            for (SearchHit searchHit : response.getHits().getHits()) {
 +                                // add hit to results
 +                                String sourceAsString = searchHit.getSourceAsString();
 +                                final T value = CustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
 +                                value.setItemId(searchHit.getId());
 +                                results.add(value);
 +                            }
 +
 +                            response = client.prepareSearchScroll(response.getScrollId()).setScroll(keepAlive).execute().actionGet();
 +
 +                            // If we have no more hits, exit
 +                            if (response.getHits().getHits().length == 0) {
 +                                break;
 +                            }
 +                        }
 +                        client.prepareClearScroll().addScrollId(response.getScrollId()).execute().actionGet();
 +                    } else {
 +                        SearchHits searchHits = response.getHits();
++                        scrollIdentifier = response.getScrollId();
 +                        totalHits = searchHits.getTotalHits();
 +                        for (SearchHit searchHit : searchHits) {
 +                            String sourceAsString = searchHit.getSourceAsString();
 +                            final T value = CustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
 +                            value.setItemId(searchHit.getId());
 +                            results.add(value);
 +                        }
                      }
                  } catch (Exception t) {
                      logger.error("Error loading itemType=" + clazz.getName() + " query=" + query + " sortBy=" + sortBy, t);

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/923e4917/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
----------------------------------------------------------------------