You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/11/25 09:47:52 UTC

[2/6] camel git commit: Adds support for UPDATE request, see [CAMEL-9358]

Adds support for UPDATE request, see [CAMEL-9358]


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b169bb23
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b169bb23
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b169bb23

Branch: refs/heads/master
Commit: b169bb23f00b6240ab05809e20d2bf9162d5a413
Parents: b94bdac
Author: acartapanis <ac...@diginext.fr>
Authored: Tue Nov 24 16:30:50 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Nov 25 08:38:07 2015 +0100

----------------------------------------------------------------------
 .../ElasticsearchConfiguration.java             |  2 +-
 .../elasticsearch/ElasticsearchConstants.java   |  1 +
 .../elasticsearch/ElasticsearchProducer.java    |  6 +++
 .../ElasticsearchActionRequestConverter.java    | 37 ++++++++++++++
 .../ElasticsearchComponentTest.java             | 54 ++++++++++++++++++++
 5 files changed, 99 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/b169bb23/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
index b71b925..1069902 100644
--- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
+++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
@@ -43,7 +43,7 @@ public class ElasticsearchConfiguration {
 
     @UriPath @Metadata(required = "true")
     private String clusterName;
-    @UriParam(enums = "INDEX,BULK,BULK_INDEX,GET_BY_ID,DELETE") @Metadata(required = "true")
+    @UriParam(enums = "INDEX,UPDATE,BULK,BULK_INDEX,GET_BY_ID,DELETE") @Metadata(required = "true")
     private String operation;
     @UriParam
     private String indexName;

http://git-wip-us.apache.org/repos/asf/camel/blob/b169bb23/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java
index 66388be..a8f79e1 100644
--- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java
+++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java
@@ -24,6 +24,7 @@ public interface ElasticsearchConstants {
 
     String PARAM_OPERATION = "operation";
     String OPERATION_INDEX = "INDEX";
+    String OPERATION_UPDATE = "UPDATE";
     String OPERATION_BULK = "BULK";
     String OPERATION_BULK_INDEX = "BULK_INDEX";
     String OPERATION_GET_BY_ID = "GET_BY_ID";

http://git-wip-us.apache.org/repos/asf/camel/blob/b169bb23/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
index 3a1afdb..476f7b7 100644
--- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
+++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
@@ -28,6 +28,7 @@ import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.get.GetRequest;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.client.Client;
 
 /**
@@ -60,6 +61,8 @@ public class ElasticsearchProducer extends DefaultProducer {
             return ElasticsearchConstants.OPERATION_INDEX;
         } else if (request instanceof GetRequest) {
             return ElasticsearchConstants.OPERATION_GET_BY_ID;
+        } else if (request instanceof UpdateRequest) {
+            return ElasticsearchConstants.OPERATION_UPDATE;
         } else if (request instanceof BulkRequest) {
             // do we want bulk or bulk_index?
             if ("BULK_INDEX".equals(getEndpoint().getConfig().getOperation())) {
@@ -131,6 +134,9 @@ public class ElasticsearchProducer extends DefaultProducer {
         if (ElasticsearchConstants.OPERATION_INDEX.equals(operation)) {
             IndexRequest indexRequest = message.getBody(IndexRequest.class);
             message.setBody(client.index(indexRequest).actionGet().getId());
+        } else if (ElasticsearchConstants.OPERATION_UPDATE.equals(operation)) {
+            UpdateRequest updateRequest = message.getBody(UpdateRequest.class);
+            message.setBody(client.update(updateRequest).actionGet().getId());
         } else if (ElasticsearchConstants.OPERATION_GET_BY_ID.equals(operation)) {
             GetRequest getRequest = message.getBody(GetRequest.class);
             message.setBody(client.get(getRequest));

http://git-wip-us.apache.org/repos/asf/camel/blob/b169bb23/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java
index 3763ff1..09b976e 100644
--- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java
+++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java
@@ -29,6 +29,7 @@ import org.elasticsearch.action.get.GetRequest;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.support.replication.ReplicationType;
+import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 
 @Converter
@@ -37,6 +38,36 @@ public final class ElasticsearchActionRequestConverter {
     private ElasticsearchActionRequestConverter() {
     }
 
+    // Update requests
+    private static UpdateRequest createUpdateRequest(Object document, Exchange exchange) {
+        UpdateRequest updateRequest = new UpdateRequest();
+        if (document instanceof byte[]) {
+            updateRequest.doc((byte[]) document);
+        } else if (document instanceof Map) {
+            updateRequest.doc((Map<String, Object>) document);
+        } else if (document instanceof String) {
+            updateRequest.doc((String) document);
+        } else if (document instanceof XContentBuilder) {
+            updateRequest.doc((XContentBuilder) document);
+        } else {
+            return null;
+        }
+
+        return updateRequest
+                .consistencyLevel(exchange.getIn().getHeader(
+                        ElasticsearchConstants.PARAM_CONSISTENCY_LEVEL, WriteConsistencyLevel.class))
+                .replicationType(exchange.getIn().getHeader(
+                        ElasticsearchConstants.PARAM_REPLICATION_TYPE, ReplicationType.class))
+                .parent(exchange.getIn().getHeader(
+                        ElasticsearchConstants.PARENT, String.class))
+                .index(exchange.getIn().getHeader(
+                        ElasticsearchConstants.PARAM_INDEX_NAME, String.class))
+                .type(exchange.getIn().getHeader(
+                        ElasticsearchConstants.PARAM_INDEX_TYPE, String.class));
+    }
+
+
+
     // Index requests
     private static IndexRequest createIndexRequest(Object document, Exchange exchange) {
         IndexRequest indexRequest = new IndexRequest();
@@ -72,6 +103,12 @@ public final class ElasticsearchActionRequestConverter {
     }
 
     @Converter
+    public static UpdateRequest toUpdateRequest(Object document, Exchange exchange) {
+        return createUpdateRequest(document, exchange)
+                .id(exchange.getIn().getHeader(ElasticsearchConstants.PARAM_INDEX_ID, String.class));
+    }
+
+    @Converter
     public static GetRequest toGetRequest(String id, Exchange exchange) {
         return new GetRequest(exchange.getIn().getHeader(
                 ElasticsearchConstants.PARAM_INDEX_NAME, String.class))

http://git-wip-us.apache.org/repos/asf/camel/blob/b169bb23/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
index 78d3196..0b0cdba 100644
--- a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
+++ b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
@@ -31,6 +31,7 @@ import org.elasticsearch.action.get.GetRequest;
 import org.elasticsearch.action.get.GetResponse;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.update.UpdateRequest;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
@@ -94,6 +95,20 @@ public class ElasticsearchComponentTest extends CamelTestSupport {
     }
 
     @Test
+    public void testUpdate() throws Exception {
+        Map<String, String> map = createIndexedData();
+        String indexId = template.requestBody("direct:index", map, String.class);
+        assertNotNull("indexId should be set", indexId);
+
+        Map<String, String> newMap = new HashMap<>();
+        newMap.put(createPrefix() + "key2", createPrefix() + "value2");
+        Map<String, Object> headers = new HashMap<>();
+        headers.put(ElasticsearchConstants.PARAM_INDEX_ID, indexId);
+        indexId = template.requestBodyAndHeaders("direct:update", newMap, headers, String.class);
+        assertNotNull("indexId should be set", indexId);
+    }
+
+    @Test
     public void testIndexWithReplication() throws Exception {
         Map<String, String> map = createIndexedData();
         String indexId = template.requestBody("direct:indexWithReplication", map, String.class);
@@ -203,6 +218,26 @@ public class ElasticsearchComponentTest extends CamelTestSupport {
     }
 
     @Test
+    public void testUpdateWithIDInHeader() throws Exception {
+        Map<String, String> map = createIndexedData();
+        Map<String, Object> headers = new HashMap<String, Object>();
+        headers.put(ElasticsearchConstants.PARAM_OPERATION, ElasticsearchConstants.OPERATION_INDEX);
+        headers.put(ElasticsearchConstants.PARAM_INDEX_NAME, "twitter");
+        headers.put(ElasticsearchConstants.PARAM_INDEX_TYPE, "tweet");
+        headers.put(ElasticsearchConstants.PARAM_INDEX_ID, "123");
+
+        String indexId = template.requestBodyAndHeaders("direct:start", map, headers, String.class);
+        assertNotNull("indexId should be set", indexId);
+        assertEquals("indexId should be equals to the provided id", "123", indexId);
+
+        headers.put(ElasticsearchConstants.PARAM_OPERATION, ElasticsearchConstants.OPERATION_UPDATE);
+
+        indexId = template.requestBodyAndHeaders("direct:start", map, headers, String.class);
+        assertNotNull("indexId should be set", indexId);
+        assertEquals("indexId should be equals to the provided id", "123", indexId);
+    }
+
+    @Test
     @Ignore("need to setup the cluster IP for this test")
     public void indexWithIp()  throws Exception {
         Map<String, String> map = createIndexedData();
@@ -318,6 +353,24 @@ public class ElasticsearchComponentTest extends CamelTestSupport {
     }
 
     @Test
+    public void updateRequestBody() throws Exception {
+        String prefix = createPrefix();
+
+        // first index data
+
+        IndexRequest indexRequest = new IndexRequest(prefix + "foo", prefix + "bar", prefix + "testId");
+        indexRequest.source("{\"" + prefix + "content\": \"" + prefix + "hello\"}");
+        template.requestBody("direct:index", indexRequest, String.class);
+
+        // then update
+        UpdateRequest request = new UpdateRequest(prefix + "foo", prefix + "bar", prefix + "testId");
+        request.doc("{\"" + prefix + "content2\": \"" + prefix + "hello2\"}");
+        String documentId = template.requestBody("direct:update", request, String.class);
+
+        assertThat(documentId, equalTo(prefix + "testId"));
+    }
+
+    @Test
     public void getRequestBody() throws Exception {
         String prefix = createPrefix();
 
@@ -398,6 +451,7 @@ public class ElasticsearchComponentTest extends CamelTestSupport {
             public void configure() {
                 from("direct:start").to("elasticsearch://local");
                 from("direct:index").to("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet");
+                from("direct:update").to("elasticsearch://local?operation=UPDATE&indexName=twitter&indexType=tweet");
                 from("direct:indexWithReplication").to("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet&replicationType=SYNC");
                 from("direct:indexWithWriteConsistency").to("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet&consistencyLevel=ONE");
                 from("direct:get").to("elasticsearch://local?operation=GET_BY_ID&indexName=twitter&indexType=tweet");