You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2019/01/17 07:35:14 UTC

[camel] branch master updated: CAMEL-13071: camel-elasticsearch-rest - Add scroll api support

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 79b2a88  CAMEL-13071: camel-elasticsearch-rest - Add scroll api support
79b2a88 is described below

commit 79b2a8872316823b05c9355f0a4dd0487b798846
Author: Ludovic Boutros <bo...@gmail.com>
AuthorDate: Wed Jan 16 15:49:18 2019 +0100

    CAMEL-13071: camel-elasticsearch-rest - Add scroll api support
---
 .../main/docs/elasticsearch-rest-component.adoc    |  41 +++++-
 .../elasticsearch/ElasticsearchComponent.java      |   2 +-
 .../elasticsearch/ElasticsearchConfiguration.java  |  26 ++++
 .../elasticsearch/ElasticsearchConstants.java      |   5 +
 .../elasticsearch/ElasticsearchEndpoint.java       |   2 +-
 .../elasticsearch/ElasticsearchProducer.java       |  13 +-
 .../ElasticsearchScrollRequestIterator.java        | 143 +++++++++++++++++++
 .../BulkRequestAggregationStrategy.java            |   2 +-
 .../ElasticsearchScrollSearchTest.java             | 152 +++++++++++++++++++++
 9 files changed, 381 insertions(+), 5 deletions(-)

diff --git a/components/camel-elasticsearch-rest/src/main/docs/elasticsearch-rest-component.adoc b/components/camel-elasticsearch-rest/src/main/docs/elasticsearch-rest-component.adoc
index 4d8b495..65c75b3 100644
--- a/components/camel-elasticsearch-rest/src/main/docs/elasticsearch-rest-component.adoc
+++ b/components/camel-elasticsearch-rest/src/main/docs/elasticsearch-rest-component.adoc
@@ -73,7 +73,7 @@ with the following path and query parameters:
 |===
 
 
