You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by bo...@apache.org on 2012/08/02 06:52:37 UTC

svn commit: r1368351 - in /camel/trunk/components/camel-elasticsearch/src: main/java/org/apache/camel/component/elasticsearch/ test/java/org/apache/camel/component/elasticsearch/ test/resources/

Author: boday
Date: Thu Aug  2 04:52:36 2012
New Revision: 1368351

URL: http://svn.apache.org/viewvc?rev=1368351&view=rev
Log:
CAMEL-5481 added basic GET_BY_ID support to camel-elasticsearch and fixed some issues with the configuration class

Modified:
    camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchComponent.java
    camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
    camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java
    camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
    camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
    camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchConfigurationTest.java
    camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchIndexNameAndTypeInHeaderComponentTest.java
    camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/SpringElasticsearchTest.java
    camel/trunk/components/camel-elasticsearch/src/test/resources/SpringElasticsearchTest.xml

Modified: camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchComponent.java?rev=1368351&r1=1368350&r2=1368351&view=diff
==============================================================================
--- camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchComponent.java (original)
+++ camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchComponent.java Thu Aug  2 04:52:36 2012
@@ -28,21 +28,16 @@ import org.apache.camel.impl.DefaultComp
  */
 public class ElasticsearchComponent extends DefaultComponent {
 
-    private ElasticsearchConfiguration config;
-
     public ElasticsearchComponent() {
         super();
-        config = new ElasticsearchConfiguration();
     }
 
     public ElasticsearchComponent(CamelContext context) {
         super(context);
-        config = new ElasticsearchConfiguration();
     }
 
     protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
-        config.parseURI(new URI(uri), parameters, this);
-        Endpoint endpoint = new ElasticsearchEndpoint(uri, this, config);
+        Endpoint endpoint = new ElasticsearchEndpoint(uri, this, parameters);
         setProperties(endpoint, parameters);
         return endpoint;
     }

