You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by sh...@apache.org on 2021/02/08 17:17:10 UTC

[unomi] 04/15: UNOMI-371 add optional support for optimistic concurrency control (if_seq_no) (#223)

This is an automated email from the ASF dual-hosted git repository.

shuber pushed a commit to branch unomi-1.5.x
in repository https://gitbox.apache.org/repos/asf/unomi.git

commit 55c8bcdd930d52ef6bee412e3fa8264357af2b42
Author: giladw <gw...@yotpo.com>
AuthorDate: Tue Jan 5 14:23:55 2021 +0100

    UNOMI-371 add optional support for optimistic concurrency control (if_seq_no) (#223)
    
    (cherry picked from commit 714a643efe0e0ccd66f14d4a8cff12545f7f94cd)
---
 api/src/main/java/org/apache/unomi/api/Item.java   |  10 ++
 .../apache/unomi/services/UserListServiceImpl.java |   2 +-
 .../unomi/privacy/internal/PrivacyServiceImpl.java |   2 +-
 .../itests/ProfileServiceWithoutOverwriteIT.java   | 123 +++++++++++++++++++++
 .../ElasticSearchPersistenceServiceImpl.java       | 110 ++++++++++++++----
 .../resources/OSGI-INF/blueprint/blueprint.xml     |   4 +
 .../org.apache.unomi.persistence.elasticsearch.cfg |   8 +-
 .../unomi/persistence/spi/PersistenceService.java  |  36 +++++-
 .../actions/MergeProfilesOnPropertyAction.java     |   6 +-
 .../unomi/plugins/mail/actions/SendMailAction.java |   2 +-
 .../services/impl/events/EventServiceImpl.java     |   2 +-
 .../services/impl/rules/RulesServiceImpl.java      |   2 +-
 .../services/impl/segments/SegmentServiceImpl.java |  12 +-
 13 files changed, 276 insertions(+), 43 deletions(-)

diff --git a/api/src/main/java/org/apache/unomi/api/Item.java b/api/src/main/java/org/apache/unomi/api/Item.java
index 72f0ae6..2d5ff71 100644
--- a/api/src/main/java/org/apache/unomi/api/Item.java
+++ b/api/src/main/java/org/apache/unomi/api/Item.java
@@ -21,6 +21,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -63,6 +64,7 @@ public abstract class Item implements Serializable {
     protected String itemType;
     protected String scope;
     protected Long version;
+    protected Map<String, Object> systemMetadata = new HashMap<>();
 
     public Item() {
         this.itemType = getItemType(this.getClass());
@@ -140,4 +142,12 @@ public abstract class Item implements Serializable {
     public void setVersion(Long version) {
         this.version = version;
     }
+
+    public Object getSystemMetadata(String key) {
+        return systemMetadata.get(key);
+    }
+
+    public void setSystemMetadata(String key, Object value) {
+        systemMetadata.put(key, value);
+    }
 }
diff --git a/extensions/lists-extension/services/src/main/java/org/apache/unomi/services/UserListServiceImpl.java b/extensions/lists-extension/services/src/main/java/org/apache/unomi/services/UserListServiceImpl.java
index dc3bbc8..37ca72e 100644
--- a/extensions/lists-extension/services/src/main/java/org/apache/unomi/services/UserListServiceImpl.java
+++ b/extensions/lists-extension/services/src/main/java/org/apache/unomi/services/UserListServiceImpl.java
@@ -94,7 +94,7 @@ public class UserListServiceImpl implements UserListService {
                 if(index != -1){
                     ((List) profileSystemProperties.get("lists")).remove(index);
                     profileSystemProperties.put("lastUpdated", new Date());
-                    persistenceService.update(p.getItemId(), null, Profile.class, "systemProperties", profileSystemProperties);
+                    persistenceService.update(p, null, Profile.class, "systemProperties", profileSystemProperties);
                 }
             }
         }
diff --git a/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java b/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java
index 1675e11..3d3d68d 100644
--- a/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java
+++ b/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java
@@ -137,7 +137,7 @@ public class PrivacyServiceImpl implements PrivacyService {
             persistenceService.save(session);
             List<Event> events = eventService.searchEvents(session.getItemId(), new String[0], null, 0, -1, null).getList();
             for (Event event : events) {
-                persistenceService.update(event.getItemId(), event.getTimeStamp(), Event.class, "profileId", newProfile.getItemId());
+                persistenceService.update(event, event.getTimeStamp(), Event.class, "profileId", newProfile.getItemId());
             }
         }
 
diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceWithoutOverwriteIT.java b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceWithoutOverwriteIT.java
new file mode 100644
index 0000000..5c2ed5b
--- /dev/null
+++ b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceWithoutOverwriteIT.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+package org.apache.unomi.itests;
+
+import org.apache.unomi.api.Profile;
+import org.apache.unomi.api.services.DefinitionsService;
+import org.apache.unomi.api.services.ProfileService;
+import org.apache.unomi.persistence.spi.PersistenceService;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.Configuration;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.junit.PaxExam;
+import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
+import org.ops4j.pax.exam.spi.reactors.PerSuite;
+import org.ops4j.pax.exam.util.Filter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.ops4j.pax.exam.CoreOptions.systemProperty;
+
+/**
+ * An integration test for the profile service
+ */
+@RunWith(PaxExam.class)
+@ExamReactorStrategy(PerSuite.class)
+public class ProfileServiceWithoutOverwriteIT extends BaseIT {
+    private final static Logger LOGGER = LoggerFactory.getLogger(ProfileServiceWithoutOverwriteIT.class);
+
+    private final static String TEST_PROFILE_ID = "test-profile-id";
+
+    @Configuration
+    public Option[] config() throws InterruptedException {
+        List<Option> options = new ArrayList<>();
+        options.addAll(Arrays.asList(super.config()));
+        options.add(systemProperty("org.apache.unomi.elasticsearch.throwExceptions").value("true"));
+        options.add(systemProperty("org.apache.unomi.elasticsearch.alwaysOverwrite").value("false"));
+        return options.toArray(new Option[0]);
+    }
+
+    @Inject @Filter(timeout = 600000)
+    protected ProfileService profileService;
+
+    @Inject
+    @Filter(timeout = 600000)
+    protected PersistenceService persistenceService;
+
+    @Inject
+    @Filter(timeout = 600000)
+    protected DefinitionsService definitionsService;
+
+    @Before
+    public void setUp() {
+        TestUtils.removeAllProfiles(definitionsService, persistenceService);
+    }
+
+    private Profile setupWithoutOverwriteTests() {
+        Profile profile = new Profile();
+        profile.setItemId(TEST_PROFILE_ID);
+        profile.setProperty("country", "test-country");
+        profile.setProperty("state", "test-state");
+        profileService.save(profile);
+
+        return profile;
+    }
+
+    @Test(expected = RuntimeException.class)
+    public void testSaveProfileWithoutOverwriteSameProfileThrowsException() {
+        Profile profile = setupWithoutOverwriteTests();
+        profile.setProperty("country", "test2-country");
+        profileService.save(profile);
+    }
+
+    @Test
+    public void testSaveProfileWithoutOverwriteSavesAfterReload() throws InterruptedException {
+        Profile profile = setupWithoutOverwriteTests();
+        String profileId = profile.getItemId();
+        Thread.sleep(4000);
+
+        Profile updatedProfile = profileService.load(profileId);
+        updatedProfile.setProperty("country", "test2-country");
+        profileService.save(updatedProfile);
+
+        Thread.sleep(4000);
+
+        Profile profileWithNewCountry = profileService.load(profileId);
+        assertEquals(profileWithNewCountry.getProperty("country"), "test2-country");
+    }
+
+    @Test(expected = RuntimeException.class)
+    public void testSaveProfileWithoutOverwriteWrongSeqNoThrowsException() throws InterruptedException {
+        Profile profile = setupWithoutOverwriteTests();
+        String profileId = profile.getItemId();
+
+        Thread.sleep(4000);
+
+        Profile updatedProfile = profileService.load(profileId);
+        updatedProfile.setProperty("country", "test2-country");
+        updatedProfile.setMetadata("seq_no", 1L);
+        profileService.save(updatedProfile);
+    }
+}
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 62aaa81..5c4a251 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -23,7 +23,6 @@ import org.apache.http.HttpHost;
 import org.apache.http.auth.AuthScope;
 import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.client.CredentialsProvider;
-import org.apache.http.client.config.RequestConfig;
 import org.apache.http.conn.ssl.NoopHostnameVerifier;
 import org.apache.http.impl.client.BasicCredentialsProvider;
 import org.apache.lucene.search.TotalHits;
@@ -40,6 +39,7 @@ import org.apache.unomi.persistence.elasticsearch.conditions.*;
 import org.apache.unomi.persistence.spi.PersistenceService;
 import org.apache.unomi.persistence.spi.aggregate.*;
 import org.elasticsearch.ElasticsearchStatusException;
+import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
 import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
@@ -49,6 +49,7 @@ import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.get.GetRequest;
 import org.elasticsearch.action.get.GetResponse;
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.search.ClearScrollRequest;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
@@ -63,6 +64,7 @@ import org.elasticsearch.client.HttpAsyncResponseConsumerFactory;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestClientBuilder;
 import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.action.update.UpdateResponse;
 import org.elasticsearch.client.core.CountRequest;
 import org.elasticsearch.client.core.CountResponse;
 import org.elasticsearch.client.core.MainResponse;
@@ -139,6 +141,9 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
     public static final String BULK_PROCESSOR_FLUSH_INTERVAL = "bulkProcessor.flushInterval";
     public static final String BULK_PROCESSOR_BACKOFF_POLICY = "bulkProcessor.backoffPolicy";
     public static final String INDEX_DATE_PREFIX = "date-";
+    public static final String SEQ_NO = "seq_no";
+    public static final String PRIMARY_TERM = "primary_term";
+
     private static final Logger logger = LoggerFactory.getLogger(ElasticSearchPersistenceServiceImpl.class.getName());
     private RestHighLevelClient client;
     private BulkProcessor bulkProcessor;
@@ -192,6 +197,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
     private Integer aggQueryMaxResponseSizeHttp = null;
     private Integer clientSocketTimeout = null;
 
+    private boolean alwaysOverwrite = true;
 
     private Map<String, Map<String, Map<String, Object>>> knownMappings = new HashMap<>();
 
@@ -362,6 +368,15 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
     public void setAggQueryThrowOnMissingDocs(boolean aggQueryThrowOnMissingDocs) {
         this.aggQueryThrowOnMissingDocs = aggQueryThrowOnMissingDocs;
     }
+
+    public void setThrowExceptions(boolean throwExceptions) {
+        this.throwExceptions = throwExceptions;
+    }
+    public void setAlwaysOverwrite(boolean alwaysOverwrite) {
+        this.alwaysOverwrite = alwaysOverwrite;
+    }
+
+
     public void start() throws Exception {
 
         // on startup
@@ -747,8 +762,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                         if (response.isExists()) {
                             String sourceAsString = response.getSourceAsString();
                             final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
-                            value.setItemId(response.getId());
-                            value.setVersion(response.getVersion());
+                            setMetadata(value, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm());
                             putInCache(itemId, value);
                             return value;
                         } else {
@@ -772,13 +786,28 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
 
     }
 
+    private void setMetadata(Item item, String id, long version, long seqNo, long primaryTerm) {
+        item.setItemId(id);
+        item.setVersion(version);
+        item.setSystemMetadata(SEQ_NO, seqNo);
+        item.setSystemMetadata(PRIMARY_TERM, primaryTerm);
+    }
+
     @Override
     public boolean save(final Item item) {
-        return save(item, useBatchingForSave);
+        return save(item, useBatchingForSave, alwaysOverwrite);
     }
 
     @Override
     public boolean save(final Item item, final boolean useBatching) {
+        return save(item, useBatching, alwaysOverwrite);
+    }
+
+    @Override
+    public boolean save(final Item item, final Boolean useBatchingOption, final Boolean alwaysOverwriteOption) {
+        final boolean useBatching = useBatchingOption == null ? this.useBatchingForSave : useBatchingOption;
+        final boolean alwaysOverwrite = alwaysOverwriteOption == null ? this.alwaysOverwrite : alwaysOverwriteOption;
+
         Boolean result =  new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".saveItem", this.bundleContext, this.fatalIllegalStateErrors) {
             protected Boolean execute(Object... args) throws Exception {
                 try {
@@ -790,13 +819,28 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                     IndexRequest indexRequest = new IndexRequest(index);
                     indexRequest.id(itemId);
                     indexRequest.source(source, XContentType.JSON);
+
+                    if (!alwaysOverwrite) {
+                        Long seqNo = (Long)item.getSystemMetadata(SEQ_NO);
+                        Long primaryTerm = (Long)item.getSystemMetadata(PRIMARY_TERM);
+
+                        if (seqNo != null && primaryTerm != null) {
+                            indexRequest.setIfSeqNo(seqNo);
+                            indexRequest.setIfPrimaryTerm(primaryTerm);
+                        }
+                        else {
+                            indexRequest.opType(DocWriteRequest.OpType.CREATE);
+                        }
+                    }
+
                     if (routingByType.containsKey(itemType)) {
                         indexRequest.routing(routingByType.get(itemType));
                     }
 
                     try {
                         if (bulkProcessor == null || !useBatching) {
-                            client.index(indexRequest, RequestOptions.DEFAULT);
+                            IndexResponse response = client.index(indexRequest, RequestOptions.DEFAULT);
+                            setMetadata(item, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm());
                         } else {
                             bulkProcessor.add(indexRequest);
                         }
@@ -819,26 +863,43 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
     }
 
     @Override
-    public boolean update(final String itemId, final Date dateHint, final Class clazz, final String propertyName, final Object propertyValue) {
-        return update(itemId, dateHint, clazz, Collections.singletonMap(propertyName, propertyValue));
+    public boolean update(final Item item, final Date dateHint, final Class clazz, final String propertyName, final Object propertyValue) {
+        return update(item, dateHint, clazz, Collections.singletonMap(propertyName, propertyValue));
     }
 
     @Override
-    public boolean update(final String itemId, final Date dateHint, final Class clazz, final Map source) {
+    public boolean update(final Item item, final Date dateHint, final Class clazz, final Map source) {
+        return update(item, dateHint, clazz, source, alwaysOverwrite);
+    }
+
+    @Override
+    public boolean update(final Item item, final Date dateHint, final Class clazz, final Map source, final boolean alwaysOverwrite) {
         Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateItem", this.bundleContext, this.fatalIllegalStateErrors) {
             protected Boolean execute(Object... args) throws Exception {
                 try {
                     String itemType = Item.getItemType(clazz);
-                    UpdateRequest updateRequest = new UpdateRequest(getIndex(itemType, dateHint), itemId);
+                    UpdateRequest updateRequest = new UpdateRequest(getIndex(itemType, dateHint), item.getItemId());
                     updateRequest.doc(source);
+
+                    if (!alwaysOverwrite) {
+                        Long seqNo = (Long)item.getSystemMetadata(SEQ_NO);
+                        Long primaryTerm = (Long)item.getSystemMetadata(PRIMARY_TERM);
+
+                        if (seqNo != null && primaryTerm != null) {
+                            updateRequest.setIfSeqNo(seqNo);
+                            updateRequest.setIfPrimaryTerm(primaryTerm);
+                        }
+                    }
+
                     if (bulkProcessor == null) {
-                        client.update(updateRequest, RequestOptions.DEFAULT);
+                        UpdateResponse response = client.update(updateRequest, RequestOptions.DEFAULT);
+                        setMetadata(item, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm());
                     } else {
                         bulkProcessor.add(updateRequest);
                     }
                     return true;
                 } catch (IndexNotFoundException e) {
-                    throw new Exception("No index found for itemType=" + clazz.getName() + "itemId=" + itemId, e);
+                    throw new Exception("No index found for itemType=" + clazz.getName() + "itemId=" + item.getItemId(), e);
                 }
             }
         }.catchingExecuteInClassLoader(true);
@@ -907,7 +968,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
     }
 
     @Override
-    public boolean updateWithScript(final String itemId, final Date dateHint, final Class<?> clazz, final String script, final Map<String, Object> scriptParams) {
+    public boolean updateWithScript(final Item item, final Date dateHint, final Class<?> clazz, final String script, final Map<String, Object> scriptParams) {
         Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateWithScript", this.bundleContext, this.fatalIllegalStateErrors) {
             protected Boolean execute(Object... args) throws Exception {
                 try {
@@ -917,17 +978,26 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
 
                     Script actualScript = new Script(ScriptType.INLINE, "painless", script, scriptParams);
 
-                    UpdateRequest updateRequest = new UpdateRequest(index, itemId);
+                    UpdateRequest updateRequest = new UpdateRequest(index, item.getItemId());
+
+                    Long seqNo = (Long)item.getSystemMetadata(SEQ_NO);
+                    Long primaryTerm = (Long)item.getSystemMetadata(PRIMARY_TERM);
+
+                    if (seqNo != null && primaryTerm != null) {
+                        updateRequest.setIfSeqNo(seqNo);
+                        updateRequest.setIfPrimaryTerm(primaryTerm);
+                    }
                     updateRequest.script(actualScript);
                     if (bulkProcessor == null) {
-                        client.update(updateRequest, RequestOptions.DEFAULT);
+                        UpdateResponse response = client.update(updateRequest, RequestOptions.DEFAULT);
+                        setMetadata(item, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm());
                     } else {
                         bulkProcessor.add(updateRequest);
                     }
 
                     return true;
                 } catch (IndexNotFoundException e) {
-                    throw new Exception("No index found for itemType=" + clazz.getName() + "itemId=" + itemId, e);
+                    throw new Exception("No index found for itemType=" + clazz.getName() + "itemId=" + item.getItemId(), e);
                 }
             }
         }.catchingExecuteInClassLoader(true);
@@ -1471,6 +1541,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                     SearchRequest searchRequest = new SearchRequest(getIndexNameForQuery(itemType));
                     SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
                             .fetchSource(true)
+                            .seqNoAndPrimaryTerm(true)
                             .query(query)
                             .size(size < 0 ? defaultQueryLimit : size)
                             .from(offset);
@@ -1528,8 +1599,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                                 // add hit to results
                                 String sourceAsString = searchHit.getSourceAsString();
                                 final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
-                                value.setItemId(searchHit.getId());
-                                value.setVersion(searchHit.getVersion());
+                                setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm());
                                 results.add(value);
                             }
 
@@ -1559,8 +1629,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                         for (SearchHit searchHit : searchHits) {
                             String sourceAsString = searchHit.getSourceAsString();
                             final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
-                            value.setItemId(searchHit.getId());
-                            value.setVersion(searchHit.getVersion());
+                            setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm());
                             results.add(value);
                         }
                     }
@@ -1606,8 +1675,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                             // add hit to results
                             String sourceAsString = searchHit.getSourceAsString();
                             final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
-                            value.setItemId(searchHit.getId());
-                            value.setVersion(searchHit.getVersion());
+                            setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm());
                             results.add(value);
                         }
                     }
diff --git a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 48402f3..6f40799 100644
--- a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -63,6 +63,8 @@
             <cm:property name="password" value="" />
             <cm:property name="sslEnable" value="false" />
             <cm:property name="sslTrustAllCertificates" value="false" />
+            <cm:property name="throwExceptions" value="false" />
+            <cm:property name="alwaysOverwrite" value="true" />
         </cm:default-properties>
     </cm:property-placeholder>
 
@@ -136,6 +138,8 @@
         <property name="password" value="${es.password}" />
         <property name="sslEnable" value="${es.sslEnable}" />
         <property name="sslTrustAllCertificates" value="${es.sslTrustAllCertificates}" />
+        <property name="throwExceptions" value="${es.throwExceptions}" />
+        <property name="alwaysOverwrite" value="${es.alwaysOverwrite}" />
     </bean>
 
     <!-- We use a listener here because using the list directly for listening to proxies coming from the same bundle didn't seem to work -->
diff --git a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
index c0e7b46..ac30c91 100644
--- a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
+++ b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
@@ -26,7 +26,6 @@ monthlyIndex.numberOfShards=${org.apache.unomi.elasticsearch.monthlyIndex.nbShar
 monthlyIndex.numberOfReplicas=${org.apache.unomi.elasticsearch.monthlyIndex.nbReplicas:-0}
 monthlyIndex.indexMappingTotalFieldsLimit=${org.apache.unomi.elasticsearch.monthlyIndex.indexMappingTotalFieldsLimit:-1000}
 monthlyIndex.indexMaxDocValueFieldsSearch=${org.apache.unomi.elasticsearch.monthlyIndex.indexMaxDocValueFieldsSearch:-1000}
-monthlyIndex.itemsMonthlyIndexedOverride=${org.apache.unomi.elasticsearch.monthlyIndex.itemsMonthlyIndexedOverride:-event,session}
 numberOfShards=${org.apache.unomi.elasticsearch.defaultIndex.nbShards:-5}
 numberOfReplicas=${org.apache.unomi.elasticsearch.defaultIndex.nbReplicas:-0}
 indexMappingTotalFieldsLimit=${org.apache.unomi.elasticsearch.defaultIndex.indexMappingTotalFieldsLimit:-1000}
@@ -70,4 +69,9 @@ aggQueryMaxResponseSizeHttp=${org.apache.unomi.elasticsearch.aggQueryMaxResponse
 username=${org.apache.unomi.elasticsearch.username:-}
 password=${org.apache.unomi.elasticsearch.password:-}
 sslEnable=${org.apache.unomi.elasticsearch.sslEnable:-false}
-sslTrustAllCertificates=${org.apache.unomi.elasticsearch.sslTrustAllCertificates:-false}
\ No newline at end of file
+sslTrustAllCertificates=${org.apache.unomi.elasticsearch.sslTrustAllCertificates:-false}
+
+# Errors
+throwExceptions=${org.apache.unomi.elasticsearch.throwExceptions:-false}
+
+alwaysOverwrite=${org.apache.unomi.elasticsearch.alwaysOverwrite:-true}
\ No newline at end of file
diff --git a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
index d85059e..f92d3bb 100644
--- a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
+++ b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
@@ -127,41 +127,65 @@ public interface PersistenceService {
     boolean save(Item item, boolean useBatching);
 
     /**
+     * Persists the specified Item in the context server.
+     *
+     * @param item the item to persist
+     * @param useBatching whether to use batching or not for saving the item. If activating there may be a delay between
+     *                 the call to this method and the actual saving in the persistence backend
+     * @param alwaysOverwrite whether to overwrite a document even if we are holding an old item when saving
+     *
+     * @return {@code true} if the item was properly persisted, {@code false} otherwise
+     */
+    boolean save(Item item, Boolean useBatching, Boolean alwaysOverwrite);
+
+    /**
      * Updates the item of the specified class and identified by the specified identifier with new property values provided as name - value pairs in the specified Map.
      *
-     * @param itemId   the identifier of the item we want to update
+     * @param item     the item we want to update
      * @param dateHint a Date helping in identifying where the item is located
      * @param clazz    the Item subclass of the item to update
      * @param source   a Map with entries specifying as key the property name to update and as value its new value
      * @return {@code true} if the update was successful, {@code false} otherwise
      */
-    boolean update(String itemId, Date dateHint, Class<?> clazz, Map<?, ?> source);
+    boolean update(Item item, Date dateHint, Class<?> clazz, Map<?, ?> source);
 
     /**
      * Updates the item of the specified class and identified by the specified identifier with a new property value for the specified property name. Same as
      * {@code update(itemId, dateHint, clazz, Collections.singletonMap(propertyName, propertyValue))}
      *
-     * @param itemId        the identifier of the item we want to update
+     * @param item          the item we want to update
      * @param dateHint      a Date helping in identifying where the item is located
      * @param clazz         the Item subclass of the item to update
      * @param propertyName  the name of the property to update
      * @param propertyValue the new value of the property
      * @return {@code true} if the update was successful, {@code false} otherwise
      */
-    boolean update(String itemId, Date dateHint, Class<?> clazz, String propertyName, Object propertyValue);
+    boolean update(Item item, Date dateHint, Class<?> clazz, String propertyName, Object propertyValue);
+
+    /**
+     * Updates the item of the specified class and identified by the specified identifier with new property values provided as name - value pairs in the specified Map.
+     *
+     * @param item     the item we want to update
+     * @param dateHint a Date helping in identifying where the item is located
+     * @param clazz    the Item subclass of the item to update
+     * @param source   a Map with entries specifying as key the property name to update and as value its new value
+     * @param alwaysOverwrite whether to overwrite a document even if we are holding an old item when saving
+     * @return {@code true} if the update was successful, {@code false} otherwise
+     */
+    boolean update(Item item, Date dateHint, Class<?> clazz, Map<?, ?> source, final boolean alwaysOverwrite);
 
     /**
      * Updates the item of the specified class and identified by the specified identifier with a new property value for the specified property name. Same as
      * {@code update(itemId, dateHint, clazz, Collections.singletonMap(propertyName, propertyValue))}
      *
-     * @param itemId        the identifier of the item we want to update
+     * @param item          the item we want to update
      * @param dateHint      a Date helping in identifying where the item is located
      * @param clazz         the Item subclass of the item to update
      * @param script        inline script
      * @param scriptParams  script params
      * @return {@code true} if the update was successful, {@code false} otherwise
      */
-    boolean updateWithScript(String itemId, Date dateHint, Class<?> clazz, String script, Map<String, Object> scriptParams);
+    boolean updateWithScript(Item item, Date dateHint, Class<?> clazz, String script, Map<String, Object> scriptParams);
 
     /**
      * Updates the items of the specified class by a query with a new property value for the specified property name
diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
index a496ddb..ffdf626 100644
--- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
+++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
@@ -178,12 +178,12 @@ public class MergeProfilesOnPropertyAction implements ActionExecutor {
                                     }
 
                                     for (Session session : sessions) {
-                                        persistenceService.update(session.getItemId(), session.getTimeStamp(), Session.class, "profileId", anonymousBrowsing ? null : masterProfileId);
+                                        persistenceService.update(session, session.getTimeStamp(), Session.class, "profileId", anonymousBrowsing ? null : masterProfileId);
                                     }
 
                                     List<Event> events = persistenceService.query("profileId", profileId, null, Event.class);
                                     for (Event event : events) {
-                                        persistenceService.update(event.getItemId(), event.getTimeStamp(), Event.class, "profileId", anonymousBrowsing ? null : masterProfileId);
+                                        persistenceService.update(event, event.getTimeStamp(), Event.class, "profileId", anonymousBrowsing ? null : masterProfileId);
                                     }
                                     // we must mark all the profiles that we merged into the master as merged with the master, and they will
                                     // be deleted upon next load
@@ -192,7 +192,7 @@ public class MergeProfilesOnPropertyAction implements ActionExecutor {
                                     sourceMap.put("mergedWith", masterProfileId);
                                     profile.setSystemProperty("lastUpdated", new Date());
                                     sourceMap.put("systemProperties", profile.getSystemProperties());
-                                    persistenceService.update(profile.getItemId(), null, Profile.class, sourceMap);
+                                    persistenceService.update(profile, null, Profile.class, sourceMap);
                                 }
                             }
                         } catch (Exception e) {
diff --git a/plugins/mail/src/main/java/org/apache/unomi/plugins/mail/actions/SendMailAction.java b/plugins/mail/src/main/java/org/apache/unomi/plugins/mail/actions/SendMailAction.java
index 8e03175..acdb1eb 100644
--- a/plugins/mail/src/main/java/org/apache/unomi/plugins/mail/actions/SendMailAction.java
+++ b/plugins/mail/src/main/java/org/apache/unomi/plugins/mail/actions/SendMailAction.java
@@ -116,7 +116,7 @@ public class SendMailAction implements ActionExecutor {
         event.getProfile().setSystemProperty("notificationAck", profileNotif);
         event.getProfile().setSystemProperty("lastUpdated", new Date());
 
-        persistenceService.update(event.getProfile().getItemId(), null, Profile.class, "systemProperties", event.getProfile().getSystemProperties());
+        persistenceService.update(event.getProfile(), null, Profile.class, "systemProperties", event.getProfile().getSystemProperties());
 
         ST stringTemplate = new ST(template, '$', '$');
         stringTemplate.add("profile", event.getProfile());
diff --git a/services/src/main/java/org/apache/unomi/services/impl/events/EventServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/events/EventServiceImpl.java
index 80b0449..25db0b5 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/events/EventServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/events/EventServiceImpl.java
@@ -152,7 +152,7 @@ public class EventServiceImpl implements EventService {
 
         boolean saveSucceeded = true;
         if (event.isPersistent()) {
-            saveSucceeded = persistenceService.save(event);
+            saveSucceeded = persistenceService.save(event, null, true);
         }
 
         int changes;
diff --git a/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java
index d77173c..734d856 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java
@@ -475,7 +475,7 @@ public class RulesServiceImpl implements RulesService, EventListenerService, Syn
             }
             allRuleStatistics.put(ruleStatistics.getItemId(), ruleStatistics);
             if (mustPersist) {
-                persistenceService.save(ruleStatistics);
+                persistenceService.save(ruleStatistics, null, true);
             }
         }
         // now let's iterate over the rules coming from the persistence service, as we may have new ones.
diff --git a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
index 36a2a67..e971432 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
@@ -365,7 +365,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
                 sourceMap.put("segments", profileToRemove.getSegments());
                 profileToRemove.setSystemProperty("lastUpdated", new Date());
                 sourceMap.put("systemProperties", profileToRemove.getSystemProperties());
-                persistenceService.update(profileToRemove.getItemId(), null, Profile.class, sourceMap);
+                persistenceService.update(profileToRemove, null, Profile.class, sourceMap);
                 updatedProfileCount++;
             }
             logger.info("Removed segment from {} profiles in {} ms", updatedProfileCount, System.currentTimeMillis() - profileRemovalStartTime);
@@ -724,7 +724,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
                 // todo remove profile properties ?
                 persistenceService.remove(previousRule.getItemId(), Rule.class);
             } else {
-                persistenceService.update(previousRule.getItemId(), null, Rule.class, "linkedItems", previousRule.getLinkedItems());
+                persistenceService.update(previousRule, null, Rule.class, "linkedItems", previousRule.getLinkedItems());
             }
         }
     }
@@ -862,7 +862,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
                         systemProperties.put("lastUpdated", new Date());
                         Profile profile = new Profile();
                         profile.setItemId(profileId);
-                        persistenceService.update(profile.getItemId(), null, Profile.class, "systemProperties", systemProperties);
+                        persistenceService.update(profile, null, Profile.class, "systemProperties", systemProperties);
                     } catch (Exception e) {
                         logger.error("Error updating profile {} past event system properties", profileId, e);
                     }
@@ -930,7 +930,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
                     sourceMap.put("segments", profileToAdd.getSegments());
                     profileToAdd.setSystemProperty("lastUpdated", new Date());
                     sourceMap.put("systemProperties", profileToAdd.getSystemProperties());
-                    persistenceService.update(profileToAdd.getItemId(), null, Profile.class, sourceMap);
+                    persistenceService.update(profileToAdd, null, Profile.class, sourceMap);
                     Event profileUpdated = new Event("profileUpdated", null, profileToAdd, null, null, profileToAdd, new Date());
                     profileUpdated.setPersistent(false);
                     eventService.send(profileUpdated);
@@ -950,7 +950,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
                     sourceMap.put("segments", profileToRemove.getSegments());
                     profileToRemove.setSystemProperty("lastUpdated", new Date());
                     sourceMap.put("systemProperties", profileToRemove.getSystemProperties());
-                    persistenceService.update(profileToRemove.getItemId(), null, Profile.class, sourceMap);
+                    persistenceService.update(profileToRemove, null, Profile.class, sourceMap);
                     Event profileUpdated = new Event("profileUpdated", null, profileToRemove, null, null, profileToRemove, new Date());
                     profileUpdated.setPersistent(false);
                     eventService.send(profileUpdated);
@@ -973,7 +973,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe
                     sourceMap.put("segments", profileToRemove.getSegments());
                     profileToRemove.setSystemProperty("lastUpdated", new Date());
                     sourceMap.put("systemProperties", profileToRemove.getSystemProperties());
-                    persistenceService.update(profileToRemove.getItemId(), null, Profile.class, sourceMap);
+                    persistenceService.update(profileToRemove, null, Profile.class, sourceMap);
                     Event profileUpdated = new Event("profileUpdated", null, profileToRemove, null, null, profileToRemove, new Date());
                     profileUpdated.setPersistent(false);
                     eventService.send(profileUpdated);