You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2019/01/17 07:35:14 UTC
[camel] branch master updated: CAMEL-13071:
camel-elasticsearch-rest - Add scroll api support
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 79b2a88 CAMEL-13071: camel-elasticsearch-rest - Add scroll api support
79b2a88 is described below
commit 79b2a8872316823b05c9355f0a4dd0487b798846
Author: Ludovic Boutros <bo...@gmail.com>
AuthorDate: Wed Jan 16 15:49:18 2019 +0100
CAMEL-13071: camel-elasticsearch-rest - Add scroll api support
---
.../main/docs/elasticsearch-rest-component.adoc | 41 +++++-
.../elasticsearch/ElasticsearchComponent.java | 2 +-
.../elasticsearch/ElasticsearchConfiguration.java | 26 ++++
.../elasticsearch/ElasticsearchConstants.java | 5 +
.../elasticsearch/ElasticsearchEndpoint.java | 2 +-
.../elasticsearch/ElasticsearchProducer.java | 13 +-
.../ElasticsearchScrollRequestIterator.java | 143 +++++++++++++++++++
.../BulkRequestAggregationStrategy.java | 2 +-
.../ElasticsearchScrollSearchTest.java | 152 +++++++++++++++++++++
9 files changed, 381 insertions(+), 5 deletions(-)
diff --git a/components/camel-elasticsearch-rest/src/main/docs/elasticsearch-rest-component.adoc b/components/camel-elasticsearch-rest/src/main/docs/elasticsearch-rest-component.adoc
index 4d8b495..65c75b3 100644
--- a/components/camel-elasticsearch-rest/src/main/docs/elasticsearch-rest-component.adoc
+++ b/components/camel-elasticsearch-rest/src/main/docs/elasticsearch-rest-component.adoc
@@ -73,7 +73,7 @@ with the following path and query parameters:
|===
-==== Query Parameters (11 parameters):
+==== Query Parameters (13 parameters):
[width="100%",cols="2,5,^1,2",options="header"]
@@ -87,7 +87,9 @@ with the following path and query parameters:
| *indexType* (producer) | The type of the index to act against | | String
| *maxRetryTimeout* (producer) | The time in ms before retry | 30000 | int
| *operation* (producer) | What operation to perform | | ElasticsearchOperation
+| *scrollKeepAliveMs* (producer) | Time in ms during which elasticsearch will keep search context alive | 60000 | int
| *socketTimeout* (producer) | The timeout in ms to wait before the socket will timeout. | 30000 | int
+| *useScroll* (producer) | Enable scroll usage | false | boolean
| *waitForActiveShards* (producer) | Index creation waits for the write consistency number of shards to be available | 1 | int
| *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean
|===
@@ -262,6 +264,43 @@ SearchHits response = template.requestBody("direct:search", query, SearchHits.cl
----
+Search using Elasticsearch scroll api in order to fetch all results.
+
+[source,java]
+----
+from("direct:search")
+ .to("elasticsearch-rest://elasticsearch?operation=Search&indexName=twitter&indexType=tweet&useScroll=true&scrollKeepAliveMs=30000");
+----
+
+[source,xml]
+----
+<route>
+ <from uri="direct:search" />
+ <to uri="elasticsearch-rest://elasticsearch?operation=Search&indexName=twitter&indexType=tweet&useScroll=true&scrollKeepAliveMs=30000"/>
+</route>
+----
+
+[source,java]
+----
+String query = "{\"query\":{\"match\":{\"content\":\"new release of ApacheCamel\"}}}";
+try (ElasticsearchScrollRequestIterator response = template.requestBody("direct:search", query, ElasticsearchScrollRequestIterator.class)) {
+ // do something smart with results
+}
+----
+
+link:split-eip.html[Split EIP] can also be used.
+
+[source,java]
+----
+from("direct:search")
+ .to("elasticsearch-rest://elasticsearch?operation=Search&indexName=twitter&indexType=tweet&useScroll=true&scrollKeepAliveMs=30000")
+ .split()
+ .body()
+ .streaming()
+ .to("mock:output")
+ .end();
+----
+
=== MultiSearch Example
MultiSearching on specific field(s) and value use the Operation ´MultiSearch´.
diff --git a/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchComponent.java b/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchComponent.java
index e26f2cf..9b1d44a 100644
--- a/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchComponent.java
+++ b/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchComponent.java
@@ -24,9 +24,9 @@ import java.util.Map;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
+import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.annotations.Component;
import org.apache.camel.support.DefaultComponent;
-import org.apache.camel.spi.Metadata;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
diff --git a/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java b/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
index 8adf6ff..f046f63 100644
--- a/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
+++ b/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
@@ -51,6 +51,10 @@ public class ElasticsearchConfiguration {
private boolean disconnect;
@UriParam(defaultValue = "false")
private boolean enableSSL;
+ @UriParam(defaultValue = "false")
+ private boolean useScroll;
+ @UriParam(defaultValue = "" + ElasticsearchConstants.DEFAULT_SCROLL_KEEP_ALIVE_MS)
+ private int scrollKeepAliveMs = ElasticsearchConstants.DEFAULT_SCROLL_KEEP_ALIVE_MS;
private String user;
private String password;
@@ -243,4 +247,26 @@ public class ElasticsearchConfiguration {
public void setSniffAfterFailureDelay(int sniffAfterFailureDelay) {
this.sniffAfterFailureDelay = sniffAfterFailureDelay;
}
+
+ /**
+ * Enable scroll usage
+ */
+ public boolean getUseScroll() {
+ return useScroll;
+ }
+
+ public void setUseScroll(boolean useScroll) {
+ this.useScroll = useScroll;
+ }
+
+ /**
+ * Time in ms during which elasticsearch will keep search context alive
+ */
+ public int getScrollKeepAliveMs() {
+ return scrollKeepAliveMs;
+ }
+
+ public void setScrollKeepAliveMs(int scrollKeepAliveMs) {
+ this.scrollKeepAliveMs = scrollKeepAliveMs;
+ }
}
\ No newline at end of file
diff --git a/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java b/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java
index 4bc349b..c8b71a3 100644
--- a/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java
+++ b/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java
@@ -23,6 +23,10 @@ public interface ElasticsearchConstants {
String PARAM_INDEX_NAME = "indexName";
String PARAM_INDEX_TYPE = "indexType";
String PARAM_WAIT_FOR_ACTIVE_SHARDS = "waitForActiveShards";
+ String PARAM_SCROLL_KEEP_ALIVE_MS = "scrollKeepAliveMs";
+ String PARAM_SCROLL = "useScroll";
+
+ String PROPERTY_SCROLL_ES_QUERY_COUNT = "CamelElasticsearchScrollQueryCount";
int DEFAULT_PORT = 9200;
int DEFAULT_FOR_WAIT_ACTIVE_SHARDS = 1; // Meaning only wait for the primary shard
@@ -31,5 +35,6 @@ public interface ElasticsearchConstants {
int DEFAULT_CONNECTION_TIMEOUT = 30000; // Meaning how many seconds before it timeout when establish connection
int DEFAULT_SNIFFER_INTERVAL = 60000 * 5; // Meaning how often it should search for elasticsearch nodes
int DEFAULT_AFTER_FAILURE_DELAY = 60000; // Meaning when should the sniff execution scheduled after a failure
+ int DEFAULT_SCROLL_KEEP_ALIVE_MS = 60000; // Meaning how many milliseconds elasticsearch will keep the search context
}
diff --git a/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java b/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java
index ada370c..f9e9c14 100644
--- a/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java
+++ b/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java
@@ -19,9 +19,9 @@ package org.apache.camel.component.elasticsearch;
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
-import org.apache.camel.support.DefaultEndpoint;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
+import org.apache.camel.support.DefaultEndpoint;
import org.elasticsearch.client.RestClient;
/**
diff --git a/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java b/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
index 09d01d7..0db6adc 100644
--- a/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
+++ b/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
@@ -47,6 +47,9 @@ import org.elasticsearch.client.sniff.SnifferBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.builder.SearchSourceBuilder;
+import static org.apache.camel.component.elasticsearch.ElasticsearchConstants.PARAM_SCROLL;
+import static org.apache.camel.component.elasticsearch.ElasticsearchConstants.PARAM_SCROLL_KEEP_ALIVE_MS;
+
/**
* Represents an Elasticsearch producer.
@@ -191,7 +194,15 @@ public class ElasticsearchProducer extends DefaultProducer {
}
} else if (operation == ElasticsearchOperation.Search) {
SearchRequest searchRequest = message.getBody(SearchRequest.class);
- message.setBody(restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT).getHits());
+ // is it a scroll request ?
+ boolean useScroll = message.getHeader(PARAM_SCROLL, configuration.getUseScroll(), Boolean.class);
+ if (useScroll) {
+ int scrollKeepAliveMs = message.getHeader(PARAM_SCROLL_KEEP_ALIVE_MS, configuration.getScrollKeepAliveMs(), Integer.class);
+ ElasticsearchScrollRequestIterator scrollRequestIterator = new ElasticsearchScrollRequestIterator(searchRequest, restHighLevelClient, scrollKeepAliveMs, exchange);
+ exchange.getIn().setBody(scrollRequestIterator);
+ } else {
+ message.setBody(restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT).getHits());
+ }
} else if (operation == ElasticsearchOperation.MultiSearch) {
MultiSearchRequest searchRequest = message.getBody(MultiSearchRequest.class);
message.setBody(restHighLevelClient.msearch(searchRequest, RequestOptions.DEFAULT).getResponses());
diff --git a/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchScrollRequestIterator.java b/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchScrollRequestIterator.java
new file mode 100644
index 0000000..bf2f9e5
--- /dev/null
+++ b/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchScrollRequestIterator.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.camel.Exchange;
+import org.elasticsearch.action.search.ClearScrollRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchScrollRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.search.Scroll;
+import org.elasticsearch.search.SearchHit;
+
+import static org.apache.camel.component.elasticsearch.ElasticsearchConstants.PROPERTY_SCROLL_ES_QUERY_COUNT;
+
+
+public class ElasticsearchScrollRequestIterator implements Iterator<SearchHit>, Closeable {
+ private final SearchRequest searchRequest;
+ private final RestHighLevelClient restHighLevelClient;
+ private Iterator<SearchHit> currentSearchHits;
+ private final int scrollKeepAliveMs;
+ private Exchange exchange;
+ private String scrollId;
+ private boolean closed;
+ private int requestCount;
+
+ public ElasticsearchScrollRequestIterator(SearchRequest searchRequest, RestHighLevelClient restHighLevelClient, int scrollKeepAliveMs, Exchange exchange) throws IOException {
+ this.searchRequest = searchRequest;
+ this.restHighLevelClient = restHighLevelClient;
+ this.scrollKeepAliveMs = scrollKeepAliveMs;
+ this.exchange = exchange;
+ this.closed = false;
+ this.requestCount = 0;
+
+ // add scroll option on the the first query
+ searchRequest.scroll(TimeValue.timeValueMillis(scrollKeepAliveMs));
+
+ setFirstCurrentSearchHits();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (closed) {
+ return false;
+ }
+
+ boolean hasNext = currentSearchHits.hasNext();
+ if (!hasNext) {
+ updateCurrentSearchHits();
+
+ hasNext = currentSearchHits.hasNext();
+ }
+
+ return hasNext;
+ }
+
+ @Override
+ public SearchHit next() {
+ return closed ? null : currentSearchHits.next();
+ }
+
+ /**
+ * Execute next Elasticsearch scroll request and update the current scroll result.
+ */
+ private void updateCurrentSearchHits() {
+ SearchResponse searchResponse = scrollSearch();
+ this.currentSearchHits = searchResponse.getHits().iterator();
+ }
+
+ private void setFirstCurrentSearchHits() {
+ SearchResponse searchResponse = firstSearch();
+ this.currentSearchHits = searchResponse.getHits().iterator();
+ this.scrollId = searchResponse.getScrollId();
+ }
+
+ private SearchResponse firstSearch() {
+ SearchResponse searchResponse;
+ try {
+ searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
+ requestCount++;
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ return searchResponse;
+ }
+
+ private SearchResponse scrollSearch() {
+ SearchResponse searchResponse;
+ try {
+ SearchScrollRequest searchScrollRequest = new SearchScrollRequest()
+ .scroll(new Scroll(TimeValue.timeValueMillis(scrollKeepAliveMs)))
+ .scrollId(scrollId);
+
+ searchResponse = restHighLevelClient.scroll(searchScrollRequest, RequestOptions.DEFAULT);
+ requestCount++;
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ return searchResponse;
+ }
+
+ public void close() {
+ if (!closed) {
+ try {
+ ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
+ clearScrollRequest.addScrollId(scrollId);
+
+ restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
+ closed = true;
+ exchange.setProperty(PROPERTY_SCROLL_ES_QUERY_COUNT, requestCount);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ }
+
+ public int getRequestCount() {
+ return requestCount;
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+}
diff --git a/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/aggregation/BulkRequestAggregationStrategy.java b/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/aggregation/BulkRequestAggregationStrategy.java
index dd0d7da..b199550 100644
--- a/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/aggregation/BulkRequestAggregationStrategy.java
+++ b/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/aggregation/BulkRequestAggregationStrategy.java
@@ -16,9 +16,9 @@
*/
package org.apache.camel.component.elasticsearch.aggregation;
+import org.apache.camel.AggregationStrategy;
import org.apache.camel.Exchange;
import org.apache.camel.InvalidPayloadRuntimeException;
-import org.apache.camel.AggregationStrategy;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkRequest;
diff --git a/components/camel-elasticsearch-rest/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchScrollSearchTest.java b/components/camel-elasticsearch-rest/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchScrollSearchTest.java
new file mode 100644
index 0000000..09c5000
--- /dev/null
+++ b/components/camel-elasticsearch-rest/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchScrollSearchTest.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch;
+
+import java.io.IOException;
+import java.util.*;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.AggregationStrategies;
+import org.apache.camel.builder.ExchangeBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.junit.Test;
+
+import static org.apache.camel.component.elasticsearch.ElasticsearchConstants.*;
+
+public class ElasticsearchScrollSearchTest extends ElasticsearchBaseTest {
+
+ private static final String TWITTER_ES_INDEX_NAME = "twitter";
+ private static final String SPLIT_TWITTER_ES_INDEX_NAME = "split-" + TWITTER_ES_INDEX_NAME;
+
+ @Test
+ public void testScrollSearch() throws IOException {
+ // add some documents
+ for (int i = 0; i < 10; i++) {
+ Map<String, String> map = createIndexedData();
+ String indexId = template.requestBody("direct:scroll-index", map, String.class);
+ assertNotNull("indexId should be set", indexId);
+ }
+
+ // perform a refresh
+ Response refreshResponse = getClient().performRequest(new Request("post", "/" + TWITTER_ES_INDEX_NAME + "/_refresh"));
+ assertEquals("Cannot perform a refresh", 200, refreshResponse.getStatusLine().getStatusCode());
+
+ SearchRequest req = getScrollSearchRequest(TWITTER_ES_INDEX_NAME);
+
+ Exchange exchange = ExchangeBuilder.anExchange(context)
+ .withHeader(PARAM_SCROLL_KEEP_ALIVE_MS, 50000)
+ .withHeader(PARAM_SCROLL, true)
+ .withBody(req)
+ .build();
+
+ exchange = template.send("direct:scroll-search", exchange);
+
+ try (ElasticsearchScrollRequestIterator scrollRequestIterator = exchange.getIn().getBody(ElasticsearchScrollRequestIterator.class)) {
+ assertNotNull("response should not be null", scrollRequestIterator);
+
+ List result = new ArrayList();
+ scrollRequestIterator.forEachRemaining(result::add);
+
+ assertEquals("response hits should be == 10", 10, result.size());
+ assertEquals("11 request should have been send to Elasticsearch", 11, scrollRequestIterator.getRequestCount());
+ }
+
+ ElasticsearchScrollRequestIterator scrollRequestIterator = exchange.getIn().getBody(ElasticsearchScrollRequestIterator.class);
+ assertTrue("iterator should be closed", scrollRequestIterator.isClosed());
+ assertEquals("11 request should have been send to Elasticsearch", 11, (int) exchange.getProperty(PROPERTY_SCROLL_ES_QUERY_COUNT, Integer.class));
+ }
+
+ @Test
+ public void testScrollAndSplitSearch() throws IOException, InterruptedException {
+ // add some documents
+ for (int i = 0; i < 10; i++) {
+ Map<String, String> map = createIndexedData();
+ String indexId = template.requestBody("direct:scroll-n-split-index", map, String.class);
+ assertNotNull("indexId should be set", indexId);
+ }
+
+ // perform a refresh
+ Response refreshResponse = getClient().performRequest(new Request("post", "/" + SPLIT_TWITTER_ES_INDEX_NAME + "/_refresh"));
+ assertEquals("Cannot perform a refresh", 200, refreshResponse.getStatusLine().getStatusCode());
+
+ MockEndpoint mock = getMockEndpoint("mock:output");
+ mock.expectedMessageCount(1);
+ mock.setResultWaitTime(8000);
+
+ SearchRequest req = getScrollSearchRequest(SPLIT_TWITTER_ES_INDEX_NAME);
+
+ Exchange exchange = ExchangeBuilder.anExchange(context).withBody(req).build();
+ exchange = template.send("direct:scroll-n-split-search", exchange);
+
+ // wait for aggregation
+ mock.assertIsSatisfied();
+ Iterator<Exchange> iterator = mock.getReceivedExchanges().iterator();
+ assertTrue("response should contain 1 exchange", iterator.hasNext());
+ Collection aggregatedExchanges = iterator.next().getIn().getBody(Collection.class);
+
+ assertEquals("response hits should be == 10", 10, aggregatedExchanges.size());
+
+ ElasticsearchScrollRequestIterator scrollRequestIterator = exchange.getIn().getBody(ElasticsearchScrollRequestIterator.class);
+ assertTrue("iterator should be closed", scrollRequestIterator.isClosed());
+ assertEquals("11 request should have been send to Elasticsearch", 11, scrollRequestIterator.getRequestCount());
+ assertEquals("11 request should have been send to Elasticsearch", 11, (int)exchange.getProperty(PROPERTY_SCROLL_ES_QUERY_COUNT, Integer.class));
+ }
+
+ private SearchRequest getScrollSearchRequest(String indexName) {
+ SearchRequest req = new SearchRequest().indices(indexName).types("tweet");
+
+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+ searchSourceBuilder.query(QueryBuilders.matchAllQuery()).size(1);
+ req.source(searchSourceBuilder);
+ return req;
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:scroll-index")
+ .to("elasticsearch-rest://elasticsearch?operation=Index&indexName=" + TWITTER_ES_INDEX_NAME + "&indexType=tweet&hostAddresses=localhost:" + ES_BASE_HTTP_PORT);
+ from("direct:scroll-search")
+ .to("elasticsearch-rest://elasticsearch?operation=Search&indexName=" + TWITTER_ES_INDEX_NAME + "&indexType=tweet&hostAddresses=localhost:" + ES_BASE_HTTP_PORT);
+
+ from("direct:scroll-n-split-index")
+ .to("elasticsearch-rest://elasticsearch?operation=Index&indexName=" + SPLIT_TWITTER_ES_INDEX_NAME + "&indexType=tweet&hostAddresses=localhost:" + ES_BASE_HTTP_PORT);
+ from("direct:scroll-n-split-search")
+ .to("elasticsearch-rest://elasticsearch?"
+ + "useScroll=true&scrollKeepAliveMs=50000&operation=Search&indexName=" + SPLIT_TWITTER_ES_INDEX_NAME + "&indexType=tweet&hostAddresses=localhost:" + ES_BASE_HTTP_PORT)
+ .split()
+ .body()
+ .streaming()
+ .parallelProcessing()
+ .threads(12)
+ .aggregate(AggregationStrategies.groupedExchange())
+ .constant(true)
+ .completionSize(20)
+ .completionTimeout(2000)
+ .to("mock:output")
+ .end();
+ }
+ };
+ }
+}