Modified: camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java?rev=1368351&r1=1368350&r2=1368351&view=diff
==============================================================================
--- camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java (original)
+++ camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java Thu Aug  2 04:52:36 2012
@@ -29,6 +29,10 @@ import static org.elasticsearch.node.Nod
 
 public class ElasticsearchConfiguration {
 
+    public static final String PARAM_OPERATION = "operation";
+    public static final String OPERATION_INDEX = "INDEX";
+    public static final String OPERATION_GET_BY_ID = "GET_BY_ID";
+    public static final String PARAM_INDEX_ID = "indexId";
     public static final String PARAM_DATA = "data";
     public static final String PARAM_INDEX_NAME = "indexName";
     public static final String PARAM_INDEX_TYPE = "indexType";
@@ -44,16 +48,10 @@ public class ElasticsearchConfiguration 
     private String indexType;
     private boolean local;
     private Boolean data;
+    private String operation;
 
-    public ElasticsearchConfiguration() {
-    }
-
-    public ElasticsearchConfiguration(URI uri) throws Exception {
-        this();
-        this.uri = uri;
-    }
+    public ElasticsearchConfiguration(URI uri, Map<String, Object> parameters) throws Exception {
 
-    public void parseURI(URI uri, Map<String, Object> parameters, ElasticsearchComponent component) throws Exception {
         String protocol = uri.getScheme();
 
         if (!protocol.equalsIgnoreCase(PROTOCOL)) {
@@ -86,6 +84,7 @@ public class ElasticsearchConfiguration 
 
         indexName = (String)parameters.remove(PARAM_INDEX_NAME);
         indexType = (String)parameters.remove(PARAM_INDEX_TYPE);
+        operation = (String)parameters.remove(PARAM_OPERATION);
     }
 
     protected Boolean toBoolean(Object string) {
@@ -178,4 +177,11 @@ public class ElasticsearchConfiguration 
         this.data = data;
     }
 
+    public void setOperation(String operation) {
+        this.operation = operation;
+    }
+
+    public String getOperation() {
+        return this.operation;
+    }
 }

Modified: camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java?rev=1368351&r1=1368350&r2=1368351&view=diff
==============================================================================
--- camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java (original)
+++ camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java Thu Aug  2 04:52:36 2012
@@ -16,6 +16,9 @@
  */
 package org.apache.camel.component.elasticsearch;
 
+import java.net.URI;
+import java.util.Map;
+
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
@@ -37,9 +40,9 @@ public class ElasticsearchEndpoint exten
     private Client client;
     private ElasticsearchConfiguration config;
 
-    public ElasticsearchEndpoint(String uri, ElasticsearchComponent component, ElasticsearchConfiguration config) {
+    public ElasticsearchEndpoint(String uri, ElasticsearchComponent component, Map<String, Object> parameters) throws Exception {
         super(uri, component);
-        this.config = config;
+        this.config = new ElasticsearchConfiguration(new URI(uri), parameters);
     }
 
     public Producer createProducer() throws Exception {
@@ -91,4 +94,8 @@ public class ElasticsearchEndpoint exten
         return config;
     }
 
+    public void setOperation(String operation) {
+        config.setOperation(operation);
+    }
+
 }

Modified: camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java?rev=1368351&r1=1368350&r2=1368351&view=diff
==============================================================================
--- camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java (original)
+++ camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java Thu Aug  2 04:52:36 2012
@@ -23,8 +23,10 @@ import org.apache.camel.ExpectedBodyType
 import org.apache.camel.Message;
 import org.apache.camel.impl.DefaultProducer;
 import org.elasticsearch.action.ListenableActionFuture;
+import org.elasticsearch.action.get.GetResponse;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.index.IndexResponse;
+import org.elasticsearch.action.support.replication.ReplicationType;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 
@@ -41,8 +43,46 @@ public class ElasticsearchProducer exten
     }
 
     public void process(Exchange exchange) throws Exception {
+
+        String operation = (String) exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_OPERATION);
+        if (operation == null) {
+            operation = endpoint.getConfig().getOperation();
+        }
+
+        if (operation == null) {
+            throw new IllegalArgumentException(ElasticsearchConfiguration.PARAM_OPERATION + " is missing");
+        }
+
         Client client = endpoint.getClient();
-        log.debug("indexing " + exchange);
+
+        if (operation.equalsIgnoreCase(ElasticsearchConfiguration.OPERATION_INDEX)) {
+            addToIndex(client, exchange);
+        } else if (operation.equalsIgnoreCase(ElasticsearchConfiguration.OPERATION_GET_BY_ID)) {
+            getById(client, exchange);
+        } else {
+            throw new IllegalArgumentException(ElasticsearchConfiguration.PARAM_OPERATION + " value '" + operation + "' is not supported");
+        }
+    }
+
+    public void getById(Client client, Exchange exchange) {
+
+        String indexName = exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_INDEX_NAME, String.class);
+        if (indexName == null) {
+            indexName = endpoint.getConfig().getIndexName();
+        }
+
+        String indexType = exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_INDEX_TYPE, String.class);
+        if (indexType == null) {
+            indexType = endpoint.getConfig().getIndexType();
+        }
+
+        String indexId = exchange.getIn().getBody(String.class);
+
+        GetResponse response = client.prepareGet(indexName, indexType, indexId).execute().actionGet();
+        exchange.getIn().setBody(response);
+    }
+
+    public void addToIndex(Client client, Exchange exchange) {
 
         String indexName = exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_INDEX_NAME, String.class);
         if (indexName == null) {
@@ -55,11 +95,13 @@ public class ElasticsearchProducer exten
         }
 
         IndexRequestBuilder prepareIndex = client.prepareIndex(indexName, indexType);
+
         if (!setIndexRequestSource(exchange.getIn(), prepareIndex)) {
             throw new ExpectedBodyTypeException(exchange, XContentBuilder.class);
         }
         ListenableActionFuture<IndexResponse> future = prepareIndex.execute();
         IndexResponse response = future.actionGet();
