You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by sh...@apache.org on 2020/08/20 13:11:01 UTC
[unomi] branch master updated: Expose scroll identifier on profile
queries (#181)
This is an automated email from the ASF dual-hosted git repository.
shuber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/master by this push:
new 2f4f0fc Expose scroll identifier on profile queries (#181)
2f4f0fc is described below
commit 2f4f0fcdc95abf5efbef494283ccd791a573ccb3
Author: liatiusim <62...@users.noreply.github.com>
AuthorDate: Thu Aug 20 16:10:51 2020 +0300
Expose scroll identifier on profile queries (#181)
* Expose scroller id on profile queries
* Add scroller itest
* fix profile service it tests
Co-authored-by: Shir Bromberg <sb...@yotpo.com>
---
.../java/org/apache/unomi/api/query/Query.java | 19 +++++++
.../org/apache/unomi/itests/ProfileServiceIT.java | 60 ++++++++++++++++++++++
.../ElasticSearchPersistenceServiceImpl.java | 9 +++-
.../unomi/persistence/spi/PersistenceService.java | 20 ++++++++
.../services/impl/profiles/ProfileServiceImpl.java | 7 ++-
5 files changed, 111 insertions(+), 4 deletions(-)
diff --git a/api/src/main/java/org/apache/unomi/api/query/Query.java b/api/src/main/java/org/apache/unomi/api/query/Query.java
index ef74b72..48e0c1a 100644
--- a/api/src/main/java/org/apache/unomi/api/query/Query.java
+++ b/api/src/main/java/org/apache/unomi/api/query/Query.java
@@ -33,6 +33,8 @@ public class Query implements Serializable {
private String sortby;
private Condition condition;
private boolean forceRefresh;
+ private String scrollTimeValidity;
+ private String scrollIdentifier;
/**
* Instantiates a new Query.
@@ -150,4 +152,21 @@ public class Query implements Serializable {
public void setForceRefresh(boolean forceRefresh) {
this.forceRefresh = forceRefresh;
}
+
+ public String getScrollIdentifier() {
+ return scrollIdentifier;
+ }
+
+ public void setScrollIdentifier(String scrollIdentifier) {
+ this.scrollIdentifier = scrollIdentifier;
+ }
+
+ public String getScrollTimeValidity() {
+ return scrollTimeValidity;
+ }
+
+ public void setScrollTimeValidity(String scrollTimeValidity) {
+ this.scrollTimeValidity = scrollTimeValidity;
+ }
+
}
diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
index 2c996b7..db4e735 100644
--- a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
@@ -17,9 +17,17 @@
package org.apache.unomi.itests;
import org.apache.unomi.api.Profile;
+import org.apache.unomi.api.query.Query;
import org.apache.unomi.api.services.ProfileService;
+import org.apache.unomi.persistence.spi.PersistenceService;
+import org.apache.unomi.api.services.DefinitionsService;
+import org.apache.unomi.api.PartialList;
+
+import static org.junit.Assert.assertEquals;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.junit.Before;
+
import org.ops4j.pax.exam.junit.PaxExam;
import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
import org.ops4j.pax.exam.spi.reactors.PerSuite;
@@ -42,6 +50,19 @@ public class ProfileServiceIT extends BaseIT {
@Inject @Filter(timeout = 600000)
protected ProfileService profileService;
+ @Inject
+ @Filter(timeout = 600000)
+ protected PersistenceService persistenceService;
+
+ @Inject
+ @Filter(timeout = 600000)
+ protected DefinitionsService definitionsService;
+
+ @Before
+ public void setUp() {
+ TestUtils.removeAllProfiles(definitionsService, persistenceService);
+ }
+
@Test
public void testProfileDelete() {
Profile profile = new Profile();
@@ -52,4 +73,43 @@ public class ProfileServiceIT extends BaseIT {
LOGGER.info("Profile deleted successfully.");
}
+ @Test
+ public void testGetProfileWithScrolling() throws InterruptedException {
+ final String profileIdOne = "test-profile-id-one";
+ final String profileIdTwo = "test-profile-id-two";
+ final String profileIdThree = "test-profile-id-three";
+
+ Profile profileOne = new Profile();
+ Profile profileTwo = new Profile();
+ Profile profileThree = new Profile();
+
+ profileOne.setItemId(profileIdOne);
+ profileTwo.setItemId(profileIdTwo);
+ profileThree.setItemId(profileIdThree);
+
+ profileService.save(profileOne);
+ profileService.save(profileTwo);
+ profileService.save(profileThree);
+
+ Thread.sleep(4000); // Make sure Elastic is updated
+
+ Query query = new Query();
+ query.setLimit(2);
+ query.setScrollTimeValidity("10m");
+
+ PartialList<Profile> profiles = profileService.search(query, Profile.class);
+ assertEquals(2, profiles.getList().size());
+
+ Query queryCont = new Query();
+ queryCont.setScrollTimeValidity("10m");
+ queryCont.setScrollIdentifier(profiles.getScrollIdentifier());
+
+ profiles = profileService.search(queryCont, Profile.class);
+ assertEquals(1, profiles.getList().size());
+
+ queryCont.setScrollIdentifier(profiles.getScrollIdentifier());
+ profiles = profileService.search(queryCont, Profile.class);
+ assertEquals(0, profiles.getList().size());
+ }
+
}
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index c697431..57a3a28 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -684,15 +684,19 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
@Override
public <T extends Item> PartialList<T> getAllItems(final Class<T> clazz, int offset, int size, String sortBy) {
+ return getAllItems(clazz, offset, size, sortBy, null);
+ }
+
+ @Override
+ public <T extends Item> PartialList<T> getAllItems(final Class<T> clazz, int offset, int size, String sortBy, String scrollTimeValidity) {
long startTime = System.currentTimeMillis();
try {
- return query(QueryBuilders.matchAllQuery(), sortBy, clazz, offset, size, null, null);
+ return query(QueryBuilders.matchAllQuery(), sortBy, clazz, offset, size, null, scrollTimeValidity);
} finally {
if (metricsService != null && metricsService.isActivated()) {
metricsService.updateTimer(this.getClass().getName() + ".getAllItems", startTime);
}
}
-
}
@Override
@@ -1418,6 +1422,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
}
+
@Override
public <T extends Item> List<T> query(final Condition query, String sortBy, final Class<T> clazz) {
return query(query, sortBy, clazz, 0, -1).getList();
diff --git a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
index d7606a9..876ea4a 100644
--- a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
+++ b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
@@ -61,6 +61,26 @@ public interface PersistenceService {
<T extends Item> PartialList<T> getAllItems(Class<T> clazz, int offset, int size, String sortBy);
/**
+ * Retrieves all known items of the specified class, ordered according to the specified {@code sortBy} String and and paged: only {@code size} of them are retrieved,
+ * starting with the {@code offset}-th one.
+ *
+ * TODO: use a Query object instead of distinct parameters?
+ *
+ * @param <T> the type of the {@link Item}s we want to retrieve
+ * @param clazz the {@link Item} subclass of entities we want to retrieve
+ * @param offset zero or a positive integer specifying the position of the first item in the total ordered collection of matching items
+ * @param size a positive integer specifying how many matching items should be retrieved or {@code -1} if all of them should be retrieved
+ * @param sortBy an optional ({@code null} if no sorting is required) String of comma ({@code ,}) separated property names on which ordering should be performed, ordering
+ * elements according to the property order in the
+ * String, considering each in turn and moving on to the next one in case of equality of all preceding ones. Each property name is optionally followed by
+ * a column ({@code :}) and an order specifier: {@code asc} or {@code desc}.
+ * @param scrollTimeValidity the time the scrolling query should stay valid. This must contain a time unit value such as the ones supported by ElasticSearch, such as
+ * * the ones declared here : https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#time-units
+ * @return a {@link PartialList} of pages items with the given type
+ */
+ <T extends Item> PartialList<T> getAllItems(final Class<T> clazz, int offset, int size, String sortBy, String scrollTimeValidity);
+
+ /**
* Persists the specified Item in the context server.
*
* @param item the item to persist
diff --git a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
index ffca878..12740a6 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
@@ -406,17 +406,20 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList
}
private <T extends Item> PartialList<T> doSearch(Query query, Class<T> clazz) {
+ if (query.getScrollIdentifier() != null) {
+ return persistenceService.continueScrollQuery(clazz, query.getScrollIdentifier(), query.getScrollTimeValidity());
+ }
if (query.getCondition() != null && definitionsService.resolveConditionType(query.getCondition())) {
if (StringUtils.isNotBlank(query.getText())) {
return persistenceService.queryFullText(query.getText(), query.getCondition(), query.getSortby(), clazz, query.getOffset(), query.getLimit());
} else {
- return persistenceService.query(query.getCondition(), query.getSortby(), clazz, query.getOffset(), query.getLimit());
+ return persistenceService.query(query.getCondition(), query.getSortby(), clazz, query.getOffset(), query.getLimit(), query.getScrollTimeValidity());
}
} else {
if (StringUtils.isNotBlank(query.getText())) {
return persistenceService.queryFullText(query.getText(), query.getSortby(), clazz, query.getOffset(), query.getLimit());
} else {
- return persistenceService.getAllItems(clazz, query.getOffset(), query.getLimit(), query.getSortby());
+ return persistenceService.getAllItems(clazz, query.getOffset(), query.getLimit(), query.getSortby(), query.getScrollTimeValidity());
}
}
}