You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by jk...@apache.org on 2023/01/11 16:08:17 UTC
[unomi] branch monthlyToRolloverIndicesPOC created (now b77f91f87)
This is an automated email from the ASF dual-hosted git repository.
jkevan pushed a change to branch monthlyToRolloverIndicesPOC
in repository https://gitbox.apache.org/repos/asf/unomi.git
at b77f91f87 UNOMI-724: base POC and implem for rollover system to replace monthly indices
This branch includes the following new commits:
new b77f91f87 UNOMI-724: base POC and implem for rollover system to replace monthly indices
The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
[unomi] 01/01: UNOMI-724: base POC and implem for rollover system to replace monthly indices
Posted by jk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jkevan pushed a commit to branch monthlyToRolloverIndicesPOC
in repository https://gitbox.apache.org/repos/asf/unomi.git
commit b77f91f87b322fb761722d75fc823afe05886344
Author: Kevan <ke...@jahia.com>
AuthorDate: Wed Jan 11 17:07:57 2023 +0100
UNOMI-724: base POC and implem for rollover system to replace monthly indices
---
.../unomi/privacy/internal/PrivacyServiceImpl.java | 1 +
.../ElasticSearchPersistenceServiceImpl.java | 198 +++++++++++----------
.../unomi/persistence/spi/PersistenceService.java | 20 +--
.../actions/MergeProfilesOnPropertyAction.java | 5 +
.../services/impl/profiles/ProfileServiceImpl.java | 11 +-
5 files changed, 122 insertions(+), 113 deletions(-)
diff --git a/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java b/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java
index 9c59f3d2c..25633859c 100644
--- a/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java
+++ b/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java
@@ -142,6 +142,7 @@ public class PrivacyServiceImpl implements PrivacyService {
persistenceService.save(session);
List<Event> events = eventService.searchEvents(session.getItemId(), new String[0], null, 0, -1, null).getList();
for (Event event : events) {
+ // TODO dateHint not supported anymore here
persistenceService.update(event, event.getTimeStamp(), Event.class, "profileId", newProfile.getItemId());
}
}
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 63150feec..1b053426d 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -48,10 +48,10 @@ import org.apache.unomi.persistence.spi.aggregate.IpRangeAggregate;
import org.apache.unomi.persistence.spi.aggregate.NumericRangeAggregate;
import org.apache.unomi.persistence.spi.aggregate.TermsAggregate;
import org.elasticsearch.ElasticsearchStatusException;
-import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptRequest;
+import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
@@ -83,6 +83,7 @@ import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.client.core.CountResponse;
import org.elasticsearch.client.core.MainResponse;
+import org.elasticsearch.client.indexlifecycle.*;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
@@ -452,6 +453,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
throw new Exception("ElasticSearch version is not within [" + minimalVersion + "," + maximalVersion + "), aborting startup !");
}
+ createMonthlyIndexLifecyclePolicy();
+
loadPredefinedMappings(bundleContext, false);
// load predefined mappings and condition dispatchers of any bundles that were started before this one.
@@ -461,8 +464,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
}
- createMonthlyIndexTemplate();
-
if (client != null && bulkProcessor == null) {
bulkProcessor = getBulkProcessor();
}
@@ -677,7 +678,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
}
- private void loadPredefinedMappings(BundleContext bundleContext, boolean createMapping) {
+ private void loadPredefinedMappings(BundleContext bundleContext, boolean forceUpdateMapping) {
Enumeration<URL> predefinedMappings = bundleContext.getBundle().findEntries("META-INF/cxs/mappings", "*.json", true);
if (predefinedMappings == null) {
return;
@@ -692,14 +693,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
mappings.put(name, mappingSource);
- String itemIndexName = getIndex(name, new Date());
- if (!client.indices().exists(new GetIndexRequest(itemIndexName), RequestOptions.DEFAULT)) {
- logger.info("{} index doesn't exist yet, creating it...", itemIndexName);
- internalCreateIndex(itemIndexName, mappingSource);
- } else {
- logger.info("Found index {}", itemIndexName);
- if (createMapping) {
- logger.info("Updating mapping for {}", itemIndexName);
+ if (!createIndex(name)) {
+ logger.info("Found index for type {}", name);
+ if (forceUpdateMapping) {
+ logger.info("Updating mapping for {}", name);
createMapping(name, mappingSource);
}
}
@@ -773,7 +770,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
itemType = customItemType;
}
- if (itemsMonthlyIndexed.contains(itemType) && dateHint == null) {
+ if (itemsMonthlyIndexed.contains(itemType)) {
return new MetricAdapter<T>(metricsService, ".loadItemWithQuery") {
@Override
public T execute(Object... args) throws Exception {
@@ -792,12 +789,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
}.execute();
} else {
- GetRequest getRequest = new GetRequest(getIndex(itemType, dateHint), itemId);
+ GetRequest getRequest = new GetRequest(getIndex(itemType), itemId);
GetResponse response = client.get(getRequest, RequestOptions.DEFAULT);
if (response.isExists()) {
String sourceAsString = response.getSourceAsString();
final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
- setMetadata(value, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm());
+ setMetadata(value, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), response.getIndex());
return value;
} else {
return null;
@@ -820,11 +817,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
- private void setMetadata(Item item, String id, long version, long seqNo, long primaryTerm) {
+ private void setMetadata(Item item, String id, long version, long seqNo, long primaryTerm, String index) {
item.setItemId(id);
item.setVersion(version);
item.setSystemMetadata(SEQ_NO, seqNo);
item.setSystemMetadata(PRIMARY_TERM, primaryTerm);
+ item.setSystemMetadata("index", index);
}
@Override
@@ -858,7 +856,9 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
className = CustomItem.class.getName() + "." + itemType;
}
String itemId = item.getItemId();
- String index = getIndex(itemType, itemsMonthlyIndexed.contains(itemType) ? ((TimestampedItem) item).getTimeStamp() : null);
+ String index = item.getSystemMetadata("index") != null ?
+ (String) item.getSystemMetadata("index") :
+ getIndex(itemType);
IndexRequest indexRequest = new IndexRequest(index);
indexRequest.id(itemId);
indexRequest.source(source, XContentType.JSON);
@@ -884,7 +884,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
if (bulkProcessor == null || !useBatching) {
indexRequest.setRefreshPolicy(getRefreshPolicy(itemType));
IndexResponse response = client.index(indexRequest, RequestOptions.DEFAULT);
- setMetadata(item, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm());
+ setMetadata(item, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), response.getIndex());
} else {
bulkProcessor.add(indexRequest);
}
@@ -921,11 +921,11 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateItem", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
protected Boolean execute(Object... args) throws Exception {
try {
- UpdateRequest updateRequest = createUpdateRequest(clazz, dateHint, item, source, alwaysOverwrite);
+ UpdateRequest updateRequest = createUpdateRequest(clazz, item, source, alwaysOverwrite);
if (bulkProcessor == null || !useBatchingForUpdate) {
UpdateResponse response = client.update(updateRequest, RequestOptions.DEFAULT);
- setMetadata(item, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm());
+ setMetadata(item, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), response.getIndex());
} else {
bulkProcessor.add(updateRequest);
}
@@ -942,9 +942,9 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
}
- private UpdateRequest createUpdateRequest(Class clazz, Date dateHint, Item item, Map source, boolean alwaysOverwrite) {
+ private UpdateRequest createUpdateRequest(Class clazz, Item item, Map source, boolean alwaysOverwrite) {
String itemType = Item.getItemType(clazz);
- UpdateRequest updateRequest = new UpdateRequest(getIndex(itemType, dateHint), item.getItemId());
+ UpdateRequest updateRequest = new UpdateRequest(getIndex(itemType), item.getItemId());
updateRequest.doc(source);
if (!alwaysOverwrite) {
@@ -970,7 +970,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
BulkRequest bulkRequest = new BulkRequest();
items.forEach((item, source) -> {
- UpdateRequest updateRequest = createUpdateRequest(clazz, dateHint, item, source, alwaysOverwrite);
+ UpdateRequest updateRequest = createUpdateRequest(clazz, item, source, alwaysOverwrite);
bulkRequest.add(updateRequest);
});
@@ -999,7 +999,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
for (int i = 0; i < scripts.length; i++) {
builtScripts[i] = new Script(ScriptType.INLINE, "painless", scripts[i], scriptParams[i]);
}
- return updateWithQueryAndScript(dateHint, clazz, builtScripts, conditions);
+ return updateWithQueryAndScript(clazz, builtScripts, conditions);
}
@Override
@@ -1008,16 +1008,16 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
for (int i = 0; i < scripts.length; i++) {
builtScripts[i] = new Script(ScriptType.STORED, null, scripts[i], scriptParams[i]);
}
- return updateWithQueryAndScript(dateHint, clazz, builtScripts, conditions);
+ return updateWithQueryAndScript(clazz, builtScripts, conditions);
}
- private boolean updateWithQueryAndScript(final Date dateHint, final Class<?> clazz, final Script[] scripts, final Condition[] conditions) {
+ private boolean updateWithQueryAndScript(final Class<?> clazz, final Script[] scripts, final Condition[] conditions) {
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateWithQueryAndScript", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
protected Boolean execute(Object... args) throws Exception {
try {
String itemType = Item.getItemType(clazz);
- String index = getIndex(itemType, dateHint);
+ String index = getIndex(itemType);
for (int i = 0; i < scripts.length; i++) {
RefreshRequest refreshRequest = new RefreshRequest(index);
@@ -1110,7 +1110,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
try {
String itemType = Item.getItemType(clazz);
- String index = getIndex(itemType, dateHint);
+ String index = getIndex(itemType);
Script actualScript = new Script(ScriptType.INLINE, "painless", script, scriptParams);
@@ -1126,7 +1126,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
updateRequest.script(actualScript);
if (bulkProcessor == null) {
UpdateResponse response = client.update(updateRequest, RequestOptions.DEFAULT);
- setMetadata(item, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm());
+ setMetadata(item, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), response.getIndex());
} else {
bulkProcessor.add(updateRequest);
}
@@ -1281,40 +1281,23 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
}
- public boolean createMonthlyIndexTemplate() {
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".createMonthlyIndexTemplate", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
+ public boolean createMonthlyIndexLifecyclePolicy() {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".createMonthlyIndexLifecyclePolicy", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
protected Boolean execute(Object... args) throws IOException {
- boolean executedSuccessfully = true;
- for (String itemName : itemsMonthlyIndexed) {
- PutIndexTemplateRequest putIndexTemplateRequest = new PutIndexTemplateRequest(indexPrefix + "-" + itemName + "-date-template")
- .patterns(Collections.singletonList(getMonthlyIndexForQuery(itemName)))
- .order(1)
- .settings("{\n" +
- " \"index\" : {\n" +
- " \"number_of_shards\" : " + monthlyIndexNumberOfShards + ",\n" +
- " \"number_of_replicas\" : " + monthlyIndexNumberOfReplicas + ",\n" +
- " \"mapping.total_fields.limit\" : " + monthlyIndexMappingTotalFieldsLimit + ",\n" +
- " \"max_docvalue_fields_search\" : " + monthlyIndexMaxDocValueFieldsSearch + "\n" +
- " },\n" +
- " \"analysis\": {\n" +
- " \"analyzer\": {\n" +
- " \"folding\": {\n" +
- " \"type\":\"custom\",\n" +
- " \"tokenizer\": \"keyword\",\n" +
- " \"filter\": [ \"lowercase\", \"asciifolding\" ]\n" +
- " }\n" +
- " }\n" +
- " }\n" +
- "}\n", XContentType.JSON);
- if (mappings.get(itemName) == null) {
- logger.warn("Couldn't find mapping for item {}, won't create monthly index template", itemName);
- return false;
- }
- putIndexTemplateRequest.mapping(mappings.get(itemName), XContentType.JSON);
- AcknowledgedResponse putIndexTemplateResponse = client.indices().putTemplate(putIndexTemplateRequest, RequestOptions.DEFAULT);
- executedSuccessfully &= putIndexTemplateResponse.isAcknowledged();
- }
- return executedSuccessfully;
+ // Create the lifecycle policy for monthly indices
+ Map<String, Phase> phases = new HashMap<>();
+ Map<String, LifecycleAction> hotActions = new HashMap<>();
+ // TODO configure the rollover correctly, here it's 50000 bytes to test the rollover (5 sessions should trigger the rollover)
+ hotActions.put(RolloverAction.NAME, new RolloverAction(new ByteSizeValue(50000, ByteSizeUnit.BYTES), null, null));
+ phases.put("hot", new Phase("hot", TimeValue.ZERO, hotActions));
+
+ Map<String, LifecycleAction> deleteActions = Collections.singletonMap(DeleteAction.NAME, new DeleteAction());
+ phases.put("delete", new Phase("delete", new TimeValue(90, TimeUnit.DAYS), deleteActions));
+
+ LifecyclePolicy policy = new LifecyclePolicy("monthly-index-policy", phases);
+ PutLifecyclePolicyRequest request = new PutLifecyclePolicyRequest(policy);
+ org.elasticsearch.client.core.AcknowledgedResponse putLifecyclePolicy = client.indexLifecycle().putLifecyclePolicy(request, RequestOptions.DEFAULT);
+ return putLifecyclePolicy.isAcknowledged();
}
}.catchingExecuteInClassLoader(true);
if (result == null) {
@@ -1325,15 +1308,22 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
public boolean createIndex(final String itemType) {
- String index = getIndex(itemType);
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".createIndex", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
protected Boolean execute(Object... args) throws IOException {
+ String index = getIndex(itemType);
GetIndexRequest getIndexRequest = new GetIndexRequest(index);
boolean indexExists = client.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
+
if (!indexExists) {
- internalCreateIndex(index, mappings.get(itemType));
+ if (itemsMonthlyIndexed.contains(itemType)) {
+ internalCreateMonthlyIndexTemplate(itemType);
+ internalCreateMonthlyIndex(index);
+ } else {
+ internalCreateIndex(index, mappings.get(itemType));
+ }
}
+
return !indexExists;
}
}.catchingExecuteInClassLoader(true);
@@ -1367,6 +1357,47 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
}
+ private void internalCreateMonthlyIndexTemplate(String itemName) throws IOException {
+ String rolloverAlias = indexPrefix + "-" + itemName;
+ PutIndexTemplateRequest putIndexTemplateRequest = new PutIndexTemplateRequest(rolloverAlias + "-date-template")
+ .patterns(Collections.singletonList(getMonthlyIndexForQuery(itemName)))
+ .order(1)
+ .settings("{\n" +
+ " \"index\" : {\n" +
+ " \"number_of_shards\" : " + monthlyIndexNumberOfShards + ",\n" +
+ " \"number_of_replicas\" : " + monthlyIndexNumberOfReplicas + ",\n" +
+ " \"mapping.total_fields.limit\" : " + monthlyIndexMappingTotalFieldsLimit + ",\n" +
+ " \"max_docvalue_fields_search\" : " + monthlyIndexMaxDocValueFieldsSearch + ",\n" +
+ " \"lifecycle.name\": \"monthly-index-policy\",\n" +
+ " \"lifecycle.rollover_alias\": \"" + rolloverAlias + "\"" +
+ "" +
+ " },\n" +
+ " \"analysis\": {\n" +
+ " \"analyzer\": {\n" +
+ " \"folding\": {\n" +
+ " \"type\":\"custom\",\n" +
+ " \"tokenizer\": \"keyword\",\n" +
+ " \"filter\": [ \"lowercase\", \"asciifolding\" ]\n" +
+ " }\n" +
+ " }\n" +
+ " }\n" +
+ "}\n", XContentType.JSON);
+ if (mappings.get(itemName) == null) {
+ logger.warn("Couldn't find mapping for item {}, won't create monthly index template", itemName);
+ return;
+ }
+ putIndexTemplateRequest.mapping(mappings.get(itemName), XContentType.JSON);
+ client.indices().putTemplate(putIndexTemplateRequest, RequestOptions.DEFAULT);
+ }
+
+ private void internalCreateMonthlyIndex(String indexName) throws IOException {
+ CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName + "-000001")
+ .alias(new Alias(indexName).writeIndex(true));
+ CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
+ logger.info("Index created: [{}], acknowledge: [{}], shards acknowledge: [{}]", createIndexResponse.index(),
+ createIndexResponse.isAcknowledged(), createIndexResponse.isShardsAcknowledged());
+ }
+
private void internalCreateIndex(String indexName, String mappingSource) throws IOException {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
createIndexRequest.settings("{\n" +
@@ -1398,16 +1429,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
@Override
public void createMapping(String type, String source) {
try {
- if (itemsMonthlyIndexed.contains(type)) {
- createMonthlyIndexTemplate();
- String indexName = getIndex(type, new Date());
- GetIndexRequest getIndexRequest = new GetIndexRequest(indexName);
- if (client.indices().exists(getIndexRequest, RequestOptions.DEFAULT)) {
- putMapping(source, indexName);
- }
- } else {
- putMapping(source, getIndex(type));
- }
+ putMapping(source, getIndex(type));
} catch (IOException ioe) {
logger.error("Error while creating mapping for type " + type + " and source " + source, ioe);
}
@@ -1611,7 +1633,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
//Index the query = register it in the percolator
try {
logger.info("Saving query : " + queryName);
- String index = getIndex(".percolator", null);
+ String index = getIndex(".percolator");
IndexRequest indexRequest = new IndexRequest(index);
indexRequest.id(queryName);
indexRequest.source(query, XContentType.JSON);
@@ -1645,7 +1667,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
protected Boolean execute(Object... args) throws Exception {
//Index the query = register it in the percolator
try {
- String index = getIndex(".percolator", null);
+ String index = getIndex(".percolator");
DeleteRequest deleteRequest = new DeleteRequest(index);
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.delete(deleteRequest, RequestOptions.DEFAULT);
@@ -1884,7 +1906,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
// add hit to results
String sourceAsString = searchHit.getSourceAsString();
final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
- setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm());
+ setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm(), searchHit.getIndex());
results.add(value);
}
@@ -1914,7 +1936,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
for (SearchHit searchHit : searchHits) {
String sourceAsString = searchHit.getSourceAsString();
final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
- setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm());
+ setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm(), searchHit.getIndex());
results.add(value);
}
}
@@ -1960,7 +1982,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
// add hit to results
String sourceAsString = searchHit.getSourceAsString();
final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
- setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm());
+ setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm(), searchHit.getIndex());
results.add(value);
}
}
@@ -2001,7 +2023,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
// add hit to results
String sourceAsString = searchHit.getSourceAsString();
final CustomItem value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, CustomItem.class);
- setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm());
+ setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm(), searchHit.getIndex());
results.add(value);
}
}
@@ -2244,7 +2266,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
protected Boolean execute(Object... args) {
try {
String itemType = Item.getItemType(clazz);
- String index = getIndex(itemType, dateHint);
+ String index = getIndex(itemType);
client.indices().refresh(Requests.refreshRequest(index), RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();//TODO manage ES7
@@ -2485,27 +2507,17 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
private String getIndexNameForQuery(String itemType) {
- return itemsMonthlyIndexed.contains(itemType) ? getMonthlyIndexForQuery(itemType) : getIndex(itemType, null);
+ return itemsMonthlyIndexed.contains(itemType) ? getMonthlyIndexForQuery(itemType) : getIndex(itemType);
}
private String getMonthlyIndexForQuery(String itemType) {
- return indexPrefix + "-" + itemType.toLowerCase() + "-" + INDEX_DATE_PREFIX + "*";
- }
-
- private String getIndex(String itemType, Date dateHint) {
- String indexItemTypePart = itemsMonthlyIndexed.contains(itemType) && dateHint != null ? itemType + "-" + getMonthlyIndexPart(dateHint) : itemType;
- return getIndex(indexItemTypePart);
+ return indexPrefix + "-" + itemType.toLowerCase() + "-*";
}
private String getIndex(String indexItemTypePart) {
return (indexPrefix + "-" + indexItemTypePart).toLowerCase();
}
- private String getMonthlyIndexPart(Date date) {
- String d = new SimpleDateFormat("yyyy-MM").format(date);
- return INDEX_DATE_PREFIX + d;
- }
-
private WriteRequest.RefreshPolicy getRefreshPolicy(String itemType) {
if (itemTypeToRefreshPolicy.containsKey(itemType)) {
return itemTypeToRefreshPolicy.get(itemType);
diff --git a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
index 625a5948e..a2770e2d5 100644
--- a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
+++ b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
@@ -125,7 +125,7 @@ public interface PersistenceService {
* Updates the item of the specified class and identified by the specified identifier with new property values provided as name - value pairs in the specified Map.
*
* @param item the item we want to update
- * @param dateHint a Date helping in identifying where the item is located
+ * @param dateHint deprecated
* @param clazz the Item subclass of the item to update
* @param source a Map with entries specifying as key the property name to update and as value its new value
* @return {@code true} if the update was successful, {@code false} otherwise
@@ -137,7 +137,7 @@ public interface PersistenceService {
* {@code update(itemId, dateHint, clazz, Collections.singletonMap(propertyName, propertyValue))}
*
* @param item the item we want to update
- * @param dateHint a Date helping in identifying where the item is located
+ * @param dateHint deprecated
* @param clazz the Item subclass of the item to update
* @param propertyName the name of the property to update
* @param propertyValue the new value of the property
@@ -149,7 +149,7 @@ public interface PersistenceService {
* Updates the item of the specified class and identified by the specified identifier with new property values provided as name - value pairs in the specified Map.
*
* @param item the item we want to update
- * @param dateHint a Date helping in identifying where the item is located
+ * @param dateHint deprecated
* @param clazz the Item subclass of the item to update
* @param source a Map with entries specifying as key the property name to update and as value its new value
* @param alwaysOverwrite whether to overwrite a document even if we are holding an old item when saving
@@ -162,7 +162,7 @@ public interface PersistenceService {
* {@code update(itemId, dateHint, clazz, Collections.singletonMap(propertyName, propertyValue))}
*
* @param items A map the consist of item (key) and properties to update (value)
- * @param dateHint a Date helping in identifying where the item is located
+ * @param dateHint deprecated
* @param clazz the Item subclass of the item to update
* @return List of failed Items Ids, if all succesful then returns an empty list. if the whole operation failed then will return null
*/
@@ -173,7 +173,7 @@ public interface PersistenceService {
* {@code update(itemId, dateHint, clazz, Collections.singletonMap(propertyName, propertyValue))}
*
* @param item the item we want to update
- * @param dateHint a Date helping in identifying where the item is located
+ * @param dateHint deprecated
* @param clazz the Item subclass of the item to update
* @param script inline script
* @param scriptParams script params
@@ -185,7 +185,7 @@ public interface PersistenceService {
* Updates the items of the specified class by a query with a new property value for the specified property name
* based on provided scripts and script parameters
*
- * @param dateHint a Date helping in identifying where the item is located
+ * @param dateHint deprecated
* @param clazz the Item subclass of the item to update
* @param scripts inline scripts array
* @param scriptParams script params array
@@ -198,7 +198,7 @@ public interface PersistenceService {
* Updates the items of the specified class by a query with a new property value for the specified property name
* based on provided stored scripts and script parameters
*
- * @param dateHint a Date helping in identifying where the item is located
+ * @param dateHint deprecated
* @param clazz the Item subclass of the item to update
* @param scripts Stored scripts name
* @param scriptParams script params array
@@ -230,7 +230,7 @@ public interface PersistenceService {
*
* @param <T> the type of the Item subclass we want to retrieve
* @param itemId the identifier of the item we want to retrieve
- * @param dateHint a Date helping in identifying where the item is located
+ * @param dateHint deprecated
* @param clazz the {@link Item} subclass of the item we want to retrieve
* @return the item identified with the specified identifier and with the specified Item subclass if it exists, {@code null} otherwise
*/
@@ -239,7 +239,7 @@ public interface PersistenceService {
/**
* Load a custom item type identified by an identifier, an optional date hint and the identifier of the custom item type
* @param itemId the identifier of the custom type we want to retrieve
- * @param dateHint an optional Date object if the custom item types are stored by date
+ * @param dateHint deprecated
* @param customItemType an identifier of the custom item type to load
* @return the CustomItem instance with the specified identifier and the custom item type if it exists, {@code null} otherwise
*/
@@ -610,7 +610,7 @@ public interface PersistenceService {
* Updates the persistence's engine specific index.
*
* @param clazz will use an index by class type
- * @param dateHint for index with time, can be null
+ * @param dateHint deprecated
* @param <T> a class that extends Item
*/
<T extends Item> void refreshIndex(Class<T> clazz, Date dateHint);
diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
index 752068594..9e64ef075 100644
--- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
+++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
@@ -162,6 +162,7 @@ public class MergeProfilesOnPropertyAction implements ActionExecutor {
// Update current event explicitly, as it might not return from search query if there wasn't a refresh in ES
if (!StringUtils.equals(profileId, masterProfileId)) {
if (currentEvent.isPersistent()) {
+ // TODO dateHint not supported anymore here
persistenceService.update(currentEvent, currentEvent.getTimeStamp(), Event.class, "profileId", anonymousBrowsing ? null : masterProfileId);
}
}
@@ -169,6 +170,7 @@ public class MergeProfilesOnPropertyAction implements ActionExecutor {
for (Profile profile : profiles) {
String profileId = profile.getItemId();
if (!StringUtils.equals(profileId, masterProfileId)) {
+ // TODO consider udpate by query and/or script
List<Session> sessions = persistenceService.query("profileId", profileId, null, Session.class);
if (currentSession != null) {
if (masterProfileId.equals(profileId) && !sessions.contains(currentSession)) {
@@ -177,12 +179,15 @@ public class MergeProfilesOnPropertyAction implements ActionExecutor {
}
for (Session session : sessions) {
+ // TODO dateHint not supported anymore here
persistenceService.update(session, session.getTimeStamp(), Session.class, "profileId", anonymousBrowsing ? null : masterProfileId);
}
+ // TODO consider udpate by query and/or script
List<Event> events = persistenceService.query("profileId", profileId, null, Event.class);
for (Event event : events) {
if (!event.getItemId().equals(currentEvent.getItemId())) {
+ // TODO dateHint not supported anymore here
persistenceService.update(event, event.getTimeStamp(), Event.class, "profileId", anonymousBrowsing ? null : masterProfileId);
}
}
diff --git a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
index bc93fd0c5..8ec136ef2 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
@@ -848,16 +848,7 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList
}
public Session loadSession(String sessionId, Date dateHint) {
- Session s = persistenceService.load(sessionId, dateHint, Session.class);
- if (s == null && dateHint != null) {
- GregorianCalendar gc = new GregorianCalendar();
- gc.setTime(dateHint);
- if (gc.get(Calendar.DAY_OF_MONTH) == 1) {
- gc.add(Calendar.DAY_OF_MONTH, -1);
- s = persistenceService.load(sessionId, gc.getTime(), Session.class);
- }
- }
- return s;
+ return persistenceService.load(sessionId, null, Session.class);
}
public Session saveSession(Session session) {