You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by sh...@apache.org on 2021/02/08 17:17:10 UTC
[unomi] 04/15: UNOMI-371 add optional support for optimistic
concurrency control (if_seq_no) (#223)
This is an automated email from the ASF dual-hosted git repository.
shuber pushed a commit to branch unomi-1.5.x
in repository https://gitbox.apache.org/repos/asf/unomi.git
commit 55c8bcdd930d52ef6bee412e3fa8264357af2b42
Author: giladw <gw...@yotpo.com>
AuthorDate: Tue Jan 5 14:23:55 2021 +0100
UNOMI-371 add optional support for optimistic concurrency control (if_seq_no) (#223)
(cherry picked from commit 714a643efe0e0ccd66f14d4a8cff12545f7f94cd)
---
api/src/main/java/org/apache/unomi/api/Item.java | 10 ++
.../apache/unomi/services/UserListServiceImpl.java | 2 +-
.../unomi/privacy/internal/PrivacyServiceImpl.java | 2 +-
.../itests/ProfileServiceWithoutOverwriteIT.java | 123 +++++++++++++++++++++
.../ElasticSearchPersistenceServiceImpl.java | 110 ++++++++++++++----
.../resources/OSGI-INF/blueprint/blueprint.xml | 4 +
.../org.apache.unomi.persistence.elasticsearch.cfg | 8 +-
.../unomi/persistence/spi/PersistenceService.java | 36 +++++-
.../actions/MergeProfilesOnPropertyAction.java | 6 +-
.../unomi/plugins/mail/actions/SendMailAction.java | 2 +-
.../services/impl/events/EventServiceImpl.java | 2 +-
.../services/impl/rules/RulesServiceImpl.java | 2 +-
.../services/impl/segments/SegmentServiceImpl.java | 12 +-
13 files changed, 276 insertions(+), 43 deletions(-)
diff --git a/api/src/main/java/org/apache/unomi/api/Item.java b/api/src/main/java/org/apache/unomi/api/Item.java
index 72f0ae6..2d5ff71 100644
--- a/api/src/main/java/org/apache/unomi/api/Item.java
+++ b/api/src/main/java/org/apache/unomi/api/Item.java
@@ -21,6 +21,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
+import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -63,6 +64,7 @@ public abstract class Item implements Serializable {
protected String itemType;
protected String scope;
protected Long version;
+ protected Map<String, Object> systemMetadata = new HashMap<>();
public Item() {
this.itemType = getItemType(this.getClass());
@@ -140,4 +142,12 @@ public abstract class Item implements Serializable {
public void setVersion(Long version) {
this.version = version;
}
+
+ public Object getSystemMetadata(String key) {
+ return systemMetadata.get(key);
+ }
+
+ public void setSystemMetadata(String key, Object value) {
+ systemMetadata.put(key, value);
+ }
}
diff --git a/extensions/lists-extension/services/src/main/java/org/apache/unomi/services/UserListServiceImpl.java b/extensions/lists-extension/services/src/main/java/org/apache/unomi/services/UserListServiceImpl.java
index dc3bbc8..37ca72e 100644
--- a/extensions/lists-extension/services/src/main/java/org/apache/unomi/services/UserListServiceImpl.java
+++ b/extensions/lists-extension/services/src/main/java/org/apache/unomi/services/UserListServiceImpl.java
@@ -94,7 +94,7 @@ public class UserListServiceImpl implements UserListService {
if(index != -1){
((List) profileSystemProperties.get("lists")).remove(index);
profileSystemProperties.put("lastUpdated", new Date());
- persistenceService.update(p.getItemId(), null, Profile.class, "systemProperties", profileSystemProperties);
+ persistenceService.update(p, null, Profile.class, "systemProperties", profileSystemProperties);
}
}
}
diff --git a/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java b/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java
index 1675e11..3d3d68d 100644
--- a/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java
+++ b/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java
@@ -137,7 +137,7 @@ public class PrivacyServiceImpl implements PrivacyService {
persistenceService.save(session);
List<Event> events = eventService.searchEvents(session.getItemId(), new String[0], null, 0, -1, null).getList();
for (Event event : events) {
- persistenceService.update(event.getItemId(), event.getTimeStamp(), Event.class, "profileId", newProfile.getItemId());
+ persistenceService.update(event, event.getTimeStamp(), Event.class, "profileId", newProfile.getItemId());
}
}
diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceWithoutOverwriteIT.java b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceWithoutOverwriteIT.java
new file mode 100644
index 0000000..5c2ed5b
--- /dev/null
+++ b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceWithoutOverwriteIT.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+package org.apache.unomi.itests;
+
+import org.apache.unomi.api.Profile;
+import org.apache.unomi.api.services.DefinitionsService;
+import org.apache.unomi.api.services.ProfileService;
+import org.apache.unomi.persistence.spi.PersistenceService;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.Configuration;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.junit.PaxExam;
+import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
+import org.ops4j.pax.exam.spi.reactors.PerSuite;
+import org.ops4j.pax.exam.util.Filter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.ops4j.pax.exam.CoreOptions.systemProperty;
+
+/**
+ * An integration test for the profile service
+ */
+@RunWith(PaxExam.class)
+@ExamReactorStrategy(PerSuite.class)
+public class ProfileServiceWithoutOverwriteIT extends BaseIT {
+ private final static Logger LOGGER = LoggerFactory.getLogger(ProfileServiceWithoutOverwriteIT.class);
+
+ private final static String TEST_PROFILE_ID = "test-profile-id";
+
+ @Configuration
+ public Option[] config() throws InterruptedException {
+ List<Option> options = new ArrayList<>();
+ options.addAll(Arrays.asList(super.config()));
+ options.add(systemProperty("org.apache.unomi.elasticsearch.throwExceptions").value("true"));
+ options.add(systemProperty("org.apache.unomi.elasticsearch.alwaysOverwrite").value("false"));
+ return options.toArray(new Option[0]);
+ }
+
+ @Inject @Filter(timeout = 600000)
+ protected ProfileService profileService;
+
+ @Inject
+ @Filter(timeout = 600000)
+ protected PersistenceService persistenceService;
+
+ @Inject
+ @Filter(timeout = 600000)
+ protected DefinitionsService definitionsService;
+
+ @Before
+ public void setUp() {
+ TestUtils.removeAllProfiles(definitionsService, persistenceService);
+ }
+
+ private Profile setupWithoutOverwriteTests() {
+ Profile profile = new Profile();
+ profile.setItemId(TEST_PROFILE_ID);
+ profile.setProperty("country", "test-country");
+ profile.setProperty("state", "test-state");
+ profileService.save(profile);
+
+ return profile;
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testSaveProfileWithoutOverwriteSameProfileThrowsException() {
+ Profile profile = setupWithoutOverwriteTests();
+ profile.setProperty("country", "test2-country");
+ profileService.save(profile);
+ }
+
+ @Test
+ public void testSaveProfileWithoutOverwriteSavesAfterReload() throws InterruptedException {
+ Profile profile = setupWithoutOverwriteTests();
+ String profileId = profile.getItemId();
+ Thread.sleep(4000);
+
+ Profile updatedProfile = profileService.load(profileId);
+ updatedProfile.setProperty("country", "test2-country");
+ profileService.save(updatedProfile);
+
+ Thread.sleep(4000);
+
+ Profile profileWithNewCountry = profileService.load(profileId);
+ assertEquals(profileWithNewCountry.getProperty("country"), "test2-country");
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testSaveProfileWithoutOverwriteWrongSeqNoThrowsException() throws InterruptedException {
+ Profile profile = setupWithoutOverwriteTests();
+ String profileId = profile.getItemId();
+
+ Thread.sleep(4000);
+
+ Profile updatedProfile = profileService.load(profileId);
+ updatedProfile.setProperty("country", "test2-country");
+ updatedProfile.setMetadata("seq_no", 1L);
+ profileService.save(updatedProfile);
+ }
+}
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 62aaa81..5c4a251 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -23,7 +23,6 @@ import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
-import org.apache.http.client.config.RequestConfig;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.lucene.search.TotalHits;
@@ -40,6 +39,7 @@ import org.apache.unomi.persistence.elasticsearch.conditions.*;
import org.apache.unomi.persistence.spi.PersistenceService;
import org.apache.unomi.persistence.spi.aggregate.*;
import org.elasticsearch.ElasticsearchStatusException;
+import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
@@ -49,6 +49,7 @@ import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
@@ -63,6 +64,7 @@ import org.elasticsearch.client.HttpAsyncResponseConsumerFactory;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.client.core.CountResponse;
import org.elasticsearch.client.core.MainResponse;
@@ -139,6 +141,9 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
public static final String BULK_PROCESSOR_FLUSH_INTERVAL = "bulkProcessor.flushInterval";
public static final String BULK_PROCESSOR_BACKOFF_POLICY = "bulkProcessor.backoffPolicy";
public static final String INDEX_DATE_PREFIX = "date-";
+ public static final String SEQ_NO = "seq_no";
+ public static final String PRIMARY_TERM = "primary_term";
+
private static final Logger logger = LoggerFactory.getLogger(ElasticSearchPersistenceServiceImpl.class.getName());
private RestHighLevelClient client;
private BulkProcessor bulkProcessor;
@@ -192,6 +197,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
private Integer aggQueryMaxResponseSizeHttp = null;
private Integer clientSocketTimeout = null;
+ private boolean alwaysOverwrite = true;
private Map<String, Map<String, Map<String, Object>>> knownMappings = new HashMap<>();
@@ -362,6 +368,15 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
public void setAggQueryThrowOnMissingDocs(boolean aggQueryThrowOnMissingDocs) {
this.aggQueryThrowOnMissingDocs = aggQueryThrowOnMissingDocs;
}
+
+ public void setThrowExceptions(boolean throwExceptions) {
+ this.throwExceptions = throwExceptions;
+ }
+ public void setAlwaysOverwrite(boolean alwaysOverwrite) {
+ this.alwaysOverwrite = alwaysOverwrite;
+ }
+
+
public void start() throws Exception {
// on startup
@@ -747,8 +762,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
if (response.isExists()) {
String sourceAsString = response.getSourceAsString();
final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
- value.setItemId(response.getId());
- value.setVersion(response.getVersion());
+ setMetadata(value, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm());
putInCache(itemId, value);
return value;
} else {
@@ -772,13 +786,28 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
+ private void setMetadata(Item item, String id, long version, long seqNo, long primaryTerm) {
+ item.setItemId(id);
+ item.setVersion(version);
+ item.setSystemMetadata(SEQ_NO, seqNo);
+ item.setSystemMetadata(PRIMARY_TERM, primaryTerm);
+ }
+
@Override
public boolean save(final Item item) {
- return save(item, useBatchingForSave);
+ return save(item, useBatchingForSave, alwaysOverwrite);
}
@Override
public boolean save(final Item item, final boolean useBatching) {
+ return save(item, useBatching, alwaysOverwrite);
+ }
+
+ @Override
+ public boolean save(final Item item, final Boolean useBatchingOption, final Boolean alwaysOverwriteOption) {
+ final boolean useBatching = useBatchingOption == null ? this.useBatchingForSave : useBatchingOption;
+ final boolean alwaysOverwrite = alwaysOverwriteOption == null ? this.alwaysOverwrite : alwaysOverwriteOption;
+
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".saveItem", this.bundleContext, this.fatalIllegalStateErrors) {
protected Boolean execute(Object... args) throws Exception {
try {
@@ -790,13 +819,28 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
IndexRequest indexRequest = new IndexRequest(index);
indexRequest.id(itemId);
indexRequest.source(source, XContentType.JSON);
+
+ if (!alwaysOverwrite) {
+ Long seqNo = (Long)item.getSystemMetadata(SEQ_NO);
+ Long primaryTerm = (Long)item.getSystemMetadata(PRIMARY_TERM);
+
+ if (seqNo != null && primaryTerm != null) {
+ indexRequest.setIfSeqNo(seqNo);
+ indexRequest.setIfPrimaryTerm(primaryTerm);
+ }
+ else {
+ indexRequest.opType(DocWriteRequest.OpType.CREATE);
+ }
+ }
+
if (routingByType.containsKey(itemType)) {
indexRequest.routing(routingByType.get(itemType));
}
try {
if (bulkProcessor == null || !useBatching) {
- client.index(indexRequest, RequestOptions.DEFAULT);
+ IndexResponse response = client.index(indexRequest, RequestOptions.DEFAULT);
+ setMetadata(item, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm());
} else {
bulkProcessor.add(indexRequest);
}
@@ -819,26 +863,43 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
@Override
- public boolean update(final String itemId, final Date dateHint, final Class clazz, final String propertyName, final Object propertyValue) {
- return update(itemId, dateHint, clazz, Collections.singletonMap(propertyName, propertyValue));
+ public boolean update(final Item item, final Date dateHint, final Class clazz, final String propertyName, final Object propertyValue) {
+ return update(item, dateHint, clazz, Collections.singletonMap(propertyName, propertyValue));
}
@Override
- public boolean update(final String itemId, final Date dateHint, final Class clazz, final Map source) {
+ public boolean update(final Item item, final Date dateHint, final Class clazz, final Map source) {
+ return update(item, dateHint, clazz, source, alwaysOverwrite);
+ }
+
+ @Override
+ public boolean update(final Item item, final Date dateHint, final Class clazz, final Map source, final boolean alwaysOverwrite) {
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateItem", this.bundleContext, this.fatalIllegalStateErrors) {
protected Boolean execute(Object... args) throws Exception {
try {
String itemType = Item.getItemType(clazz);
- UpdateRequest updateRequest = new UpdateRequest(getIndex(itemType, dateHint), itemId);
+ UpdateRequest updateRequest = new UpdateRequest(getIndex(itemType, dateHint), item.getItemId());
updateRequest.doc(source);
+
+ if (!alwaysOverwrite) {
+ Long seqNo = (Long)item.getSystemMetadata(SEQ_NO);
+ Long primaryTerm = (Long)item.getSystemMetadata(PRIMARY_TERM);
+
+ if (seqNo != null && primaryTerm != null) {
+ updateRequest.setIfSeqNo(seqNo);
+ updateRequest.setIfPrimaryTerm(primaryTerm);
+ }
+ }
+
if (bulkProcessor == null) {
- client.update(updateRequest, RequestOptions.DEFAULT);
+ UpdateResponse response = client.update(updateRequest, RequestOptions.DEFAULT);
+ setMetadata(item, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm());
} else {
bulkProcessor.add(updateRequest);
}
return true;
} catch (IndexNotFoundException e) {
- throw new Exception("No index found for itemType=" + clazz.getName() + "itemId=" + itemId, e);
+ throw new Exception("No index found for itemType=" + clazz.getName() + "itemId=" + item.getItemId(), e);
}
}
}.catchingExecuteInClassLoader(true);
@@ -907,7 +968,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
@Override
- public boolean updateWithScript(final String itemId, final Date dateHint, final Class<?> clazz, final String script, final Map<String, Object> scriptParams) {
+ public boolean updateWithScript(final Item item, final Date dateHint, final Class<?> clazz, final String script, final Map<String, Object> scriptParams) {
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateWithScript", this.bundleContext, this.fatalIllegalStateErrors) {
protected Boolean execute(Object... args) throws Exception {
try {
@@ -917,17 +978,26 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
Script actualScript = new Script(ScriptType.INLINE, "painless", script, scriptParams);
- UpdateRequest updateRequest = new UpdateRequest(index, itemId);
+ UpdateRequest updateRequest = new UpdateRequest(index, item.getItemId());
+
+ Long seqNo = (Long)item.getSystemMetadata(SEQ_NO);
+ Long primaryTerm = (Long)item.getSystemMetadata(PRIMARY_TERM);
+
+ if (seqNo != null && primaryTerm != null) {
+ updateRequest.setIfSeqNo(seqNo);
+ updateRequest.setIfPrimaryTerm(primaryTerm);
+ }
updateRequest.script(actualScript);
if (bulkProcessor == null) {
- client.update(updateRequest, RequestOptions.DEFAULT);
+ UpdateResponse response = client.update(updateRequest, RequestOptions.DEFAULT);
+ setMetadata(item, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm());
} else {
bulkProcessor.add(updateRequest);
}
return true;
} catch (IndexNotFoundException e) {
- throw new Exception("No index found for itemType=" + clazz.getName() + "itemId=" + itemId, e);
+ throw new Exception("No index found for itemType=" + clazz.getName() + "itemId=" + item.getItemId(), e);
}
}
}.catchingExecuteInClassLoader(true);
@@ -1471,6 +1541,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
SearchRequest searchRequest = new SearchRequest(getIndexNameForQuery(itemType));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.fetchSource(true)
+ .seqNoAndPrimaryTerm(true)
.query(query)
.size(size < 0 ? defaultQueryLimit : size)
.from(offset);
@@ -1528,8 +1599,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
// add hit to results
String sourceAsString = searchHit.getSourceAsString();
final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
- value.setItemId(searchHit.getId());
- value.setVersion(searchHit.getVersion());
+ setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm());
results.add(value);
}
@@ -1559,8 +1629,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
for (SearchHit searchHit : searchHits) {
String sourceAsString = searchHit.getSourceAsString();
final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
- value.setItemId(searchHit.getId());
- value.setVersion(searchHit.getVersion());
+ setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm());
results.add(value);
}
}
@@ -1606,8 +1675,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
// add hit to results
String sourceAsString = searchHit.getSourceAsString();
final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
- value.setItemId(searchHit.getId());
- value.setVersion(searchHit.getVersion());
+ setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm());
results.add(value);
}
}
diff --git a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 48402f3..6f40799 100644
--- a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -63,6 +63,8 @@
<cm:property name="password" value="" />
<cm:property name="sslEnable" value="false" />
<cm:property name="sslTrustAllCertificates" value="false" />
+ <cm:property name="throwExceptions" value="false" />
+ <cm:property name="alwaysOverwrite" value="true" />
</cm:default-properties>
</cm:property-placeholder>
@@ -136,6 +138,8 @@
<property name="password" value="${es.password}" />
<property name="sslEnable" value="${es.sslEnable}" />
<property name="sslTrustAllCertificates" value="${es.sslTrustAllCertificates}" />
+ <property name="throwExceptions" value="${es.throwExceptions}" />
+ <property name="alwaysOverwrite" value="${es.alwaysOverwrite}" />
</bean>
<!-- We use a listener here because using the list directly for listening to proxies coming from the same bundle didn't seem to work -->
diff --git a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
index c0e7b46..ac30c91 100644
--- a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
+++ b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
@@ -26,7 +26,6 @@ monthlyIndex.numberOfShards=${org.apache.unomi.elasticsearch.monthlyIndex.nbShar
monthlyIndex.numberOfReplicas=${org.apache.unomi.elasticsearch.monthlyIndex.nbReplicas:-0}
monthlyIndex.indexMappingTotalFieldsLimit=${org.apache.unomi.elasticsearch.monthlyIndex.indexMappingTotalFieldsLimit:-1000}
monthlyIndex.indexMaxDocValueFieldsSearch=${org.apache.unomi.elasticsearch.monthlyIndex.indexMaxDocValueFieldsSearch:-1000}
-monthlyIndex.itemsMonthlyIndexedOverride=${org.apache.unomi.elasticsearch.monthlyIndex.itemsMonthlyIndexedOverride:-event,session}
numberOfShards=${org.apache.unomi.elasticsearch.defaultIndex.nbShards:-5}
numberOfReplicas=${org.apache.unomi.elasticsearch.defaultIndex.nbReplicas:-0}
indexMappingTotalFieldsLimit=${org.apache.unomi.elasticsearch.defaultIndex.indexMappingTotalFieldsLimit:-1000}
@@ -70,4 +69,9 @@ aggQueryMaxResponseSizeHttp=${org.apache.unomi.elasticsearch.aggQueryMaxResponse
username=${org.apache.unomi.elasticsearch.username:-}
password=${org.apache.unomi.elasticsearch.password:-}
sslEnable=${org.apache.unomi.elasticsearch.sslEnable:-false}
-sslTrustAllCertificates=${org.apache.unomi.elasticsearch.sslTrustAllCertificates:-false}
\ No newline at end of file
+sslTrustAllCertificates=${org.apache.unomi.elasticsearch.sslTrustAllCertificates:-false}
+
+# Errors
+throwExceptions=${org.apache.unomi.elasticsearch.throwExceptions:-false}
+
+alwaysOverwrite=${org.apache.unomi.elasticsearch.alwaysOverwrite:-true}
\ No newline at end of file
diff --git a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
index d85059e..f92d3bb 100644
--- a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
+++ b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
@@ -127,41 +127,65 @@ public interface PersistenceService {
boolean save(Item item, boolean useBatching);
/**
+ * Persists the specified Item in the context server.
+ *
+ * @param item the item to persist
+ * @param useBatching whether to use batching or not for saving the item. If activating there may be a delay between
+ * the call to this method and the actual saving in the persistence backend
+ * @param alwaysOverwrite whether to overwrite a document even if we are holding an old item when saving
+ *
+ * @return {@code true} if the item was properly persisted, {@code false} otherwise
+ */
+ boolean save(Item item, Boolean useBatching, Boolean alwaysOverwrite);
+
+ /**
* Updates the item of the specified class and identified by the specified identifier with new property values provided as name - value pairs in the specified Map.
*
- * @param itemId the identifier of the item we want to update
+ * @param item the item we want to update
* @param dateHint a Date helping in identifying where the item is located
* @param clazz the Item subclass of the item to update
* @param source a Map with entries specifying as key the property name to update and as value its new value
* @return {@code true} if the update was successful, {@code false} otherwise
*/
- boolean update(String itemId, Date dateHint, Class<?> clazz, Map<?, ?> source);
+ boolean update(Item item, Date dateHint, Class<?> clazz, Map<?, ?> source);
/**
* Updates the item of the specified class and identified by the specified identifier with a new property value for the specified property name. Same as
* {@code update(itemId, dateHint, clazz, Collections.singletonMap(propertyName, propertyValue))}
*
- * @param itemId the identifier of the item we want to update
+ * @param item the item we want to update
* @param dateHint a Date helping in identifying where the item is located
* @param clazz the Item subclass of the item to update
* @param propertyName the name of the property to update
* @param propertyValue the new value of the property
* @return {@code true} if the update was successful, {@code false} otherwise
*/
- boolean update(String itemId, Date dateHint, Class<?> clazz, String propertyName, Object propertyValue);
+ boolean update(Item item, Date dateHint, Class<?> clazz, String propertyName, Object propertyValue);
+
+ /**
+ * Updates the item of the specified class and identified by the specified identifier with new property values provided as name - value pairs in the specified Map.
+ *
+ * @param item the item we want to update
+ * @param dateHint a Date helping in identifying where the item is located
+ * @param clazz the Item subclass of the item to update
+ * @param source a Map with entries specifying as key the property name to update and as value its new value
+ * @param alwaysOverwrite whether to overwrite a document even if we are holding an old item when saving
+ * @return {@code true} if the update was successful, {@code false} otherwise
+ */
+ boolean update(Item item, Date dateHint, Class<?> clazz, Map<?, ?> source, final boolean alwaysOverwrite);
/**
* Updates the item of the specified class and identified by the specified identifier with a new property value for the specified property name. Same as
* {@code update(itemId, dateHint, clazz, Collections.singletonMap(propertyName, propertyValue))}
*
- * @param itemId the identifier of the item we want to update
+ * @param item the item we want to update
* @param dateHint a Date helping in identifying where the item is located
* @param clazz the Item subclass of the item to update
* @param script inline script
* @param scriptParams script params
* @return {@code true} if the update was successful, {@code false} otherwise
*/
- boolean updateWithScript(String itemId, Date dateHint, Class<?> clazz, String script, Map<String, Object> scriptParams);
+ boolean updateWithScript(Item item, Date dateHint, Class<?> clazz, String script, Map<String, Object> scriptParams);
/**
* Updates the items of the specified class by a query with a new property value for the specified property name
diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
index a496ddb..ffdf626 100644
--- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
+++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
@@ -178,12 +178,12 @@ public class MergeProfilesOnPropertyAction implements ActionExecutor {
}
for (Session session : sessions) {
- persistenceService.update(session.getItemId(), session.getTimeStamp(), Session.class, "profileId", anonymousBrowsing ? null : masterProfileId);
+ persistenceService.update(session, session.getTimeStamp(), Session.class, "profileId", anonymousBrowsing ? null : masterProfileId);
}
List<Event> events = persistenceService.query("profileId", profileId, null, Event.class);
for (Event event : events) {
- persistenceService.update(event.getItemId(), event.getTimeStamp(), Event.class, "profileId", anonymousBrowsing ? null : masterProfileId);
+ persistenceService.update(event, event.getTimeStamp(), Event.class, "profileId", anonymousBrowsing ? null : masterProfileId);
}
// we must mark all the profiles that we merged into the master as merged with the master, and they will
// be deleted upon next load
@@ -192,7 +192,7 @@ public class MergeProfilesOnPropertyAction implements ActionExecutor {
sourceMap.put("mergedWith", masterProfileId);
profile.setSystemProperty("lastUpdated", new Date());
sourceMap.put("systemProperties", profile.getSystemProperties());
- persistenceService.update(profile.getItemId(), null, Profile.class, sourceMap);
+ persistenceService.update(profile, null, Profile.class, sourceMap);
}
}
} catch (Exception e) {
diff --git a/plugins/mail/src/main/java/org/apache/unomi/plugins/mail/actions/SendMailAction.java b/plugins/mail/src/main/java/org/apache/unomi/plugins/mail/actions/SendMailAction.java
index 8e03175..acdb1eb 100644
--- a/plugins/mail/src/main/java/org/apache/unomi/plugins/mail/actions/SendMailAction.java
+++ b/plugins/mail/src/main/java/org/apache/unomi/plugins/mail/actions/SendMailAction.java
@@ -116,7 +116,7 @@ public class SendMailAction implements ActionExecutor {
event.getProfile().setSystemProperty("notificationAck", profileNotif);
event.getProfile().setSystemProperty("lastUpdated", new Date());
- persistenceService.update(event.getProfile().getItemId(), null, Profile.class, "systemProperties", event.getProfile().getSystemProperties());
+ persistenceService.update(event.getProfile(), null, Profile.class, "systemProperties", event.getProfile().getSystemProperties());
ST stringTemplate = new ST(template, '$', '$');
stringTemplate.add("profile", event.getProfile());
diff --git a/services/src/main/java/org/apache/unomi/services/impl/events/EventServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/events/EventServiceImpl.java
index 80b0449..25db0b5 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/events/EventServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/events/EventServiceImpl.java
@@ -152,7 +152,7 @@ public class EventServiceImpl implements EventService {
boolean saveSucceeded = true;
if (event.isPersistent()) {
- saveSucceeded = persistenceService.save(event);
+ saveSucceeded = persistenceService.save(event, null, true);
}
int changes;
diff --git a/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java
index d77173c..734d856 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java
@@ -475,7 +475,7 @@ public class RulesServiceImpl implements RulesService, EventListenerService, Syn
}
allRuleStatistics.put(ruleStatistics.getItemId(), ruleStatistics);
if (mustPersist) {
- persistenceService.save(ruleStatistics);
+ persistenceService.save(ruleStatistics, null, true);
}
}
// now let's iterate over the rules coming from the persistence service, as we may have new ones.
diff --git a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
index 36a2a67..e971432 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
@@ -365,7 +365,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
sourceMap.put("segments", profileToRemove.getSegments());
profileToRemove.setSystemProperty("lastUpdated", new Date());
sourceMap.put("systemProperties", profileToRemove.getSystemProperties());
- persistenceService.update(profileToRemove.getItemId(), null, Profile.class, sourceMap);
+ persistenceService.update(profileToRemove, null, Profile.class, sourceMap);
updatedProfileCount++;
}
logger.info("Removed segment from {} profiles in {} ms", updatedProfileCount, System.currentTimeMillis() - profileRemovalStartTime);
@@ -724,7 +724,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
// todo remove profile properties ?
persistenceService.remove(previousRule.getItemId(), Rule.class);
} else {
- persistenceService.update(previousRule.getItemId(), null, Rule.class, "linkedItems", previousRule.getLinkedItems());
+ persistenceService.update(previousRule, null, Rule.class, "linkedItems", previousRule.getLinkedItems());
}
}
}
@@ -862,7 +862,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
systemProperties.put("lastUpdated", new Date());
Profile profile = new Profile();
profile.setItemId(profileId);
- persistenceService.update(profile.getItemId(), null, Profile.class, "systemProperties", systemProperties);
+ persistenceService.update(profile, null, Profile.class, "systemProperties", systemProperties);
} catch (Exception e) {
logger.error("Error updating profile {} past event system properties", profileId, e);
}
@@ -930,7 +930,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
sourceMap.put("segments", profileToAdd.getSegments());
profileToAdd.setSystemProperty("lastUpdated", new Date());
sourceMap.put("systemProperties", profileToAdd.getSystemProperties());
- persistenceService.update(profileToAdd.getItemId(), null, Profile.class, sourceMap);
+ persistenceService.update(profileToAdd, null, Profile.class, sourceMap);
Event profileUpdated = new Event("profileUpdated", null, profileToAdd, null, null, profileToAdd, new Date());
profileUpdated.setPersistent(false);
eventService.send(profileUpdated);
@@ -950,7 +950,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
sourceMap.put("segments", profileToRemove.getSegments());
profileToRemove.setSystemProperty("lastUpdated", new Date());
sourceMap.put("systemProperties", profileToRemove.getSystemProperties());
- persistenceService.update(profileToRemove.getItemId(), null, Profile.class, sourceMap);
+ persistenceService.update(profileToRemove, null, Profile.class, sourceMap);
Event profileUpdated = new Event("profileUpdated", null, profileToRemove, null, null, profileToRemove, new Date());
profileUpdated.setPersistent(false);
eventService.send(profileUpdated);
@@ -973,7 +973,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
sourceMap.put("segments", profileToRemove.getSegments());
profileToRemove.setSystemProperty("lastUpdated", new Date());
sourceMap.put("systemProperties", profileToRemove.getSystemProperties());
- persistenceService.update(profileToRemove.getItemId(), null, Profile.class, sourceMap);
+ persistenceService.update(profileToRemove, null, Profile.class, sourceMap);
Event profileUpdated = new Event("profileUpdated", null, profileToRemove, null, null, profileToRemove, new Date());
profileUpdated.setPersistent(false);
eventService.send(profileUpdated);