You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2015/04/27 22:09:23 UTC
camel git commit: This closes #507
Repository: camel
Updated Branches:
refs/heads/master 1451499b8 -> fba8aa47c
This closes #507
More parameters are available for Elasticsearch component,
including replication type, write consistency level and
multiple transport addresses.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/fba8aa47
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/fba8aa47
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/fba8aa47
Branch: refs/heads/master
Commit: fba8aa47cc3885cdf71d57dec201a48272ea7577
Parents: 1451499
Author: Mauricio Jost <ma...@activeeon.com>
Authored: Mon Apr 27 16:30:46 2015 +0200
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Mon Apr 27 22:05:41 2015 +0200
----------------------------------------------------------------------
.../ElasticsearchConfiguration.java | 88 ++++++++++++++-
.../elasticsearch/ElasticsearchEndpoint.java | 41 ++++---
.../elasticsearch/ElasticsearchProducer.java | 23 ++++
.../ElasticsearchActionRequestConverter.java | 18 ++--
.../ElasticsearchComponentTest.java | 45 ++++++++
.../ElasticsearchConfigurationTest.java | 108 +++++++++++++++++++
6 files changed, 301 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/fba8aa47/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
index 7db78c7..d2568d3 100644
--- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
+++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
@@ -18,12 +18,18 @@ package org.apache.camel.component.elasticsearch;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriParams;
import org.apache.camel.spi.UriPath;
+import org.elasticsearch.action.WriteConsistencyLevel;
+import org.elasticsearch.action.support.replication.ReplicationType;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
@@ -44,14 +50,22 @@ public class ElasticsearchConfiguration {
public static final String PARAM_DATA = "data";
public static final String PARAM_INDEX_NAME = "indexName";
public static final String PARAM_INDEX_TYPE = "indexType";
+ public static final String PARAM_CONSISTENCY_LEVEL = "consistencyLevel";
+ public static final String PARAM_REPLICATION_TYPE = "replicationType";
+ public static final String TRANSPORT_ADDRESSES = "transportAddresses";
public static final String PROTOCOL = "elasticsearch";
private static final String LOCAL_NAME = "local";
private static final String IP = "ip";
private static final String PORT = "port";
private static final Integer DEFAULT_PORT = 9300;
+ private static final WriteConsistencyLevel DEFAULT_CONSISTENCY_LEVEL = WriteConsistencyLevel.DEFAULT;
+ private static final ReplicationType DEFAULT_REPLICATION_TYPE = ReplicationType.DEFAULT;
+ private static final String TRANSPORT_ADDRESSES_SEPARATOR_REGEX = ",";
+ private static final String IP_PORT_SEPARATOR_REGEX = ":";
private URI uri;
- @UriPath(description = "Name of cluster or use local for local mode") @Metadata(required = "true")
+ @UriPath(description = "Name of cluster or use local for local mode")
+ @Metadata(required = "true")
private String clusterName;
@UriParam
private String protocolType;
@@ -62,6 +76,10 @@ public class ElasticsearchConfiguration {
@UriParam
private String indexType;
@UriParam
+ private WriteConsistencyLevel consistencyLevel;
+ @UriParam
+ private ReplicationType replicationType;
+ @UriParam
private boolean local;
@UriParam
private Boolean data;
@@ -70,6 +88,8 @@ public class ElasticsearchConfiguration {
@UriParam
private String ip;
@UriParam
+ private List<InetSocketTransportAddress> transportAddresses;
+ @UriParam
private Integer port;
public ElasticsearchConfiguration(URI uri, Map<String, Object> parameters) throws Exception {
@@ -106,11 +126,53 @@ public class ElasticsearchConfiguration {
indexName = (String)parameters.remove(PARAM_INDEX_NAME);
indexType = (String)parameters.remove(PARAM_INDEX_TYPE);
operation = (String)parameters.remove(PARAM_OPERATION);
+ consistencyLevel = parseConsistencyLevel(parameters);
+ replicationType = parseReplicationType(parameters);
+
ip = (String)parameters.remove(IP);
+ transportAddresses = parseTransportAddresses((String) parameters.remove(TRANSPORT_ADDRESSES));
+
String portParam = (String) parameters.remove(PORT);
port = portParam == null ? DEFAULT_PORT : Integer.valueOf(portParam);
}
+ private ReplicationType parseReplicationType(Map<String, Object> parameters) {
+ Object replicationTypeParam = parameters.remove(PARAM_REPLICATION_TYPE);
+ if (replicationTypeParam != null) {
+ return ReplicationType.valueOf(replicationTypeParam.toString());
+ } else {
+ return DEFAULT_REPLICATION_TYPE;
+ }
+ }
+
+ private WriteConsistencyLevel parseConsistencyLevel(Map<String, Object> parameters) {
+ Object consistencyLevelParam = parameters.remove(PARAM_CONSISTENCY_LEVEL);
+ if (consistencyLevelParam != null) {
+ return WriteConsistencyLevel.valueOf(consistencyLevelParam.toString());
+ } else {
+ return DEFAULT_CONSISTENCY_LEVEL;
+ }
+ }
+
+ private List<InetSocketTransportAddress> parseTransportAddresses(String ipsString) {
+ if (ipsString == null || ipsString.isEmpty()) {
+ return null;
+ }
+ List<String> addressesStr = Arrays.asList(ipsString.split(TRANSPORT_ADDRESSES_SEPARATOR_REGEX));
+ List<InetSocketTransportAddress> addressesTrAd = new ArrayList<>(addressesStr.size());
+ for (String address : addressesStr) {
+ String[] split = address.split(IP_PORT_SEPARATOR_REGEX);
+ String hostname;
+ if (split.length > 0)
+ hostname = split[0];
+ else
+ throw new IllegalArgumentException();
+ Integer port = (split.length > 1 ? Integer.parseInt(split[1]) : DEFAULT_PORT);
+ addressesTrAd.add(new InetSocketTransportAddress(hostname, port));
+ }
+ return addressesTrAd;
+ }
+
protected Boolean toBoolean(Object string) {
if ("true".equals(string)) {
return true;
@@ -217,6 +279,14 @@ public class ElasticsearchConfiguration {
this.ip = ip;
}
+ public List<InetSocketTransportAddress> getTransportAddresses() {
+ return transportAddresses;
+ }
+
+ public void setTransportAddresses(List<InetSocketTransportAddress> transportAddresses) {
+ this.transportAddresses = transportAddresses;
+ }
+
public Integer getPort() {
return port;
}
@@ -225,4 +295,20 @@ public class ElasticsearchConfiguration {
this.port = port;
}
+ public void setConsistencyLevel(WriteConsistencyLevel consistencyLevel) {
+ this.consistencyLevel = consistencyLevel;
+ }
+
+ public WriteConsistencyLevel getConsistencyLevel() {
+ return consistencyLevel;
+ }
+
+ public void setReplicationType(ReplicationType replicationType) {
+ this.replicationType = replicationType;
+ }
+
+ public ReplicationType getReplicationType() {
+ return replicationType;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/fba8aa47/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java
index da49c12..4a582ed 100644
--- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java
+++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java
@@ -18,6 +18,8 @@ package org.apache.camel.component.elasticsearch;
import java.net.URI;
import java.util.Map;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
@@ -30,6 +32,7 @@ import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.node.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,27 +76,37 @@ public class ElasticsearchEndpoint extends DefaultEndpoint {
LOG.info("Joining ElasticSearch cluster " + configuration.getClusterName());
}
if (configuration.getIp() != null) {
- LOG.info("REMOTE ELASTICSEARCH: {}", configuration.getIp());
- Settings settings = ImmutableSettings.settingsBuilder()
- // setting the classloader here will allow the underlying elasticsearch-java
- // class to find its names.txt in an OSGi environment (otherwise the thread
- // classloader is used, which won't be able to see the file causing a startup
- // exception).
- .classLoader(Settings.class.getClassLoader())
- .put("cluster.name", configuration.getClusterName())
- .put("client.transport.ignore_cluster_name", false)
- .put("node.client", true)
- .put("client.transport.sniff", true)
- .build();
- Client client = new TransportClient(settings)
+ this.client = new TransportClient(getSettings())
.addTransportAddress(new InetSocketTransportAddress(configuration.getIp(), configuration.getPort()));
- this.client = client;
+
+ } else if (configuration.getTransportAddresses() != null &&
+ !configuration.getTransportAddresses().isEmpty()) {
+ List<TransportAddress> addresses = new ArrayList<>(configuration.getTransportAddresses().size());
+ for (TransportAddress address : configuration.getTransportAddresses()) {
+ addresses.add(address);
+ }
+ this.client = new TransportClient(getSettings())
+ .addTransportAddresses(addresses.toArray(new TransportAddress[0]));
} else {
node = configuration.buildNode();
client = node.client();
}
}
+ private Settings getSettings() {
+ return ImmutableSettings.settingsBuilder()
+ // setting the classloader here will allow the underlying elasticsearch-java
+ // class to find its names.txt in an OSGi environment (otherwise the thread
+ // classloader is used, which won't be able to see the file causing a startup
+ // exception).
+ .classLoader(Settings.class.getClassLoader())
+ .put("cluster.name", configuration.getClusterName())
+ .put("client.transport.ignore_cluster_name", false)
+ .put("node.client", true)
+ .put("client.transport.sniff", true)
+ .build();
+ }
+
@Override
protected void doStop() throws Exception {
if (configuration.isLocal()) {
http://git-wip-us.apache.org/repos/asf/camel/blob/fba8aa47/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
index 2b432d5..0ededde 100644
--- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
+++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
@@ -113,6 +113,20 @@ public class ElasticsearchProducer extends DefaultProducer {
configIndexType = true;
}
+ boolean configConsistencyLevel = false;
+ String consistencyLevel = message.getHeader(ElasticsearchConfiguration.PARAM_CONSISTENCY_LEVEL, String.class);
+ if (consistencyLevel == null) {
+ message.setHeader(ElasticsearchConfiguration.PARAM_CONSISTENCY_LEVEL, getEndpoint().getConfig().getConsistencyLevel());
+ configConsistencyLevel = true;
+ }
+
+ boolean configReplicationType = false;
+ String replicationType = message.getHeader(ElasticsearchConfiguration.PARAM_REPLICATION_TYPE, String.class);
+ if (replicationType == null) {
+ message.setHeader(ElasticsearchConfiguration.PARAM_REPLICATION_TYPE, getEndpoint().getConfig().getReplicationType());
+ configReplicationType = true;
+ }
+
Client client = getEndpoint().getClient();
if (ElasticsearchConfiguration.OPERATION_INDEX.equals(operation)) {
IndexRequest indexRequest = message.getBody(IndexRequest.class);
@@ -155,5 +169,14 @@ public class ElasticsearchProducer extends DefaultProducer {
if (configIndexType) {
message.removeHeader(ElasticsearchConfiguration.PARAM_INDEX_TYPE);
}
+
+ if (configConsistencyLevel) {
+ message.removeHeader(ElasticsearchConfiguration.PARAM_CONSISTENCY_LEVEL);
+ }
+
+ if (configReplicationType) {
+ message.removeHeader(ElasticsearchConfiguration.PARAM_REPLICATION_TYPE);
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/fba8aa47/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java
index 934258f..a64f843 100644
--- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java
+++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java
@@ -22,11 +22,13 @@ import java.util.Map;
import org.apache.camel.Converter;
import org.apache.camel.Exchange;
import org.apache.camel.component.elasticsearch.ElasticsearchConfiguration;
+import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.common.xcontent.XContentBuilder;
@Converter
@@ -50,13 +52,15 @@ public final class ElasticsearchActionRequestConverter {
return null;
}
- return indexRequest.index(
- exchange.getIn().getHeader(
- ElasticsearchConfiguration.PARAM_INDEX_NAME,
- String.class)).type(
- exchange.getIn().getHeader(
- ElasticsearchConfiguration.PARAM_INDEX_TYPE,
- String.class));
+ return indexRequest
+ .consistencyLevel(exchange.getIn().getHeader(
+ ElasticsearchConfiguration.PARAM_CONSISTENCY_LEVEL, WriteConsistencyLevel.class))
+ .replicationType(exchange.getIn().getHeader(
+ ElasticsearchConfiguration.PARAM_REPLICATION_TYPE, ReplicationType.class))
+ .index(exchange.getIn().getHeader(
+ ElasticsearchConfiguration.PARAM_INDEX_NAME, String.class))
+ .type(exchange.getIn().getHeader(
+ ElasticsearchConfiguration.PARAM_INDEX_TYPE, String.class));
}
@Converter
http://git-wip-us.apache.org/repos/asf/camel/blob/fba8aa47/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
index 318bdf5..27fa90f 100644
--- a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
+++ b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
@@ -94,6 +94,20 @@ public class ElasticsearchComponentTest extends CamelTestSupport {
}
@Test
+ public void testIndexWithReplication() throws Exception {
+ Map<String, String> map = createIndexedData();
+ String indexId = template.requestBody("direct:indexWithReplication", map, String.class);
+ assertNotNull("indexId should be set", indexId);
+ }
+
+ @Test
+ public void testIndexWithWriteConsistency() throws Exception {
+ Map<String, String> map = createIndexedData();
+ String indexId = template.requestBody("direct:indexWithWriteConsistency", map, String.class);
+ assertNotNull("indexId should be set", indexId);
+ }
+
+ @Test
public void testBulkIndex() throws Exception {
List<Map<String, String>> documents = new ArrayList<Map<String, String>>();
Map<String, String> document1 = createIndexedData("1");
@@ -215,6 +229,33 @@ public class ElasticsearchComponentTest extends CamelTestSupport {
}
@Test
+ @Ignore("need to setup the cluster with multiple nodes for this test")
+ public void indexWithTransportAddresses() throws Exception {
+ Map<String, String> map = createIndexedData();
+ Map<String, Object> headers = new HashMap<String, Object>();
+ headers.put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_INDEX);
+ headers.put(ElasticsearchConfiguration.PARAM_INDEX_NAME, "twitter");
+ headers.put(ElasticsearchConfiguration.PARAM_INDEX_TYPE, "tweet");
+
+ String indexId = template.requestBodyAndHeaders("direct:indexWithTransportAddresses", map, headers, String.class);
+ assertNotNull("indexId should be set", indexId);
+ }
+
+ @Test
+ @Ignore("need to setup the cluster with multiple nodes for this test")
+ public void indexWithIpAndTransportAddresses() throws Exception {
+ Map<String, String> map = createIndexedData();
+ Map<String, Object> headers = new HashMap<String, Object>();
+ headers.put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_INDEX);
+ headers.put(ElasticsearchConfiguration.PARAM_INDEX_NAME, "twitter");
+ headers.put(ElasticsearchConfiguration.PARAM_INDEX_TYPE, "tweet");
+
+ //should ignore transport addresses configuration
+ String indexId = template.requestBodyAndHeaders("direct:indexWithIpAndTransportAddresses", map, headers, String.class);
+ assertNotNull("indexId should be set", indexId);
+ }
+
+ @Test
public void testGetWithHeaders() throws Exception {
//first, INDEX a value
Map<String, String> map = createIndexedData();
@@ -357,6 +398,8 @@ public class ElasticsearchComponentTest extends CamelTestSupport {
public void configure() {
from("direct:start").to("elasticsearch://local");
from("direct:index").to("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet");
+ from("direct:indexWithReplication").to("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet&replicationType=SYNC");
+ from("direct:indexWithWriteConsistency").to("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet&consistencyLevel=ONE");
from("direct:get").to("elasticsearch://local?operation=GET_BY_ID&indexName=twitter&indexType=tweet");
from("direct:delete").to("elasticsearch://local?operation=DELETE&indexName=twitter&indexType=tweet");
from("direct:search").to("elasticsearch://local?operation=SEARCH&indexName=twitter&indexType=tweet");
@@ -364,6 +407,8 @@ public class ElasticsearchComponentTest extends CamelTestSupport {
from("direct:bulk").to("elasticsearch://local?operation=BULK&indexName=twitter&indexType=tweet");
//from("direct:indexWithIp").to("elasticsearch://elasticsearch?operation=INDEX&indexName=twitter&indexType=tweet&ip=localhost");
//from("direct:indexWithIpAndPort").to("elasticsearch://elasticsearch?operation=INDEX&indexName=twitter&indexType=tweet&ip=localhost&port=9300");
+ //from("direct:indexWithTransportAddresses").to("elasticsearch://elasticsearch?operation=INDEX&indexName=twitter&indexType=tweet&transportAddresses=localhost:9300,localhost:9301");
+ //from("direct:indexWithIpAndTransportAddresses").to("elasticsearch://elasticsearch?operation=INDEX&indexName=twitter&indexType=tweet&ip=localhost&port=9300&transportAddresses=localhost:4444,localhost:5555");
}
};
}
http://git-wip-us.apache.org/repos/asf/camel/blob/fba8aa47/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchConfigurationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchConfigurationTest.java b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchConfigurationTest.java
index f96c164..294cd32 100644
--- a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchConfigurationTest.java
+++ b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchConfigurationTest.java
@@ -21,6 +21,8 @@ import java.util.Map;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.apache.camel.util.URISupport;
+import org.elasticsearch.action.WriteConsistencyLevel;
+import org.elasticsearch.action.support.replication.ReplicationType;
import org.junit.Test;
@@ -93,4 +95,110 @@ public class ElasticsearchConfigurationTest extends CamelTestSupport {
assertNull(conf.getClusterName());
}
+ @Test
+ public void writeConsistencyLevelDefaultConfTest() throws Exception {
+ URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet");
+ Map<String, Object> parameters = URISupport.parseParameters(uri);
+ ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters);
+ assertTrue(conf.isLocal());
+ assertEquals("INDEX", conf.getOperation());
+ assertEquals("twitter", conf.getIndexName());
+ assertEquals("tweet", conf.getIndexType());
+ assertEquals(WriteConsistencyLevel.DEFAULT, conf.getConsistencyLevel());
+ assertNull(conf.getClusterName());
+ }
+
+ @Test
+ public void writeConsistencyLevelConfTest() throws Exception {
+ URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet&consistencyLevel=QUORUM");
+ Map<String, Object> parameters = URISupport.parseParameters(uri);
+ ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters);
+ assertTrue(conf.isLocal());
+ assertEquals("INDEX", conf.getOperation());
+ assertEquals("twitter", conf.getIndexName());
+ assertEquals("tweet", conf.getIndexType());
+ assertEquals(WriteConsistencyLevel.QUORUM, conf.getConsistencyLevel());
+ assertNull(conf.getClusterName());
+ }
+
+ @Test
+ public void replicationTypeConfTest() throws Exception {
+ URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet&replicationType=ASYNC");
+ Map<String, Object> parameters = URISupport.parseParameters(uri);
+ ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters);
+ assertDefaultConfigurationParameters(conf);
+ assertEquals(ReplicationType.ASYNC, conf.getReplicationType());
+ }
+
+ @Test
+ public void replicationTypeDefaultConfTest() throws Exception {
+ URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet");
+ Map<String, Object> parameters = URISupport.parseParameters(uri);
+ ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters);
+ assertDefaultConfigurationParameters(conf);
+ assertEquals(ReplicationType.DEFAULT, conf.getReplicationType());
+ }
+
+ @Test
+ public void transportAddressesSimpleHostnameTest() throws Exception {
+ URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&" +
+ "indexType=tweet&transportAddresses=127.0.0.1");
+ Map<String, Object> parameters = URISupport.parseParameters(uri);
+ ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters);
+ assertDefaultConfigurationParameters(conf);
+ assertEquals(1, conf.getTransportAddresses().size());
+ assertEquals("127.0.0.1", conf.getTransportAddresses().get(0).address().getHostString());
+ assertEquals(9300, conf.getTransportAddresses().get(0).address().getPort());
+ }
+
+ @Test
+ public void transportAddressesMultipleHostnameTest() throws Exception {
+ URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&" +
+ "indexType=tweet&transportAddresses=127.0.0.1,127.0.0.2");
+ Map<String, Object> parameters = URISupport.parseParameters(uri);
+ ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters);
+ assertDefaultConfigurationParameters(conf);
+ assertEquals(2, conf.getTransportAddresses().size());
+ assertEquals("127.0.0.1", conf.getTransportAddresses().get(0).address().getHostString());
+ assertEquals(9300, conf.getTransportAddresses().get(0).address().getPort());
+ assertEquals("127.0.0.2", conf.getTransportAddresses().get(1).address().getHostString());
+ assertEquals(9300, conf.getTransportAddresses().get(1).address().getPort());
+ }
+
+ @Test
+ public void transportAddressesSimpleHostnameAndPortTest() throws Exception {
+ URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&" +
+ "indexType=tweet&transportAddresses=127.0.0.1:9305");
+ Map<String, Object> parameters = URISupport.parseParameters(uri);
+ ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters);
+ assertDefaultConfigurationParameters(conf);
+ assertEquals(1, conf.getTransportAddresses().size());
+ assertEquals("127.0.0.1", conf.getTransportAddresses().get(0).address().getHostString());
+ assertEquals(9305, conf.getTransportAddresses().get(0).address().getPort());
+ }
+
+ @Test
+ public void transportAddressesMultipleHostnameAndPortTest() throws Exception {
+ URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&" +
+ "indexType=tweet&transportAddresses=127.0.0.1:9400,127.0.0.2,127.0.0.3:9401");
+ Map<String, Object> parameters = URISupport.parseParameters(uri);
+ ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters);
+ assertDefaultConfigurationParameters(conf);
+ assertEquals(3, conf.getTransportAddresses().size());
+ assertEquals("127.0.0.1", conf.getTransportAddresses().get(0).address().getHostString());
+ assertEquals(9400, conf.getTransportAddresses().get(0).address().getPort());
+ assertEquals("127.0.0.2", conf.getTransportAddresses().get(1).address().getHostString());
+ assertEquals(9300, conf.getTransportAddresses().get(1).address().getPort());
+ assertEquals("127.0.0.3", conf.getTransportAddresses().get(2).address().getHostString());
+ assertEquals(9401, conf.getTransportAddresses().get(2).address().getPort());
+ }
+
+ private void assertDefaultConfigurationParameters(ElasticsearchConfiguration conf) {
+ assertTrue(conf.isLocal());
+ assertEquals("INDEX", conf.getOperation());
+ assertEquals("twitter", conf.getIndexName());
+ assertEquals("tweet", conf.getIndexType());
+ assertNull(conf.getClusterName());
+ }
+
}