You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by sh...@apache.org on 2017/01/09 16:21:54 UTC
[20/50] [abbrv] incubator-unomi git commit: UNOMI-70 Upgrade to
ElasticSearch 5.x - Use scroll API to get unlimited results
UNOMI-70 Upgrade to ElasticSearch 5.x
- Use scroll API to get unlimited results
Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/a94fdb0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/a94fdb0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/a94fdb0e
Branch: refs/heads/master
Commit: a94fdb0e02b3606c012fa5f3d6d35212a34b489d
Parents: dafa9f6
Author: Serge Huber <sh...@apache.org>
Authored: Thu Dec 8 09:14:14 2016 +0100
Committer: Serge Huber <sh...@apache.org>
Committed: Thu Dec 8 09:14:14 2016 +0100
----------------------------------------------------------------------
.../ElasticSearchPersistenceServiceImpl.java | 47 ++++++++++++++++----
1 file changed, 39 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a94fdb0e/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
----------------------------------------------------------------------
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 0dc3899..fa4b5e1 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -898,6 +898,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
break;
}
}
+ client.prepareClearScroll().addScrollId(response.getScrollId()).execute().actionGet();
// we're done with the scrolling, delete now
if (deleteByScope.numberOfActions() > 0) {
@@ -1188,6 +1189,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
long totalHits = 0;
try {
String itemType = getItemType(clazz);
+ final TimeValue keepAlive = TimeValue.timeValueHours(1);
SearchRequestBuilder requestBuilder = client.prepareSearch(getIndexNameForQuery(itemType))
.setTypes(itemType)
@@ -1199,7 +1201,14 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
} else if (size != -1) {
requestBuilder.setSize(size);
} else {
- // requestBuilder.setSize(Integer.MAX_VALUE);
+ // size == -1, use scroll query to retrieve all the results
+ requestBuilder = client.prepareSearch(getIndexNameForQuery(itemType))
+ .setTypes(itemType)
+ .setFetchSource(true)
+ .setScroll(keepAlive)
+ .setFrom(offset)
+ .setQuery(query)
+ .setSize(100);
}
if (routing != null) {
requestBuilder.setRouting(routing);
@@ -1229,13 +1238,35 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
SearchResponse response = requestBuilder
.execute()
.actionGet();
- SearchHits searchHits = response.getHits();
- totalHits = searchHits.getTotalHits();
- for (SearchHit searchHit : searchHits) {
- String sourceAsString = searchHit.getSourceAsString();
- final T value = CustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
- value.setItemId(searchHit.getId());
- results.add(value);
+ if (size == -1) {
+ // Scroll until no more hits are returned
+ while (true) {
+
+ for (SearchHit searchHit : response.getHits().getHits()) {
+ // add hit to results
+ String sourceAsString = searchHit.getSourceAsString();
+ final T value = CustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
+ value.setItemId(searchHit.getId());
+ results.add(value);
+ }
+
+ response = client.prepareSearchScroll(response.getScrollId()).setScroll(keepAlive).execute().actionGet();
+
+ // If we have no more hits, exit
+ if (response.getHits().getHits().length == 0) {
+ break;
+ }
+ }
+ client.prepareClearScroll().addScrollId(response.getScrollId()).execute().actionGet();
+ } else {
+ SearchHits searchHits = response.getHits();
+ totalHits = searchHits.getTotalHits();
+ for (SearchHit searchHit : searchHits) {
+ String sourceAsString = searchHit.getSourceAsString();
+ final T value = CustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
+ value.setItemId(searchHit.getId());
+ results.add(value);
+ }
}
} catch (Exception t) {
logger.error("Error loading itemType=" + clazz.getName() + " query=" + query + " sortBy=" + sortBy, t);