You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by sh...@apache.org on 2017/01/09 16:21:54 UTC

[20/50] [abbrv] incubator-unomi git commit: UNOMI-70 Upgrade to ElasticSearch 5.x - Use scroll API to get unlimited results

UNOMI-70 Upgrade to ElasticSearch 5.x
- Use scroll API to get unlimited results


Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/a94fdb0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/a94fdb0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/a94fdb0e

Branch: refs/heads/master
Commit: a94fdb0e02b3606c012fa5f3d6d35212a34b489d
Parents: dafa9f6
Author: Serge Huber <sh...@apache.org>
Authored: Thu Dec 8 09:14:14 2016 +0100
Committer: Serge Huber <sh...@apache.org>
Committed: Thu Dec 8 09:14:14 2016 +0100

----------------------------------------------------------------------
 .../ElasticSearchPersistenceServiceImpl.java    | 47 ++++++++++++++++----
 1 file changed, 39 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a94fdb0e/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
----------------------------------------------------------------------
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 0dc3899..fa4b5e1 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -898,6 +898,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                             break;
                         }
                     }
+                    client.prepareClearScroll().addScrollId(response.getScrollId()).execute().actionGet();
 
                     // we're done with the scrolling, delete now
                     if (deleteByScope.numberOfActions() > 0) {
@@ -1188,6 +1189,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                 long totalHits = 0;
                 try {
                     String itemType = getItemType(clazz);
+                    final TimeValue keepAlive = TimeValue.timeValueHours(1);
 
                     SearchRequestBuilder requestBuilder = client.prepareSearch(getIndexNameForQuery(itemType))
                             .setTypes(itemType)
@@ -1199,7 +1201,14 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                     } else if (size != -1) {
                         requestBuilder.setSize(size);
                     } else {
-                        // requestBuilder.setSize(Integer.MAX_VALUE);
+                        // size == -1, use scroll query to retrieve all the results
+                        requestBuilder = client.prepareSearch(getIndexNameForQuery(itemType))
+                                .setTypes(itemType)
+                                .setFetchSource(true)
+                                .setScroll(keepAlive)
+                                .setFrom(offset)
+                                .setQuery(query)
+                                .setSize(100);
                     }
                     if (routing != null) {
                         requestBuilder.setRouting(routing);
@@ -1229,13 +1238,35 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                     SearchResponse response = requestBuilder
                             .execute()
                             .actionGet();
-                    SearchHits searchHits = response.getHits();
-                    totalHits = searchHits.getTotalHits();
-                    for (SearchHit searchHit : searchHits) {
-                        String sourceAsString = searchHit.getSourceAsString();
-                        final T value = CustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
-                        value.setItemId(searchHit.getId());
-                        results.add(value);
+                    if (size == -1) {
+                        // Scroll until no more hits are returned
+                        while (true) {
+
+                            for (SearchHit searchHit : response.getHits().getHits()) {
+                                // add hit to results
+                                String sourceAsString = searchHit.getSourceAsString();
+                                final T value = CustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
+                                value.setItemId(searchHit.getId());
+                                results.add(value);
+                            }
+
+                            response = client.prepareSearchScroll(response.getScrollId()).setScroll(keepAlive).execute().actionGet();
+
+                            // If we have no more hits, exit
+                            if (response.getHits().getHits().length == 0) {
+                                break;
+                            }
+                        }
+                        client.prepareClearScroll().addScrollId(response.getScrollId()).execute().actionGet();
+                    } else {
+                        SearchHits searchHits = response.getHits();
+                        totalHits = searchHits.getTotalHits();
+                        for (SearchHit searchHit : searchHits) {
+                            String sourceAsString = searchHit.getSourceAsString();
+                            final T value = CustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
+                            value.setItemId(searchHit.getId());
+                            results.add(value);
+                        }
                     }
                 } catch (Exception t) {
                     logger.error("Error loading itemType=" + clazz.getName() + " query=" + query + " sortBy=" + sortBy, t);