You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by sh...@apache.org on 2020/08/20 13:11:01 UTC

[unomi] branch master updated: Expose scroll identifier on profile queries (#181)

This is an automated email from the ASF dual-hosted git repository.

shuber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/unomi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f4f0fc  Expose scroll identifier on profile queries (#181)
2f4f0fc is described below

commit 2f4f0fcdc95abf5efbef494283ccd791a573ccb3
Author: liatiusim <62...@users.noreply.github.com>
AuthorDate: Thu Aug 20 16:10:51 2020 +0300

    Expose scroll identifier on profile queries (#181)
    
    * Expose scroller id on profile queries
    
    * Add scroller itest
    
    * fix profile service it tests
    
    Co-authored-by: Shir Bromberg <sb...@yotpo.com>
---
 .../java/org/apache/unomi/api/query/Query.java     | 19 +++++++
 .../org/apache/unomi/itests/ProfileServiceIT.java  | 60 ++++++++++++++++++++++
 .../ElasticSearchPersistenceServiceImpl.java       |  9 +++-
 .../unomi/persistence/spi/PersistenceService.java  | 20 ++++++++
 .../services/impl/profiles/ProfileServiceImpl.java |  7 ++-
 5 files changed, 111 insertions(+), 4 deletions(-)

diff --git a/api/src/main/java/org/apache/unomi/api/query/Query.java b/api/src/main/java/org/apache/unomi/api/query/Query.java
index ef74b72..48e0c1a 100644
--- a/api/src/main/java/org/apache/unomi/api/query/Query.java
+++ b/api/src/main/java/org/apache/unomi/api/query/Query.java
@@ -33,6 +33,8 @@ public class Query implements Serializable {
     private String sortby;
     private Condition condition;
     private boolean forceRefresh;
+    private String scrollTimeValidity;
+    private String scrollIdentifier;
 
     /**
      * Instantiates a new Query.
@@ -150,4 +152,21 @@ public class Query implements Serializable {
     public void setForceRefresh(boolean forceRefresh) {
         this.forceRefresh = forceRefresh;
     }
+
+    public String getScrollIdentifier() {
+        return scrollIdentifier;
+    }
+
+    public void setScrollIdentifier(String scrollIdentifier) {
+        this.scrollIdentifier = scrollIdentifier;
+    }
+
+    public String getScrollTimeValidity() {
+        return scrollTimeValidity;
+    }
+
+    public void setScrollTimeValidity(String scrollTimeValidity) {
+        this.scrollTimeValidity = scrollTimeValidity;
+    }
+
 }
diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
index 2c996b7..db4e735 100644
--- a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
@@ -17,9 +17,17 @@
 package org.apache.unomi.itests;
 
 import org.apache.unomi.api.Profile;
+import org.apache.unomi.api.query.Query;
 import org.apache.unomi.api.services.ProfileService;
+import org.apache.unomi.persistence.spi.PersistenceService;
+import org.apache.unomi.api.services.DefinitionsService;
+import org.apache.unomi.api.PartialList;
+
+import static org.junit.Assert.assertEquals;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.junit.Before;
+
 import org.ops4j.pax.exam.junit.PaxExam;
 import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
 import org.ops4j.pax.exam.spi.reactors.PerSuite;
@@ -42,6 +50,19 @@ public class ProfileServiceIT extends BaseIT {
     @Inject @Filter(timeout = 600000)
     protected ProfileService profileService;
 
+    @Inject
+    @Filter(timeout = 600000)
+    protected PersistenceService persistenceService;
+
+    @Inject
+    @Filter(timeout = 600000)
+    protected DefinitionsService definitionsService;
+
+    @Before
+    public void setUp() {
+        TestUtils.removeAllProfiles(definitionsService, persistenceService);
+    }
+
     @Test
     public void testProfileDelete() {
         Profile profile = new Profile();
@@ -52,4 +73,43 @@ public class ProfileServiceIT extends BaseIT {
         LOGGER.info("Profile deleted successfully.");
     }
 
+    @Test
+    public void testGetProfileWithScrolling() throws InterruptedException {
+        final String profileIdOne = "test-profile-id-one";
+        final String profileIdTwo = "test-profile-id-two";
+        final String profileIdThree = "test-profile-id-three";
+
+        Profile profileOne = new Profile();
+        Profile profileTwo = new Profile();
+        Profile profileThree = new Profile();
+
+        profileOne.setItemId(profileIdOne);
+        profileTwo.setItemId(profileIdTwo);
+        profileThree.setItemId(profileIdThree);
+
+        profileService.save(profileOne);
+        profileService.save(profileTwo);
+        profileService.save(profileThree);
+
+        Thread.sleep(4000); // Make sure Elastic is updated
+
+        Query query = new Query();
+        query.setLimit(2);
+        query.setScrollTimeValidity("10m");
+
+        PartialList<Profile> profiles = profileService.search(query, Profile.class);
+        assertEquals(2, profiles.getList().size());
+
+        Query queryCont = new Query();
+        queryCont.setScrollTimeValidity("10m");
+        queryCont.setScrollIdentifier(profiles.getScrollIdentifier());
+
+        profiles = profileService.search(queryCont, Profile.class);
+        assertEquals(1, profiles.getList().size());
+
+        queryCont.setScrollIdentifier(profiles.getScrollIdentifier());
+        profiles = profileService.search(queryCont, Profile.class);
+        assertEquals(0, profiles.getList().size());
+    }
+
 }
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index c697431..57a3a28 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -684,15 +684,19 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
 
     @Override
     public <T extends Item> PartialList<T> getAllItems(final Class<T> clazz, int offset, int size, String sortBy) {
+        return getAllItems(clazz, offset, size, sortBy, null);
+    }
+
+    @Override
+    public <T extends Item> PartialList<T> getAllItems(final Class<T> clazz, int offset, int size, String sortBy, String scrollTimeValidity) {
         long startTime = System.currentTimeMillis();
         try {
-            return query(QueryBuilders.matchAllQuery(), sortBy, clazz, offset, size, null, null);
+            return query(QueryBuilders.matchAllQuery(), sortBy, clazz, offset, size, null, scrollTimeValidity);
         } finally {
             if (metricsService != null && metricsService.isActivated()) {
                 metricsService.updateTimer(this.getClass().getName() + ".getAllItems", startTime);
             }
         }
-
     }
 
     @Override
@@ -1418,6 +1422,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
         }
     }
 
+
     @Override
     public <T extends Item> List<T> query(final Condition query, String sortBy, final Class<T> clazz) {
         return query(query, sortBy, clazz, 0, -1).getList();
diff --git a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
index d7606a9..876ea4a 100644
--- a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
+++ b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
@@ -61,6 +61,26 @@ public interface PersistenceService {
     <T extends Item> PartialList<T> getAllItems(Class<T> clazz, int offset, int size, String sortBy);
 
     /**
+     * Retrieves all known items of the specified class, ordered according to the specified {@code sortBy} String and and paged: only {@code size} of them are retrieved,
+     * starting with the {@code offset}-th one.
+     *
+     * TODO: use a Query object instead of distinct parameters?
+     *
+     * @param <T>    the type of the {@link Item}s we want to retrieve
+     * @param clazz  the {@link Item} subclass of entities we want to retrieve
+     * @param offset zero or a positive integer specifying the position of the first item in the total ordered collection of matching items
+     * @param size   a positive integer specifying how many matching items should be retrieved or {@code -1} if all of them should be retrieved
+     * @param sortBy an optional ({@code null} if no sorting is required) String of comma ({@code ,}) separated property names on which ordering should be performed, ordering
+     *               elements according to the property order in the
+     *               String, considering each in turn and moving on to the next one in case of equality of all preceding ones. Each property name is optionally followed by
+     *               a column ({@code :}) and an order specifier: {@code asc} or {@code desc}.
+     * @param scrollTimeValidity  the time the scrolling query should stay valid. This must contain a time unit value such as the ones supported by ElasticSearch, such as
+     *      *                     the ones declared here : https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#time-units
+     * @return a {@link PartialList} of pages items with the given type
+     */
+    <T extends Item> PartialList<T> getAllItems(final Class<T> clazz, int offset, int size, String sortBy, String scrollTimeValidity);
+
+    /**
      * Persists the specified Item in the context server.
      *
      * @param item the item to persist
diff --git a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
index ffca878..12740a6 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
@@ -406,17 +406,20 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList
     }
 
     private <T extends Item> PartialList<T> doSearch(Query query, Class<T> clazz) {
+        if (query.getScrollIdentifier() != null) {
+            return persistenceService.continueScrollQuery(clazz, query.getScrollIdentifier(), query.getScrollTimeValidity());
+        }
         if (query.getCondition() != null && definitionsService.resolveConditionType(query.getCondition())) {
             if (StringUtils.isNotBlank(query.getText())) {
                 return persistenceService.queryFullText(query.getText(), query.getCondition(), query.getSortby(), clazz, query.getOffset(), query.getLimit());
             } else {
-                return persistenceService.query(query.getCondition(), query.getSortby(), clazz, query.getOffset(), query.getLimit());
+                return persistenceService.query(query.getCondition(), query.getSortby(), clazz, query.getOffset(), query.getLimit(), query.getScrollTimeValidity());
             }
         } else {
             if (StringUtils.isNotBlank(query.getText())) {
                 return persistenceService.queryFullText(query.getText(), query.getSortby(), clazz, query.getOffset(), query.getLimit());
             } else {
-                return persistenceService.getAllItems(clazz, query.getOffset(), query.getLimit(), query.getSortby());
+                return persistenceService.getAllItems(clazz, query.getOffset(), query.getLimit(), query.getSortby(), query.getScrollTimeValidity());
             }
         }
     }