You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/11/25 09:47:52 UTC
[2/6] camel git commit: Adds support for UPDATE request,
see [CAMEL-9358]
Adds support for UPDATE request, see [CAMEL-9358]
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b169bb23
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b169bb23
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b169bb23
Branch: refs/heads/master
Commit: b169bb23f00b6240ab05809e20d2bf9162d5a413
Parents: b94bdac
Author: acartapanis <ac...@diginext.fr>
Authored: Tue Nov 24 16:30:50 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Nov 25 08:38:07 2015 +0100
----------------------------------------------------------------------
.../ElasticsearchConfiguration.java | 2 +-
.../elasticsearch/ElasticsearchConstants.java | 1 +
.../elasticsearch/ElasticsearchProducer.java | 6 +++
.../ElasticsearchActionRequestConverter.java | 37 ++++++++++++++
.../ElasticsearchComponentTest.java | 54 ++++++++++++++++++++
5 files changed, 99 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/b169bb23/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
index b71b925..1069902 100644
--- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
+++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
@@ -43,7 +43,7 @@ public class ElasticsearchConfiguration {
@UriPath @Metadata(required = "true")
private String clusterName;
- @UriParam(enums = "INDEX,BULK,BULK_INDEX,GET_BY_ID,DELETE") @Metadata(required = "true")
+ @UriParam(enums = "INDEX,UPDATE,BULK,BULK_INDEX,GET_BY_ID,DELETE") @Metadata(required = "true")
private String operation;
@UriParam
private String indexName;
http://git-wip-us.apache.org/repos/asf/camel/blob/b169bb23/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java
index 66388be..a8f79e1 100644
--- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java
+++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java
@@ -24,6 +24,7 @@ public interface ElasticsearchConstants {
String PARAM_OPERATION = "operation";
String OPERATION_INDEX = "INDEX";
+ String OPERATION_UPDATE = "UPDATE";
String OPERATION_BULK = "BULK";
String OPERATION_BULK_INDEX = "BULK_INDEX";
String OPERATION_GET_BY_ID = "GET_BY_ID";
http://git-wip-us.apache.org/repos/asf/camel/blob/b169bb23/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
index 3a1afdb..476f7b7 100644
--- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
+++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
@@ -28,6 +28,7 @@ import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;
/**
@@ -60,6 +61,8 @@ public class ElasticsearchProducer extends DefaultProducer {
return ElasticsearchConstants.OPERATION_INDEX;
} else if (request instanceof GetRequest) {
return ElasticsearchConstants.OPERATION_GET_BY_ID;
+ } else if (request instanceof UpdateRequest) {
+ return ElasticsearchConstants.OPERATION_UPDATE;
} else if (request instanceof BulkRequest) {
// do we want bulk or bulk_index?
if ("BULK_INDEX".equals(getEndpoint().getConfig().getOperation())) {
@@ -131,6 +134,9 @@ public class ElasticsearchProducer extends DefaultProducer {
if (ElasticsearchConstants.OPERATION_INDEX.equals(operation)) {
IndexRequest indexRequest = message.getBody(IndexRequest.class);
message.setBody(client.index(indexRequest).actionGet().getId());
+ } else if (ElasticsearchConstants.OPERATION_UPDATE.equals(operation)) {
+ UpdateRequest updateRequest = message.getBody(UpdateRequest.class);
+ message.setBody(client.update(updateRequest).actionGet().getId());
} else if (ElasticsearchConstants.OPERATION_GET_BY_ID.equals(operation)) {
GetRequest getRequest = message.getBody(GetRequest.class);
message.setBody(client.get(getRequest));
http://git-wip-us.apache.org/repos/asf/camel/blob/b169bb23/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java
index 3763ff1..09b976e 100644
--- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java
+++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java
@@ -29,6 +29,7 @@ import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.replication.ReplicationType;
+import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.xcontent.XContentBuilder;
@Converter
@@ -37,6 +38,36 @@ public final class ElasticsearchActionRequestConverter {
private ElasticsearchActionRequestConverter() {
}
+ // Update requests
+ private static UpdateRequest createUpdateRequest(Object document, Exchange exchange) {
+ UpdateRequest updateRequest = new UpdateRequest();
+ if (document instanceof byte[]) {
+ updateRequest.doc((byte[]) document);
+ } else if (document instanceof Map) {
+ updateRequest.doc((Map<String, Object>) document);
+ } else if (document instanceof String) {
+ updateRequest.doc((String) document);
+ } else if (document instanceof XContentBuilder) {
+ updateRequest.doc((XContentBuilder) document);
+ } else {
+ return null;
+ }
+
+ return updateRequest
+ .consistencyLevel(exchange.getIn().getHeader(
+ ElasticsearchConstants.PARAM_CONSISTENCY_LEVEL, WriteConsistencyLevel.class))
+ .replicationType(exchange.getIn().getHeader(
+ ElasticsearchConstants.PARAM_REPLICATION_TYPE, ReplicationType.class))
+ .parent(exchange.getIn().getHeader(
+ ElasticsearchConstants.PARENT, String.class))
+ .index(exchange.getIn().getHeader(
+ ElasticsearchConstants.PARAM_INDEX_NAME, String.class))
+ .type(exchange.getIn().getHeader(
+ ElasticsearchConstants.PARAM_INDEX_TYPE, String.class));
+ }
+
+
+
// Index requests
private static IndexRequest createIndexRequest(Object document, Exchange exchange) {
IndexRequest indexRequest = new IndexRequest();
@@ -72,6 +103,12 @@ public final class ElasticsearchActionRequestConverter {
}
@Converter
+ public static UpdateRequest toUpdateRequest(Object document, Exchange exchange) {
+ return createUpdateRequest(document, exchange)
+ .id(exchange.getIn().getHeader(ElasticsearchConstants.PARAM_INDEX_ID, String.class));
+ }
+
+ @Converter
public static GetRequest toGetRequest(String id, Exchange exchange) {
return new GetRequest(exchange.getIn().getHeader(
ElasticsearchConstants.PARAM_INDEX_NAME, String.class))
http://git-wip-us.apache.org/repos/asf/camel/blob/b169bb23/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
index 78d3196..0b0cdba 100644
--- a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
+++ b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
@@ -31,6 +31,7 @@ import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.update.UpdateRequest;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
@@ -94,6 +95,20 @@ public class ElasticsearchComponentTest extends CamelTestSupport {
}
@Test
+ public void testUpdate() throws Exception {
+ Map<String, String> map = createIndexedData();
+ String indexId = template.requestBody("direct:index", map, String.class);
+ assertNotNull("indexId should be set", indexId);
+
+ Map<String, String> newMap = new HashMap<>();
+ newMap.put(createPrefix() + "key2", createPrefix() + "value2");
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(ElasticsearchConstants.PARAM_INDEX_ID, indexId);
+ indexId = template.requestBodyAndHeaders("direct:update", newMap, headers, String.class);
+ assertNotNull("indexId should be set", indexId);
+ }
+
+ @Test
public void testIndexWithReplication() throws Exception {
Map<String, String> map = createIndexedData();
String indexId = template.requestBody("direct:indexWithReplication", map, String.class);
@@ -203,6 +218,26 @@ public class ElasticsearchComponentTest extends CamelTestSupport {
}
@Test
+ public void testUpdateWithIDInHeader() throws Exception {
+ Map<String, String> map = createIndexedData();
+ Map<String, Object> headers = new HashMap<String, Object>();
+ headers.put(ElasticsearchConstants.PARAM_OPERATION, ElasticsearchConstants.OPERATION_INDEX);
+ headers.put(ElasticsearchConstants.PARAM_INDEX_NAME, "twitter");
+ headers.put(ElasticsearchConstants.PARAM_INDEX_TYPE, "tweet");
+ headers.put(ElasticsearchConstants.PARAM_INDEX_ID, "123");
+
+ String indexId = template.requestBodyAndHeaders("direct:start", map, headers, String.class);
+ assertNotNull("indexId should be set", indexId);
+ assertEquals("indexId should be equals to the provided id", "123", indexId);
+
+ headers.put(ElasticsearchConstants.PARAM_OPERATION, ElasticsearchConstants.OPERATION_UPDATE);
+
+ indexId = template.requestBodyAndHeaders("direct:start", map, headers, String.class);
+ assertNotNull("indexId should be set", indexId);
+ assertEquals("indexId should be equals to the provided id", "123", indexId);
+ }
+
+ @Test
@Ignore("need to setup the cluster IP for this test")
public void indexWithIp() throws Exception {
Map<String, String> map = createIndexedData();
@@ -318,6 +353,24 @@ public class ElasticsearchComponentTest extends CamelTestSupport {
}
@Test
+ public void updateRequestBody() throws Exception {
+ String prefix = createPrefix();
+
+ // first index data
+
+ IndexRequest indexRequest = new IndexRequest(prefix + "foo", prefix + "bar", prefix + "testId");
+ indexRequest.source("{\"" + prefix + "content\": \"" + prefix + "hello\"}");
+ template.requestBody("direct:index", indexRequest, String.class);
+
+ // then update
+ UpdateRequest request = new UpdateRequest(prefix + "foo", prefix + "bar", prefix + "testId");
+ request.doc("{\"" + prefix + "content2\": \"" + prefix + "hello2\"}");
+ String documentId = template.requestBody("direct:update", request, String.class);
+
+ assertThat(documentId, equalTo(prefix + "testId"));
+ }
+
+ @Test
public void getRequestBody() throws Exception {
String prefix = createPrefix();
@@ -398,6 +451,7 @@ public class ElasticsearchComponentTest extends CamelTestSupport {
public void configure() {
from("direct:start").to("elasticsearch://local");
from("direct:index").to("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet");
+ from("direct:update").to("elasticsearch://local?operation=UPDATE&indexName=twitter&indexType=tweet");
from("direct:indexWithReplication").to("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet&replicationType=SYNC");
from("direct:indexWithWriteConsistency").to("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet&consistencyLevel=ONE");
from("direct:get").to("elasticsearch://local?operation=GET_BY_ID&indexName=twitter&indexType=tweet");