You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by am...@apache.org on 2017/03/20 12:34:50 UTC

incubator-unomi git commit: UNOMI-87 : Rewrite the queries for the scroring plans update

Repository: incubator-unomi
Updated Branches:
  refs/heads/master cd3bc9ff0 -> 2f687915a


UNOMI-87 : Rewrite the queries for the scroring plans update


Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/2f687915
Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/2f687915
Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/2f687915

Branch: refs/heads/master
Commit: 2f687915a8929af291e43ee2536ba4208e752496
Parents: cd3bc9f
Author: Abdelkader Midani <am...@apache.org>
Authored: Mon Mar 20 12:09:36 2017 +0100
Committer: Abdelkader Midani <am...@apache.org>
Committed: Mon Mar 20 12:09:42 2017 +0100

----------------------------------------------------------------------
 .../ElasticSearchPersistenceServiceImpl.java    | 124 ++++++++++++++-----
 ...g.apache.unomi.persistence.elasticsearch.cfg |   4 +-
 persistence-elasticsearch/pom.xml               |   1 -
 .../persistence/spi/PersistenceService.java     |  12 ++
 .../PropertyConditionESQueryBuilder.java        |   3 +
 .../conditions/PropertyConditionEvaluator.java  |   3 -
 pom.xml                                         |   2 +-
 .../services/services/SegmentServiceImpl.java   | 119 +++++++++---------
 8 files changed, 169 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/2f687915/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
----------------------------------------------------------------------
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index e2d1645..4e66418 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -56,7 +56,11 @@ import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.query.RangeQueryBuilder;
+import org.elasticsearch.index.reindex.BulkIndexByScrollResponse;
+import org.elasticsearch.index.reindex.UpdateByQueryAction;
+import org.elasticsearch.index.reindex.UpdateByQueryRequestBuilder;
 import org.elasticsearch.script.Script;
+import org.elasticsearch.script.ScriptException;
 import org.elasticsearch.script.ScriptType;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