+        exchange.getIn().setBody(response.getId());
     }
 
     private boolean setIndexRequestSource(Message msg, IndexRequestBuilder builder) {

Modified: camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java?rev=1368351&r1=1368350&r2=1368351&view=diff
==============================================================================
--- camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java (original)
+++ camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java Thu Aug  2 04:52:36 2012
@@ -17,31 +17,82 @@
 package org.apache.camel.component.elasticsearch;
 
 import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.junit4.CamelTestSupport;
+import org.elasticsearch.action.get.GetResponse;
 import org.junit.Test;
 
 public class ElasticsearchComponentTest extends CamelTestSupport {
 
     @Test
     public void testIndex() throws Exception {
-        MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedMinimumMessageCount(1);
-        sendBody("direct:index", new HashMap<String, String>() {
-            {
-                put("content", "test");
-            }
-        });
-        assertMockEndpointsSatisfied();
+        HashMap<String, String> map = new HashMap<String, String>();
+        map.put("content", "test");
+        String indexId = (String) template.requestBody("direct:index", map);
+        assertNotNull("indexId should be set", indexId);
+    }
+
+    @Test
+    public void testGet() throws Exception {
+        //first, index a document
+        HashMap<String, String> map = new HashMap<String, String>();
+        map.put("content", "test");
+        sendBody("direct:index", map);
+        String indexId = (String) template.requestBody("direct:index", map);
+        assertNotNull("indexId should be set", indexId);
+
+        //now, verify get succeeded
+        GetResponse response = (GetResponse) template.requestBody("direct:get", indexId);
+        assertNotNull("response should not be null", response);
+        assertNotNull("response source should not be null", response.getSource());
+    }
+
+    @Test
+    public void testIndexWithHeaders() throws Exception {
+
+        HashMap<String, String> map = new HashMap<String, String>();
+        map.put("content", "test");
+
+        Map<String, Object> headers = new HashMap<String, Object>();
+        headers.put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_INDEX);
+        headers.put(ElasticsearchConfiguration.PARAM_INDEX_NAME, "twitter");
+        headers.put(ElasticsearchConfiguration.PARAM_INDEX_TYPE, "tweet");
+
+        String indexId = (String) template.requestBodyAndHeaders("direct:start", map, headers);
+        assertNotNull("indexId should be set", indexId);
+    }
+
+    @Test
+    public void testGetWithHeaders() throws Exception {
+
+        //first, INDEX a document
+        HashMap<String, String> map = new HashMap<String, String>();
+        map.put("content", "test");
+
+        Map<String, Object> headers = new HashMap<String, Object>();
+        headers.put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_INDEX);
+        headers.put(ElasticsearchConfiguration.PARAM_INDEX_NAME, "twitter");
+        headers.put(ElasticsearchConfiguration.PARAM_INDEX_TYPE, "tweet");
+
+        String indexId = (String) template.requestBodyAndHeaders("direct:start", map, headers);
+
+        //now, verify GET
+        headers.put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_GET_BY_ID);
+        GetResponse response = (GetResponse) template.requestBodyAndHeaders("direct:start", indexId, headers);
+        assertNotNull("response should not be null", response);
+        assertNotNull("response source should not be null", response.getSource());
     }
 
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() {
-                from("direct:index").to("elasticsearch://local?indexName=twitter&indexType=tweet").to("mock:result");
+                from("direct:start").to("elasticsearch://local");
+                from("direct:index").to("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet");
+                from("direct:get").to("elasticsearch://local?operation=GET_BY_ID&indexName=twitter&indexType=tweet");
             }
         };
     }

Modified: camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchConfigurationTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchConfigurationTest.java?rev=1368351&r1=1368350&r2=1368351&view=diff
==============================================================================
--- camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchConfigurationTest.java (original)
+++ camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchConfigurationTest.java Thu Aug  2 04:52:36 2012
@@ -31,53 +31,52 @@ public class ElasticsearchConfigurationT
 
     @Test
     public void localNode() throws Exception {
-        ElasticsearchConfiguration conf = new ElasticsearchConfiguration();
-        URI uri = new URI("elasticsearch://local?indexName=twitter&indexType=tweet");
+        URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet");
         Map<String, Object> parameters = URISupport.parseParameters(uri);
-        conf.parseURI(uri, parameters, null);
+        ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters);
         assertTrue(conf.isLocal());
         assertEquals("twitter", conf.getIndexName());
         assertEquals("tweet", conf.getIndexType());
