You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by jk...@apache.org on 2023/02/09 15:41:22 UTC

[unomi] 01/01: UNOMI-741: PoC regarding sessionId/indexName affinity cache to improve perf on rollover sessions

This is an automated email from the ASF dual-hosted git repository.

jkevan pushed a commit to branch PoCSessionIdIndexAffinityCache
in repository https://gitbox.apache.org/repos/asf/unomi.git

commit bb3b638101fb98d44263f82160ffb2c62368af84
Author: Kevan <ke...@jahia.com>
AuthorDate: Thu Feb 9 16:41:04 2023 +0100

    UNOMI-741: PoC regarding sessionId/indexName affinity cache to improve perf on rollover sessions
---
 .../elasticsearch/ElasticSearchPersistenceServiceImpl.java     | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 864548f3e..eaeeb68cc 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -168,6 +168,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
 import static org.elasticsearch.index.query.QueryBuilders.termQuery;
@@ -209,6 +210,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
     private ConditionESQueryBuilderDispatcher conditionESQueryBuilderDispatcher;
     private List<String> itemsMonthlyIndexed;
     private Map<String, String> routingByType;
+    private final Map<String, String> sessionAffinityCache = new ConcurrentHashMap<>();
 
     private Integer defaultQueryLimit = 10;
     private Integer removeByQueryTimeoutInMinutes = 10;
@@ -813,7 +815,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                         itemType = customItemType;
                     }
 
-                    if (isItemTypeRollingOver(itemType)) {
+                    String affinityIndex = "session".equals(itemType) && sessionAffinityCache.containsKey(itemId) ? sessionAffinityCache.get(itemId) : null;
+                    if (affinityIndex == null && isItemTypeRollingOver(itemType)) {
                         return new MetricAdapter<T>(metricsService, ".loadItemWithQuery") {
                             @Override
                             public T execute(Object... args) throws Exception {
@@ -832,7 +835,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                             }
                         }.execute();
                     } else {
-                        GetRequest getRequest = new GetRequest(getIndex(itemType), itemId);
+                        GetRequest getRequest = new GetRequest(affinityIndex != null ? affinityIndex : getIndex(itemType), itemId);
                         GetResponse response = client.get(getRequest, RequestOptions.DEFAULT);
                         if (response.isExists()) {
                             String sourceAsString = response.getSourceAsString();
@@ -866,6 +869,9 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
         item.setSystemMetadata(SEQ_NO, seqNo);
         item.setSystemMetadata(PRIMARY_TERM, primaryTerm);
         item.setSystemMetadata("index", index);
+        if (item.getItemType().equals("session") && !sessionAffinityCache.containsKey(id)) {
+            sessionAffinityCache.put(id, index);
+        }
     }
 
     @Override