You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by jk...@apache.org on 2023/03/09 16:56:10 UTC

[unomi] branch master updated: UNOMI-741: use new strategy to retrieve session by always asking latest available rollover index (#586)

This is an automated email from the ASF dual-hosted git repository.

jkevan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/unomi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b4ab5e91 UNOMI-741: use new strategy to retrieve session by always asking latest available rollover index (#586)
6b4ab5e91 is described below

commit 6b4ab5e91632dd4ec94389f174026334d5c7a0de
Author: kevan Jahanshahi <jk...@apache.org>
AuthorDate: Thu Mar 9 17:56:03 2023 +0100

    UNOMI-741: use new strategy to retrieve session by always asking latest available rollover index (#586)
---
 .../ElasticSearchPersistenceServiceImpl.java       | 47 ++++++++++++++++------
 1 file changed, 34 insertions(+), 13 deletions(-)

diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index f0929887b..ee3f0fc02 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -50,9 +50,11 @@ import org.apache.unomi.persistence.spi.aggregate.TermsAggregate;
 import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.DocWriteResponse;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptRequest;
 import org.elasticsearch.action.admin.indices.alias.Alias;
+import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
 import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
 import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
@@ -88,6 +90,7 @@ import org.elasticsearch.client.indices.GetMappingsResponse;
 import org.elasticsearch.client.indices.IndexTemplatesExistRequest;
 import org.elasticsearch.client.indices.PutIndexTemplateRequest;
 import org.elasticsearch.client.indices.PutMappingRequest;
+import org.elasticsearch.cluster.metadata.AliasMetaData;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.unit.ByteSizeUnit;
@@ -149,7 +152,6 @@ import java.security.NoSuchAlgorithmException;
 import java.security.SecureRandom;
 import java.security.cert.X509Certificate;
 import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
 import static org.elasticsearch.index.query.QueryBuilders.termQuery;
@@ -157,18 +159,15 @@ import static org.elasticsearch.index.query.QueryBuilders.termQuery;
 @SuppressWarnings("rawtypes")
 public class ElasticSearchPersistenceServiceImpl implements PersistenceService, SynchronousBundleListener {
 
-    public static final String BULK_PROCESSOR_CONCURRENT_REQUESTS = "bulkProcessor.concurrentRequests";
-    public static final String BULK_PROCESSOR_BULK_ACTIONS = "bulkProcessor.bulkActions";
     public static final String BULK_PROCESSOR_BULK_SIZE = "bulkProcessor.bulkSize";
     public static final String BULK_PROCESSOR_FLUSH_INTERVAL = "bulkProcessor.flushInterval";
     public static final String BULK_PROCESSOR_BACKOFF_POLICY = "bulkProcessor.backoffPolicy";
-    public static final String MONTHLY_INDEX_ITEMS_MONTHLY_INDEXED = "monthlyIndex.itemsMonthlyIndexedOverride";
-    public static final String INDEX_DATE_PREFIX = "date-";
     public static final String SEQ_NO = "seq_no";
     public static final String PRIMARY_TERM = "primary_term";
 
     private static final Logger logger = LoggerFactory.getLogger(ElasticSearchPersistenceServiceImpl.class.getName());
     private static final String ROLLOVER_LIFECYCLE_NAME = "unomi-rollover-policy";
+
     private boolean throwExceptions = false;
     private RestHighLevelClient client;
     private BulkProcessor bulkProcessor;
@@ -191,7 +190,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
     private ConditionESQueryBuilderDispatcher conditionESQueryBuilderDispatcher;
     private List<String> itemsMonthlyIndexed;
     private Map<String, String> routingByType;
-    private final Map<String, String> sessionAffinityCache = new ConcurrentHashMap<>();
 
     private Integer defaultQueryLimit = 10;
     private Integer removeByQueryTimeoutInMinutes = 10;
@@ -203,6 +201,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
     private String bulkProcessorBackoffPolicy = "exponential";
 
     // Rollover configuration
+    private String sessionLatestIndex;
     private List<String> rolloverIndices;
     private String rolloverMaxSize;
     private String rolloverMaxAge;
@@ -502,12 +501,24 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                     bulkProcessor = getBulkProcessor();
                 }
 
+                // Wait for green
                 logger.info("Waiting for GREEN cluster status...");
-
                 client.cluster().health(new ClusterHealthRequest().waitForGreenStatus(), RequestOptions.DEFAULT);
-
                 logger.info("Cluster status is GREEN");
 
+                // We keep in memory the latest available session index to be able to load session using direct GET access on ES
+                if (isItemTypeRollingOver(Session.ITEM_TYPE)) {
+                    logger.info("Sessions are using rollover indices, loading latest session index available ...");
+                    GetAliasesResponse sessionAliasResponse = client.indices().getAlias(new GetAliasesRequest(getIndex(Session.ITEM_TYPE)), RequestOptions.DEFAULT);
+                    Map<String, Set<AliasMetaData>> aliases = sessionAliasResponse.getAliases();
+                    if (!aliases.isEmpty()) {
+                        sessionLatestIndex = new TreeSet<>(aliases.keySet()).last();
+                        logger.info("Latest available session index found is: {}", sessionLatestIndex);
+                    } else {
+                        throw new IllegalStateException("No index found for sessions");
+                    }
+                }
+
                 return true;
             }
         }.executeInClassLoader();
@@ -814,8 +825,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                     }
                     String documentId = getDocumentIDForItemType(itemId, itemType);
 