+        assertEquals("INDEX", conf.getOperation());
         assertTrue(conf.isData());
         assertNull(conf.getClusterName());
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void localNonDataNodeThrowsIllegalArgumentException() throws Exception {
-        ElasticsearchConfiguration conf = new ElasticsearchConfiguration();
-        URI uri = new URI("elasticsearch://local?indexName=twitter&indexType=tweet&data=false");
+        URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet&data=false");
         Map<String, Object> parameters = URISupport.parseParameters(uri);
-        conf.parseURI(uri, parameters, null);
+        ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters);
     }
 
     @Test
     public void localConfDefaultsToDataNode() throws Exception {
-        ElasticsearchConfiguration conf = new ElasticsearchConfiguration();
-        URI uri = new URI("elasticsearch://local?indexName=twitter&indexType=tweet");
+        URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet");
         Map<String, Object> parameters = URISupport.parseParameters(uri);
-        conf.parseURI(uri, parameters, null);
+        ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters);
+        assertEquals("INDEX", conf.getOperation());
         assertTrue(conf.isLocal());
         assertTrue(conf.isData());
     }
 
     @Test
     public void clusterConfDefaultsToNonDataNode() throws Exception {
-        ElasticsearchConfiguration conf = new ElasticsearchConfiguration();
-        URI uri = new URI("elasticsearch://clustername?indexName=twitter&indexType=tweet");
+        URI uri = new URI("elasticsearch://clustername?operation=INDEX&indexName=twitter&indexType=tweet");
         Map<String, Object> parameters = URISupport.parseParameters(uri);
-        conf.parseURI(uri, parameters, null);
+        ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters);
         assertEquals("clustername", conf.getClusterName());
+        assertEquals("INDEX", conf.getOperation());
         assertFalse(conf.isLocal());
         assertFalse(conf.isData());
     }
 
     @Test
     public void localDataNode() throws Exception {
-        ElasticsearchConfiguration conf = new ElasticsearchConfiguration();
-        URI uri = new URI("elasticsearch://local?indexName=twitter&indexType=tweet&data=true");
+        URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet&data=true");
         Map<String, Object> parameters = URISupport.parseParameters(uri);
-        conf.parseURI(uri, parameters, null);
+        ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters);
         assertTrue(conf.isLocal());
+        assertEquals("INDEX", conf.getOperation());
         assertEquals("twitter", conf.getIndexName());
         assertEquals("tweet", conf.getIndexType());
         assertTrue(conf.isData());

Modified: camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchIndexNameAndTypeInHeaderComponentTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchIndexNameAndTypeInHeaderComponentTest.java?rev=1368351&r1=1368350&r2=1368351&view=diff
==============================================================================
--- camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchIndexNameAndTypeInHeaderComponentTest.java (original)
+++ camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchIndexNameAndTypeInHeaderComponentTest.java Thu Aug  2 04:52:36 2012
@@ -35,6 +35,7 @@ public class ElasticsearchIndexNameAndTy
             }
         }, new HashMap() {
             {
+                put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_INDEX);
                 put(ElasticsearchConfiguration.PARAM_INDEX_NAME, "twitter");
                 put(ElasticsearchConfiguration.PARAM_INDEX_TYPE, "tweet");
             }

Modified: camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/SpringElasticsearchTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/SpringElasticsearchTest.java?rev=1368351&r1=1368350&r2=1368351&view=diff
==============================================================================
--- camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/SpringElasticsearchTest.java (original)
+++ camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/SpringElasticsearchTest.java Thu Aug  2 04:52:36 2012
@@ -56,6 +56,7 @@ public class SpringElasticsearchTest ext
             }
         }, new HashMap() {
             {
+                put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_INDEX);
                 put(ElasticsearchConfiguration.PARAM_INDEX_NAME, "twitter");
                 put(ElasticsearchConfiguration.PARAM_INDEX_TYPE, "tweet");
             }

Modified: camel/trunk/components/camel-elasticsearch/src/test/resources/SpringElasticsearchTest.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-elasticsearch/src/test/resources/SpringElasticsearchTest.xml?rev=1368351&r1=1368350&r2=1368351&view=diff
==============================================================================
--- camel/trunk/components/camel-elasticsearch/src/test/resources/SpringElasticsearchTest.xml (original)
+++ camel/trunk/components/camel-elasticsearch/src/test/resources/SpringElasticsearchTest.xml Thu Aug  2 04:52:36 2012
@@ -7,7 +7,7 @@
     <camelContext xmlns="http://camel.apache.org/schema/spring">
         <route>
             <from uri="direct:index" />
-            <to uri="elasticsearch://local?indexName=twitter&amp;indexType=tweet"/>
+            <to uri="elasticsearch://local?operation=INDEX&amp;indexName=twitter&amp;indexType=tweet"/>
             <to uri="mock:result"/>
         </route>
     </camelContext>