You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by jk...@apache.org on 2023/03/08 11:11:09 UTC
[unomi] branch master updated: UNOMI-737: indices reduction migration (ES document id suffixed to av… (#582)
This is an automated email from the ASF dual-hosted git repository.
jkevan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/master by this push:
new f5eed2ccb UNOMI-737: indices reduction migration (ES document id suffixed to av… (#582)
f5eed2ccb is described below
commit f5eed2ccbe3b970ca91e1762e5a3e4722096fb9a
Author: kevan Jahanshahi <jk...@apache.org>
AuthorDate: Wed Mar 8 12:11:03 2023 +0100
UNOMI-737: indices reduction migration (ES document id suffixed to av… (#582)
* UNOMI-737: indices reduction migration (ES document id suffixed to avoid conflicts)
* UNOMI-737: add integration tests
* UNOMI-737: add integration test
* UNOMI-737: un-skip a test about purge
---
.../services/impl/GroovyActionsServiceImpl.java | 25 +-
.../test/java/org/apache/unomi/itests/BaseIT.java | 6 -
.../unomi/itests/GroovyActionsServiceIT.java | 2 +-
.../org/apache/unomi/itests/ProfileServiceIT.java | 1 -
.../unomi/itests/migration/Migrate16xTo220IT.java | 45 +++-
.../ElasticSearchPersistenceServiceImpl.java | 298 ++++++++++-----------
.../META-INF/cxs/mappings/personaSession.json | 41 +++
.../unomi/persistence/spi/PersistenceService.java | 25 +-
.../services/impl/profiles/ProfileServiceImpl.java | 46 +---
.../services/impl/rules/RulesServiceImpl.java | 22 +-
.../migrate-2.2.0-05-indicesReduction.groovy | 58 ++--
11 files changed, 264 insertions(+), 305 deletions(-)
diff --git a/extensions/groovy-actions/services/src/main/java/org/apache/unomi/groovy/actions/services/impl/GroovyActionsServiceImpl.java b/extensions/groovy-actions/services/src/main/java/org/apache/unomi/groovy/actions/services/impl/GroovyActionsServiceImpl.java
index 5761cc1c5..7b215c187 100644
--- a/extensions/groovy-actions/services/src/main/java/org/apache/unomi/groovy/actions/services/impl/GroovyActionsServiceImpl.java
+++ b/extensions/groovy-actions/services/src/main/java/org/apache/unomi/groovy/actions/services/impl/GroovyActionsServiceImpl.java
@@ -79,7 +79,6 @@ public class GroovyActionsServiceImpl implements GroovyActionsService {
private static final Logger logger = LoggerFactory.getLogger(GroovyActionsServiceImpl.class.getName());
private static final String BASE_SCRIPT_NAME = "BaseScript";
- private static final String GROOVY_SOURCE_CODE_ID_SUFFIX = "-groovySourceCode";
private DefinitionsService definitionsService;
private PersistenceService persistenceService;
@@ -217,21 +216,20 @@ public class GroovyActionsServiceImpl implements GroovyActionsService {
@Override
public void remove(String id) {
- String groovySourceCodeId = getGroovyCodeSourceIdForActionId(id);
- if (groovyCodeSourceMap.containsKey(groovySourceCodeId)) {
+ if (groovyCodeSourceMap.containsKey(id)) {
try {
definitionsService.removeActionType(
- groovyShell.parse(groovyCodeSourceMap.get(groovySourceCodeId)).getClass().getMethod("execute").getAnnotation(Action.class).id());
+ groovyShell.parse(groovyCodeSourceMap.get(id)).getClass().getMethod("execute").getAnnotation(Action.class).id());
} catch (NoSuchMethodException e) {
logger.error("Failed to delete the action type for the id {}", id, e);
}
- persistenceService.remove(groovySourceCodeId, GroovyAction.class);
+ persistenceService.remove(id, GroovyAction.class);
}
}
@Override
public GroovyCodeSource getGroovyCodeSource(String id) {
- return groovyCodeSourceMap.get(getGroovyCodeSourceIdForActionId(id));
+ return groovyCodeSourceMap.get(id);
}
/**
@@ -245,21 +243,10 @@ public class GroovyActionsServiceImpl implements GroovyActionsService {
return new GroovyCodeSource(groovyScript, actionName, "/groovy/script");
}
- /**
- * We use a suffix for avoiding id conflict between the actionType and the groovyAction in ElasticSearch
- * Since those items are now stored in the same ES index
- * @param actionName name/id of the actionType
- * @return id of the groovyAction source code for query/save/storage usage.
- */
- private String getGroovyCodeSourceIdForActionId(String actionName) {
- return actionName + GROOVY_SOURCE_CODE_ID_SUFFIX;
- }
-
private void saveScript(String actionName, String script) {
- String groovyName = getGroovyCodeSourceIdForActionId(actionName);
- GroovyAction groovyScript = new GroovyAction(groovyName, script);
+ GroovyAction groovyScript = new GroovyAction(actionName, script);
persistenceService.save(groovyScript);
- logger.info("The script {} has been persisted.", groovyName);
+ logger.info("The script {} has been persisted.", actionName);
}
private void refreshGroovyActions() {
diff --git a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
index 1f5ebf1a4..7e4e20cfa 100644
--- a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
@@ -214,12 +214,6 @@ public abstract class BaseIT extends KarafTestSupport {
refreshPersistence();
}
- protected void recreateIndex(final String itemType) {
- if (persistenceService.removeIndex(itemType)) {
- persistenceService.createIndex(itemType);
- }
- }
-
protected void refreshPersistence() throws InterruptedException {
persistenceService.refresh();
Thread.sleep(1000);
diff --git a/itests/src/test/java/org/apache/unomi/itests/GroovyActionsServiceIT.java b/itests/src/test/java/org/apache/unomi/itests/GroovyActionsServiceIT.java
index 7fecea183..f86ff7f7e 100644
--- a/itests/src/test/java/org/apache/unomi/itests/GroovyActionsServiceIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/GroovyActionsServiceIT.java
@@ -123,7 +123,7 @@ public class GroovyActionsServiceIT extends BaseIT {
GroovyCodeSource groovyCodeSource = keepTrying("Failed waiting for the creation of the GroovyAction for the save test",
() -> groovyActionsService.getGroovyCodeSource(UPDATE_ADDRESS_ACTION), Objects::nonNull, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
- Assert.assertEquals(UPDATE_ADDRESS_ACTION + "-groovySourceCode", groovyActionsService.getGroovyCodeSource(UPDATE_ADDRESS_ACTION).getName());
+ Assert.assertEquals(UPDATE_ADDRESS_ACTION, groovyActionsService.getGroovyCodeSource(UPDATE_ADDRESS_ACTION).getName());
Assert.assertTrue(actionType.getMetadata().getId().contains(UPDATE_ADDRESS_GROOVY_ACTION));
Assert.assertEquals(2, actionType.getMetadata().getSystemTags().size());
diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
index 455ab9073..67cc5c16e 100644
--- a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
@@ -331,7 +331,6 @@ public class ProfileServiceIT extends BaseIT {
}
@Test
- @Ignore // TODO - fix test https://issues.apache.org/jira/browse/UNOMI-726
public void testMonthlyIndicesPurge() throws Exception {
Date currentDate = new Date();
LocalDateTime minus10Months = LocalDateTime.ofInstant(currentDate.toInstant(), ZoneId.systemDefault()).minusMonths(10);
diff --git a/itests/src/test/java/org/apache/unomi/itests/migration/Migrate16xTo220IT.java b/itests/src/test/java/org/apache/unomi/itests/migration/Migrate16xTo220IT.java
index 8ae93daf4..7dc321268 100644
--- a/itests/src/test/java/org/apache/unomi/itests/migration/Migrate16xTo220IT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/migration/Migrate16xTo220IT.java
@@ -40,7 +40,10 @@ public class Migrate16xTo220IT extends BaseIT {
private int sessionCount = 0;
private static final int NUMBER_DUPLICATE_SESSIONS = 3;
- private static final int NUMBER_PERSONA_SESSIONS = 2;
+ private static final List<String> oldSystemItemsIndices = Arrays.asList("context-actiontype", "context-campaign", "context-campaignevent", "context-goal",
+ "context-userlist", "context-propertytype", "context-scope", "context-conditiontype", "context-rule", "context-scoring", "context-segment", "context-groovyaction", "context-topic",
+ "context-patch", "context-jsonschema", "context-importconfig", "context-exportconfig", "context-rulestats");
+
@Override
@Before
public void waitForStartup() throws InterruptedException {
@@ -56,8 +59,9 @@ public class Migrate16xTo220IT extends BaseIT {
}
// Restore the snapshot
HttpUtils.executePostRequest(httpClient, "http://localhost:9400/_snapshot/snapshots_repository/snapshot_1.6.x/_restore?wait_for_completion=true", "{}", null);
- fillNumberEventAndSessionBeforeMigration(httpClient);
+ // Get initial counts of items to compare after migration
+ initCounts(httpClient);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -93,6 +97,7 @@ public class Migrate16xTo220IT extends BaseIT {
checkEventTypesNotPersistedAnymore();
checkForMappingUpdates();
checkEventSessionRollover2_2_0();
+ checkIndexReductions2_2_0();
}
/**
@@ -107,17 +112,25 @@ public class Migrate16xTo220IT extends BaseIT {
int newEventcount = 0;
for (String eventIndex : MigrationUtils.getIndexesPrefixedBy(httpClient, "http://localhost:9400", "context-event-0")) {
- JsonNode jsonNode = objectMapper.readTree(HttpUtils.executePostRequest(httpClient, "http://localhost:9400" + "/" + eventIndex + "/_count", resourceAsString("migration/match_all_body_request.json"), null));
- newEventcount += jsonNode.get("count").asInt();
+ newEventcount += countItems(httpClient, eventIndex, null);
}
int newSessioncount = 0;
for (String sessionIndex : MigrationUtils.getIndexesPrefixedBy(httpClient, "http://localhost:9400", "context-session-0")) {
- JsonNode jsonNode = objectMapper.readTree(HttpUtils.executePostRequest(httpClient, "http://localhost:9400" + "/" + sessionIndex + "/_count", resourceAsString("migration/match_all_body_request.json"), null));
- newSessioncount += jsonNode.get("count").asInt();
+ newSessioncount += countItems(httpClient, sessionIndex, null);
}
Assert.assertEquals(eventCount, newEventcount);
- Assert.assertEquals(sessionCount - NUMBER_DUPLICATE_SESSIONS + NUMBER_PERSONA_SESSIONS, newSessioncount);
+ Assert.assertEquals(sessionCount - NUMBER_DUPLICATE_SESSIONS, newSessioncount);
+ }
+
+ private void checkIndexReductions2_2_0() throws IOException {
+ // new index for system items:
+ Assert.assertTrue(MigrationUtils.indexExists(httpClient, "http://localhost:9400", "context-systemitems"));
+
+ // old indices should be removed:
+ for (String oldSystemItemsIndex : oldSystemItemsIndices) {
+ Assert.assertFalse(MigrationUtils.indexExists(httpClient, "http://localhost:9400", oldSystemItemsIndex));
+ }
}
/**
@@ -298,21 +311,25 @@ public class Migrate16xTo220IT extends BaseIT {
Assert.assertNull(persistenceService.load(masterProfile, ProfileAlias.class));
}
- private void fillNumberEventAndSessionBeforeMigration(CloseableHttpClient httpClient) {
+ private void initCounts(CloseableHttpClient httpClient) {
try {
for (String eventIndex : MigrationUtils.getIndexesPrefixedBy(httpClient, "http://localhost:9400", "context-event-date")) {
- JsonNode jsonNode = objectMapper.readTree(HttpUtils.executePostRequest(httpClient, "http://localhost:9400" + "/" + eventIndex + "/_count", resourceAsString("migration/must_not_match_some_eventype_body.json"), null));
- eventCount += jsonNode.get("count").asInt();
+ eventCount += countItems(httpClient, eventIndex, resourceAsString("migration/must_not_match_some_eventype_body.json"));
}
- for (String eventIndex : MigrationUtils.getIndexesPrefixedBy(httpClient, "http://localhost:9400", "context-session-date")) {
- JsonNode jsonNode = objectMapper.readTree(HttpUtils.executePostRequest(httpClient, "http://localhost:9400" + "/" + eventIndex + "/_count", resourceAsString("migration/match_all_body_request.json"), null));
- sessionCount += jsonNode.get("count").asInt();
+ for (String sessionIndex : MigrationUtils.getIndexesPrefixedBy(httpClient, "http://localhost:9400", "context-session-date")) {
+ sessionCount += countItems(httpClient, sessionIndex, null);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
+ }
-
+ private int countItems(CloseableHttpClient httpClient, String index, String requestBody) throws IOException {
+ if (requestBody == null) {
+ requestBody = resourceAsString("migration/must_not_match_some_eventype_body.json");
+ }
+ JsonNode jsonNode = objectMapper.readTree(HttpUtils.executePostRequest(httpClient, "http://localhost:9400" + "/" + index + "/_count", requestBody, null));
+ return jsonNode.get("count").asInt();
}
}
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index d14d284d9..f0929887b 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -148,22 +148,9 @@ import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.cert.X509Certificate;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
@@ -249,31 +236,16 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
private Map<String, Map<String, Map<String, Object>>> knownMappings = new HashMap<>();
private static final Map<String, String> itemTypeIndexNameMap = new HashMap<>();
+ private static final Collection<String> systemItems = Arrays.asList("actionType", "campaign", "campaignevent", "goal",
+ "userList", "propertyType", "scope", "conditionType", "rule", "scoring", "segment", "groovyAction", "topic",
+ "patch", "jsonSchema", "importConfig", "exportConfig", "rulestats");
static {
- itemTypeIndexNameMap.put("actionType", "systemItems");
- itemTypeIndexNameMap.put("campaign", "systemItems");
- itemTypeIndexNameMap.put("campaignevent", "systemItems");
- itemTypeIndexNameMap.put("goal", "systemItems");
- itemTypeIndexNameMap.put("userList", "systemItems");
- itemTypeIndexNameMap.put("propertyType", "systemItems");
- itemTypeIndexNameMap.put("scope", "systemItems");
- itemTypeIndexNameMap.put("conditionType", "systemItems");
- itemTypeIndexNameMap.put("rule", "systemItems");
- itemTypeIndexNameMap.put("scoring", "systemItems");
- itemTypeIndexNameMap.put("segment", "systemItems");
- itemTypeIndexNameMap.put("groovyAction", "systemItems");
- itemTypeIndexNameMap.put("topic", "systemItems");
- itemTypeIndexNameMap.put("patch", "systemItems");
- itemTypeIndexNameMap.put("jsonSchema", "systemItems");
- itemTypeIndexNameMap.put("importConfig", "systemItems");
- itemTypeIndexNameMap.put("exportConfig", "systemItems");
- itemTypeIndexNameMap.put("rulestats", "systemItems");
+ for (String systemItem : systemItems) {
+ itemTypeIndexNameMap.put(systemItem, "systemItems");
+ }
itemTypeIndexNameMap.put("profile", "profile");
itemTypeIndexNameMap.put("persona", "profile");
-
- itemTypeIndexNameMap.put("session", "session");
- itemTypeIndexNameMap.put("personaSession", "session");
}
public void setBundleContext(BundleContext bundleContext) {
@@ -840,19 +812,20 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
if (customItemType != null) {
itemType = customItemType;
}
+ String documentId = getDocumentIDForItemType(itemId, itemType);
- String affinityIndex = "session".equals(itemType) && sessionAffinityCache.containsKey(itemId) ? sessionAffinityCache.get(itemId) : null;
+ String affinityIndex = "session".equals(itemType) && sessionAffinityCache.containsKey(documentId) ? sessionAffinityCache.get(documentId) : null;
if (affinityIndex == null && isItemTypeRollingOver(itemType)) {
return new MetricAdapter<T>(metricsService, ".loadItemWithQuery") {
@Override
public T execute(Object... args) throws Exception {
if (customItemType == null) {
- PartialList<T> r = query(QueryBuilders.idsQuery().addIds(itemId), null, clazz, 0, 1, null, null);
+ PartialList<T> r = query(QueryBuilders.idsQuery().addIds(documentId), null, clazz, 0, 1, null, null);
if (r.size() > 0) {
return r.get(0);
}
} else {
- PartialList<CustomItem> r = query(QueryBuilders.idsQuery().addIds(itemId), null, customItemType, 0, 1, null, null);
+ PartialList<CustomItem> r = query(QueryBuilders.idsQuery().addIds(documentId), null, customItemType, 0, 1, null, null);
if (r.size() > 0) {
return (T) r.get(0);
}
@@ -861,12 +834,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
}.execute();
} else {
- GetRequest getRequest = new GetRequest(affinityIndex != null ? affinityIndex : getIndex(itemType), itemId);
+ GetRequest getRequest = new GetRequest(affinityIndex != null ? affinityIndex : getIndex(itemType), documentId);
GetResponse response = client.get(getRequest, RequestOptions.DEFAULT);
if (response.isExists()) {
String sourceAsString = response.getSourceAsString();
final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
- setMetadata(value, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), response.getIndex());
+ setMetadata(value, response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), response.getIndex());
return value;
} else {
return null;
@@ -889,15 +862,11 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
- private void setMetadata(Item item, String id, long version, long seqNo, long primaryTerm, String index) {
- item.setItemId(id);
+ private void setMetadata(Item item, long version, long seqNo, long primaryTerm, String index) {
item.setVersion(version);
item.setSystemMetadata(SEQ_NO, seqNo);
item.setSystemMetadata(PRIMARY_TERM, primaryTerm);
item.setSystemMetadata("index", index);
- if (item.getItemType().equals("session") && !sessionAffinityCache.containsKey(id)) {
- sessionAffinityCache.put(id, index);
- }
}
@Override
@@ -925,17 +894,14 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
try {
String source = ESCustomObjectMapper.getObjectMapper().writeValueAsString(item);
String itemType = item.getItemType();
- String className = item.getClass().getName();
if (item instanceof CustomItem) {
itemType = ((CustomItem) item).getCustomItemType();
- className = CustomItem.class.getName() + "." + itemType;
}
- String itemId = item.getItemId();
- String index = item.getSystemMetadata("index") != null ?
- (String) item.getSystemMetadata("index") :
- getIndex(itemType);
+ String documentId = getDocumentIDForItemType(item.getItemId(), itemType);
+ String index = item.getSystemMetadata("index") != null ? (String) item.getSystemMetadata("index") : getIndex(itemType);
+
IndexRequest indexRequest = new IndexRequest(index);
- indexRequest.id(itemId);
+ indexRequest.id(documentId);
indexRequest.source(source, XContentType.JSON);
if (!alwaysOverwrite) {
@@ -958,13 +924,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
if (bulkProcessor == null || !useBatching) {
indexRequest.setRefreshPolicy(getRefreshPolicy(itemType));
IndexResponse response = client.index(indexRequest, RequestOptions.DEFAULT);
- setMetadata(item, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), response.getIndex());
+ setMetadata(item, response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), response.getIndex());
} else {
bulkProcessor.add(indexRequest);
}
} catch (IndexNotFoundException e) {
- logger.error("Could not find index {}, could not register item type {} with id {} ",
- index, itemType, itemId, e);
+ logger.error("Could not find index {}, could not register item type {} with id {} ", index, itemType, item.getItemId(), e);
return false;
}
return true;
@@ -1015,7 +980,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
if (bulkProcessor == null || !useBatchingForUpdate) {
UpdateResponse response = client.update(updateRequest, RequestOptions.DEFAULT);
- setMetadata(item, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), response.getIndex());
+ setMetadata(item, response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), response.getIndex());
} else {
bulkProcessor.add(updateRequest);
}
@@ -1034,7 +999,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
private UpdateRequest createUpdateRequest(Class clazz, Item item, Map source, boolean alwaysOverwrite) {
String itemType = Item.getItemType(clazz);
- UpdateRequest updateRequest = new UpdateRequest(getIndex(itemType), item.getItemId());
+ String documentId = getDocumentIDForItemType(item.getItemId(), itemType);
+ String index = getIndex(itemType);
+
+ UpdateRequest updateRequest = new UpdateRequest(index, documentId);
updateRequest.doc(source);
if (!alwaysOverwrite) {
@@ -1115,7 +1083,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
protected Boolean execute(Object... args) throws Exception {
try {
String itemType = Item.getItemType(clazz);
-
String index = getIndex(itemType);
for (int i = 0; i < scripts.length; i++) {
@@ -1213,12 +1180,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
protected Boolean execute(Object... args) throws Exception {
try {
String itemType = Item.getItemType(clazz);
-
String index = getIndex(itemType);
+ String documentId = getDocumentIDForItemType(item.getItemId(), itemType);
Script actualScript = new Script(ScriptType.INLINE, "painless", script, scriptParams);
- UpdateRequest updateRequest = new UpdateRequest(index, item.getItemId());
+ UpdateRequest updateRequest = new UpdateRequest(index, documentId);
Long seqNo = (Long) item.getSystemMetadata(SEQ_NO);
Long primaryTerm = (Long) item.getSystemMetadata(PRIMARY_TERM);
@@ -1230,7 +1197,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
updateRequest.script(actualScript);
if (bulkProcessor == null) {
UpdateResponse response = client.update(updateRequest, RequestOptions.DEFAULT);
- setMetadata(item, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), response.getIndex());
+ setMetadata(item, response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), response.getIndex());
} else {
bulkProcessor.add(updateRequest);
}
@@ -1266,8 +1233,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
if (customItemType != null) {
itemType = customItemType;
}
+ String documentId = getDocumentIDForItemType(itemId, itemType);
+ String index = getIndexNameForQuery(itemType);
- DeleteRequest deleteRequest = new DeleteRequest(getIndexNameForQuery(itemType), itemId);
+ DeleteRequest deleteRequest = new DeleteRequest(index, documentId);
client.delete(deleteRequest, RequestOptions.DEFAULT);
return true;
} catch (Exception e) {
@@ -1285,78 +1254,81 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
public <T extends Item> boolean removeByQuery(final Condition query, final Class<T> clazz) {
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeByQuery", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
protected Boolean execute(Object... args) throws Exception {
- try {
- String itemType = Item.getItemType(clazz);
- QueryBuilder queryBuilder = conditionESQueryBuilderDispatcher.getQueryBuilder(query);
- final DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(getIndexNameForQuery(itemType))
- .setQuery(isItemTypeSharingIndex(itemType) ? wrapWithItemTypeQuery(itemType, queryBuilder) : queryBuilder)
- // Setting slices to auto will let Elasticsearch choose the number of slices to use.
- // This setting will use one slice per shard, up to a certain limit.
- // The delete request will be more efficient and faster than no slicing.
- .setSlices(AbstractBulkByScrollRequest.AUTO_SLICES)
- // Elasticsearch takes a snapshot of the index when you hit delete by query request and uses the _version of the documents to process the request.
- // If a document gets updated in the meantime, it will result in a version conflict error and the delete operation will fail.
- // So we explicitly set the conflict strategy to proceed in case of version conflict.
- .setAbortOnVersionConflict(false)
- // Remove by Query is mostly used for purge and cleaning up old data
- // It's mostly used in jobs/timed tasks so we don't really care about long request
- // So we increase default timeout of 1min to 10min
- .setTimeout(TimeValue.timeValueMinutes(removeByQueryTimeoutInMinutes));
-
- BulkByScrollResponse bulkByScrollResponse = client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
-
- if (bulkByScrollResponse == null) {
- logger.error("Remove by query: no response returned for query: {}", query);
- return false;
- }
+ QueryBuilder queryBuilder = conditionESQueryBuilderDispatcher.getQueryBuilder(query);
+ return removeByQuery(queryBuilder, clazz);
+ }
+ }.catchingExecuteInClassLoader(true);
+ if (result == null) {
+ return false;
+ } else {
+ return result;
+ }
+ }
- if (bulkByScrollResponse.isTimedOut()) {
- logger.warn("Remove by query: timed out because took more than {} minutes for query: {}", removeByQueryTimeoutInMinutes, query);
- }
+ public <T extends Item> boolean removeByQuery(QueryBuilder queryBuilder, final Class<T> clazz) throws Exception {
+ try {
+ String itemType = Item.getItemType(clazz);
+ final DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(getIndexNameForQuery(itemType))
+ .setQuery(isItemTypeSharingIndex(itemType) ? wrapWithItemTypeQuery(itemType, queryBuilder) : queryBuilder)
+ // Setting slices to auto will let Elasticsearch choose the number of slices to use.
+ // This setting will use one slice per shard, up to a certain limit.
+ // The delete request will be more efficient and faster than no slicing.
+ .setSlices(AbstractBulkByScrollRequest.AUTO_SLICES)
+ // Elasticsearch takes a snapshot of the index when you hit delete by query request and uses the _version of the documents to process the request.
+ // If a document gets updated in the meantime, it will result in a version conflict error and the delete operation will fail.
+ // So we explicitly set the conflict strategy to proceed in case of version conflict.
+ .setAbortOnVersionConflict(false)
+ // Remove by Query is mostly used for purge and cleaning up old data
+ // It's mostly used in jobs/timed tasks so we don't really care about long request
+ // So we increase default timeout of 1min to 10min
+ .setTimeout(TimeValue.timeValueMinutes(removeByQueryTimeoutInMinutes));
+
+ BulkByScrollResponse bulkByScrollResponse = client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
+
+ if (bulkByScrollResponse == null) {
+ logger.error("Remove by query: no response returned for query: {}", queryBuilder);
+ return false;
+ }
- if ((bulkByScrollResponse.getSearchFailures() != null && bulkByScrollResponse.getSearchFailures().size() > 0) ||
- bulkByScrollResponse.getBulkFailures() != null && bulkByScrollResponse.getBulkFailures().size() > 0) {
- logger.warn("Remove by query: we found some failure during the process of query: {}", query);
+ if (bulkByScrollResponse.isTimedOut()) {
+ logger.warn("Remove by query: timed out because took more than {} minutes for query: {}", removeByQueryTimeoutInMinutes, queryBuilder);
+ }
- if (bulkByScrollResponse.getSearchFailures() != null && bulkByScrollResponse.getSearchFailures().size() > 0) {
- for (ScrollableHitSource.SearchFailure searchFailure : bulkByScrollResponse.getSearchFailures()) {
- logger.warn("Remove by query, search failure: {}", searchFailure.toString());
- }
- }
+ if ((bulkByScrollResponse.getSearchFailures() != null && bulkByScrollResponse.getSearchFailures().size() > 0) ||
+ bulkByScrollResponse.getBulkFailures() != null && bulkByScrollResponse.getBulkFailures().size() > 0) {
+ logger.warn("Remove by query: we found some failure during the process of query: {}", queryBuilder);
- if (bulkByScrollResponse.getBulkFailures() != null && bulkByScrollResponse.getBulkFailures().size() > 0) {
- for (BulkItemResponse.Failure bulkFailure : bulkByScrollResponse.getBulkFailures()) {
- logger.warn("Remove by query, bulk failure: {}", bulkFailure.toString());
- }
- }
+ if (bulkByScrollResponse.getSearchFailures() != null && bulkByScrollResponse.getSearchFailures().size() > 0) {
+ for (ScrollableHitSource.SearchFailure searchFailure : bulkByScrollResponse.getSearchFailures()) {
+ logger.warn("Remove by query, search failure: {}", searchFailure.toString());
}
+ }
- if (logger.isDebugEnabled()) {
- logger.debug("Remove by query: took {}, deleted docs: {}, batches executed: {}, skipped docs: {}, version conflicts: {}, search retries: {}, bulk retries: {}, for query: {}",
- bulkByScrollResponse.getTook().toHumanReadableString(1),
- bulkByScrollResponse.getDeleted(),
- bulkByScrollResponse.getBatches(),
- bulkByScrollResponse.getNoops(),
- bulkByScrollResponse.getVersionConflicts(),
- bulkByScrollResponse.getSearchRetries(),
- bulkByScrollResponse.getBulkRetries(),
- query);
+ if (bulkByScrollResponse.getBulkFailures() != null && bulkByScrollResponse.getBulkFailures().size() > 0) {
+ for (BulkItemResponse.Failure bulkFailure : bulkByScrollResponse.getBulkFailures()) {
+ logger.warn("Remove by query, bulk failure: {}", bulkFailure.toString());
}
-
- return true;
- } catch (Exception e) {
- throw new Exception("Cannot remove by query", e);
}
}
- }.catchingExecuteInClassLoader(true);
- if (result == null) {
- return false;
- } else {
- return result;
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Remove by query: took {}, deleted docs: {}, batches executed: {}, skipped docs: {}, version conflicts: {}, search retries: {}, bulk retries: {}, for query: {}",
+ bulkByScrollResponse.getTook().toHumanReadableString(1),
+ bulkByScrollResponse.getDeleted(),
+ bulkByScrollResponse.getBatches(),
+ bulkByScrollResponse.getNoops(),
+ bulkByScrollResponse.getVersionConflicts(),
+ bulkByScrollResponse.getSearchRetries(),
+ bulkByScrollResponse.getBulkRetries(),
+ queryBuilder);
+ }
+
+ return true;
+ } catch (Exception e) {
+ throw new Exception("Cannot remove by query", e);
}
}
-
public boolean indexTemplateExists(final String templateName) {
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".indexTemplateExists", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
protected Boolean execute(Object... args) throws IOException {
@@ -1444,8 +1416,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
}
- public boolean removeIndex(final String itemType, boolean addPrefix){
- String index = addPrefix ? getIndex(itemType) : itemType;
+ public boolean removeIndex(final String itemType) {
+ String index = getIndex(itemType);
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeIndex", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
protected Boolean execute(Object... args) throws IOException {
@@ -1465,9 +1437,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
return result;
}
}
- public boolean removeIndex(final String itemType) {
- return removeIndex(itemType, true);
- }
private void internalCreateRolloverTemplate(String itemName) throws IOException {
String rolloverAlias = indexPrefix + "-" + itemName;
@@ -1830,9 +1799,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
try {
final Class<? extends Item> clazz = item.getClass();
String itemType = Item.getItemType(clazz);
+ String documentId = getDocumentIDForItemType(item.getItemId(), itemType);
QueryBuilder builder = QueryBuilders.boolQuery()
- .must(QueryBuilders.idsQuery().addIds(item.getItemId()))
+ .must(QueryBuilders.idsQuery().addIds(documentId))
.must(conditionESQueryBuilderDispatcher.buildFilter(query));
return queryCount(builder, itemType) > 0;
} finally {
@@ -1918,29 +1888,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
}
- @Override
- public Map<String, Long> docCountPerIndex(String... indexes) {
- return new InClassLoaderExecute<Map<String, Long>>(metricsService, this.getClass().getName() + ".docCountPerIndex", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
- @Override
- protected Map<String, Long> execute(Object... args) throws IOException {
- List<String> indexesForQuery = Stream.of(indexes).map(index -> getIndexNameForQuery(index)).collect(Collectors.toList());
- String[] itemsArray = new String[indexesForQuery.size()];
- itemsArray = indexesForQuery.toArray(itemsArray);
- GetIndexRequest request = new GetIndexRequest(itemsArray);
- GetIndexResponse getIndexResponse = client.indices().get(request, RequestOptions.DEFAULT);
-
- Map<String, Long> countPerIndex = new HashMap<>();
-
- for (String index : getIndexResponse.getIndices()) {
- CountRequest countRequest = new CountRequest(index);
- CountResponse response = client.count(countRequest, RequestOptions.DEFAULT);
- countPerIndex.put(index, response.getCount());
- }
- return countPerIndex;
- }
- }.catchingExecuteInClassLoader(true);
- }
-
private long queryCount(final QueryBuilder filter, final String itemType) {
return new InClassLoaderExecute<Long>(metricsService, this.getClass().getName() + ".queryCount", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
@@ -2041,7 +1988,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
// add hit to results
String sourceAsString = searchHit.getSourceAsString();
final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
- setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm(), searchHit.getIndex());
+ setMetadata(value, searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm(), searchHit.getIndex());
results.add(value);
}
@@ -2071,7 +2018,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
for (SearchHit searchHit : searchHits) {
String sourceAsString = searchHit.getSourceAsString();
final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
- setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm(), searchHit.getIndex());
+ setMetadata(value, searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm(), searchHit.getIndex());
results.add(value);
}
}
@@ -2117,7 +2064,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
// add hit to results
String sourceAsString = searchHit.getSourceAsString();
final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
- setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm(), searchHit.getIndex());
+ setMetadata(value, searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm(), searchHit.getIndex());
results.add(value);
}
}
@@ -2158,7 +2105,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
// add hit to results
String sourceAsString = searchHit.getSourceAsString();
final CustomItem value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, CustomItem.class);
- setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm(), searchHit.getIndex());
+ setMetadata(value, searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm(), searchHit.getIndex());
results.add(value);
}
}
@@ -2420,6 +2367,42 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
@Override
public void purge(final Date date) {
+ // nothing, this method is deprecated since 2.2.0
+ }
+
+ @Override
+ public <T extends Item> void purgeTimeBasedItems(int existsNumberOfDays, Class<T> clazz) {
+ new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".purgeTimeBasedItems", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
+ protected Boolean execute(Object... args) throws Exception {
+ String itemType = Item.getItemType(clazz);
+
+ if (existsNumberOfDays > 0 && isItemTypeRollingOver(itemType)) {
+ // First we purge the documents
+ removeByQuery(QueryBuilders.rangeQuery("timeStamp").lte("now-" + existsNumberOfDays + "d"), clazz);
+
+ // get count per index for those time based data
+ TreeMap<String, Long> countsPerIndex = new TreeMap<>();
+ GetIndexResponse getIndexResponse = client.indices().get(new GetIndexRequest(getIndexNameForQuery(itemType)), RequestOptions.DEFAULT);
+ for (String index : getIndexResponse.getIndices()) {
+ countsPerIndex.put(index, client.count(new CountRequest(index), RequestOptions.DEFAULT).getCount());
+ }
+
+ // Check for count=0 and remove them
+ if (countsPerIndex.size() >= 1) {
+ // do not check the last index, because it's the one used to write documents
+ countsPerIndex.pollLastEntry();
+
+ for (Map.Entry<String, Long> indexCount : countsPerIndex.entrySet()) {
+ if (indexCount.getValue() == 0) {
+ client.indices().delete(new DeleteIndexRequest(indexCount.getKey()), RequestOptions.DEFAULT);
+ }
+ }
+ }
+ }
+
+ return true;
+ }
+ }.catchingExecuteInClassLoader(true);
}
@Override
@@ -2632,6 +2615,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
return itemTypeIndexNameMap.getOrDefault(itemType, itemType);
}
+ private String getDocumentIDForItemType(String itemId, String itemType) {
+ return systemItems.contains(itemType) ? (itemId + "_" + itemType.toLowerCase()) : itemId;
+ }
+
private QueryBuilder wrapWithItemTypeQuery(String itemType, QueryBuilder originalQuery) {
BoolQueryBuilder wrappedQuery = QueryBuilders.boolQuery();
wrappedQuery.must(getItemTypeQueryBuilder(itemType));
@@ -2657,5 +2644,4 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
return WriteRequest.RefreshPolicy.NONE;
}
-
}
diff --git a/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/personaSession.json b/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/personaSession.json
new file mode 100644
index 000000000..c635e0285
--- /dev/null
+++ b/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/personaSession.json
@@ -0,0 +1,41 @@
+{
+ "dynamic_templates": [
+ {
+ "all": {
+ "match": "*",
+ "match_mapping_type": "string",
+ "mapping": {
+ "type": "text",
+ "analyzer": "folding",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ }
+ }
+ }
+ ],
+ "properties": {
+ "duration": {
+ "type": "long"
+ },
+ "timeStamp": {
+ "type": "date"
+ },
+ "lastEventDate": {
+ "type": "date"
+ },
+ "properties": {
+ "properties": {
+ "location": {
+ "type": "geo_point"
+ }
+ }
+ },
+ "size": {
+ "type": "long"
+ }
+ }
+}
\ No newline at end of file
diff --git a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
index 751a34c85..29c196a2b 100644
--- a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
+++ b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
@@ -292,6 +292,7 @@ public interface PersistenceService {
/**
* @deprecated use {@link #loadCustomItem(String, String)}
*/
+ @Deprecated
CustomItem loadCustomItem(String itemId, Date dateHint, String customItemType);
/**
@@ -676,22 +677,19 @@ public interface PersistenceService {
<T extends Item> void refreshIndex(Class<T> clazz, Date dateHint);
/**
- * Purges all data in the context server up to the specified date, not included.
- *
- * @param date the date (not included) before which we want to erase all data
+ * deprecated: (use: purgeTimeBasedItems instead)
*/
@Deprecated
void purge(Date date);
/**
- * Retrieves the number of document per indexes.
- * If the index is a rollover index, each rollover index will be return with its own number of document
- * For example: with "event" as parameter, the indexes named ...-event-000001, ...-event-000002 and so one will be returned
+ * Purges time based data in the context server up to the specified days number of existence.
+ * (This only works for time based data stored in rolling over indices, it have no effect on other types)
*
- * @param indexes names of the indexes to count the documents
- * @return Map where the key in the index name and the value is the number of document for this index
+ * @param existsNumberOfDays the number of days
+ * @param clazz the item type to be purged
*/
- Map<String, Long> docCountPerIndex(String... indexes);
+ <T extends Item> void purgeTimeBasedItems(int existsNumberOfDays, Class<T> clazz);
/**
* Retrieves all items of the specified Item subclass which specified ranged property is within the specified bounds, ordered according to the specified {@code sortBy} String
@@ -744,15 +742,6 @@ public interface PersistenceService {
*/
boolean removeIndex(final String itemType);
- /**
- * Removes the index for the specified item type.
- *
- * @param itemType the item type
- * @param addPrefix should add the index prefix to the itemType passed as parameter
- * @return {@code true} if the operation was successful, {@code false} otherwise
- */
- boolean removeIndex(final String itemType, boolean addPrefix);
-
/**
* Removes all data associated with the provided scope.
*
diff --git a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
index 7912cc6de..8221f19ff 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
@@ -370,40 +370,11 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList
}
}
- private <T extends Item> void purgeRolloverItems(int existsNumberOfDays, Class<T> clazz) {
- if (existsNumberOfDays > 0) {
- String conditionType = null;
- String itemType = null;
-
- if (clazz.getName().equals(Event.class.getName())) {
- conditionType = "eventPropertyCondition";
- itemType = Event.ITEM_TYPE;
- } else if (clazz.getName().equals(Session.class.getName())) {
- conditionType = "sessionPropertyCondition";
- itemType = Session.ITEM_TYPE;
- }
-
- ConditionType propertyConditionType = definitionsService.getConditionType(conditionType);
- if (propertyConditionType == null) {
- // definition service not yet fully instantiate
- return;
- }
-
- Condition condition = new Condition(propertyConditionType);
-
- condition.setParameter("propertyName", "timeStamp");
- condition.setParameter("comparisonOperator", "lessThanOrEqualTo");
- condition.setParameter("propertyValueDateExpr", "now-" + existsNumberOfDays + "d");
- persistenceService.removeByQuery(condition, clazz);
- deleteEmptyRolloverIndex(itemType);
- }
- }
-
@Override
public void purgeSessionItems(int existsNumberOfDays) {
if (existsNumberOfDays > 0) {
logger.info("Purging: Sessions created since more than {} days", existsNumberOfDays);
- purgeRolloverItems(existsNumberOfDays, Session.class);
+ persistenceService.purgeTimeBasedItems(existsNumberOfDays, Session.class);
}
}
@@ -411,7 +382,7 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList
public void purgeEventItems(int existsNumberOfDays) {
if (existsNumberOfDays > 0) {
logger.info("Purging: Events created since more than {} days", existsNumberOfDays);
- purgeRolloverItems(existsNumberOfDays, Event.class);
+ persistenceService.purgeTimeBasedItems(existsNumberOfDays, Event.class);
}
}
@@ -421,19 +392,6 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList
}
- public void deleteEmptyRolloverIndex(String indexName) {
- TreeMap<String, Long> countsPerIndex = new TreeMap<>(persistenceService.docCountPerIndex(indexName));
- if (countsPerIndex.size() >= 1) {
- // do not check the last index, because it's the one used to write documents
- countsPerIndex.pollLastEntry();
- countsPerIndex.forEach((index, count) -> {
- if (count == 0) {
- persistenceService.removeIndex(index, false);
- }
- });
- }
- }
-
private void initializePurge() {
logger.info("Purge: Initializing");
diff --git a/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java
index fafec7bcf..ea6109064 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java
@@ -46,8 +46,6 @@ import java.util.stream.Collectors;
public class RulesServiceImpl implements RulesService, EventListenerService, SynchronousBundleListener {
- public static final String RULE_QUERY_PREFIX = "rule_";
- private static final String RULE_STAT_ID_SUFFIX = "-stat";
public static final String TRACKED_PARAMETER = "trackedConditionParameters";
private static final Logger logger = LoggerFactory.getLogger(RulesServiceImpl.class.getName());
@@ -253,10 +251,9 @@ public class RulesServiceImpl implements RulesService, EventListenerService, Syn
}
private RuleStatistics getLocalRuleStatistics(Rule rule) {
- String ruleStatisticsId = getRuleStatisticId(rule.getItemId());
- RuleStatistics ruleStatistics = this.allRuleStatistics.get(ruleStatisticsId);
+ RuleStatistics ruleStatistics = this.allRuleStatistics.get(rule.getItemId());
if (ruleStatistics == null) {
- ruleStatistics = new RuleStatistics(ruleStatisticsId);
+ ruleStatistics = new RuleStatistics(rule.getItemId());
}
return ruleStatistics;
}
@@ -267,10 +264,6 @@ public class RulesServiceImpl implements RulesService, EventListenerService, Syn
allRuleStatistics.put(ruleStatistics.getItemId(), ruleStatistics);
}
- private String getRuleStatisticId(String ruleID) {
- return ruleID + RULE_STAT_ID_SUFFIX;
- }
-
public void refreshRules() {
try {
// we use local variables to make sure we quickly switch the collections since the refresh is called often
@@ -344,17 +337,14 @@ public class RulesServiceImpl implements RulesService, EventListenerService, Syn
@Override
public RuleStatistics getRuleStatistics(String ruleId) {
- String ruleStatisticsId = getRuleStatisticId(ruleId);
- if (allRuleStatistics.containsKey(ruleStatisticsId)) {
- return allRuleStatistics.get(ruleStatisticsId);
+ if (allRuleStatistics.containsKey(ruleId)) {
+ return allRuleStatistics.get(ruleId);
}
- return persistenceService.load(ruleStatisticsId, RuleStatistics.class);
+ return persistenceService.load(ruleId, RuleStatistics.class);
}
public Map<String, RuleStatistics> getAllRuleStatistics() {
- return allRuleStatistics.keySet().stream()
- .collect(Collectors.toMap(key -> key.endsWith(RULE_STAT_ID_SUFFIX) ?
- key.substring(0, key.length() - RULE_STAT_ID_SUFFIX.length()) : key, allRuleStatistics::get));
+ return allRuleStatistics;
}
@Override
diff --git a/tools/shell-commands/src/main/resources/META-INF/cxs/migration/migrate-2.2.0-05-indicesReduction.groovy b/tools/shell-commands/src/main/resources/META-INF/cxs/migration/migrate-2.2.0-05-indicesReduction.groovy
index 8a07a5278..631f0bced 100644
--- a/tools/shell-commands/src/main/resources/META-INF/cxs/migration/migrate-2.2.0-05-indicesReduction.groovy
+++ b/tools/shell-commands/src/main/resources/META-INF/cxs/migration/migrate-2.2.0-05-indicesReduction.groovy
@@ -23,6 +23,28 @@ MigrationContext context = migrationContext
String esAddress = context.getConfigString("esAddress")
String indexPrefix = context.getConfigString("indexPrefix")
String baseSettings = MigrationUtils.resourceAsString(bundleContext, "requestBody/2.0.0/base_index_mapping.json")
+def indicesToReduce = [
+ actiontype: [reduceTo: "systemitems", renameId: true],
+ campaign: [reduceTo: "systemitems", renameId: true],
+ campaignevent: [reduceTo: "systemitems", renameId: true],
+ goal: [reduceTo: "systemitems", renameId: true],
+ userlist: [reduceTo: "systemitems", renameId: true],
+ propertytype: [reduceTo: "systemitems", renameId: true],
+ scope: [reduceTo: "systemitems", renameId: true],
+ conditiontype: [reduceTo: "systemitems", renameId: true],
+ rule: [reduceTo: "systemitems", renameId: true],
+ scoring: [reduceTo: "systemitems", renameId: true],
+ segment: [reduceTo: "systemitems", renameId: true],
+ topic: [reduceTo: "systemitems", renameId: true],
+ patch: [reduceTo: "systemitems", renameId: true],
+ jsonschema: [reduceTo: "systemitems", renameId: true],
+ importconfig: [reduceTo: "systemitems", renameId: true],
+ exportconfig: [reduceTo: "systemitems", renameId: true],
+ rulestats: [reduceTo: "systemitems", renameId: true],
+ groovyaction: [reduceTo: "systemitems", renameId: true],
+
+ persona: [reduceTo: "profile", renameId: false]
+]
context.performMigrationStep("2.2.0-create-systemItems-index", () -> {
if (!MigrationUtils.indexExists(context.getHttpClient(), esAddress, "${indexPrefix}-systemitems")) {
@@ -32,44 +54,20 @@ context.performMigrationStep("2.2.0-create-systemItems-index", () -> {
}
})
-def indicesToReduce = [
- actiontype: "systemitems",
- campaign: "systemitems",
- campaignevent: "systemitems",
- goal: "systemitems",
- userlist: "systemitems",
- propertytype: "systemitems",
- scope: "systemitems",
- conditiontype: "systemitems",
- rule: "systemitems",
- scoring: "systemitems",
- segment: "systemitems",
- topic: "systemitems",
- patch: "systemitems",
- jsonschema: "systemitems",
- importconfig: "systemitems",
- exportconfig: "systemitems",
- rulestats: "systemitems",
- groovyaction: "systemitems",
- persona: "profile",
- personasession: "session"
-]
-def indicesToSuffixIds = [
- rulestats: "-stat",
- groovyaction: "-groovySourceCode"
-]
indicesToReduce.each { indexToReduce ->
context.performMigrationStep("2.2.0-reduce-${indexToReduce.key}", () -> {
if (MigrationUtils.indexExists(context.getHttpClient(), esAddress, "${indexPrefix}-${indexToReduce.key}")) {
def painless = null
// check if we need to update the ids of those items first
- if (indicesToSuffixIds.containsKey(indexToReduce.key)) {
- painless = MigrationUtils.getFileWithoutComments(bundleContext, "requestBody/2.2.0/suffix_ids.painless").replace("#ID_SUFFIX", indicesToSuffixIds.get(indexToReduce.key))
+ if (indexToReduce.value.renameId) {
+ painless = MigrationUtils.getFileWithoutComments(bundleContext, "requestBody/2.2.0/suffix_ids.painless").replace("#ID_SUFFIX", "_${indexToReduce.key}")
}
// move items
- MigrationUtils.moveToIndex(context.getHttpClient(), bundleContext, esAddress, "${indexPrefix}-${indexToReduce.key}", "${indexPrefix}-${indexToReduce.value}", painless)
+ def reduceToIndex = "${indexPrefix}-${indexToReduce.value.reduceTo}"
+ MigrationUtils.moveToIndex(context.getHttpClient(), bundleContext, esAddress, "${indexPrefix}-${indexToReduce.key}", reduceToIndex, painless)
MigrationUtils.deleteIndex(context.getHttpClient(), esAddress, "${indexPrefix}-${indexToReduce.key}")
- HttpUtils.executePostRequest(context.getHttpClient(), esAddress + "/${indexPrefix}-${indexToReduce.value}/_refresh", null, null);
+
+ HttpUtils.executePostRequest(context.getHttpClient(), esAddress + "/${reduceToIndex}/_refresh", null, null);
MigrationUtils.waitForYellowStatus(context.getHttpClient(), esAddress, context);
}
})