You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by am...@apache.org on 2017/03/20 12:34:50 UTC
incubator-unomi git commit: UNOMI-87 : Rewrite the queries for the
scroring plans update
Repository: incubator-unomi
Updated Branches:
refs/heads/master cd3bc9ff0 -> 2f687915a
UNOMI-87 : Rewrite the queries for the scroring plans update
Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/2f687915
Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/2f687915
Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/2f687915
Branch: refs/heads/master
Commit: 2f687915a8929af291e43ee2536ba4208e752496
Parents: cd3bc9f
Author: Abdelkader Midani <am...@apache.org>
Authored: Mon Mar 20 12:09:36 2017 +0100
Committer: Abdelkader Midani <am...@apache.org>
Committed: Mon Mar 20 12:09:42 2017 +0100
----------------------------------------------------------------------
.../ElasticSearchPersistenceServiceImpl.java | 124 ++++++++++++++-----
...g.apache.unomi.persistence.elasticsearch.cfg | 4 +-
persistence-elasticsearch/pom.xml | 1 -
.../persistence/spi/PersistenceService.java | 12 ++
.../PropertyConditionESQueryBuilder.java | 3 +
.../conditions/PropertyConditionEvaluator.java | 3 -
pom.xml | 2 +-
.../services/services/SegmentServiceImpl.java | 119 +++++++++---------
8 files changed, 169 insertions(+), 99 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/2f687915/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
----------------------------------------------------------------------
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index e2d1645..4e66418 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -56,7 +56,11 @@ import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
+import org.elasticsearch.index.reindex.BulkIndexByScrollResponse;
+import org.elasticsearch.index.reindex.UpdateByQueryAction;
+import org.elasticsearch.index.reindex.UpdateByQueryRequestBuilder;
import org.elasticsearch.script.Script;
+import org.elasticsearch.script.ScriptException;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
@@ -92,22 +96,21 @@ import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import static org.elasticsearch.index.query.QueryBuilders.termQuery;
+
@SuppressWarnings("rawtypes")
public class ElasticSearchPersistenceServiceImpl implements PersistenceService, SynchronousBundleListener {
- private static final Logger logger = LoggerFactory.getLogger(ElasticSearchPersistenceServiceImpl.class.getName());
-
public static final String NUMBER_OF_SHARDS = "number_of_shards";
public static final String NUMBER_OF_REPLICAS = "number_of_replicas";
public static final String CLUSTER_NAME = "cluster.name";
-
public static final String BULK_PROCESSOR_NAME = "bulkProcessor.name";
public static final String BULK_PROCESSOR_CONCURRENT_REQUESTS = "bulkProcessor.concurrentRequests";
public static final String BULK_PROCESSOR_BULK_ACTIONS = "bulkProcessor.bulkActions";
public static final String BULK_PROCESSOR_BULK_SIZE = "bulkProcessor.bulkSize";
public static final String BULK_PROCESSOR_FLUSH_INTERVAL = "bulkProcessor.flushInterval";
public static final String BULK_PROCESSOR_BACKOFF_POLICY = "bulkProcessor.backoffPolicy";
-
+ private static final Logger logger = LoggerFactory.getLogger(ElasticSearchPersistenceServiceImpl.class.getName());
private TransportClient client;
private BulkProcessor bulkProcessor;
private String elasticSearchAddresses;
@@ -123,7 +126,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
private ConditionEvaluatorDispatcher conditionEvaluatorDispatcher;
private ConditionESQueryBuilderDispatcher conditionESQueryBuilderDispatcher;
- private Map<String,String> indexNames;
+ private Map<String, String> indexNames;
private List<String> itemsMonthlyIndexed;
private Map<String, String> routingByType;
private Set<String> existingIndexNames = new TreeSet<String>();
@@ -135,7 +138,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
private String bulkProcessorName = "unomi-bulk";
private String bulkProcessorConcurrentRequests = "1";
private String bulkProcessorBulkActions = "1000";
- private String bulkProcessorBulkSize= "5MB";
+ private String bulkProcessorBulkSize = "5MB";
private String bulkProcessorFlushInterval = "5s";
private String bulkProcessorBackoffPolicy = "exponential";
@@ -306,8 +309,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
if (!indexExists) {
logger.info("{} index doesn't exist yet, creating it...", indexName);
- Map<String,String> indexMappings = new HashMap<String,String>();
- indexMappings.put("_default_",mappings.get("_default_"));
+ Map<String, String> indexMappings = new HashMap<String, String>();
+ indexMappings.put("_default_", mappings.get("_default_"));
for (Map.Entry<String, String> entry : mappings.entrySet()) {
if (!itemsMonthlyIndexed.contains(entry.getKey()) && !indexNames.containsKey(entry.getKey())) {
indexMappings.put(entry.getKey(), entry.getValue());
@@ -455,7 +458,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
int paramEndPos = backoffPolicyStr.indexOf(")", paramStartPos);
int paramSeparatorPos = backoffPolicyStr.indexOf(",", paramStartPos);
TimeValue delay = TimeValue.parseTimeValue(backoffPolicyStr.substring(paramStartPos, paramSeparatorPos), new TimeValue(5, TimeUnit.SECONDS), BULK_PROCESSOR_BACKOFF_POLICY);
- int maxNumberOfRetries = Integer.parseInt(backoffPolicyStr.substring(paramSeparatorPos+1, paramEndPos));
+ int maxNumberOfRetries = Integer.parseInt(backoffPolicyStr.substring(paramSeparatorPos + 1, paramEndPos));
bulkProcessorBuilder.setBackoffPolicy(BackoffPolicy.constantBackoff(delay, maxNumberOfRetries));
} else if (backoffPolicyStr.startsWith("exponential")) {
if (!backoffPolicyStr.contains("(")) {
@@ -466,7 +469,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
int paramEndPos = backoffPolicyStr.indexOf(")", paramStartPos);
int paramSeparatorPos = backoffPolicyStr.indexOf(",", paramStartPos);
TimeValue delay = TimeValue.parseTimeValue(backoffPolicyStr.substring(paramStartPos, paramSeparatorPos), new TimeValue(5, TimeUnit.SECONDS), BULK_PROCESSOR_BACKOFF_POLICY);
- int maxNumberOfRetries = Integer.parseInt(backoffPolicyStr.substring(paramSeparatorPos+1, paramEndPos));
+ int maxNumberOfRetries = Integer.parseInt(backoffPolicyStr.substring(paramSeparatorPos + 1, paramEndPos));
bulkProcessorBuilder.setBackoffPolicy(BackoffPolicy.exponentialBackoff(delay, maxNumberOfRetries));
}
}
@@ -551,8 +554,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
if (!indexExists) {
logger.info("{} index doesn't exist yet, creating it...", monthlyIndexName);
- Map<String,String> indexMappings = new HashMap<String,String>();
- indexMappings.put("_default_",mappings.get("_default_"));
+ Map<String, String> indexMappings = new HashMap<String, String>();
+ indexMappings.put("_default_", mappings.get("_default_"));
for (Map.Entry<String, String> entry : mappings.entrySet()) {
if (itemsMonthlyIndexed.contains(entry.getKey())) {
indexMappings.put(entry.getKey(), entry.getValue());
@@ -749,6 +752,60 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
@Override
+ public boolean updateWithQueryAndScript(final Date dateHint, final Class<?> clazz, final String[] scripts, final Map<String, Object>[] scriptParams, final Condition[] conditions) {
+ return new InClassLoaderExecute<Boolean>() {
+ protected Boolean execute(Object... args) throws Exception {
+ try {
+ String itemType = (String) clazz.getField("ITEM_TYPE").get(null);
+
+ String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) :
+ (itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndex(dateHint) : indexName);
+
+ for (int i = 0; i < scripts.length; i++) {
+ Script actualScript = new Script(ScriptType.INLINE, "painless", scripts[i], scriptParams[i]);
+
+ client.admin().indices().prepareRefresh(index).get();
+
+ UpdateByQueryRequestBuilder ubqrb = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
+ ubqrb.source(index).source().setTypes(itemType);
+ BulkIndexByScrollResponse response = ubqrb.setSlices(2)
+ .setMaxRetries(1000).abortOnVersionConflict(false).script(actualScript)
+ .filter(conditionESQueryBuilderDispatcher.buildFilter(conditions[i])).get();
+ if (response.getBulkFailures().size() > 0) {
+ for (BulkItemResponse.Failure failure : response.getBulkFailures()) {
+ logger.error("Failure : cause={} , message={}", failure.getCause(), failure.getMessage());
+ }
+ } else {
+ logger.info("Update By Query has processed {} in {}.", response.getUpdated(), response.getTook().toString());
+ }
+ if (response.isTimedOut()) {
+ logger.error("Update By Query ended with timeout!");
+ }
+ if (response.getVersionConflicts() > 0) {
+ logger.warn("Update By Query ended with {} Version Conflicts!", response.getVersionConflicts());
+ }
+ if (response.getNoops() > 0) {
+ logger.warn("Update By Query ended with {} noops!", response.getNoops());
+ }
+ }
+ return true;
+ } catch (IndexNotFoundException e) {
+ throw new Exception("No index found for itemType=" + clazz.getName(), e);
+ } catch (NoSuchFieldException e) {
+ throw new Exception("Error updating item ", e);
+ } catch (IllegalAccessException e) {
+ throw new Exception("Error updating item ", e);
+ } catch (ScriptException e) {
+ logger.error("Error in the update script : {}\n{}\n{}", e.getScript(), e.getDetailedMessage(), e.getScriptStack());
+ throw new Exception("Error in the update script");
+ } finally {
+ return false;
+ }
+ }
+ }.catchingExecuteInClassLoader(true);
+ }
+
+ @Override
public boolean updateWithScript(final String itemId, final Date dateHint, final Class<?> clazz, final String script, final Map<String, Object> scriptParams) {
return new InClassLoaderExecute<Boolean>() {
protected Boolean execute(Object... args) throws Exception {
@@ -758,13 +815,15 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) :
(itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndex(dateHint) : indexName);
- Script actualScript = new Script(ScriptType.INLINE, "groovy", script, scriptParams);
+ Script actualScript = new Script(ScriptType.INLINE, "painless", script, scriptParams);
+
if (bulkProcessor == null) {
client.prepareUpdate(index, itemType, itemId).setScript(actualScript)
.execute()
.actionGet();
} else {
- UpdateRequest updateRequest = client.prepareUpdate(index, itemType, itemId).setScript(actualScript).request();
+ UpdateRequest updateRequest = client.prepareUpdate(index, itemType, itemId).setScript(actualScript).
+ setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).request();
bulkProcessor.add(updateRequest);
}
return true;
@@ -852,8 +911,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
IndicesExistsResponse indicesExistsResponse = client.admin().indices().prepareExists(indexName).execute().actionGet();
boolean indexExists = indicesExistsResponse.isExists();
if (!indexExists) {
- Map<String,String> indexMappings = new HashMap<String,String>();
- indexMappings.put("_default_",mappings.get("_default_"));
+ Map<String, String> indexMappings = new HashMap<String, String>();
+ indexMappings.put("_default_", mappings.get("_default_"));
for (Map.Entry<String, String> entry : mappings.entrySet()) {
if (indexNames.containsKey(entry.getKey()) && indexNames.get(entry.getKey()).equals(indexName)) {
indexMappings.put(entry.getKey(), entry.getValue());
@@ -880,7 +939,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}.catchingExecuteInClassLoader(true);
}
- private void internalCreateIndex(String indexName, Map<String,String> mappings) {
+ private void internalCreateIndex(String indexName, Map<String, String> mappings) {
CreateIndexRequestBuilder builder = client.admin().indices().prepareCreate(indexName)
.setSettings("{\n" +
" \"index\" : {\n" +
@@ -995,14 +1054,14 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
private String getPropertyNameWithData(String name, String itemType) {
- Map<String,Object> propertyMapping = getPropertyMapping(name,itemType);
+ Map<String, Object> propertyMapping = getPropertyMapping(name, itemType);
if (propertyMapping == null) {
return null;
}
if (propertyMapping != null
&& "text".equals(propertyMapping.get("type"))
&& propertyMapping.containsKey("fields")
- && ((Map)propertyMapping.get("fields")).containsKey("keyword")) {
+ && ((Map) propertyMapping.get("fields")).containsKey("keyword")) {
name += ".keyword";
}
return name;
@@ -1107,12 +1166,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
@Override
public <T extends Item> PartialList<T> query(String fieldName, String fieldValue, String sortBy, Class<T> clazz, int offset, int size) {
- return query(QueryBuilders.termQuery(fieldName, ConditionContextHelper.foldToASCII(fieldValue)), sortBy, clazz, offset, size, getRouting(fieldName, new String[]{fieldValue}, clazz), null);
+ return query(termQuery(fieldName, ConditionContextHelper.foldToASCII(fieldValue)), sortBy, clazz, offset, size, getRouting(fieldName, new String[]{fieldValue}, clazz), null);
}
@Override
public <T extends Item> PartialList<T> queryFullText(String fieldName, String fieldValue, String fulltext, String sortBy, Class<T> clazz, int offset, int size) {
- return query(QueryBuilders.boolQuery().must(QueryBuilders.queryStringQuery(fulltext).defaultField("_all")).must(QueryBuilders.termQuery(fieldName, fieldValue)), sortBy, clazz, offset, size, getRouting(fieldName, new String[]{fieldValue}, clazz), null);
+ return query(QueryBuilders.boolQuery().must(QueryBuilders.queryStringQuery(fulltext).defaultField("_all")).must(termQuery(fieldName, fieldValue)), sortBy, clazz, offset, size, getRouting(fieldName, new String[]{fieldValue}, clazz), null);
}
@Override
@@ -1207,7 +1266,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
requestBuilder = requestBuilder.addSort(distanceSortBuilder.order(SortOrder.ASC));
}
} else {
- String name = getPropertyNameWithData(StringUtils.substringBeforeLast(sortByElement,":"), itemType);
+ String name = getPropertyNameWithData(StringUtils.substringBeforeLast(sortByElement, ":"), itemType);
if (name != null) {
if (sortByElement.endsWith(":desc")) {
requestBuilder = requestBuilder.addSort(name, SortOrder.DESC);
@@ -1451,7 +1510,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
-
@Override
public void refresh() {
new InClassLoaderExecute<Boolean>() {
@@ -1514,7 +1572,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
new InClassLoaderExecute<Void>() {
@Override
protected Void execute(Object... args) {
- QueryBuilder query = QueryBuilders.termQuery("scope", scope);
+ QueryBuilder query = termQuery("scope", scope);
BulkRequestBuilder deleteByScope = client.prepareBulk();
@@ -1610,6 +1668,14 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
(itemsMonthlyIndexed.contains(itemType) ? indexName + "-*" : indexName);
}
+ private String getConfig(Map<String, String> settings, String key,
+ String defaultValue) {
+ if (settings != null && settings.get(key) != null) {
+ return settings.get(key);
+ }
+ return defaultValue;
+ }
+
public abstract static class InClassLoaderExecute<T> {
protected abstract T execute(Object... args) throws Exception;
@@ -1624,7 +1690,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
}
- public T catchingExecuteInClassLoader( boolean logError, Object... args) {
+ public T catchingExecuteInClassLoader(boolean logError, Object... args) {
try {
return executeInClassLoader(args);
} catch (Exception e) {
@@ -1634,13 +1700,5 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
}
- private String getConfig(Map<String,String> settings, String key,
- String defaultValue) {
- if (settings != null && settings.get(key) != null) {
- return settings.get(key);
- }
- return defaultValue;
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/2f687915/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
----------------------------------------------------------------------
diff --git a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
index 4e52a1c..12980e1 100644
--- a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
+++ b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
@@ -15,7 +15,7 @@
# limitations under the License.
#
-cluster.name=contextElasticSearch
+cluster.name=contextElasticSearch_amidani
# The elasticSearchAddresses may be a comma seperated list of host names and ports such as
# hostA:9300,hostB:9300
# Note: the port number must be repeated for each host.
@@ -42,4 +42,4 @@ bulkProcessor.backoffPolicy=exponential
# for each node in the ElasticSearch cluster:
# minimalElasticSearchVersion <= ElasticSearch node version < maximalElasticSearchVersion
minimalElasticSearchVersion=5.0.0
-maximalElasticSearchVersion=5.2.0
\ No newline at end of file
+maximalElasticSearchVersion=5.3.0
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/2f687915/persistence-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/persistence-elasticsearch/pom.xml b/persistence-elasticsearch/pom.xml
index 5d25fd5..70a85ea 100644
--- a/persistence-elasticsearch/pom.xml
+++ b/persistence-elasticsearch/pom.xml
@@ -32,7 +32,6 @@
<modules>
<module>core</module>
- <module>plugins</module>
</modules>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/2f687915/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
----------------------------------------------------------------------
diff --git a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
index a6b175f..1397659 100644
--- a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
+++ b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
@@ -116,6 +116,18 @@ public interface PersistenceService {
boolean updateWithScript(String itemId, Date dateHint, Class<?> clazz, String script, Map<String, Object> scriptParams);
/**
+ * Updates the items of the specified class by a query with a new property value for the specified property name based on a provided script.
+ *
+ * @param dateHint a Date helping in identifying where the item is located
+ * @param clazz the Item subclass of the item to update
+ * @param scripts inline scripts array
+ * @param scriptParams script params array
+ * @param conditions conditions array
+ * @return {@code true} if the update was successful, {@code false} otherwise
+ */
+ boolean updateWithQueryAndScript(Date dateHint, Class<?> clazz, String[] scripts, Map<String, Object>[] scriptParams, Condition[] conditions);
+
+ /**
* Retrieves the item identified with the specified identifier and with the specified Item subclass if it exists.
*
* @param <T> the type of the Item subclass we want to retrieve
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/2f687915/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionESQueryBuilder.java
----------------------------------------------------------------------
diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionESQueryBuilder.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionESQueryBuilder.java
index e83bad2..b1aa234 100644
--- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionESQueryBuilder.java
+++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionESQueryBuilder.java
@@ -87,6 +87,9 @@ public class PropertyConditionESQueryBuilder implements ConditionESQueryBuilder
case "contains":
checkRequiredValue(expectedValue, name, comparisonOperator, false);
return QueryBuilders.regexpQuery(name, ".*" + expectedValue + ".*");
+ case "notContains":
+ checkRequiredValue(expectedValue, name, comparisonOperator, false);
+ return QueryBuilders.boolQuery().mustNot(QueryBuilders.regexpQuery(name, ".*" + expectedValue + ".*"));
case "startsWith":
checkRequiredValue(expectedValue, name, comparisonOperator, false);
return QueryBuilders.prefixQuery(name, expectedValue);
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/2f687915/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionEvaluator.java
----------------------------------------------------------------------
diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionEvaluator.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionEvaluator.java
index 1de3ae0..d7c2d2c 100644
--- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionEvaluator.java
+++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionEvaluator.java
@@ -22,7 +22,6 @@ import ognl.Ognl;
import ognl.OgnlContext;
import ognl.OgnlException;
import ognl.enhance.ExpressionAccessor;
-import org.apache.commons.beanutils.BeanUtilsBean;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.unomi.api.Event;
@@ -52,8 +51,6 @@ public class PropertyConditionEvaluator implements ConditionEvaluator {
private static final SimpleDateFormat yearMonthDayDateFormat = new SimpleDateFormat("yyyyMMdd");
- private BeanUtilsBean beanUtilsBean = BeanUtilsBean.getInstance();
-
private Map<String, Map<String, ExpressionAccessor>> expressionCache = new HashMap<>(64);
private int compare(Object actualValue, String expectedValue, Object expectedValueDate, Object expectedValueInteger, Object expectedValueDateExpr) {
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/2f687915/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9a0872c..18bc4c1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,7 +70,7 @@
<version.karaf>3.0.8</version.karaf>
<version.karaf.cellar>3.0.3</version.karaf.cellar>
<version.pax.exam>4.9.1</version.pax.exam>
- <elasticsearch.version>5.1.1</elasticsearch.version>
+ <elasticsearch.version>5.2.2</elasticsearch.version>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/2f687915/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
----------------------------------------------------------------------
diff --git a/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
index 770936d..4367b4d 100644
--- a/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
@@ -147,7 +147,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
}
private void cancelTimers() {
- if(segmentTimer != null) {
+ if (segmentTimer != null) {
segmentTimer.cancel();
}
logger.info("Segment purge: Purge unscheduled");
@@ -258,7 +258,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
}
private boolean checkSegmentDeletionImpact(Condition condition, String segmentToDeleteId) {
- if(condition != null) {
+ if (condition != null) {
@SuppressWarnings("unchecked")
final List<Condition> subConditions = (List<Condition>) condition.getParameter("subConditions");
if (subConditions != null) {
@@ -283,6 +283,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
* Return an updated condition that do not contain a condition on the segmentId anymore
* it's remove the unnecessary boolean condition (if a condition is the only one of a boolean the boolean will be remove and the subcondition returned)
* it's return null when there is no more condition after (if the condition passed was only a segment condition on the segmentId)
+ *
* @param condition the condition to update
* @param segmentId the segment id to remove in the condition
* @return updated condition
@@ -294,12 +295,12 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
List<Condition> updatedSubConditions = new LinkedList<>();
for (Condition subCondition : subConditions) {
Condition updatedCondition = updateSegmentDependentCondition(subCondition, segmentId);
- if(updatedCondition != null) {
+ if (updatedCondition != null) {
updatedSubConditions.add(updatedCondition);
}
}
- if(!updatedSubConditions.isEmpty()){
- if(updatedSubConditions.size() == 1) {
+ if (!updatedSubConditions.isEmpty()) {
+ if (updatedSubConditions.size() == 1) {
return updatedSubConditions.get(0);
} else {
condition.setParameter("subConditions", updatedSubConditions);
@@ -308,12 +309,12 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
} else {
return null;
}
- } else if("profileSegmentCondition".equals(condition.getConditionTypeId())) {
+ } else if ("profileSegmentCondition".equals(condition.getConditionTypeId())) {
@SuppressWarnings("unchecked")
final List<String> referencedSegmentIds = (List<String>) condition.getParameter("segments");
if (referencedSegmentIds.indexOf(segmentId) >= 0) {
referencedSegmentIds.remove(segmentId);
- if(referencedSegmentIds.isEmpty()) {
+ if (referencedSegmentIds.isEmpty()) {
return null;
} else {
condition.setParameter("segments", referencedSegmentIds);
@@ -379,7 +380,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
for (Segment segment : impactedSegments) {
Condition updatedCondition = updateSegmentDependentCondition(segment.getCondition(), segmentId);
segment.setCondition(updatedCondition);
- if(updatedCondition == null) {
+ if (updatedCondition == null) {
clearAutoGeneratedRules(persistenceService.query("linkedItems", segment.getMetadata().getId(), null, Rule.class), segment.getMetadata().getId());
segment.getMetadata().setEnabled(false);
}
@@ -455,7 +456,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
public SegmentsAndScores getSegmentsAndScoresForProfile(Profile profile) {
Set<String> segments = new HashSet<String>();
- Map<String,Integer> scores = new HashMap<String, Integer>();
+ Map<String, Integer> scores = new HashMap<String, Integer>();
List<Segment> allSegments = this.allSegments;
for (Segment segment : allSegments) {
@@ -542,18 +543,18 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
persistenceService.createMapping(Profile.ITEM_TYPE, String.format(
"{\n" +
- " \"profile\": {\n" +
- " \"properties\" : {\n" +
- " \"scores\": {\n" +
- " \"properties\": {\n" +
- " \"%s\": {\n" +
- " \"type\": \"long\"\n" +
- " }\n" +
- " }\n" +
- " }\n" +
- " }\n" +
- " }\n" +
- "}\n", scoring.getItemId()));
+ " \"profile\": {\n" +
+ " \"properties\" : {\n" +
+ " \"scores\": {\n" +
+ " \"properties\": {\n" +
+ " \"%s\": {\n" +
+ " \"type\": \"long\"\n" +
+ " }\n" +
+ " }\n" +
+ " }\n" +
+ " }\n" +
+ " }\n" +
+ "}\n", scoring.getItemId()));
updateExistingProfilesForScoring(scoring);
}
@@ -571,7 +572,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
}
private boolean checkScoringDeletionImpact(Condition condition, String scoringToDeleteId) {
- if(condition != null) {
+ if (condition != null) {
@SuppressWarnings("unchecked")
final List<Condition> subConditions = (List<Condition>) condition.getParameter("subConditions");
if (subConditions != null) {
@@ -593,6 +594,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
* Return an updated condition that do not contain a condition on the scoringId anymore
* it's remove the unnecessary boolean condition (if a condition is the only one of a boolean the boolean will be remove and the subcondition returned)
* it's return null when there is no more condition after (if the condition passed was only a scoring condition on the scoringId)
+ *
* @param condition the condition to update
* @param scoringId the scoring id to remove in the condition
* @return updated condition
@@ -604,12 +606,12 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
List<Condition> updatedSubConditions = new LinkedList<>();
for (Condition subCondition : subConditions) {
Condition updatedCondition = updateScoringDependentCondition(subCondition, scoringId);
- if(updatedCondition != null) {
+ if (updatedCondition != null) {
updatedSubConditions.add(updatedCondition);
}
}
- if(!updatedSubConditions.isEmpty()){
- if(updatedSubConditions.size() == 1) {
+ if (!updatedSubConditions.isEmpty()) {
+ if (updatedSubConditions.size() == 1) {
return updatedSubConditions.get(0);
} else {
condition.setParameter("subConditions", updatedSubConditions);
@@ -671,7 +673,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
for (Segment segment : impactedSegments) {
Condition updatedCondition = updateScoringDependentCondition(segment.getCondition(), scoringId);
segment.setCondition(updatedCondition);
- if(updatedCondition == null) {
+ if (updatedCondition == null) {
clearAutoGeneratedRules(persistenceService.query("linkedItems", segment.getMetadata().getId(), null, Rule.class), segment.getMetadata().getId());
segment.getMetadata().setEnabled(false);
}
@@ -741,8 +743,8 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
Set<String> tags = condition.getConditionType().getMetadata().getTags();
if (tags.contains("eventCondition") && !tags.contains("profileCondition")) {
try {
- Map<String,Object> m = new HashMap<>(3);
- m.put("scope",metadata.getScope());
+ Map<String, Object> m = new HashMap<>(3);
+ m.put("scope", metadata.getScope());
m.put("condition", condition);
m.put("numberOfDays", parentCondition.getParameter("numberOfDays"));
String key = CustomObjectMapper.getObjectMapper().writeValueAsString(m);
@@ -750,7 +752,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
parentCondition.setParameter("generatedPropertyKey", key);
Rule rule = rulesService.getRule(key);
if (rule == null) {
- rule = new Rule(new Metadata(metadata.getScope(), key, "Auto generated rule for "+metadata.getName(), ""));
+ rule = new Rule(new Metadata(metadata.getScope(), key, "Auto generated rule for " + metadata.getName(), ""));
rule.setCondition(condition);
rule.getMetadata().setHidden(true);
final Action action = new Action();
@@ -809,10 +811,10 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
for (Map.Entry<String, Long> entry : eventCountByProfile.entrySet()) {
String profileId = entry.getKey();
if (!profileId.startsWith("_")) {
- Map<String,Long> pastEventCounts = new HashMap<>();
+ Map<String, Long> pastEventCounts = new HashMap<>();
pastEventCounts.put(propertyKey, entry.getValue());
- Map<String,Object> systemProperties = new HashMap<>();
- systemProperties.put("pastEvents",pastEventCounts);
+ Map<String, Object> systemProperties = new HashMap<>();
+ systemProperties.put("pastEvents", pastEventCounts);
try {
persistenceService.update(profileId, null, Profile.class, "systemProperties", systemProperties);
} catch (Exception e) {
@@ -821,7 +823,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
}
}
- logger.info("Profiles past condition updated in {}ms", System.currentTimeMillis()-t);
+ logger.info("Profiles past condition updated in {}ms", System.currentTimeMillis() - t);
}
private void updateExistingProfilesForSegment(Segment segment) {
@@ -835,7 +837,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
segmentCondition.setParameter("comparisonOperator", "equals");
segmentCondition.setParameter("propertyValue", segment.getItemId());
- if(segment.getMetadata().isEnabled()) {
+ if (segment.getMetadata().isEnabled()) {
ConditionType booleanConditionType = definitionsService.getConditionType("booleanCondition");
ConditionType notConditionType = definitionsService.getConditionType("notCondition");
@@ -904,7 +906,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
}
}
}
- logger.info("{} profiles updated in {}ms", updatedProfileCount, System.currentTimeMillis()-t);
+ logger.info("{} profiles updated in {}ms", updatedProfileCount, System.currentTimeMillis() - t);
}
private void updateExistingProfilesForScoring(Scoring scoring) {
@@ -913,32 +915,31 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
scoringCondition.setConditionType(definitionsService.getConditionType("profilePropertyCondition"));
scoringCondition.setParameter("propertyName", "scores." + scoring.getItemId());
scoringCondition.setParameter("comparisonOperator", "exists");
- List<Profile> previousProfiles = persistenceService.query(scoringCondition, null, Profile.class);
- HashMap<String, Object> scriptParams = new HashMap<>();
- scriptParams.put("scoringId", scoring.getItemId());
+ String[] scripts = new String[scoring.getElements().size() + 1];
+ HashMap<String, Object>[] scriptParams = new HashMap[scoring.getElements().size() + 1];
+ Condition[] conditions = new Condition[scoring.getElements().size() + 1];
- for (Profile profileToRemove : previousProfiles) {
- persistenceService.updateWithScript(profileToRemove.getItemId(), null, Profile.class, "if (ctx._source.systemProperties.scoreModifiers == null) { ctx._source.systemProperties.scoreModifiers=[:] } ; if (ctx._source.systemProperties.scoreModifiers.containsKey(scoringId)) { ctx._source.scores[scoringId] = ctx._source.systemProperties.scoreModifiers[scoringId] } else { ctx._source.scores.remove(scoringId) }", scriptParams);
- }
- if(scoring.getMetadata().isEnabled()) {
- String script = "if (ctx._source.scores == null) { ctx._source.scores=[:] } ; if (ctx._source.scores.containsKey(scoringId)) { ctx._source.scores[scoringId] += scoringValue } else { ctx._source.scores[scoringId] = scoringValue }";
- Map<String, Event> updatedProfiles = new HashMap<>();
+ scriptParams[0] = new HashMap<String, Object>();
+ scriptParams[0].put("scoringId", scoring.getItemId());
+ scripts[0] = "if( ctx._source.containsKey(\"systemProperties\") && ctx._source.systemProperties.containsKey(\"scoreModifiers\") && ctx._source.systemProperties.scoreModifiers.containsKey(params.scoringId) ) { ctx._source.scores.put(params.scoringId, ctx._source.systemProperties.scoreModifiers.get(params.scoringId)) } else { ctx._source.scores.remove(params.scoringId) }";
+ conditions[0] = scoringCondition;
+
+ if (scoring.getMetadata().isEnabled()) {
+ String scriptToAdd = "if( !ctx._source.containsKey(\"scores\") ){ ctx._source.put(\"scores\", [:])} if( ctx._source.scores.containsKey(params.scoringId) ) { ctx._source.scores.put(params.scoringId, ctx._source.scores.get(params.scoringId)+params.scoringValue) } else { ctx._source.scores.put(params.scoringId, params.scoringValue) }";
+ int idx = 1;
for (ScoringElement element : scoring.getElements()) {
- scriptParams.put("scoringValue", element.getValue());
- for (Profile p : persistenceService.query(element.getCondition(), null, Profile.class)) {
- persistenceService.updateWithScript(p.getItemId(), null, Profile.class, script, scriptParams);
- Event profileUpdated = new Event("profileUpdated", null, p, null, null, p, new Date());
- profileUpdated.setPersistent(false);
- updatedProfiles.put(p.getItemId(), profileUpdated);
- }
- }
- Iterator<Map.Entry<String, Event>> entries = updatedProfiles.entrySet().iterator();
- while (entries.hasNext()) {
- eventService.send(entries.next().getValue());
+ scriptParams[idx] = new HashMap<>();
+ scriptParams[idx].put("scoringId", scoring.getItemId());
+ scriptParams[idx].put("scoringValue", element.getValue());
+ scripts[idx] = scriptToAdd;
+ conditions[idx] = element.getCondition();
+ idx++;
}
}
- logger.info("Profiles updated in {}ms", System.currentTimeMillis()-t);
+
+ persistenceService.updateWithQueryAndScript(null, Profile.class, scripts, scriptParams, conditions);
+ logger.info("Profiles updated in {}ms", System.currentTimeMillis() - t);
}
private void updateExistingProfilesForRemovedScoring(String scoringId) {
@@ -955,7 +956,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
for (Profile profileToRemove : previousProfiles) {
persistenceService.updateWithScript(profileToRemove.getItemId(), null, Profile.class, "ctx._source.scores.remove(scoringId)", scriptParams);
}
- logger.info("Profiles updated in {}ms", System.currentTimeMillis()-t);
+ logger.info("Profiles updated in {}ms", System.currentTimeMillis() - t);
}
private String getMD5(String md5) {
@@ -976,7 +977,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
switch (event.getType()) {
case BundleEvent.STARTED:
processBundleStartup(event.getBundle().getBundleContext());
- break;
+ break;
case BundleEvent.STOPPING:
processBundleStop(event.getBundle().getBundleContext());
break;
@@ -1034,7 +1035,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
}
private <T extends MetadataItem> PartialList<Metadata> getMetadatas(Query query, Class<T> clazz) {
- if(query.isForceRefresh()){
+ if (query.isForceRefresh()) {
persistenceService.refresh();
}
definitionsService.resolveConditionType(query.getCondition());