-                    String affinityIndex = "session".equals(itemType) && sessionAffinityCache.containsKey(documentId) ? sessionAffinityCache.get(documentId) : null;
-                    if (affinityIndex == null && isItemTypeRollingOver(itemType)) {
+                    boolean sessionSpecialDirectAccess = sessionLatestIndex != null && Session.ITEM_TYPE.equals(itemType) ;
+                    if (!sessionSpecialDirectAccess && isItemTypeRollingOver(itemType)) {
                         return new MetricAdapter<T>(metricsService, ".loadItemWithQuery") {
                             @Override
                             public T execute(Object... args) throws Exception {
@@ -834,7 +845,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                             }
                         }.execute();
                     } else {
-                        GetRequest getRequest = new GetRequest(affinityIndex != null ? affinityIndex : getIndex(itemType), documentId);
+                        // Special handling for session we check the latest available index directly to speed up session loading
+                        GetRequest getRequest = new GetRequest(sessionSpecialDirectAccess ? sessionLatestIndex : getIndex(itemType), documentId);
                         GetResponse response = client.get(getRequest, RequestOptions.DEFAULT);
                         if (response.isExists()) {
                             String sourceAsString = response.getSourceAsString();
@@ -924,7 +936,16 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                         if (bulkProcessor == null || !useBatching) {
                             indexRequest.setRefreshPolicy(getRefreshPolicy(itemType));
                             IndexResponse response = client.index(indexRequest, RequestOptions.DEFAULT);
-                            setMetadata(item, response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), response.getIndex());
+                            String responseIndex = response.getIndex();
+                            setMetadata(item, response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), responseIndex);
+
+                            // Special handling for session, in case of new session we check that a rollover happen or not to update the latest available index
+                            if (Session.ITEM_TYPE.equals(itemType) &&
+                                    sessionLatestIndex != null &&
+                                    response.getResult().equals(DocWriteResponse.Result.CREATED) &&
+                                    !responseIndex.equals(sessionLatestIndex)) {
+                                sessionLatestIndex = responseIndex;
+                            }
                         } else {
                             bulkProcessor.add(indexRequest);
                         }
@@ -1000,7 +1021,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
     private UpdateRequest createUpdateRequest(Class clazz, Item item, Map source, boolean alwaysOverwrite) {
         String itemType = Item.getItemType(clazz);
         String documentId = getDocumentIDForItemType(item.getItemId(), itemType);
-        String index = getIndex(itemType);
+        String index =  item.getSystemMetadata("index") != null ? (String) item.getSystemMetadata("index") : getIndex(itemType);
 
         UpdateRequest updateRequest = new UpdateRequest(index, documentId);
         updateRequest.doc(source);