@@ -92,22 +96,21 @@ import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import static org.elasticsearch.index.query.QueryBuilders.termQuery;
+
 @SuppressWarnings("rawtypes")
 public class ElasticSearchPersistenceServiceImpl implements PersistenceService, SynchronousBundleListener {
 
-    private static final Logger logger = LoggerFactory.getLogger(ElasticSearchPersistenceServiceImpl.class.getName());
-
     public static final String NUMBER_OF_SHARDS = "number_of_shards";
     public static final String NUMBER_OF_REPLICAS = "number_of_replicas";
     public static final String CLUSTER_NAME = "cluster.name";
-
     public static final String BULK_PROCESSOR_NAME = "bulkProcessor.name";
     public static final String BULK_PROCESSOR_CONCURRENT_REQUESTS = "bulkProcessor.concurrentRequests";
     public static final String BULK_PROCESSOR_BULK_ACTIONS = "bulkProcessor.bulkActions";
     public static final String BULK_PROCESSOR_BULK_SIZE = "bulkProcessor.bulkSize";
     public static final String BULK_PROCESSOR_FLUSH_INTERVAL = "bulkProcessor.flushInterval";
     public static final String BULK_PROCESSOR_BACKOFF_POLICY = "bulkProcessor.backoffPolicy";
-
+    private static final Logger logger = LoggerFactory.getLogger(ElasticSearchPersistenceServiceImpl.class.getName());
     private TransportClient client;
     private BulkProcessor bulkProcessor;
     private String elasticSearchAddresses;
@@ -123,7 +126,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
     private ConditionEvaluatorDispatcher conditionEvaluatorDispatcher;
     private ConditionESQueryBuilderDispatcher conditionESQueryBuilderDispatcher;
 
-    private Map<String,String> indexNames;
+    private Map<String, String> indexNames;
     private List<String> itemsMonthlyIndexed;
     private Map<String, String> routingByType;
     private Set<String> existingIndexNames = new TreeSet<String>();
@@ -135,7 +138,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
     private String bulkProcessorName = "unomi-bulk";
     private String bulkProcessorConcurrentRequests = "1";
     private String bulkProcessorBulkActions = "1000";
-    private String bulkProcessorBulkSize= "5MB";
+    private String bulkProcessorBulkSize = "5MB";
     private String bulkProcessorFlushInterval = "5s";
     private String bulkProcessorBackoffPolicy = "exponential";
 
@@ -306,8 +309,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                 }
                 if (!indexExists) {
                     logger.info("{} index doesn't exist yet, creating it...", indexName);
-                    Map<String,String> indexMappings = new HashMap<String,String>();
-                    indexMappings.put("_default_",mappings.get("_default_"));
+                    Map<String, String> indexMappings = new HashMap<String, String>();
+                    indexMappings.put("_default_", mappings.get("_default_"));
                     for (Map.Entry<String, String> entry : mappings.entrySet()) {
                         if (!itemsMonthlyIndexed.contains(entry.getKey()) && !indexNames.containsKey(entry.getKey())) {
                             indexMappings.put(entry.getKey(), entry.getValue());
@@ -455,7 +458,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                     int paramEndPos = backoffPolicyStr.indexOf(")", paramStartPos);
                     int paramSeparatorPos = backoffPolicyStr.indexOf(",", paramStartPos);
                     TimeValue delay = TimeValue.parseTimeValue(backoffPolicyStr.substring(paramStartPos, paramSeparatorPos), new TimeValue(5, TimeUnit.SECONDS), BULK_PROCESSOR_BACKOFF_POLICY);
-                    int maxNumberOfRetries = Integer.parseInt(backoffPolicyStr.substring(paramSeparatorPos+1, paramEndPos));
+                    int maxNumberOfRetries = Integer.parseInt(backoffPolicyStr.substring(paramSeparatorPos + 1, paramEndPos));
                     bulkProcessorBuilder.setBackoffPolicy(BackoffPolicy.constantBackoff(delay, maxNumberOfRetries));
                 } else if (backoffPolicyStr.startsWith("exponential")) {
                     if (!backoffPolicyStr.contains("(")) {
@@ -466,7 +469,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                         int paramEndPos = backoffPolicyStr.indexOf(")", paramStartPos);
                         int paramSeparatorPos = backoffPolicyStr.indexOf(",", paramStartPos);
                         TimeValue delay = TimeValue.parseTimeValue(backoffPolicyStr.substring(paramStartPos, paramSeparatorPos), new TimeValue(5, TimeUnit.SECONDS), BULK_PROCESSOR_BACKOFF_POLICY);
-                        int maxNumberOfRetries = Integer.parseInt(backoffPolicyStr.substring(paramSeparatorPos+1, paramEndPos));
+                        int maxNumberOfRetries = Integer.parseInt(backoffPolicyStr.substring(paramSeparatorPos + 1, paramEndPos));
                         bulkProcessorBuilder.setBackoffPolicy(BackoffPolicy.exponentialBackoff(delay, maxNumberOfRetries));
                     }
                 }
@@ -551,8 +554,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
             if (!indexExists) {
                 logger.info("{} index doesn't exist yet, creating it...", monthlyIndexName);
 
-                Map<String,String> indexMappings = new HashMap<String,String>();
-                indexMappings.put("_default_",mappings.get("_default_"));
+                Map<String, String> indexMappings = new HashMap<String, String>();
+                indexMappings.put("_default_", mappings.get("_default_"));
                 for (Map.Entry<String, String> entry : mappings.entrySet()) {
                     if (itemsMonthlyIndexed.contains(entry.getKey())) {
                         indexMappings.put(entry.getKey(), entry.getValue());
@@ -749,6 +752,60 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
     }
 
     @Override
+    public boolean updateWithQueryAndScript(final Date dateHint, final Class<?> clazz, final String[] scripts, final Map<String, Object>[] scriptParams, final Condition[] conditions) {
+        return new InClassLoaderExecute<Boolean>() {
+            protected Boolean execute(Object... args) throws Exception {
+                try {
+                    String itemType = (String) clazz.getField("ITEM_TYPE").get(null);
+
+                    String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) :
+                            (itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndex(dateHint) : indexName);
+
+                    for (int i = 0; i < scripts.length; i++) {
+                        Script actualScript = new Script(ScriptType.INLINE, "painless", scripts[i], scriptParams[i]);
+
+                        client.admin().indices().prepareRefresh(index).get();
+
+                        UpdateByQueryRequestBuilder ubqrb = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
+                        ubqrb.source(index).source().setTypes(itemType);
+                        BulkIndexByScrollResponse response = ubqrb.setSlices(2)
+                                .setMaxRetries(1000).abortOnVersionConflict(false).script(actualScript)
+                                .filter(conditionESQueryBuilderDispatcher.buildFilter(conditions[i])).get();
+                        if (response.getBulkFailures().size() > 0) {
+                            for (BulkItemResponse.Failure failure : response.getBulkFailures()) {
+                                logger.error("Failure : cause={} , message={}", failure.getCause(), failure.getMessage());
+                            }
+                        } else {
+                            logger.info("Update By Query has processed {} in {}.", response.getUpdated(), response.getTook().toString());
+                        }
+                        if (response.isTimedOut()) {
+                            logger.error("Update By Query ended with timeout!");
+                        }
+                        if (response.getVersionConflicts() > 0) {
+                            logger.warn("Update By Query ended with {} Version Conflicts!", response.getVersionConflicts());
+                        }
+                        if (response.getNoops() > 0) {
+                            logger.warn("Update By Query ended with {} noops!", response.getNoops());
+                        }
+                    }
+                    return true;
+                } catch (IndexNotFoundException e) {
+                    throw new Exception("No index found for itemType=" + clazz.getName(), e);
+                } catch (NoSuchFieldException e) {
+                    throw new Exception("Error updating item ", e);
+                } catch (IllegalAccessException e) {
+                    throw new Exception("Error updating item ", e);
+                } catch (ScriptException e) {
+                    logger.error("Error in the update script : {}\n{}\n{}", e.getScript(), e.getDetailedMessage(), e.getScriptStack());
+                    throw new Exception("Error in the update script");
+                } finally {
+                    return false;
+                }
+            }
+        }.catchingExecuteInClassLoader(true);
+    }
+
+    @Override
     public boolean updateWithScript(final String itemId, final Date dateHint, final Class<?> clazz, final String script, final Map<String, Object> scriptParams) {
         return new InClassLoaderExecute<Boolean>() {
             protected Boolean execute(Object... args) throws Exception {
@@ -758,13 +815,15 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                     String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) :
                             (itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndex(dateHint) : indexName);
 
-                    Script actualScript = new Script(ScriptType.INLINE, "groovy", script, scriptParams);
+                    Script actualScript = new Script(ScriptType.INLINE, "painless", script, scriptParams);
+
                     if (bulkProcessor == null) {
                         client.prepareUpdate(index, itemType, itemId).setScript(actualScript)
                                 .execute()
                                 .actionGet();
                     } else {
-                        UpdateRequest updateRequest = client.prepareUpdate(index, itemType, itemId).setScript(actualScript).request();
+                        UpdateRequest updateRequest = client.prepareUpdate(index, itemType, itemId).setScript(actualScript).
+                                setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).request();
                         bulkProcessor.add(updateRequest);
                     }
                     return true;
@@ -852,8 +911,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                 IndicesExistsResponse indicesExistsResponse = client.admin().indices().prepareExists(indexName).execute().actionGet();
                 boolean indexExists = indicesExistsResponse.isExists();
                 if (!indexExists) {
-                    Map<String,String> indexMappings = new HashMap<String,String>();
-                    indexMappings.put("_default_",mappings.get("_default_"));
+                    Map<String, String> indexMappings = new HashMap<String, String>();
+                    indexMappings.put("_default_", mappings.get("_default_"));
                     for (Map.Entry<String, String> entry : mappings.entrySet()) {
                         if (indexNames.containsKey(entry.getKey()) && indexNames.get(entry.getKey()).equals(indexName)) {
                             indexMappings.put(entry.getKey(), entry.getValue());
@@ -880,7 +939,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
         }.catchingExecuteInClassLoader(true);
     }
 
-    private void internalCreateIndex(String indexName, Map<String,String> mappings) {
+    private void internalCreateIndex(String indexName, Map<String, String> mappings) {
         CreateIndexRequestBuilder builder = client.admin().indices().prepareCreate(indexName)
                 .setSettings("{\n" +
                         "    \"index\" : {\n" +
@@ -995,14 +1054,14 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
     }
 
     private String getPropertyNameWithData(String name, String itemType) {
-        Map<String,Object> propertyMapping = getPropertyMapping(name,itemType);
+        Map<String, Object> propertyMapping = getPropertyMapping(name, itemType);
         if (propertyMapping == null) {
             return null;
         }
         if (propertyMapping != null
                 && "text".equals(propertyMapping.get("type"))
                 && propertyMapping.containsKey("fields")
-                && ((Map)propertyMapping.get("fields")).containsKey("keyword")) {
+                && ((Map) propertyMapping.get("fields")).containsKey("keyword")) {
             name += ".keyword";
         }
         return name;
@@ -1107,12 +1166,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
 
     @Override
     public <T extends Item> PartialList<T> query(String fieldName, String fieldValue, String sortBy, Class<T> clazz, int offset, int size) {
-        return query(QueryBuilders.termQuery(fieldName, ConditionContextHelper.foldToASCII(fieldValue)), sortBy, clazz, offset, size, getRouting(fieldName, new String[]{fieldValue}, clazz), null);
+        return query(termQuery(fieldName, ConditionContextHelper.foldToASCII(fieldValue)), sortBy, clazz, offset, size, getRouting(fieldName, new String[]{fieldValue}, clazz), null);
     }
 
     @Override
     public <T extends Item> PartialList<T> queryFullText(String fieldName, String fieldValue, String fulltext, String sortBy, Class<T> clazz, int offset, int size) {
-        return query(QueryBuilders.boolQuery().must(QueryBuilders.queryStringQuery(fulltext).defaultField("_all")).must(QueryBuilders.termQuery(fieldName, fieldValue)), sortBy, clazz, offset, size, getRouting(fieldName, new String[]{fieldValue}, clazz), null);
+        return query(QueryBuilders.boolQuery().must(QueryBuilders.queryStringQuery(fulltext).defaultField("_all")).must(termQuery(fieldName, fieldValue)), sortBy, clazz, offset, size, getRouting(fieldName, new String[]{fieldValue}, clazz), null);
     }
 
     @Override
@@ -1207,7 +1266,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                                     requestBuilder = requestBuilder.addSort(distanceSortBuilder.order(SortOrder.ASC));
                                 }
                             } else {
-                                String name = getPropertyNameWithData(StringUtils.substringBeforeLast(sortByElement,":"), itemType);
+                                String name = getPropertyNameWithData(StringUtils.substringBeforeLast(sortByElement, ":"), itemType);
                                 if (name != null) {
                                     if (sortByElement.endsWith(":desc")) {
                                         requestBuilder = requestBuilder.addSort(name, SortOrder.DESC);
@@ -1451,7 +1510,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
     }
 
 
-
     @Override
     public void refresh() {
         new InClassLoaderExecute<Boolean>() {
@@ -1514,7 +1572,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
         new InClassLoaderExecute<Void>() {
             @Override
             protected Void execute(Object... args) {
-                QueryBuilder query = QueryBuilders.termQuery("scope", scope);
+                QueryBuilder query = termQuery("scope", scope);
 
                 BulkRequestBuilder deleteByScope = client.prepareBulk();
 
@@ -1610,6 +1668,14 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                 (itemsMonthlyIndexed.contains(itemType) ? indexName + "-*" : indexName);
     }
 
+    private String getConfig(Map<String, String> settings, String key,
+                             String defaultValue) {
+        if (settings != null && settings.get(key) != null) {
+            return settings.get(key);
+        }
+        return defaultValue;
+    }
+
     public abstract static class InClassLoaderExecute<T> {
 
         protected abstract T execute(Object... args) throws Exception;
@@ -1624,7 +1690,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
             }
         }
 
-        public T catchingExecuteInClassLoader( boolean logError, Object... args) {
+        public T catchingExecuteInClassLoader(boolean logError, Object... args) {
             try {
                 return executeInClassLoader(args);
             } catch (Exception e) {
@@ -1634,13 +1700,5 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
         }
     }
 
-    private String getConfig(Map<String,String> settings, String key,
-                             String defaultValue) {
-        if (settings != null && settings.get(key) != null) {
-            return settings.get(key);
-        }
-        return defaultValue;
-    }
-
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/2f687915/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
----------------------------------------------------------------------
diff --git a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
index 4e52a1c..12980e1 100644
--- a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
+++ b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
@@ -15,7 +15,7 @@
 # limitations under the License.
 #
 
-cluster.name=contextElasticSearch
+cluster.name=contextElasticSearch_amidani
 # The elasticSearchAddresses may be a comma seperated list of host names and ports such as
 # hostA:9300,hostB:9300
 # Note: the port number must be repeated for each host.
@@ -42,4 +42,4 @@ bulkProcessor.backoffPolicy=exponential
 # for each node in the ElasticSearch cluster:
 #   minimalElasticSearchVersion <= ElasticSearch node version < maximalElasticSearchVersion
 minimalElasticSearchVersion=5.0.0
-maximalElasticSearchVersion=5.2.0
\ No newline at end of file
+maximalElasticSearchVersion=5.3.0
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/2f687915/persistence-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/persistence-elasticsearch/pom.xml b/persistence-elasticsearch/pom.xml
index 5d25fd5..70a85ea 100644
--- a/persistence-elasticsearch/pom.xml
+++ b/persistence-elasticsearch/pom.xml
@@ -32,7 +32,6 @@
 
     <modules>
         <module>core</module>
-        <module>plugins</module>
     </modules>
 
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/2f687915/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
----------------------------------------------------------------------
diff --git a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
index a6b175f..1397659 100644
--- a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
+++ b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
@@ -116,6 +116,18 @@ public interface PersistenceService {
     boolean updateWithScript(String itemId, Date dateHint, Class<?> clazz, String script, Map<String, Object> scriptParams);
 
     /**
+     * Updates the items of the specified class by a query with a new property value for the specified property name based on a provided script.
+     *
+     * @param dateHint      a Date helping in identifying where the item is located
+     * @param clazz         the Item subclass of the item to update
+     * @param scripts       inline scripts array
+     * @param scriptParams  script params array
+     * @param conditions    conditions array
+     * @return {@code true} if the update was successful, {@code false} otherwise
+     */
+    boolean updateWithQueryAndScript(Date dateHint, Class<?> clazz, String[] scripts, Map<String, Object>[] scriptParams, Condition[] conditions);
+
+    /**
      * Retrieves the item identified with the specified identifier and with the specified Item subclass if it exists.
      *
      * @param <T>    the type of the Item subclass we want to retrieve

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/2f687915/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionESQueryBuilder.java
----------------------------------------------------------------------
diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionESQueryBuilder.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionESQueryBuilder.java
index e83bad2..b1aa234 100644
--- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionESQueryBuilder.java
+++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionESQueryBuilder.java
@@ -87,6 +87,9 @@ public class PropertyConditionESQueryBuilder implements ConditionESQueryBuilder
             case "contains":
                 checkRequiredValue(expectedValue, name, comparisonOperator, false);
                 return QueryBuilders.regexpQuery(name, ".*" + expectedValue + ".*");
+            case "notContains":
+                checkRequiredValue(expectedValue, name, comparisonOperator, false);
+                return QueryBuilders.boolQuery().mustNot(QueryBuilders.regexpQuery(name, ".*" + expectedValue + ".*"));
             case "startsWith":
                 checkRequiredValue(expectedValue, name, comparisonOperator, false);
                 return QueryBuilders.prefixQuery(name, expectedValue);

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/2f687915/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionEvaluator.java
----------------------------------------------------------------------
diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionEvaluator.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionEvaluator.java
index 1de3ae0..d7c2d2c 100644
--- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionEvaluator.java
+++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionEvaluator.java
@@ -22,7 +22,6 @@ import ognl.Ognl;
 import ognl.OgnlContext;
 import ognl.OgnlException;
 import ognl.enhance.ExpressionAccessor;
-import org.apache.commons.beanutils.BeanUtilsBean;
 import org.apache.commons.lang3.ObjectUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.unomi.api.Event;
@@ -52,8 +51,6 @@ public class PropertyConditionEvaluator implements ConditionEvaluator {
 
     private static final SimpleDateFormat yearMonthDayDateFormat = new SimpleDateFormat("yyyyMMdd");
 
-    private BeanUtilsBean beanUtilsBean = BeanUtilsBean.getInstance();
-
     private Map<String, Map<String, ExpressionAccessor>> expressionCache = new HashMap<>(64);
 
     private int compare(Object actualValue, String expectedValue, Object expectedValueDate, Object expectedValueInteger, Object expectedValueDateExpr) {

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/2f687915/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9a0872c..18bc4c1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,7 +70,7 @@
         <version.karaf>3.0.8</version.karaf>
         <version.karaf.cellar>3.0.3</version.karaf.cellar>
         <version.pax.exam>4.9.1</version.pax.exam>
-        <elasticsearch.version>5.1.1</elasticsearch.version>
+        <elasticsearch.version>5.2.2</elasticsearch.version>
 
         <maven.compiler.source>1.7</maven.compiler.source>
         <maven.compiler.target>1.7</maven.compiler.target>

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/2f687915/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
----------------------------------------------------------------------
diff --git a/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
index 770936d..4367b4d 100644
--- a/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
@@ -147,7 +147,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
     }
 
     private void cancelTimers() {
-        if(segmentTimer != null) {
+        if (segmentTimer != null) {
             segmentTimer.cancel();
         }
         logger.info("Segment purge: Purge unscheduled");
@@ -258,7 +258,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
     }
 
     private boolean checkSegmentDeletionImpact(Condition condition, String segmentToDeleteId) {
-        if(condition != null) {
+        if (condition != null) {
             @SuppressWarnings("unchecked")
             final List<Condition> subConditions = (List<Condition>) condition.getParameter("subConditions");
             if (subConditions != null) {
@@ -283,6 +283,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
      * Return an updated condition that do not contain a condition on the segmentId anymore
      * it's remove the unnecessary boolean condition (if a condition is the only one of a boolean the boolean will be remove and the subcondition returned)
      * it's return null when there is no more condition after (if the condition passed was only a segment condition on the segmentId)
+     *
      * @param condition the condition to update
      * @param segmentId the segment id to remove in the condition
      * @return updated condition
@@ -294,12 +295,12 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
             List<Condition> updatedSubConditions = new LinkedList<>();
             for (Condition subCondition : subConditions) {
                 Condition updatedCondition = updateSegmentDependentCondition(subCondition, segmentId);
-                if(updatedCondition != null) {
+                if (updatedCondition != null) {
                     updatedSubConditions.add(updatedCondition);
                 }
             }
-            if(!updatedSubConditions.isEmpty()){
-                if(updatedSubConditions.size() == 1) {
+            if (!updatedSubConditions.isEmpty()) {
+                if (updatedSubConditions.size() == 1) {
                     return updatedSubConditions.get(0);
                 } else {
                     condition.setParameter("subConditions", updatedSubConditions);
@@ -308,12 +309,12 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
             } else {
                 return null;
             }
-        } else if("profileSegmentCondition".equals(condition.getConditionTypeId())) {
+        } else if ("profileSegmentCondition".equals(condition.getConditionTypeId())) {
             @SuppressWarnings("unchecked")
             final List<String> referencedSegmentIds = (List<String>) condition.getParameter("segments");
             if (referencedSegmentIds.indexOf(segmentId) >= 0) {
                 referencedSegmentIds.remove(segmentId);
-                if(referencedSegmentIds.isEmpty()) {
+                if (referencedSegmentIds.isEmpty()) {
                     return null;
                 } else {
                     condition.setParameter("segments", referencedSegmentIds);
@@ -379,7 +380,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
             for (Segment segment : impactedSegments) {
                 Condition updatedCondition = updateSegmentDependentCondition(segment.getCondition(), segmentId);
                 segment.setCondition(updatedCondition);
-                if(updatedCondition == null) {
+                if (updatedCondition == null) {
                     clearAutoGeneratedRules(persistenceService.query("linkedItems", segment.getMetadata().getId(), null, Rule.class), segment.getMetadata().getId());
                     segment.getMetadata().setEnabled(false);
                 }
@@ -455,7 +456,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
 
     public SegmentsAndScores getSegmentsAndScoresForProfile(Profile profile) {
         Set<String> segments = new HashSet<String>();
-        Map<String,Integer> scores = new HashMap<String, Integer>();
+        Map<String, Integer> scores = new HashMap<String, Integer>();
 
         List<Segment> allSegments = this.allSegments;
         for (Segment segment : allSegments) {
@@ -542,18 +543,18 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
 
         persistenceService.createMapping(Profile.ITEM_TYPE, String.format(
                 "{\n" +
-                "    \"profile\": {\n" +
-                "        \"properties\" : {\n" +
-                "            \"scores\": {\n" +
-                "                \"properties\": {\n" +
-                "                    \"%s\": {\n" +
-                "                        \"type\": \"long\"\n" +
-                "                    }\n" +
-                "                }\n" +
-                "            }\n" +
-                "        }\n" +
-                "    }\n" +
-                "}\n", scoring.getItemId()));
+                        "    \"profile\": {\n" +
+                        "        \"properties\" : {\n" +
+                        "            \"scores\": {\n" +
+                        "                \"properties\": {\n" +
+                        "                    \"%s\": {\n" +
+                        "                        \"type\": \"long\"\n" +
+                        "                    }\n" +
+                        "                }\n" +
+                        "            }\n" +
+                        "        }\n" +
+                        "    }\n" +
+                        "}\n", scoring.getItemId()));
 
         updateExistingProfilesForScoring(scoring);
     }
@@ -571,7 +572,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
     }
 
     private boolean checkScoringDeletionImpact(Condition condition, String scoringToDeleteId) {
-        if(condition != null) {
+        if (condition != null) {
             @SuppressWarnings("unchecked")
             final List<Condition> subConditions = (List<Condition>) condition.getParameter("subConditions");
             if (subConditions != null) {
@@ -593,6 +594,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
      * Return an updated condition that do not contain a condition on the scoringId anymore
      * it's remove the unnecessary boolean condition (if a condition is the only one of a boolean the boolean will be remove and the subcondition returned)
      * it's return null when there is no more condition after (if the condition passed was only a scoring condition on the scoringId)
+     *
      * @param condition the condition to update
      * @param scoringId the scoring id to remove in the condition
      * @return updated condition
@@ -604,12 +606,12 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
             List<Condition> updatedSubConditions = new LinkedList<>();
             for (Condition subCondition : subConditions) {
                 Condition updatedCondition = updateScoringDependentCondition(subCondition, scoringId);
-                if(updatedCondition != null) {
+                if (updatedCondition != null) {
                     updatedSubConditions.add(updatedCondition);
                 }
             }
-            if(!updatedSubConditions.isEmpty()){
-                if(updatedSubConditions.size() == 1) {
+            if (!updatedSubConditions.isEmpty()) {
+                if (updatedSubConditions.size() == 1) {
                     return updatedSubConditions.get(0);
                 } else {
                     condition.setParameter("subConditions", updatedSubConditions);
@@ -671,7 +673,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
             for (Segment segment : impactedSegments) {
                 Condition updatedCondition = updateScoringDependentCondition(segment.getCondition(), scoringId);
                 segment.setCondition(updatedCondition);
-                if(updatedCondition == null) {
+                if (updatedCondition == null) {
                     clearAutoGeneratedRules(persistenceService.query("linkedItems", segment.getMetadata().getId(), null, Rule.class), segment.getMetadata().getId());
                     segment.getMetadata().setEnabled(false);
                 }
@@ -741,8 +743,8 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
         Set<String> tags = condition.getConditionType().getMetadata().getTags();
         if (tags.contains("eventCondition") && !tags.contains("profileCondition")) {
             try {
-                Map<String,Object> m = new HashMap<>(3);
-                m.put("scope",metadata.getScope());
+                Map<String, Object> m = new HashMap<>(3);
+                m.put("scope", metadata.getScope());
                 m.put("condition", condition);
                 m.put("numberOfDays", parentCondition.getParameter("numberOfDays"));
                 String key = CustomObjectMapper.getObjectMapper().writeValueAsString(m);
@@ -750,7 +752,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
                 parentCondition.setParameter("generatedPropertyKey", key);
                 Rule rule = rulesService.getRule(key);
                 if (rule == null) {
-                    rule = new Rule(new Metadata(metadata.getScope(), key, "Auto generated rule for "+metadata.getName(), ""));
+                    rule = new Rule(new Metadata(metadata.getScope(), key, "Auto generated rule for " + metadata.getName(), ""));
                     rule.setCondition(condition);
                     rule.getMetadata().setHidden(true);
                     final Action action = new Action();
@@ -809,10 +811,10 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
         for (Map.Entry<String, Long> entry : eventCountByProfile.entrySet()) {
             String profileId = entry.getKey();
             if (!profileId.startsWith("_")) {
-                Map<String,Long> pastEventCounts = new HashMap<>();
+                Map<String, Long> pastEventCounts = new HashMap<>();
                 pastEventCounts.put(propertyKey, entry.getValue());
-                Map<String,Object> systemProperties = new HashMap<>();
-                systemProperties.put("pastEvents",pastEventCounts);
+                Map<String, Object> systemProperties = new HashMap<>();
+                systemProperties.put("pastEvents", pastEventCounts);
                 try {
                     persistenceService.update(profileId, null, Profile.class, "systemProperties", systemProperties);
                 } catch (Exception e) {
@@ -821,7 +823,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
             }
         }
 
-        logger.info("Profiles past condition updated in {}ms", System.currentTimeMillis()-t);
+        logger.info("Profiles past condition updated in {}ms", System.currentTimeMillis() - t);
     }
 
     private void updateExistingProfilesForSegment(Segment segment) {
@@ -835,7 +837,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
         segmentCondition.setParameter("comparisonOperator", "equals");
         segmentCondition.setParameter("propertyValue", segment.getItemId());
 
-        if(segment.getMetadata().isEnabled()) {
+        if (segment.getMetadata().isEnabled()) {
 
             ConditionType booleanConditionType = definitionsService.getConditionType("booleanCondition");
             ConditionType notConditionType = definitionsService.getConditionType("notCondition");
@@ -904,7 +906,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
                 }
             }
         }
-        logger.info("{} profiles updated in {}ms", updatedProfileCount, System.currentTimeMillis()-t);
+        logger.info("{} profiles updated in {}ms", updatedProfileCount, System.currentTimeMillis() - t);
     }
 
     private void updateExistingProfilesForScoring(Scoring scoring) {
@@ -913,32 +915,31 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
         scoringCondition.setConditionType(definitionsService.getConditionType("profilePropertyCondition"));
         scoringCondition.setParameter("propertyName", "scores." + scoring.getItemId());
         scoringCondition.setParameter("comparisonOperator", "exists");
-        List<Profile> previousProfiles = persistenceService.query(scoringCondition, null, Profile.class);
 
-        HashMap<String, Object> scriptParams = new HashMap<>();
-        scriptParams.put("scoringId", scoring.getItemId());
+        String[] scripts = new String[scoring.getElements().size() + 1];
+        HashMap<String, Object>[] scriptParams = new HashMap[scoring.getElements().size() + 1];
+        Condition[] conditions = new Condition[scoring.getElements().size() + 1];
 
-        for (Profile profileToRemove : previousProfiles) {
-            persistenceService.updateWithScript(profileToRemove.getItemId(), null, Profile.class, "if (ctx._source.systemProperties.scoreModifiers == null) { ctx._source.systemProperties.scoreModifiers=[:] } ; if (ctx._source.systemProperties.scoreModifiers.containsKey(scoringId)) { ctx._source.scores[scoringId] = ctx._source.systemProperties.scoreModifiers[scoringId] } else { ctx._source.scores.remove(scoringId) }", scriptParams);
-        }
-        if(scoring.getMetadata().isEnabled()) {
-            String script = "if (ctx._source.scores == null) { ctx._source.scores=[:] } ; if (ctx._source.scores.containsKey(scoringId)) { ctx._source.scores[scoringId] += scoringValue } else { ctx._source.scores[scoringId] = scoringValue }";
-            Map<String, Event> updatedProfiles = new HashMap<>();
+        scriptParams[0] = new HashMap<String, Object>();
+        scriptParams[0].put("scoringId", scoring.getItemId());
+        scripts[0] = "if( ctx._source.containsKey(\"systemProperties\") && ctx._source.systemProperties.containsKey(\"scoreModifiers\") && ctx._source.systemProperties.scoreModifiers.containsKey(params.scoringId) ) { ctx._source.scores.put(params.scoringId, ctx._source.systemProperties.scoreModifiers.get(params.scoringId)) } else { ctx._source.scores.remove(params.scoringId) }";
+        conditions[0] = scoringCondition;
+
+        if (scoring.getMetadata().isEnabled()) {
+            String scriptToAdd = "if( !ctx._source.containsKey(\"scores\") ){ ctx._source.put(\"scores\", [:])} if( ctx._source.scores.containsKey(params.scoringId) ) { ctx._source.scores.put(params.scoringId, ctx._source.scores.get(params.scoringId)+params.scoringValue) } else { ctx._source.scores.put(params.scoringId, params.scoringValue) }";
+            int idx = 1;
             for (ScoringElement element : scoring.getElements()) {
-                scriptParams.put("scoringValue", element.getValue());
-                for (Profile p : persistenceService.query(element.getCondition(), null, Profile.class)) {
-                    persistenceService.updateWithScript(p.getItemId(), null, Profile.class, script, scriptParams);
-                    Event profileUpdated = new Event("profileUpdated", null, p, null, null, p, new Date());
-                    profileUpdated.setPersistent(false);
-                    updatedProfiles.put(p.getItemId(), profileUpdated);
-                }
-            }
-            Iterator<Map.Entry<String, Event>> entries = updatedProfiles.entrySet().iterator();
-            while (entries.hasNext()) {
-                eventService.send(entries.next().getValue());
+                scriptParams[idx] = new HashMap<>();
+                scriptParams[idx].put("scoringId", scoring.getItemId());
+                scriptParams[idx].put("scoringValue", element.getValue());
+                scripts[idx] = scriptToAdd;
+                conditions[idx] = element.getCondition();
+                idx++;
             }
         }
-        logger.info("Profiles updated in {}ms", System.currentTimeMillis()-t);
+
+        persistenceService.updateWithQueryAndScript(null, Profile.class, scripts, scriptParams, conditions);
+        logger.info("Profiles updated in {}ms", System.currentTimeMillis() - t);
     }
 
     private void updateExistingProfilesForRemovedScoring(String scoringId) {
@@ -955,7 +956,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
         for (Profile profileToRemove : previousProfiles) {
             persistenceService.updateWithScript(profileToRemove.getItemId(), null, Profile.class, "ctx._source.scores.remove(scoringId)", scriptParams);
         }
-        logger.info("Profiles updated in {}ms", System.currentTimeMillis()-t);
+        logger.info("Profiles updated in {}ms", System.currentTimeMillis() - t);
     }
 
     private String getMD5(String md5) {
@@ -976,7 +977,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
         switch (event.getType()) {
             case BundleEvent.STARTED:
                 processBundleStartup(event.getBundle().getBundleContext());
-               break;
+                break;
             case BundleEvent.STOPPING:
                 processBundleStop(event.getBundle().getBundleContext());
                 break;
@@ -1034,7 +1035,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
     }
 
     private <T extends MetadataItem> PartialList<Metadata> getMetadatas(Query query, Class<T> clazz) {
-        if(query.isForceRefresh()){
+        if (query.isForceRefresh()) {
             persistenceService.refresh();
         }
         definitionsService.resolveConditionType(query.getCondition());