You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by bo...@apache.org on 2013/08/07 16:57:44 UTC
git commit: CAMEL-6444 added explicity ip/port support to
camel-elasticsearch
Updated Branches:
refs/heads/master 1bb756cd2 -> 9ecc122b4
CAMEL-6444 added explicity ip/port support to camel-elasticsearch
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9ecc122b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9ecc122b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9ecc122b
Branch: refs/heads/master
Commit: 9ecc122b45848045a565901772ca1b3888d1cf4a
Parents: 1bb756c
Author: boday <bo...@apache.org>
Authored: Wed Aug 7 07:55:21 2013 -0700
Committer: boday <bo...@apache.org>
Committed: Wed Aug 7 07:55:21 2013 -0700
----------------------------------------------------------------------
.../ElasticsearchConfiguration.java | 27 ++++++++++++++--
.../elasticsearch/ElasticsearchEndpoint.java | 15 +++++++--
.../ElasticsearchComponentTest.java | 33 ++++++++++++++++++--
3 files changed, 69 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/9ecc122b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
index a07ab0a..8ed17d2 100644
--- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
+++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
@@ -19,10 +19,8 @@ package org.apache.camel.component.elasticsearch;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
-
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
-
import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
public class ElasticsearchConfiguration {
@@ -37,6 +35,9 @@ public class ElasticsearchConfiguration {
public static final String PARAM_INDEX_TYPE = "indexType";
public static final String PROTOCOL = "elasticsearch";
private static final String LOCAL_NAME = "local";
+ private static final String IP = "ip";
+ private static final String PORT = "port";
+ private static final Integer DEFAULT_PORT = 9300;
private URI uri;
private String protocolType;
@@ -47,6 +48,8 @@ public class ElasticsearchConfiguration {
private boolean local;
private Boolean data;
private String operation;
+ private String ip;
+ private Integer port;
public ElasticsearchConfiguration(URI uri, Map<String, Object> parameters) throws Exception {
String protocol = uri.getScheme();
@@ -82,6 +85,9 @@ public class ElasticsearchConfiguration {
indexName = (String)parameters.remove(PARAM_INDEX_NAME);
indexType = (String)parameters.remove(PARAM_INDEX_TYPE);
operation = (String)parameters.remove(PARAM_OPERATION);
+ ip = (String)parameters.remove(IP);
+ String portParam = (String) parameters.remove(PORT);
+ port = portParam == null ? DEFAULT_PORT : Integer.valueOf(portParam);
}
protected Boolean toBoolean(Object string) {
@@ -181,4 +187,21 @@ public class ElasticsearchConfiguration {
public String getOperation() {
return this.operation;
}
+
+ public String getIp() {
+ return ip;
+ }
+
+ public void setIp(String ip) {
+ this.ip = ip;
+ }
+
+ public Integer getPort() {
+ return port;
+ }
+
+ public void setPort(Integer port) {
+ this.port = port;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/9ecc122b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java
index f1d074a..4d002ea 100644
--- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java
+++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java
@@ -18,13 +18,16 @@ package org.apache.camel.component.elasticsearch;
import java.net.URI;
import java.util.Map;
-
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.impl.DefaultEndpoint;
import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.node.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,7 +69,15 @@ public class ElasticsearchEndpoint extends DefaultEndpoint {
LOG.info("Joining ElasticSearch cluster " + config.getClusterName());
}
node = config.buildNode();
- client = node.client();
+ if (config.getIp() != null && !config.isLocal()) {
+ Settings settings = ImmutableSettings.settingsBuilder()
+ .put("cluster.name", config.getClusterName()).put("node.client", true).build();
+ Client client = new TransportClient(settings)
+ .addTransportAddress(new InetSocketTransportAddress(config.getIp(), config.getPort()));
+ this.client = client;
+ } else {
+ client = node.client();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/9ecc122b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
index 2f15011..cab6451 100644
--- a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
+++ b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
@@ -18,13 +18,12 @@ package org.apache.camel.component.elasticsearch;
import java.util.HashMap;
import java.util.Map;
-
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse;
-
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
public class ElasticsearchComponentTest extends CamelTestSupport {
@@ -104,6 +103,34 @@ public class ElasticsearchComponentTest extends CamelTestSupport {
}
@Test
+ @Ignore("need to setup the cluster IP for this test")
+ public void indexWithIp() throws Exception {
+ Map<String, String> map = new HashMap<String, String>();
+ map.put("content", "test");
+ Map<String, Object> headers = new HashMap<String, Object>();
+ headers.put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_INDEX);
+ headers.put(ElasticsearchConfiguration.PARAM_INDEX_NAME, "twitter");
+ headers.put(ElasticsearchConfiguration.PARAM_INDEX_TYPE, "tweet");
+
+ String indexId = template.requestBodyAndHeaders("direct:indexWithIp", map, headers, String.class);
+ assertNotNull("indexId should be set", indexId);
+ }
+
+ @Test
+ @Ignore("need to setup the cluster IP/Port for this test")
+ public void indexWithIpAndPort() throws Exception {
+ Map<String, String> map = new HashMap<String, String>();
+ map.put("content", "test");
+ Map<String, Object> headers = new HashMap<String, Object>();
+ headers.put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_INDEX);
+ headers.put(ElasticsearchConfiguration.PARAM_INDEX_NAME, "twitter");
+ headers.put(ElasticsearchConfiguration.PARAM_INDEX_TYPE, "tweet");
+
+ String indexId = template.requestBodyAndHeaders("direct:indexWithIpAndPort", map, headers, String.class);
+ assertNotNull("indexId should be set", indexId);
+ }
+
+ @Test
public void testGetWithHeaders() throws Exception {
//first, INDEX a value
Map<String, String> map = new HashMap<String, String>();
@@ -162,6 +189,8 @@ public class ElasticsearchComponentTest extends CamelTestSupport {
from("direct:index").to("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet");
from("direct:get").to("elasticsearch://local?operation=GET_BY_ID&indexName=twitter&indexType=tweet");
from("direct:delete").to("elasticsearch://local?operation=DELETE&indexName=twitter&indexType=tweet");
+ //from("direct:indexWithIp").to("elasticsearch://elasticsearch?operation=INDEX&indexName=twitter&indexType=tweet&ip=localhost");
+ //from("direct:indexWithIpAndPort").to("elasticsearch://elasticsearch?operation=INDEX&indexName=twitter&indexType=tweet&ip=localhost&port=9300");
}
};
}