You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by bo...@apache.org on 2013/08/07 16:57:44 UTC

git commit: CAMEL-6444 added explicity ip/port support to camel-elasticsearch

Updated Branches:
  refs/heads/master 1bb756cd2 -> 9ecc122b4


CAMEL-6444 added explicity ip/port support to camel-elasticsearch


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9ecc122b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9ecc122b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9ecc122b

Branch: refs/heads/master
Commit: 9ecc122b45848045a565901772ca1b3888d1cf4a
Parents: 1bb756c
Author: boday <bo...@apache.org>
Authored: Wed Aug 7 07:55:21 2013 -0700
Committer: boday <bo...@apache.org>
Committed: Wed Aug 7 07:55:21 2013 -0700

----------------------------------------------------------------------
 .../ElasticsearchConfiguration.java             | 27 ++++++++++++++--
 .../elasticsearch/ElasticsearchEndpoint.java    | 15 +++++++--
 .../ElasticsearchComponentTest.java             | 33 ++++++++++++++++++--
 3 files changed, 69 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/9ecc122b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
index a07ab0a..8ed17d2 100644
--- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
+++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
@@ -19,10 +19,8 @@ package org.apache.camel.component.elasticsearch;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Map;
-
 import org.elasticsearch.node.Node;
 import org.elasticsearch.node.NodeBuilder;
-
 import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
 
 public class ElasticsearchConfiguration {
@@ -37,6 +35,9 @@ public class ElasticsearchConfiguration {
     public static final String PARAM_INDEX_TYPE = "indexType";
     public static final String PROTOCOL = "elasticsearch";
     private static final String LOCAL_NAME = "local";
+    private static final String IP = "ip";
+    private static final String PORT = "port";
+    private static final Integer DEFAULT_PORT = 9300;
 
     private URI uri;
     private String protocolType;
@@ -47,6 +48,8 @@ public class ElasticsearchConfiguration {
     private boolean local;
     private Boolean data;
     private String operation;
+    private String ip;
+    private Integer port;
 
     public ElasticsearchConfiguration(URI uri, Map<String, Object> parameters) throws Exception {
         String protocol = uri.getScheme();
@@ -82,6 +85,9 @@ public class ElasticsearchConfiguration {
         indexName = (String)parameters.remove(PARAM_INDEX_NAME);
         indexType = (String)parameters.remove(PARAM_INDEX_TYPE);
         operation = (String)parameters.remove(PARAM_OPERATION);
+        ip = (String)parameters.remove(IP);
+        String portParam = (String) parameters.remove(PORT);
+        port = portParam == null ? DEFAULT_PORT : Integer.valueOf(portParam);
     }
 
     protected Boolean toBoolean(Object string) {
@@ -181,4 +187,21 @@ public class ElasticsearchConfiguration {
     public String getOperation() {
         return this.operation;
     }
+
+    public String getIp() {
+        return ip;
+    }
+
+    public void setIp(String ip) {
+        this.ip = ip;
+    }
+
+    public Integer getPort() {
+        return port;
+    }
+
+    public void setPort(Integer port) {
+        this.port = port;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/9ecc122b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java
index f1d074a..4d002ea 100644
--- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java
+++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java
@@ -18,13 +18,16 @@ package org.apache.camel.component.elasticsearch;
 
 import java.net.URI;
 import java.util.Map;
-
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
 import org.elasticsearch.node.Node;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,7 +69,15 @@ public class ElasticsearchEndpoint extends DefaultEndpoint {
             LOG.info("Joining ElasticSearch cluster " + config.getClusterName());
         }
         node = config.buildNode();
-        client = node.client();
+        if (config.getIp() != null && !config.isLocal()) {
+            Settings settings = ImmutableSettings.settingsBuilder()
+                    .put("cluster.name", config.getClusterName()).put("node.client", true).build();
+            Client client = new TransportClient(settings)
+                    .addTransportAddress(new InetSocketTransportAddress(config.getIp(), config.getPort()));
+            this.client = client;
+        } else {
+            client = node.client();
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/9ecc122b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
index 2f15011..cab6451 100644
--- a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
+++ b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
@@ -18,13 +18,12 @@ package org.apache.camel.component.elasticsearch;
 
 import java.util.HashMap;
 import java.util.Map;
-
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.elasticsearch.action.delete.DeleteResponse;
 import org.elasticsearch.action.get.GetResponse;
-
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 public class ElasticsearchComponentTest extends CamelTestSupport {
@@ -104,6 +103,34 @@ public class ElasticsearchComponentTest extends CamelTestSupport {
     }
 
     @Test
+    @Ignore("need to setup the cluster IP for this test")
+    public void indexWithIp()  throws Exception {
+        Map<String, String> map = new HashMap<String, String>();
+        map.put("content", "test");
+        Map<String, Object> headers = new HashMap<String, Object>();
+        headers.put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_INDEX);
+        headers.put(ElasticsearchConfiguration.PARAM_INDEX_NAME, "twitter");
+        headers.put(ElasticsearchConfiguration.PARAM_INDEX_TYPE, "tweet");
+
+        String indexId = template.requestBodyAndHeaders("direct:indexWithIp", map, headers, String.class);
+        assertNotNull("indexId should be set", indexId);
+    }
+
+    @Test
+    @Ignore("need to setup the cluster IP/Port for this test")
+    public void indexWithIpAndPort()  throws Exception {
+        Map<String, String> map = new HashMap<String, String>();
+        map.put("content", "test");
+        Map<String, Object> headers = new HashMap<String, Object>();
+        headers.put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_INDEX);
+        headers.put(ElasticsearchConfiguration.PARAM_INDEX_NAME, "twitter");
+        headers.put(ElasticsearchConfiguration.PARAM_INDEX_TYPE, "tweet");
+
+        String indexId = template.requestBodyAndHeaders("direct:indexWithIpAndPort", map, headers, String.class);
+        assertNotNull("indexId should be set", indexId);
+    }
+
+    @Test
     public void testGetWithHeaders() throws Exception {
         //first, INDEX a value
         Map<String, String> map = new HashMap<String, String>();
@@ -162,6 +189,8 @@ public class ElasticsearchComponentTest extends CamelTestSupport {
                 from("direct:index").to("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet");
                 from("direct:get").to("elasticsearch://local?operation=GET_BY_ID&indexName=twitter&indexType=tweet");
                 from("direct:delete").to("elasticsearch://local?operation=DELETE&indexName=twitter&indexType=tweet");
+                //from("direct:indexWithIp").to("elasticsearch://elasticsearch?operation=INDEX&indexName=twitter&indexType=tweet&ip=localhost");
+                //from("direct:indexWithIpAndPort").to("elasticsearch://elasticsearch?operation=INDEX&indexName=twitter&indexType=tweet&ip=localhost&port=9300");
             }
         };
     }