You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by bo...@apache.org on 2012/08/02 06:52:37 UTC
svn commit: r1368351 - in /camel/trunk/components/camel-elasticsearch/src:
main/java/org/apache/camel/component/elasticsearch/
test/java/org/apache/camel/component/elasticsearch/ test/resources/
Author: boday
Date: Thu Aug 2 04:52:36 2012
New Revision: 1368351
URL: http://svn.apache.org/viewvc?rev=1368351&view=rev
Log:
CAMEL-5481 added basic GET_BY_ID support to camel-elasticsearch and fixed some issues with the configuration class
Modified:
camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchComponent.java
camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java
camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchConfigurationTest.java
camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchIndexNameAndTypeInHeaderComponentTest.java
camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/SpringElasticsearchTest.java
camel/trunk/components/camel-elasticsearch/src/test/resources/SpringElasticsearchTest.xml
Modified: camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchComponent.java?rev=1368351&r1=1368350&r2=1368351&view=diff
==============================================================================
--- camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchComponent.java (original)
+++ camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchComponent.java Thu Aug 2 04:52:36 2012
@@ -28,21 +28,16 @@ import org.apache.camel.impl.DefaultComp
*/
public class ElasticsearchComponent extends DefaultComponent {
- private ElasticsearchConfiguration config;
-
public ElasticsearchComponent() {
super();
- config = new ElasticsearchConfiguration();
}
public ElasticsearchComponent(CamelContext context) {
super(context);
- config = new ElasticsearchConfiguration();
}
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
- config.parseURI(new URI(uri), parameters, this);
- Endpoint endpoint = new ElasticsearchEndpoint(uri, this, config);
+ Endpoint endpoint = new ElasticsearchEndpoint(uri, this, parameters);
setProperties(endpoint, parameters);
return endpoint;
}
Modified: camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java?rev=1368351&r1=1368350&r2=1368351&view=diff
==============================================================================
--- camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java (original)
+++ camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java Thu Aug 2 04:52:36 2012
@@ -29,6 +29,10 @@ import static org.elasticsearch.node.Nod
public class ElasticsearchConfiguration {
+ public static final String PARAM_OPERATION = "operation";
+ public static final String OPERATION_INDEX = "INDEX";
+ public static final String OPERATION_GET_BY_ID = "GET_BY_ID";
+ public static final String PARAM_INDEX_ID = "indexId";
public static final String PARAM_DATA = "data";
public static final String PARAM_INDEX_NAME = "indexName";
public static final String PARAM_INDEX_TYPE = "indexType";
@@ -44,16 +48,10 @@ public class ElasticsearchConfiguration
private String indexType;
private boolean local;
private Boolean data;
+ private String operation;
- public ElasticsearchConfiguration() {
- }
-
- public ElasticsearchConfiguration(URI uri) throws Exception {
- this();
- this.uri = uri;
- }
+ public ElasticsearchConfiguration(URI uri, Map<String, Object> parameters) throws Exception {
- public void parseURI(URI uri, Map<String, Object> parameters, ElasticsearchComponent component) throws Exception {
String protocol = uri.getScheme();
if (!protocol.equalsIgnoreCase(PROTOCOL)) {
@@ -86,6 +84,7 @@ public class ElasticsearchConfiguration
indexName = (String)parameters.remove(PARAM_INDEX_NAME);
indexType = (String)parameters.remove(PARAM_INDEX_TYPE);
+ operation = (String)parameters.remove(PARAM_OPERATION);
}
protected Boolean toBoolean(Object string) {
@@ -178,4 +177,11 @@ public class ElasticsearchConfiguration
this.data = data;
}
+ public void setOperation(String operation) {
+ this.operation = operation;
+ }
+
+ public String getOperation() {
+ return this.operation;
+ }
}
Modified: camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java?rev=1368351&r1=1368350&r2=1368351&view=diff
==============================================================================
--- camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java (original)
+++ camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java Thu Aug 2 04:52:36 2012
@@ -16,6 +16,9 @@
*/
package org.apache.camel.component.elasticsearch;
+import java.net.URI;
+import java.util.Map;
+
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
@@ -37,9 +40,9 @@ public class ElasticsearchEndpoint exten
private Client client;
private ElasticsearchConfiguration config;
- public ElasticsearchEndpoint(String uri, ElasticsearchComponent component, ElasticsearchConfiguration config) {
+ public ElasticsearchEndpoint(String uri, ElasticsearchComponent component, Map<String, Object> parameters) throws Exception {
super(uri, component);
- this.config = config;
+ this.config = new ElasticsearchConfiguration(new URI(uri), parameters);
}
public Producer createProducer() throws Exception {
@@ -91,4 +94,8 @@ public class ElasticsearchEndpoint exten
return config;
}
+ public void setOperation(String operation) {
+ config.setOperation(operation);
+ }
+
}
Modified: camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java?rev=1368351&r1=1368350&r2=1368351&view=diff
==============================================================================
--- camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java (original)
+++ camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java Thu Aug 2 04:52:36 2012
@@ -23,8 +23,10 @@ import org.apache.camel.ExpectedBodyType
import org.apache.camel.Message;
import org.apache.camel.impl.DefaultProducer;
import org.elasticsearch.action.ListenableActionFuture;
+import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
+import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -41,8 +43,46 @@ public class ElasticsearchProducer exten
}
public void process(Exchange exchange) throws Exception {
+
+ String operation = (String) exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_OPERATION);
+ if (operation == null) {
+ operation = endpoint.getConfig().getOperation();
+ }
+
+ if (operation == null) {
+ throw new IllegalArgumentException(ElasticsearchConfiguration.PARAM_OPERATION + " is missing");
+ }
+
Client client = endpoint.getClient();
- log.debug("indexing " + exchange);
+
+ if (operation.equalsIgnoreCase(ElasticsearchConfiguration.OPERATION_INDEX)) {
+ addToIndex(client, exchange);
+ } else if (operation.equalsIgnoreCase(ElasticsearchConfiguration.OPERATION_GET_BY_ID)) {
+ getById(client, exchange);
+ } else {
+ throw new IllegalArgumentException(ElasticsearchConfiguration.PARAM_OPERATION + " value '" + operation + "' is not supported");
+ }
+ }
+
+ public void getById(Client client, Exchange exchange) {
+
+ String indexName = exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_INDEX_NAME, String.class);
+ if (indexName == null) {
+ indexName = endpoint.getConfig().getIndexName();
+ }
+
+ String indexType = exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_INDEX_TYPE, String.class);
+ if (indexType == null) {
+ indexType = endpoint.getConfig().getIndexType();
+ }
+
+ String indexId = exchange.getIn().getBody(String.class);
+
+ GetResponse response = client.prepareGet(indexName, indexType, indexId).execute().actionGet();
+ exchange.getIn().setBody(response);
+ }
+
+ public void addToIndex(Client client, Exchange exchange) {
String indexName = exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_INDEX_NAME, String.class);
if (indexName == null) {
@@ -55,11 +95,13 @@ public class ElasticsearchProducer exten
}
IndexRequestBuilder prepareIndex = client.prepareIndex(indexName, indexType);
+
if (!setIndexRequestSource(exchange.getIn(), prepareIndex)) {
throw new ExpectedBodyTypeException(exchange, XContentBuilder.class);
}
ListenableActionFuture<IndexResponse> future = prepareIndex.execute();
IndexResponse response = future.actionGet();
+ exchange.getIn().setBody(response.getId());
}
private boolean setIndexRequestSource(Message msg, IndexRequestBuilder builder) {
Modified: camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java?rev=1368351&r1=1368350&r2=1368351&view=diff
==============================================================================
--- camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java (original)
+++ camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java Thu Aug 2 04:52:36 2012
@@ -17,31 +17,82 @@
package org.apache.camel.component.elasticsearch;
import java.util.HashMap;
+import java.util.Map;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit4.CamelTestSupport;
+import org.elasticsearch.action.get.GetResponse;
import org.junit.Test;
public class ElasticsearchComponentTest extends CamelTestSupport {
@Test
public void testIndex() throws Exception {
- MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedMinimumMessageCount(1);
- sendBody("direct:index", new HashMap<String, String>() {
- {
- put("content", "test");
- }
- });
- assertMockEndpointsSatisfied();
+ HashMap<String, String> map = new HashMap<String, String>();
+ map.put("content", "test");
+ String indexId = (String) template.requestBody("direct:index", map);
+ assertNotNull("indexId should be set", indexId);
+ }
+
+ @Test
+ public void testGet() throws Exception {
+ //first, index a document
+ HashMap<String, String> map = new HashMap<String, String>();
+ map.put("content", "test");
+ sendBody("direct:index", map);
+ String indexId = (String) template.requestBody("direct:index", map);
+ assertNotNull("indexId should be set", indexId);
+
+ //now, verify get succeeded
+ GetResponse response = (GetResponse) template.requestBody("direct:get", indexId);
+ assertNotNull("response should not be null", response);
+ assertNotNull("response source should not be null", response.getSource());
+ }
+
+ @Test
+ public void testIndexWithHeaders() throws Exception {
+
+ HashMap<String, String> map = new HashMap<String, String>();
+ map.put("content", "test");
+
+ Map<String, Object> headers = new HashMap<String, Object>();
+ headers.put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_INDEX);
+ headers.put(ElasticsearchConfiguration.PARAM_INDEX_NAME, "twitter");
+ headers.put(ElasticsearchConfiguration.PARAM_INDEX_TYPE, "tweet");
+
+ String indexId = (String) template.requestBodyAndHeaders("direct:start", map, headers);
+ assertNotNull("indexId should be set", indexId);
+ }
+
+ @Test
+ public void testGetWithHeaders() throws Exception {
+
+ //first, INDEX a document
+ HashMap<String, String> map = new HashMap<String, String>();
+ map.put("content", "test");
+
+ Map<String, Object> headers = new HashMap<String, Object>();
+ headers.put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_INDEX);
+ headers.put(ElasticsearchConfiguration.PARAM_INDEX_NAME, "twitter");
+ headers.put(ElasticsearchConfiguration.PARAM_INDEX_TYPE, "tweet");
+
+ String indexId = (String) template.requestBodyAndHeaders("direct:start", map, headers);
+
+ //now, verify GET
+ headers.put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_GET_BY_ID);
+ GetResponse response = (GetResponse) template.requestBodyAndHeaders("direct:start", indexId, headers);
+ assertNotNull("response should not be null", response);
+ assertNotNull("response source should not be null", response.getSource());
}
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() {
- from("direct:index").to("elasticsearch://local?indexName=twitter&indexType=tweet").to("mock:result");
+ from("direct:start").to("elasticsearch://local");
+ from("direct:index").to("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet");
+ from("direct:get").to("elasticsearch://local?operation=GET_BY_ID&indexName=twitter&indexType=tweet");
}
};
}
Modified: camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchConfigurationTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchConfigurationTest.java?rev=1368351&r1=1368350&r2=1368351&view=diff
==============================================================================
--- camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchConfigurationTest.java (original)
+++ camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchConfigurationTest.java Thu Aug 2 04:52:36 2012
@@ -31,53 +31,52 @@ public class ElasticsearchConfigurationT
@Test
public void localNode() throws Exception {
- ElasticsearchConfiguration conf = new ElasticsearchConfiguration();
- URI uri = new URI("elasticsearch://local?indexName=twitter&indexType=tweet");
+ URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet");
Map<String, Object> parameters = URISupport.parseParameters(uri);
- conf.parseURI(uri, parameters, null);
+ ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters);
assertTrue(conf.isLocal());
assertEquals("twitter", conf.getIndexName());
assertEquals("tweet", conf.getIndexType());
+ assertEquals("INDEX", conf.getOperation());
assertTrue(conf.isData());
assertNull(conf.getClusterName());
}
@Test(expected = IllegalArgumentException.class)
public void localNonDataNodeThrowsIllegalArgumentException() throws Exception {
- ElasticsearchConfiguration conf = new ElasticsearchConfiguration();
- URI uri = new URI("elasticsearch://local?indexName=twitter&indexType=tweet&data=false");
+ URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet&data=false");
Map<String, Object> parameters = URISupport.parseParameters(uri);
- conf.parseURI(uri, parameters, null);
+ ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters);
}
@Test
public void localConfDefaultsToDataNode() throws Exception {
- ElasticsearchConfiguration conf = new ElasticsearchConfiguration();
- URI uri = new URI("elasticsearch://local?indexName=twitter&indexType=tweet");
+ URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet");
Map<String, Object> parameters = URISupport.parseParameters(uri);
- conf.parseURI(uri, parameters, null);
+ ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters);
+ assertEquals("INDEX", conf.getOperation());
assertTrue(conf.isLocal());
assertTrue(conf.isData());
}
@Test
public void clusterConfDefaultsToNonDataNode() throws Exception {
- ElasticsearchConfiguration conf = new ElasticsearchConfiguration();
- URI uri = new URI("elasticsearch://clustername?indexName=twitter&indexType=tweet");
+ URI uri = new URI("elasticsearch://clustername?operation=INDEX&indexName=twitter&indexType=tweet");
Map<String, Object> parameters = URISupport.parseParameters(uri);
- conf.parseURI(uri, parameters, null);
+ ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters);
assertEquals("clustername", conf.getClusterName());
+ assertEquals("INDEX", conf.getOperation());
assertFalse(conf.isLocal());
assertFalse(conf.isData());
}
@Test
public void localDataNode() throws Exception {
- ElasticsearchConfiguration conf = new ElasticsearchConfiguration();
- URI uri = new URI("elasticsearch://local?indexName=twitter&indexType=tweet&data=true");
+ URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet&data=true");
Map<String, Object> parameters = URISupport.parseParameters(uri);
- conf.parseURI(uri, parameters, null);
+ ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters);
assertTrue(conf.isLocal());
+ assertEquals("INDEX", conf.getOperation());
assertEquals("twitter", conf.getIndexName());
assertEquals("tweet", conf.getIndexType());
assertTrue(conf.isData());
Modified: camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchIndexNameAndTypeInHeaderComponentTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchIndexNameAndTypeInHeaderComponentTest.java?rev=1368351&r1=1368350&r2=1368351&view=diff
==============================================================================
--- camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchIndexNameAndTypeInHeaderComponentTest.java (original)
+++ camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchIndexNameAndTypeInHeaderComponentTest.java Thu Aug 2 04:52:36 2012
@@ -35,6 +35,7 @@ public class ElasticsearchIndexNameAndTy
}
}, new HashMap() {
{
+ put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_INDEX);
put(ElasticsearchConfiguration.PARAM_INDEX_NAME, "twitter");
put(ElasticsearchConfiguration.PARAM_INDEX_TYPE, "tweet");
}
Modified: camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/SpringElasticsearchTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/SpringElasticsearchTest.java?rev=1368351&r1=1368350&r2=1368351&view=diff
==============================================================================
--- camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/SpringElasticsearchTest.java (original)
+++ camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/SpringElasticsearchTest.java Thu Aug 2 04:52:36 2012
@@ -56,6 +56,7 @@ public class SpringElasticsearchTest ext
}
}, new HashMap() {
{
+ put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_INDEX);
put(ElasticsearchConfiguration.PARAM_INDEX_NAME, "twitter");
put(ElasticsearchConfiguration.PARAM_INDEX_TYPE, "tweet");
}
Modified: camel/trunk/components/camel-elasticsearch/src/test/resources/SpringElasticsearchTest.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-elasticsearch/src/test/resources/SpringElasticsearchTest.xml?rev=1368351&r1=1368350&r2=1368351&view=diff
==============================================================================
--- camel/trunk/components/camel-elasticsearch/src/test/resources/SpringElasticsearchTest.xml (original)
+++ camel/trunk/components/camel-elasticsearch/src/test/resources/SpringElasticsearchTest.xml Thu Aug 2 04:52:36 2012
@@ -7,7 +7,7 @@
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:index" />
- <to uri="elasticsearch://local?indexName=twitter&indexType=tweet"/>
+ <to uri="elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet"/>
<to uri="mock:result"/>
</route>
</camelContext>