You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by jk...@apache.org on 2023/03/09 16:56:10 UTC
[unomi] branch master updated: UNOMI-741: use new strategy to retrieve session by always asking latest available rollover index (#586)
This is an automated email from the ASF dual-hosted git repository.
jkevan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/master by this push:
new 6b4ab5e91 UNOMI-741: use new strategy to retrieve session by always asking latest available rollover index (#586)
6b4ab5e91 is described below
commit 6b4ab5e91632dd4ec94389f174026334d5c7a0de
Author: kevan Jahanshahi <jk...@apache.org>
AuthorDate: Thu Mar 9 17:56:03 2023 +0100
UNOMI-741: use new strategy to retrieve session by always asking latest available rollover index (#586)
---
.../ElasticSearchPersistenceServiceImpl.java | 47 ++++++++++++++++------
1 file changed, 34 insertions(+), 13 deletions(-)
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index f0929887b..ee3f0fc02 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -50,9 +50,11 @@ import org.apache.unomi.persistence.spi.aggregate.TermsAggregate;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.Version;
import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptRequest;
import org.elasticsearch.action.admin.indices.alias.Alias;
+import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
@@ -88,6 +90,7 @@ import org.elasticsearch.client.indices.GetMappingsResponse;
import org.elasticsearch.client.indices.IndexTemplatesExistRequest;
import org.elasticsearch.client.indices.PutIndexTemplateRequest;
import org.elasticsearch.client.indices.PutMappingRequest;
+import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.unit.ByteSizeUnit;
@@ -149,7 +152,6 @@ import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.cert.X509Certificate;
import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
@@ -157,18 +159,15 @@ import static org.elasticsearch.index.query.QueryBuilders.termQuery;
@SuppressWarnings("rawtypes")
public class ElasticSearchPersistenceServiceImpl implements PersistenceService, SynchronousBundleListener {
- public static final String BULK_PROCESSOR_CONCURRENT_REQUESTS = "bulkProcessor.concurrentRequests";
- public static final String BULK_PROCESSOR_BULK_ACTIONS = "bulkProcessor.bulkActions";
public static final String BULK_PROCESSOR_BULK_SIZE = "bulkProcessor.bulkSize";
public static final String BULK_PROCESSOR_FLUSH_INTERVAL = "bulkProcessor.flushInterval";
public static final String BULK_PROCESSOR_BACKOFF_POLICY = "bulkProcessor.backoffPolicy";
- public static final String MONTHLY_INDEX_ITEMS_MONTHLY_INDEXED = "monthlyIndex.itemsMonthlyIndexedOverride";
- public static final String INDEX_DATE_PREFIX = "date-";
public static final String SEQ_NO = "seq_no";
public static final String PRIMARY_TERM = "primary_term";
private static final Logger logger = LoggerFactory.getLogger(ElasticSearchPersistenceServiceImpl.class.getName());
private static final String ROLLOVER_LIFECYCLE_NAME = "unomi-rollover-policy";
+
private boolean throwExceptions = false;
private RestHighLevelClient client;
private BulkProcessor bulkProcessor;
@@ -191,7 +190,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
private ConditionESQueryBuilderDispatcher conditionESQueryBuilderDispatcher;
private List<String> itemsMonthlyIndexed;
private Map<String, String> routingByType;
- private final Map<String, String> sessionAffinityCache = new ConcurrentHashMap<>();
private Integer defaultQueryLimit = 10;
private Integer removeByQueryTimeoutInMinutes = 10;
@@ -203,6 +201,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
private String bulkProcessorBackoffPolicy = "exponential";
// Rollover configuration
+ private String sessionLatestIndex;
private List<String> rolloverIndices;
private String rolloverMaxSize;
private String rolloverMaxAge;
@@ -502,12 +501,24 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
bulkProcessor = getBulkProcessor();
}
+ // Wait for green
logger.info("Waiting for GREEN cluster status...");
-
client.cluster().health(new ClusterHealthRequest().waitForGreenStatus(), RequestOptions.DEFAULT);
-
logger.info("Cluster status is GREEN");
+ // We keep in memory the latest available session index to be able to load session using direct GET access on ES
+ if (isItemTypeRollingOver(Session.ITEM_TYPE)) {
+ logger.info("Sessions are using rollover indices, loading latest session index available ...");
+ GetAliasesResponse sessionAliasResponse = client.indices().getAlias(new GetAliasesRequest(getIndex(Session.ITEM_TYPE)), RequestOptions.DEFAULT);
+ Map<String, Set<AliasMetaData>> aliases = sessionAliasResponse.getAliases();
+ if (!aliases.isEmpty()) {
+ sessionLatestIndex = new TreeSet<>(aliases.keySet()).last();
+ logger.info("Latest available session index found is: {}", sessionLatestIndex);
+ } else {
+ throw new IllegalStateException("No index found for sessions");
+ }
+ }
+
return true;
}
}.executeInClassLoader();
@@ -814,8 +825,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
String documentId = getDocumentIDForItemType(itemId, itemType);
- String affinityIndex = "session".equals(itemType) && sessionAffinityCache.containsKey(documentId) ? sessionAffinityCache.get(documentId) : null;
- if (affinityIndex == null && isItemTypeRollingOver(itemType)) {
+ boolean sessionSpecialDirectAccess = sessionLatestIndex != null && Session.ITEM_TYPE.equals(itemType) ;
+ if (!sessionSpecialDirectAccess && isItemTypeRollingOver(itemType)) {
return new MetricAdapter<T>(metricsService, ".loadItemWithQuery") {
@Override
public T execute(Object... args) throws Exception {
@@ -834,7 +845,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
}.execute();
} else {
- GetRequest getRequest = new GetRequest(affinityIndex != null ? affinityIndex : getIndex(itemType), documentId);
+ // Special handling for session we check the latest available index directly to speed up session loading
+ GetRequest getRequest = new GetRequest(sessionSpecialDirectAccess ? sessionLatestIndex : getIndex(itemType), documentId);
GetResponse response = client.get(getRequest, RequestOptions.DEFAULT);
if (response.isExists()) {
String sourceAsString = response.getSourceAsString();
@@ -924,7 +936,16 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
if (bulkProcessor == null || !useBatching) {
indexRequest.setRefreshPolicy(getRefreshPolicy(itemType));
IndexResponse response = client.index(indexRequest, RequestOptions.DEFAULT);
- setMetadata(item, response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), response.getIndex());
+ String responseIndex = response.getIndex();
+ setMetadata(item, response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), responseIndex);
+
+ // Special handling for session, in case of new session we check that a rollover happen or not to update the latest available index
+ if (Session.ITEM_TYPE.equals(itemType) &&
+ sessionLatestIndex != null &&
+ response.getResult().equals(DocWriteResponse.Result.CREATED) &&
+ !responseIndex.equals(sessionLatestIndex)) {
+ sessionLatestIndex = responseIndex;
+ }
} else {
bulkProcessor.add(indexRequest);
}
@@ -1000,7 +1021,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
private UpdateRequest createUpdateRequest(Class clazz, Item item, Map source, boolean alwaysOverwrite) {
String itemType = Item.getItemType(clazz);
String documentId = getDocumentIDForItemType(item.getItemId(), itemType);
- String index = getIndex(itemType);
+ String index = item.getSystemMetadata("index") != null ? (String) item.getSystemMetadata("index") : getIndex(itemType);
UpdateRequest updateRequest = new UpdateRequest(index, documentId);
updateRequest.doc(source);