You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/02/10 04:21:56 UTC

[camel] branch master updated: CAMEL-12962: camel-elasticsearch - Allow from and size for SearchSource / SearchRequest to be set via URI (#3557)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 872afab  CAMEL-12962: camel-elasticsearch - Allow from and size for SearchSource / SearchRequest to be set via URI (#3557)
872afab is described below

commit 872afab4dca128ce13e7dec0f76b20f3976aff3a
Author: Miguel Caetano Serra <se...@gmail.com>
AuthorDate: Mon Feb 10 04:21:45 2020 +0000

    CAMEL-12962: camel-elasticsearch - Allow from and size for SearchSource / SearchRequest to be set via URI (#3557)
    
    Signed-off-by: Miguel Serra <se...@gmail.com>
---
 .../ElasticsearchEndpointConfigurer.java           |  2 +
 .../elasticsearch/elasticsearch-rest.json          |  2 +
 .../main/docs/elasticsearch-rest-component.adoc    |  4 +-
 .../elasticsearch/ElasticsearchConfiguration.java  | 26 +++++++++
 .../elasticsearch/ElasticsearchConstants.java      |  2 +
 .../elasticsearch/ElasticsearchProducer.java       | 12 ++++-
 .../ElasticsearchActionRequestConverter.java       |  9 +++-
 .../elasticsearch/ElasticsearchSizeLimitTest.java  | 63 ++++++++++++++++++++++
 8 files changed, 117 insertions(+), 3 deletions(-)

diff --git a/components/camel-elasticsearch-rest/src/generated/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpointConfigurer.java b/components/camel-elasticsearch-rest/src/generated/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpointConfigurer.java
index 1a02435..0701c11 100644
--- a/components/camel-elasticsearch-rest/src/generated/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpointConfigurer.java
+++ b/components/camel-elasticsearch-rest/src/generated/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpointConfigurer.java
@@ -22,6 +22,7 @@ public class ElasticsearchEndpointConfigurer extends PropertyConfigurerSupport i
         case "enableSniffer": target.getConfiguration().setEnableSniffer(property(camelContext, boolean.class, value)); return true;
         case "enablessl":
         case "enableSSL": target.getConfiguration().setEnableSSL(property(camelContext, boolean.class, value)); return true;
+        case "from": target.getConfiguration().setFrom(property(camelContext, java.lang.Integer.class, value)); return true;
         case "hostaddresses":
         case "hostAddresses": target.getConfiguration().setHostAddresses(property(camelContext, java.lang.String.class, value)); return true;
         case "indexname":
@@ -33,6 +34,7 @@ public class ElasticsearchEndpointConfigurer extends PropertyConfigurerSupport i
         case "operation": target.getConfiguration().setOperation(property(camelContext, org.apache.camel.component.elasticsearch.ElasticsearchOperation.class, value)); return true;
         case "scrollkeepalivems":
         case "scrollKeepAliveMs": target.getConfiguration().setScrollKeepAliveMs(property(camelContext, int.class, value)); return true;
+        case "size": target.getConfiguration().setSize(property(camelContext, java.lang.Integer.class, value)); return true;
         case "sniffafterfailuredelay":
         case "sniffAfterFailureDelay": target.getConfiguration().setSniffAfterFailureDelay(property(camelContext, int.class, value)); return true;
         case "snifferinterval":
diff --git a/components/camel-elasticsearch-rest/src/generated/resources/org/apache/camel/component/elasticsearch/elasticsearch-rest.json b/components/camel-elasticsearch-rest/src/generated/resources/org/apache/camel/component/elasticsearch/elasticsearch-rest.json
index a14eafe..1f304dd 100644
--- a/components/camel-elasticsearch-rest/src/generated/resources/org/apache/camel/component/elasticsearch/elasticsearch-rest.json
+++ b/components/camel-elasticsearch-rest/src/generated/resources/org/apache/camel/component/elasticsearch/elasticsearch-rest.json
@@ -39,12 +39,14 @@
     "disconnect": { "kind": "parameter", "displayName": "Disconnect", "group": "producer", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": "false", "configurationClass": "org.apache.camel.component.elasticsearch.ElasticsearchConfiguration", "configurationField": "configuration", "description": "Disconnect after it finish calling the producer" },
     "enableSniffer": { "kind": "parameter", "displayName": "Enable Sniffer", "group": "producer", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.elasticsearch.ElasticsearchConfiguration", "configurationField": "configuration", "description": "Enable automatically discover nodes from a running Elasticsearch cluster" },
     "enableSSL": { "kind": "parameter", "displayName": "Enable SSL", "group": "producer", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": "false", "configurationClass": "org.apache.camel.component.elasticsearch.ElasticsearchConfiguration", "configurationField": "configuration", "description": "Enable SSL" },
+    "from": { "kind": "parameter", "displayName": "From", "group": "producer", "label": "", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.elasticsearch.ElasticsearchConfiguration", "configurationField": "configuration", "description": "Starting index of the response." },
     "hostAddresses": { "kind": "parameter", "displayName": "Host Addresses", "group": "producer", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "secret": false, "configurationClass": "org.apache.camel.component.elasticsearch.ElasticsearchConfiguration", "configurationField": "configuration", "description": "Comma separated list with ip:port formatted remote transport addresses to use." },
     "indexName": { "kind": "parameter", "displayName": "Index Name", "group": "producer", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.elasticsearch.ElasticsearchConfiguration", "configurationField": "configuration", "description": "The name of the index to act against" },
     "lazyStartProducer": { "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the  [...]
     "maxRetryTimeout": { "kind": "parameter", "displayName": "Max Retry Timeout", "group": "producer", "label": "", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "defaultValue": "30000", "configurationClass": "org.apache.camel.component.elasticsearch.ElasticsearchConfiguration", "configurationField": "configuration", "description": "The time in ms before retry" },
     "operation": { "kind": "parameter", "displayName": "Operation", "group": "producer", "label": "", "required": false, "type": "object", "javaType": "org.apache.camel.component.elasticsearch.ElasticsearchOperation", "enum": [ "Index", "Update", "Bulk", "BulkIndex", "GetById", "MultiGet", "MultiSearch", "Delete", "DeleteIndex", "Search", "Exists", "Ping" ], "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.elasticsearch.ElasticsearchConfiguration",  [...]
     "scrollKeepAliveMs": { "kind": "parameter", "displayName": "Scroll Keep Alive Ms", "group": "producer", "label": "", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "defaultValue": "60000", "configurationClass": "org.apache.camel.component.elasticsearch.ElasticsearchConfiguration", "configurationField": "configuration", "description": "Time in ms during which elasticsearch will keep search context alive" },
+    "size": { "kind": "parameter", "displayName": "Size", "group": "producer", "label": "", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.elasticsearch.ElasticsearchConfiguration", "configurationField": "configuration", "description": "Size of the response." },
     "sniffAfterFailureDelay": { "kind": "parameter", "displayName": "Sniff After Failure Delay", "group": "producer", "label": "", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "defaultValue": "60000", "configurationClass": "org.apache.camel.component.elasticsearch.ElasticsearchConfiguration", "configurationField": "configuration", "description": "The delay of a sniff execution scheduled after a failure (in milliseconds)" },
     "snifferInterval": { "kind": "parameter", "displayName": "Sniffer Interval", "group": "producer", "label": "", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "defaultValue": "300000", "configurationClass": "org.apache.camel.component.elasticsearch.ElasticsearchConfiguration", "configurationField": "configuration", "description": "The interval between consecutive ordinary sniff executions in milliseconds. Will be honoured when sniffOnFai [...]
     "socketTimeout": { "kind": "parameter", "displayName": "Socket Timeout", "group": "producer", "label": "", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "defaultValue": "30000", "configurationClass": "org.apache.camel.component.elasticsearch.ElasticsearchConfiguration", "configurationField": "configuration", "description": "The timeout in ms to wait before the socket will timeout." },
diff --git a/components/camel-elasticsearch-rest/src/main/docs/elasticsearch-rest-component.adoc b/components/camel-elasticsearch-rest/src/main/docs/elasticsearch-rest-component.adoc
index c428db0..f23db06 100644
--- a/components/camel-elasticsearch-rest/src/main/docs/elasticsearch-rest-component.adoc
+++ b/components/camel-elasticsearch-rest/src/main/docs/elasticsearch-rest-component.adoc
@@ -81,7 +81,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (17 parameters):
+=== Query Parameters (19 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -91,12 +91,14 @@ with the following path and query parameters:
 | *disconnect* (producer) | Disconnect after it finish calling the producer | false | boolean
 | *enableSniffer* (producer) | Enable automatically discover nodes from a running Elasticsearch cluster | false | boolean
 | *enableSSL* (producer) | Enable SSL | false | boolean
+| *from* (producer) | Starting index of the response. |  | Integer
 | *hostAddresses* (producer) | *Required* Comma separated list with ip:port formatted remote transport addresses to use. |  | String
 | *indexName* (producer) | The name of the index to act against |  | String
 | *lazyStartProducer* (producer) | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and [...]
 | *maxRetryTimeout* (producer) | The time in ms before retry | 30000 | int
 | *operation* (producer) | What operation to perform. The value can be one of: Index, Update, Bulk, BulkIndex, GetById, MultiGet, MultiSearch, Delete, DeleteIndex, Search, Exists, Ping |  | ElasticsearchOperation
 | *scrollKeepAliveMs* (producer) | Time in ms during which elasticsearch will keep search context alive | 60000 | int
+| *size* (producer) | Size of the response. |  | Integer
 | *sniffAfterFailureDelay* (producer) | The delay of a sniff execution scheduled after a failure (in milliseconds) | 60000 | int
 | *snifferInterval* (producer) | The interval between consecutive ordinary sniff executions in milliseconds. Will be honoured when sniffOnFailure is disabled or when there are no failures between consecutive sniff executions | 300000 | int
 | *socketTimeout* (producer) | The timeout in ms to wait before the socket will timeout. | 30000 | int
diff --git a/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java b/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
index 41ba8ad..9c410de 100644
--- a/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
+++ b/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
@@ -37,6 +37,10 @@ public class ElasticsearchConfiguration {
     @UriParam
     private ElasticsearchOperation operation;
     @UriParam
+    private Integer size;
+    @UriParam
+    private Integer from;
+    @UriParam
     private String indexName;
     @UriParam(defaultValue = "" + ElasticsearchConstants.DEFAULT_FOR_WAIT_ACTIVE_SHARDS)
     private int waitForActiveShards = ElasticsearchConstants.DEFAULT_FOR_WAIT_ACTIVE_SHARDS;
@@ -64,6 +68,28 @@ public class ElasticsearchConfiguration {
     private int sniffAfterFailureDelay = ElasticsearchConstants.DEFAULT_AFTER_FAILURE_DELAY;
 
     /**
+     * Starting index of the response.
+     */
+    public Integer getFrom() {
+        return from;
+    }
+
+    public void setFrom(Integer from) {
+        this.from = from;
+    }
+
+    /**
+     * Size of the response.
+     */
+    public Integer getSize() {
+        return size;
+    }
+
+    public void setSize(Integer size) {
+        this.size = size;
+    }
+
+    /**
      * Name of the cluster
      */
     public String getClusterName() {
diff --git a/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java b/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java
index 4d58345..046c44c 100644
--- a/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java
+++ b/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java
@@ -24,6 +24,8 @@ public interface ElasticsearchConstants {
     String PARAM_WAIT_FOR_ACTIVE_SHARDS = "waitForActiveShards";
     String PARAM_SCROLL_KEEP_ALIVE_MS = "scrollKeepAliveMs";
     String PARAM_SCROLL = "useScroll";
+    String PARAM_SIZE = "size";
+    String PARAM_FROM = "from";
 
     String PROPERTY_SCROLL_ES_QUERY_COUNT = "CamelElasticsearchScrollQueryCount";
 
diff --git a/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java b/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
index fd30e78..9f817d5 100644
--- a/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
+++ b/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
@@ -145,6 +145,16 @@ public class ElasticsearchProducer extends DefaultProducer {
             configIndexName = true;
         }
 
+        Integer size = message.getHeader(ElasticsearchConstants.PARAM_SIZE, Integer.class);
+        if (size == null) {
+            message.setHeader(ElasticsearchConstants.PARAM_SIZE, configuration.getSize());
+        }
+
+        Integer from = message.getHeader(ElasticsearchConstants.PARAM_FROM, Integer.class);
+        if (from == null) {
+            message.setHeader(ElasticsearchConstants.PARAM_FROM, configuration.getFrom());
+        }
+
         boolean configWaitForActiveShards = false;
         Integer waitForActiveShards = message.getHeader(ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS, Integer.class);
         if (waitForActiveShards == null) {
@@ -324,7 +334,7 @@ public class ElasticsearchProducer extends DefaultProducer {
     public RestClient getClient() {
         return client;
     }
-    
+
     private final class HighLevelClient extends RestHighLevelClient {
         private HighLevelClient(RestClient restClient) {
             super(restClient, client -> { }, Collections.emptyList());
diff --git a/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java b/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java
index c3f98e9..0d54600 100644
--- a/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java
+++ b/components/camel-elasticsearch-rest/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java
@@ -153,6 +153,8 @@ public final class ElasticsearchActionRequestConverter {
         // Only setup the indexName and indexType if the message header has the
         // setting
         String indexName = exchange.getIn().getHeader(ElasticsearchConstants.PARAM_INDEX_NAME, String.class);
+        Integer size = exchange.getIn().getHeader(ElasticsearchConstants.PARAM_SIZE, Integer.class);
+        Integer from = exchange.getIn().getHeader(ElasticsearchConstants.PARAM_FROM, Integer.class);
         if (ObjectHelper.isNotEmpty(indexName)) {
             searchRequest.indices(indexName);
         }
@@ -186,7 +188,12 @@ public final class ElasticsearchActionRequestConverter {
             LOG.info("Cannot convert queryObject into SearchRequest object");
             return null;
         }
-
+        if (size != null)  {
+            searchSourceBuilder.size(size);
+        }
+        if (from != null)  {
+            searchSourceBuilder.from(from);
+        }
         searchSourceBuilder.query(QueryBuilders.wrapperQuery(queryText));
         searchRequest.source(searchSourceBuilder);
 
diff --git a/components/camel-elasticsearch-rest/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchSizeLimitTest.java b/components/camel-elasticsearch-rest/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchSizeLimitTest.java
new file mode 100644
index 0000000..c419d17
--- /dev/null
+++ b/components/camel-elasticsearch-rest/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchSizeLimitTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.elasticsearch.search.SearchHits;
+import org.junit.Test;
+
+public class ElasticsearchSizeLimitTest extends ElasticsearchBaseTest {
+
+    @Test
+    public void testSize() {
+        //put 4
+        String indexId = template.requestBody("direct:index", getContent("content"), String.class);
+        String indexId1 = template.requestBody("direct:index", getContent("content1"), String.class);
+        String indexId2 = template.requestBody("direct:index", getContent("content2"), String.class);
+        String indexId4 = template.requestBody("direct:index", getContent("content3"), String.class);
+
+        String query = "{\"query\":{\"match_all\": {}}}";
+        SearchHits searchWithSizeTwo = template.requestBody("direct:searchWithSizeTwo", query, SearchHits.class);
+        SearchHits searchFrom3 = template.requestBody("direct:searchFrom3", query, SearchHits.class);
+        assertEquals(2, searchWithSizeTwo.getHits().length);
+        assertEquals(1, searchFrom3.getHits().length);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("direct:index")
+                    .to("elasticsearch-rest://elasticsearch?operation=Index&indexName=twitter&hostAddresses=localhost:" + ES_BASE_HTTP_PORT);
+                from("direct:searchWithSizeTwo")
+                    .to("elasticsearch-rest://elasticsearch?operation=Search&indexName=twitter&size=2&hostAddresses=localhost:" + ES_BASE_HTTP_PORT);
+                from("direct:searchFrom3")
+                    .to("elasticsearch-rest://elasticsearch?operation=Search&indexName=twitter&from=3&hostAddresses=localhost:" + ES_BASE_HTTP_PORT);
+            }
+        };
+    }
+
+    private Map<String, String> getContent(String content) {
+        Map<String, String> map = new HashMap<>();
+        map.put("content", content);
+        return map;
+    }
+}