-==== Query Parameters (11 parameters):
+==== Query Parameters (13 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -87,7 +87,9 @@ with the following path and query parameters:
 | *indexType* (producer) | The type of the index to act against |  | String
 | *maxRetryTimeout* (producer) | The time in ms before retry | 30000 | int
 | *operation* (producer) | What operation to perform |  | ElasticsearchOperation
+| *scrollKeepAliveMs* (producer) | Time in ms during which elasticsearch will keep search context alive | 60000 | int
 | *socketTimeout* (producer) | The timeout in ms to wait before the socket will timeout. | 30000 | int
+| *useScroll* (producer) | Enable scroll usage | false | boolean
 | *waitForActiveShards* (producer) | Index creation waits for the write consistency number of shards to be available | 1 | int
 | *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean
 |===
@@ -262,6 +264,43 @@ SearchHits response = template.requestBody("direct:search", query, SearchHits.cl
 
 ----
 
+Search using Elasticsearch scroll api in order to fetch all results.
+
+[source,java]
+----
+from("direct:search")
+  .to("elasticsearch-rest://elasticsearch?operation=Search&indexName=twitter&indexType=tweet&useScroll=true&scrollKeepAliveMs=30000");
+----
+
+[source,xml]
+----
+<route>
+    <from uri="direct:search" />
+    <to uri="elasticsearch-rest://elasticsearch?operation=Search&indexName=twitter&indexType=tweet&useScroll=true&scrollKeepAliveMs=30000"/>
+</route>
+----
+
+[source,java]
+----
+String query = "{\"query\":{\"match\":{\"content\":\"new release of ApacheCamel\"}}}";
+try (ElasticsearchScrollRequestIterator response = template.requestBody("direct:search", query, ElasticsearchScrollRequestIterator.class)) {
+    // do something smart with results
+}
+----
+
+link:split-eip.html[Split EIP] can also be used.
+
+[source,java]
+----
+from("direct:search")
+  .to("elasticsearch-rest://elasticsearch?operation=Search&indexName=twitter&indexType=tweet&useScroll=true&scrollKeepAliveMs=30000")
+  .split()
+  .body()
+  .streaming()
+  .to("mock:output")
+  .end();
+----
+
 === MultiSearch Example
 
 MultiSearching on specific field(s) and value use the Operation ´MultiSearch´.
diff --git a/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchComponent.java b/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchComponent.java
index e26f2cf..9b1d44a 100644
--- a/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchComponent.java
+++ b/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchComponent.java
@@ -24,9 +24,9 @@ import java.util.Map;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
+import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.annotations.Component;
 import org.apache.camel.support.DefaultComponent;
-import org.apache.camel.spi.Metadata;
 import org.apache.http.HttpHost;
 import org.elasticsearch.client.RestClient;
 
diff --git a/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java b/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
index 8adf6ff..f046f63 100644
--- a/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
+++ b/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
@@ -51,6 +51,10 @@ public class ElasticsearchConfiguration {
     private boolean disconnect;
     @UriParam(defaultValue = "false")
     private boolean enableSSL;
+    @UriParam(defaultValue = "false")
+    private boolean useScroll;
+    @UriParam(defaultValue = "" + ElasticsearchConstants.DEFAULT_SCROLL_KEEP_ALIVE_MS)
+    private int scrollKeepAliveMs = ElasticsearchConstants.DEFAULT_SCROLL_KEEP_ALIVE_MS;
 
     private String user;
     private String password;
@@ -243,4 +247,26 @@ public class ElasticsearchConfiguration {
     public void setSniffAfterFailureDelay(int sniffAfterFailureDelay) {
         this.sniffAfterFailureDelay = sniffAfterFailureDelay;
     }
+
+    /**
+     * Enable scroll usage
+     */
+    public boolean getUseScroll() {
+        return useScroll;
+    }
+
+    public void setUseScroll(boolean useScroll) {
+        this.useScroll = useScroll;
+    }
+
+    /**
+     * Time in ms during which elasticsearch will keep search context alive
+     */
+    public int getScrollKeepAliveMs() {
+        return scrollKeepAliveMs;
+    }
+
+    public void setScrollKeepAliveMs(int scrollKeepAliveMs) {
+        this.scrollKeepAliveMs = scrollKeepAliveMs;
+    }
 }
\ No newline at end of file
diff --git a/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java b/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java
index 4bc349b..c8b71a3 100644
--- a/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java
+++ b/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java
@@ -23,6 +23,10 @@ public interface ElasticsearchConstants {
     String PARAM_INDEX_NAME = "indexName";
     String PARAM_INDEX_TYPE = "indexType";
     String PARAM_WAIT_FOR_ACTIVE_SHARDS = "waitForActiveShards";
+    String PARAM_SCROLL_KEEP_ALIVE_MS = "scrollKeepAliveMs";
+    String PARAM_SCROLL = "useScroll";
+
+    String PROPERTY_SCROLL_ES_QUERY_COUNT = "CamelElasticsearchScrollQueryCount";
 
     int    DEFAULT_PORT = 9200;
     int    DEFAULT_FOR_WAIT_ACTIVE_SHARDS = 1; // Meaning only wait for the primary shard
@@ -31,5 +35,6 @@ public interface ElasticsearchConstants {
     int    DEFAULT_CONNECTION_TIMEOUT = 30000; // Meaning how many seconds before it timeout when establish connection
     int    DEFAULT_SNIFFER_INTERVAL = 60000 * 5; // Meaning how often it should search for elasticsearch nodes
     int    DEFAULT_AFTER_FAILURE_DELAY = 60000; // Meaning when should the sniff execution scheduled after a failure
+    int    DEFAULT_SCROLL_KEEP_ALIVE_MS = 60000; // Meaning how many milliseconds elasticsearch will keep the search context
 
 }
diff --git a/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java b/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java
index ada370c..f9e9c14 100644
--- a/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java
+++ b/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java
@@ -19,9 +19,9 @@ package org.apache.camel.component.elasticsearch;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.support.DefaultEndpoint;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
+import org.apache.camel.support.DefaultEndpoint;
 import org.elasticsearch.client.RestClient;
 
 /**
diff --git a/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java b/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
index 09d01d7..0db6adc 100644
--- a/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
+++ b/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
@@ -47,6 +47,9 @@ import org.elasticsearch.client.sniff.SnifferBuilder;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 
+import static org.apache.camel.component.elasticsearch.ElasticsearchConstants.PARAM_SCROLL;
+import static org.apache.camel.component.elasticsearch.ElasticsearchConstants.PARAM_SCROLL_KEEP_ALIVE_MS;
+
 
 /**
  * Represents an Elasticsearch producer.
@@ -191,7 +194,15 @@ public class ElasticsearchProducer extends DefaultProducer {
             }
         } else if (operation == ElasticsearchOperation.Search) {
             SearchRequest searchRequest = message.getBody(SearchRequest.class);
-            message.setBody(restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT).getHits());
+            // is it a scroll request ?
+            boolean useScroll = message.getHeader(PARAM_SCROLL, configuration.getUseScroll(), Boolean.class);
+            if (useScroll) {
+                int scrollKeepAliveMs = message.getHeader(PARAM_SCROLL_KEEP_ALIVE_MS, configuration.getScrollKeepAliveMs(), Integer.class);
+                ElasticsearchScrollRequestIterator scrollRequestIterator = new ElasticsearchScrollRequestIterator(searchRequest, restHighLevelClient, scrollKeepAliveMs, exchange);
+                exchange.getIn().setBody(scrollRequestIterator);
+            } else {
+                message.setBody(restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT).getHits());
+            }
         } else if (operation == ElasticsearchOperation.MultiSearch) {
             MultiSearchRequest searchRequest = message.getBody(MultiSearchRequest.class);
             message.setBody(restHighLevelClient.msearch(searchRequest, RequestOptions.DEFAULT).getResponses());
diff --git a/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchScrollRequestIterator.java b/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchScrollRequestIterator.java
new file mode 100644
index 0000000..bf2f9e5
--- /dev/null
+++ b/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchScrollRequestIterator.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.camel.Exchange;
+import org.elasticsearch.action.search.ClearScrollRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchScrollRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.search.Scroll;
+import org.elasticsearch.search.SearchHit;
+
+import static org.apache.camel.component.elasticsearch.ElasticsearchConstants.PROPERTY_SCROLL_ES_QUERY_COUNT;
+
+
+public class ElasticsearchScrollRequestIterator implements Iterator<SearchHit>, Closeable {
+    private final SearchRequest searchRequest;
+    private final RestHighLevelClient restHighLevelClient;
+    private Iterator<SearchHit> currentSearchHits;
+    private final int scrollKeepAliveMs;
+    private Exchange exchange;
+    private String scrollId;
+    private boolean closed;
+    private int requestCount;
+
+    public ElasticsearchScrollRequestIterator(SearchRequest searchRequest, RestHighLevelClient restHighLevelClient, int scrollKeepAliveMs, Exchange exchange) throws IOException {
+        this.searchRequest = searchRequest;
+        this.restHighLevelClient = restHighLevelClient;
+        this.scrollKeepAliveMs = scrollKeepAliveMs;
+        this.exchange = exchange;
+        this.closed = false;
+        this.requestCount = 0;
+
+        // add scroll option on the the first query
+        searchRequest.scroll(TimeValue.timeValueMillis(scrollKeepAliveMs));
+
+        setFirstCurrentSearchHits();
+    }
+
+    @Override
+    public boolean hasNext() {
+        if (closed) {
+            return false;
+        }
+
+        boolean hasNext = currentSearchHits.hasNext();
+        if (!hasNext) {
+            updateCurrentSearchHits();
+
+            hasNext = currentSearchHits.hasNext();
+        }
+
+        return hasNext;
+    }
+
+    @Override
+    public SearchHit next() {
+        return closed ? null : currentSearchHits.next();
+    }
+
+    /**
+     * Execute next Elasticsearch scroll request and update the current scroll result.
+     */
+    private void updateCurrentSearchHits() {
+        SearchResponse searchResponse = scrollSearch();
+        this.currentSearchHits = searchResponse.getHits().iterator();
+    }
+
+    private void setFirstCurrentSearchHits() {
+        SearchResponse searchResponse = firstSearch();
+        this.currentSearchHits = searchResponse.getHits().iterator();
+        this.scrollId = searchResponse.getScrollId();
+    }
+
+    private SearchResponse firstSearch() {
+        SearchResponse searchResponse;
+        try {
+            searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
+            requestCount++;
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+        return searchResponse;
+    }
+
+    private SearchResponse scrollSearch() {
+        SearchResponse searchResponse;
+        try {
+            SearchScrollRequest searchScrollRequest = new SearchScrollRequest()
+                    .scroll(new Scroll(TimeValue.timeValueMillis(scrollKeepAliveMs)))
+                    .scrollId(scrollId);
+
+            searchResponse = restHighLevelClient.scroll(searchScrollRequest, RequestOptions.DEFAULT);
+            requestCount++;
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+        return searchResponse;
+    }
+
+    public void close() {
+        if (!closed) {
+            try {
+                ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
+                clearScrollRequest.addScrollId(scrollId);
+
+                restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
+                closed = true;
+                exchange.setProperty(PROPERTY_SCROLL_ES_QUERY_COUNT, requestCount);
+            } catch (IOException e) {
+                throw new IllegalStateException(e);
+            }
+        }
+    }
+
+    public int getRequestCount() {
+        return requestCount;
+    }
+
+    public boolean isClosed() {
+        return closed;
+    }
+}
diff --git a/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/aggregation/BulkRequestAggregationStrategy.java b/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/aggregation/BulkRequestAggregationStrategy.java
index dd0d7da..b199550 100644
--- a/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/aggregation/BulkRequestAggregationStrategy.java
+++ b/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/aggregation/BulkRequestAggregationStrategy.java
@@ -16,9 +16,9 @@
  */
 package org.apache.camel.component.elasticsearch.aggregation;
 
+import org.apache.camel.AggregationStrategy;
 import org.apache.camel.Exchange;
 import org.apache.camel.InvalidPayloadRuntimeException;
-import org.apache.camel.AggregationStrategy;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.bulk.BulkRequest;
diff --git a/components/camel-elasticsearch-rest/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchScrollSearchTest.java b/components/camel-elasticsearch-rest/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchScrollSearchTest.java
new file mode 100644
index 0000000..09c5000
--- /dev/null
+++ b/components/camel-elasticsearch-rest/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchScrollSearchTest.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch;
+
+import java.io.IOException;
+import java.util.*;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.AggregationStrategies;
+import org.apache.camel.builder.ExchangeBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.junit.Test;
+
+import static org.apache.camel.component.elasticsearch.ElasticsearchConstants.*;
+
+public class ElasticsearchScrollSearchTest extends ElasticsearchBaseTest {
+
+    private static final String TWITTER_ES_INDEX_NAME = "twitter";
+    private static final String SPLIT_TWITTER_ES_INDEX_NAME = "split-" + TWITTER_ES_INDEX_NAME;
+
+    @Test
+    public void testScrollSearch() throws IOException {
+        // add some documents
+        for (int i = 0; i < 10; i++) {
+            Map<String, String> map = createIndexedData();
+            String indexId = template.requestBody("direct:scroll-index", map, String.class);
+            assertNotNull("indexId should be set", indexId);
+        }
+
+        // perform a refresh
+        Response refreshResponse = getClient().performRequest(new Request("post", "/" + TWITTER_ES_INDEX_NAME + "/_refresh"));
+        assertEquals("Cannot perform a refresh", 200, refreshResponse.getStatusLine().getStatusCode());
+
+        SearchRequest req = getScrollSearchRequest(TWITTER_ES_INDEX_NAME);
+
+        Exchange exchange = ExchangeBuilder.anExchange(context)
+                .withHeader(PARAM_SCROLL_KEEP_ALIVE_MS, 50000)
+                .withHeader(PARAM_SCROLL, true)
+                .withBody(req)
+                .build();
+
+        exchange = template.send("direct:scroll-search", exchange);
+
+        try (ElasticsearchScrollRequestIterator scrollRequestIterator = exchange.getIn().getBody(ElasticsearchScrollRequestIterator.class)) {
+            assertNotNull("response should not be null", scrollRequestIterator);
+
+            List result = new ArrayList();
+            scrollRequestIterator.forEachRemaining(result::add);
+
+            assertEquals("response hits should be == 10", 10, result.size());
+            assertEquals("11 request should have been send to Elasticsearch", 11, scrollRequestIterator.getRequestCount());
+        }
+
+        ElasticsearchScrollRequestIterator scrollRequestIterator = exchange.getIn().getBody(ElasticsearchScrollRequestIterator.class);
+        assertTrue("iterator should be closed", scrollRequestIterator.isClosed());
+        assertEquals("11 request should have been send to Elasticsearch", 11, (int) exchange.getProperty(PROPERTY_SCROLL_ES_QUERY_COUNT, Integer.class));
+    }
+
+    @Test
+    public void testScrollAndSplitSearch() throws IOException, InterruptedException {
+        // add some documents
+        for (int i = 0; i < 10; i++) {
+            Map<String, String> map = createIndexedData();
+            String indexId = template.requestBody("direct:scroll-n-split-index", map, String.class);
+            assertNotNull("indexId should be set", indexId);
+        }
+
+        // perform a refresh
+        Response refreshResponse = getClient().performRequest(new Request("post", "/" + SPLIT_TWITTER_ES_INDEX_NAME + "/_refresh"));
+        assertEquals("Cannot perform a refresh", 200, refreshResponse.getStatusLine().getStatusCode());
+
+        MockEndpoint mock = getMockEndpoint("mock:output");
+        mock.expectedMessageCount(1);
+        mock.setResultWaitTime(8000);
+
+        SearchRequest req = getScrollSearchRequest(SPLIT_TWITTER_ES_INDEX_NAME);
+
+        Exchange exchange = ExchangeBuilder.anExchange(context).withBody(req).build();
+        exchange = template.send("direct:scroll-n-split-search", exchange);
+
+        // wait for aggregation
+        mock.assertIsSatisfied();
+        Iterator<Exchange> iterator = mock.getReceivedExchanges().iterator();
+        assertTrue("response should contain 1 exchange", iterator.hasNext());
+        Collection aggregatedExchanges = iterator.next().getIn().getBody(Collection.class);
+
+        assertEquals("response hits should be == 10", 10, aggregatedExchanges.size());
+
+        ElasticsearchScrollRequestIterator scrollRequestIterator = exchange.getIn().getBody(ElasticsearchScrollRequestIterator.class);
+        assertTrue("iterator should be closed", scrollRequestIterator.isClosed());
+        assertEquals("11 request should have been send to Elasticsearch", 11, scrollRequestIterator.getRequestCount());
+        assertEquals("11 request should have been send to Elasticsearch", 11, (int)exchange.getProperty(PROPERTY_SCROLL_ES_QUERY_COUNT, Integer.class));
+    }
+
+    private SearchRequest getScrollSearchRequest(String indexName) {
+        SearchRequest req = new SearchRequest().indices(indexName).types("tweet");
+
+        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+        searchSourceBuilder.query(QueryBuilders.matchAllQuery()).size(1);
+        req.source(searchSourceBuilder);
+        return req;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("direct:scroll-index")
+                        .to("elasticsearch-rest://elasticsearch?operation=Index&indexName=" + TWITTER_ES_INDEX_NAME + "&indexType=tweet&hostAddresses=localhost:" + ES_BASE_HTTP_PORT);
+                from("direct:scroll-search")
+                        .to("elasticsearch-rest://elasticsearch?operation=Search&indexName=" + TWITTER_ES_INDEX_NAME + "&indexType=tweet&hostAddresses=localhost:" + ES_BASE_HTTP_PORT);
+
+                from("direct:scroll-n-split-index")
+                        .to("elasticsearch-rest://elasticsearch?operation=Index&indexName=" + SPLIT_TWITTER_ES_INDEX_NAME + "&indexType=tweet&hostAddresses=localhost:" + ES_BASE_HTTP_PORT);
+                from("direct:scroll-n-split-search")
+                        .to("elasticsearch-rest://elasticsearch?"
+                                + "useScroll=true&scrollKeepAliveMs=50000&operation=Search&indexName=" + SPLIT_TWITTER_ES_INDEX_NAME + "&indexType=tweet&hostAddresses=localhost:" + ES_BASE_HTTP_PORT)
+                        .split()
+                        .body()
+                        .streaming()
+                        .parallelProcessing()
+                        .threads(12)
+                        .aggregate(AggregationStrategies.groupedExchange())
+                        .constant(true)
+                        .completionSize(20)
+                        .completionTimeout(2000)
+                        .to("mock:output")
+                        .end();
+            }
+        };
+    }
+}