You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2015/04/27 22:09:23 UTC

camel git commit: This closes #507

Repository: camel
Updated Branches:
  refs/heads/master 1451499b8 -> fba8aa47c


This closes #507

More parameters are available for Elasticsearch component,
including replication type, write consistency level and
multiple transport addresses.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/fba8aa47
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/fba8aa47
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/fba8aa47

Branch: refs/heads/master
Commit: fba8aa47cc3885cdf71d57dec201a48272ea7577
Parents: 1451499
Author: Mauricio Jost <ma...@activeeon.com>
Authored: Mon Apr 27 16:30:46 2015 +0200
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Mon Apr 27 22:05:41 2015 +0200

----------------------------------------------------------------------
 .../ElasticsearchConfiguration.java             |  88 ++++++++++++++-
 .../elasticsearch/ElasticsearchEndpoint.java    |  41 ++++---
 .../elasticsearch/ElasticsearchProducer.java    |  23 ++++
 .../ElasticsearchActionRequestConverter.java    |  18 ++--
 .../ElasticsearchComponentTest.java             |  45 ++++++++
 .../ElasticsearchConfigurationTest.java         | 108 +++++++++++++++++++
 6 files changed, 301 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/fba8aa47/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
index 7db78c7..d2568d3 100644
--- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
+++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
@@ -18,12 +18,18 @@ package org.apache.camel.component.elasticsearch;
 
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
 import org.apache.camel.spi.UriPath;
+import org.elasticsearch.action.WriteConsistencyLevel;
+import org.elasticsearch.action.support.replication.ReplicationType;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
 import org.elasticsearch.node.Node;
 import org.elasticsearch.node.NodeBuilder;
 
@@ -44,14 +50,22 @@ public class ElasticsearchConfiguration {
     public static final String PARAM_DATA = "data";
     public static final String PARAM_INDEX_NAME = "indexName";
     public static final String PARAM_INDEX_TYPE = "indexType";
+    public static final String PARAM_CONSISTENCY_LEVEL = "consistencyLevel";
+    public static final String PARAM_REPLICATION_TYPE = "replicationType";
+    public static final String TRANSPORT_ADDRESSES = "transportAddresses";
     public static final String PROTOCOL = "elasticsearch";
     private static final String LOCAL_NAME = "local";
     private static final String IP = "ip";
     private static final String PORT = "port";
     private static final Integer DEFAULT_PORT = 9300;
+    private static final WriteConsistencyLevel DEFAULT_CONSISTENCY_LEVEL = WriteConsistencyLevel.DEFAULT;
+    private static final ReplicationType DEFAULT_REPLICATION_TYPE = ReplicationType.DEFAULT;
+    private static final String TRANSPORT_ADDRESSES_SEPARATOR_REGEX = ",";
+    private static final String IP_PORT_SEPARATOR_REGEX = ":";
 
     private URI uri;
-    @UriPath(description = "Name of cluster or use local for local mode") @Metadata(required = "true")
+    @UriPath(description = "Name of cluster or use local for local mode")
+    @Metadata(required = "true")
     private String clusterName;
     @UriParam
     private String protocolType;
@@ -62,6 +76,10 @@ public class ElasticsearchConfiguration {
     @UriParam
     private String indexType;
     @UriParam
+    private WriteConsistencyLevel consistencyLevel;
+    @UriParam
+    private ReplicationType replicationType;
+    @UriParam
     private boolean local;
     @UriParam
     private Boolean data;
@@ -70,6 +88,8 @@ public class ElasticsearchConfiguration {
     @UriParam
     private String ip;
     @UriParam
+    private List<InetSocketTransportAddress> transportAddresses;
+    @UriParam
     private Integer port;
 
     public ElasticsearchConfiguration(URI uri, Map<String, Object> parameters) throws Exception {
@@ -106,11 +126,53 @@ public class ElasticsearchConfiguration {
         indexName = (String)parameters.remove(PARAM_INDEX_NAME);
         indexType = (String)parameters.remove(PARAM_INDEX_TYPE);
         operation = (String)parameters.remove(PARAM_OPERATION);
+        consistencyLevel = parseConsistencyLevel(parameters);
+        replicationType = parseReplicationType(parameters);
+
         ip = (String)parameters.remove(IP);
+        transportAddresses = parseTransportAddresses((String) parameters.remove(TRANSPORT_ADDRESSES));
+
         String portParam = (String) parameters.remove(PORT);
         port = portParam == null ? DEFAULT_PORT : Integer.valueOf(portParam);
     }
 
+    private ReplicationType parseReplicationType(Map<String, Object> parameters) {
+        Object replicationTypeParam = parameters.remove(PARAM_REPLICATION_TYPE);
+        if (replicationTypeParam != null) {
+            return ReplicationType.valueOf(replicationTypeParam.toString());
+        } else {
+            return DEFAULT_REPLICATION_TYPE;
+        }
+    }
+
+    private WriteConsistencyLevel parseConsistencyLevel(Map<String, Object> parameters) {
+        Object consistencyLevelParam = parameters.remove(PARAM_CONSISTENCY_LEVEL);
+        if (consistencyLevelParam != null) {
+            return WriteConsistencyLevel.valueOf(consistencyLevelParam.toString());
+        } else {
+            return DEFAULT_CONSISTENCY_LEVEL;
+        }
+    }
+
+    private List<InetSocketTransportAddress> parseTransportAddresses(String ipsString) {
+        if (ipsString == null || ipsString.isEmpty()) {
+            return null;
+        }
+        List<String> addressesStr = Arrays.asList(ipsString.split(TRANSPORT_ADDRESSES_SEPARATOR_REGEX));
+        List<InetSocketTransportAddress> addressesTrAd = new ArrayList<>(addressesStr.size());
+        for (String address : addressesStr) {
+            String[] split = address.split(IP_PORT_SEPARATOR_REGEX);
+            String hostname;
+            if (split.length > 0)
+                hostname = split[0];
+            else
+                throw new IllegalArgumentException();
+            Integer port = (split.length > 1 ? Integer.parseInt(split[1]) : DEFAULT_PORT);
+            addressesTrAd.add(new InetSocketTransportAddress(hostname, port));
+        }
+        return addressesTrAd;
+    }
+
     protected Boolean toBoolean(Object string) {
         if ("true".equals(string)) {
             return true;
@@ -217,6 +279,14 @@ public class ElasticsearchConfiguration {
         this.ip = ip;
     }
 
+    public List<InetSocketTransportAddress> getTransportAddresses() {
+        return transportAddresses;
+    }
+
+    public void setTransportAddresses(List<InetSocketTransportAddress> transportAddresses) {
+        this.transportAddresses = transportAddresses;
+    }
+
     public Integer getPort() {
         return port;
     }
@@ -225,4 +295,20 @@ public class ElasticsearchConfiguration {
         this.port = port;
     }
 
+    public void setConsistencyLevel(WriteConsistencyLevel consistencyLevel) {
+        this.consistencyLevel = consistencyLevel;
+    }
+
+    public WriteConsistencyLevel getConsistencyLevel() {
+        return consistencyLevel;
+    }
+
+    public void setReplicationType(ReplicationType replicationType) {
+        this.replicationType = replicationType;
+    }
+
+    public ReplicationType getReplicationType() {
+        return replicationType;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/fba8aa47/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java
index da49c12..4a582ed 100644
--- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java
+++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java
@@ -18,6 +18,8 @@ package org.apache.camel.component.elasticsearch;
 
 import java.net.URI;
 import java.util.Map;
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
@@ -30,6 +32,7 @@ import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.settings.ImmutableSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.node.Node;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -73,27 +76,37 @@ public class ElasticsearchEndpoint extends DefaultEndpoint {
             LOG.info("Joining ElasticSearch cluster " + configuration.getClusterName());
         }
         if (configuration.getIp() != null) {
-            LOG.info("REMOTE ELASTICSEARCH: {}", configuration.getIp());
-            Settings settings = ImmutableSettings.settingsBuilder()
-                    // setting the classloader here will allow the underlying elasticsearch-java
-                    // class to find its names.txt in an OSGi environment (otherwise the thread
-                    // classloader is used, which won't be able to see the file causing a startup
-                    // exception).
-                    .classLoader(Settings.class.getClassLoader())
-                    .put("cluster.name", configuration.getClusterName())
-                    .put("client.transport.ignore_cluster_name", false)
-                    .put("node.client", true)
-                    .put("client.transport.sniff", true)
-                    .build();
-            Client client = new TransportClient(settings)
+            this.client = new TransportClient(getSettings())
                     .addTransportAddress(new InetSocketTransportAddress(configuration.getIp(), configuration.getPort()));
-            this.client = client;
+
+        } else if (configuration.getTransportAddresses() != null &&
+                !configuration.getTransportAddresses().isEmpty()) {
+            List<TransportAddress> addresses = new ArrayList<>(configuration.getTransportAddresses().size());
+            for (TransportAddress address : configuration.getTransportAddresses()) {
+                addresses.add(address);
+            }
+            this.client = new TransportClient(getSettings())
+                   .addTransportAddresses(addresses.toArray(new TransportAddress[0]));
         } else {
             node = configuration.buildNode();
             client = node.client();
         }
     }
 
+    private Settings getSettings() {
+        return ImmutableSettings.settingsBuilder()
+                        // setting the classloader here will allow the underlying elasticsearch-java
+                        // class to find its names.txt in an OSGi environment (otherwise the thread
+                        // classloader is used, which won't be able to see the file causing a startup
+                // exception).
+                .classLoader(Settings.class.getClassLoader())
+                .put("cluster.name", configuration.getClusterName())
+                .put("client.transport.ignore_cluster_name", false)
+                        .put("node.client", true)
+                .put("client.transport.sniff", true)
+                        .build();
+    }
+
     @Override
     protected void doStop() throws Exception {
         if (configuration.isLocal()) {

http://git-wip-us.apache.org/repos/asf/camel/blob/fba8aa47/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
index 2b432d5..0ededde 100644
--- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
+++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
@@ -113,6 +113,20 @@ public class ElasticsearchProducer extends DefaultProducer {
             configIndexType = true;
         }
 
+        boolean configConsistencyLevel = false;
+        String consistencyLevel = message.getHeader(ElasticsearchConfiguration.PARAM_CONSISTENCY_LEVEL, String.class);
+        if (consistencyLevel == null) {
+            message.setHeader(ElasticsearchConfiguration.PARAM_CONSISTENCY_LEVEL, getEndpoint().getConfig().getConsistencyLevel());
+            configConsistencyLevel = true;
+        }
+
+        boolean configReplicationType = false;
+        String replicationType = message.getHeader(ElasticsearchConfiguration.PARAM_REPLICATION_TYPE, String.class);
+        if (replicationType == null) {
+            message.setHeader(ElasticsearchConfiguration.PARAM_REPLICATION_TYPE, getEndpoint().getConfig().getReplicationType());
+            configReplicationType = true;
+        }
+
         Client client = getEndpoint().getClient();
         if (ElasticsearchConfiguration.OPERATION_INDEX.equals(operation)) {
             IndexRequest indexRequest = message.getBody(IndexRequest.class);
@@ -155,5 +169,14 @@ public class ElasticsearchProducer extends DefaultProducer {
         if (configIndexType) {
             message.removeHeader(ElasticsearchConfiguration.PARAM_INDEX_TYPE);
         }
+
+        if (configConsistencyLevel) {
+            message.removeHeader(ElasticsearchConfiguration.PARAM_CONSISTENCY_LEVEL);
+        }
+
+        if (configReplicationType) {
+            message.removeHeader(ElasticsearchConfiguration.PARAM_REPLICATION_TYPE);
+        }
+
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/fba8aa47/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java
index 934258f..a64f843 100644
--- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java
+++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java
@@ -22,11 +22,13 @@ import java.util.Map;
 import org.apache.camel.Converter;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.elasticsearch.ElasticsearchConfiguration;
+import org.elasticsearch.action.WriteConsistencyLevel;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.get.GetRequest;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.support.replication.ReplicationType;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 
 @Converter
@@ -50,13 +52,15 @@ public final class ElasticsearchActionRequestConverter {
             return null;
         }
 
-        return indexRequest.index(
-                exchange.getIn().getHeader(
-                        ElasticsearchConfiguration.PARAM_INDEX_NAME,
-                        String.class)).type(
-                exchange.getIn().getHeader(
-                        ElasticsearchConfiguration.PARAM_INDEX_TYPE,
-                        String.class));
+        return indexRequest
+                .consistencyLevel(exchange.getIn().getHeader(
+                        ElasticsearchConfiguration.PARAM_CONSISTENCY_LEVEL, WriteConsistencyLevel.class))
+                .replicationType(exchange.getIn().getHeader(
+                        ElasticsearchConfiguration.PARAM_REPLICATION_TYPE, ReplicationType.class))
+                .index(exchange.getIn().getHeader(
+                        ElasticsearchConfiguration.PARAM_INDEX_NAME, String.class))
+                .type(exchange.getIn().getHeader(
+                        ElasticsearchConfiguration.PARAM_INDEX_TYPE, String.class));
     }
 
     @Converter

http://git-wip-us.apache.org/repos/asf/camel/blob/fba8aa47/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
index 318bdf5..27fa90f 100644
--- a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
+++ b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
@@ -94,6 +94,20 @@ public class ElasticsearchComponentTest extends CamelTestSupport {
     }
 
     @Test
+    public void testIndexWithReplication() throws Exception {
+        Map<String, String> map = createIndexedData();
+        String indexId = template.requestBody("direct:indexWithReplication", map, String.class);
+        assertNotNull("indexId should be set", indexId);
+    }
+
+    @Test
+    public void testIndexWithWriteConsistency() throws Exception {
+        Map<String, String> map = createIndexedData();
+        String indexId = template.requestBody("direct:indexWithWriteConsistency", map, String.class);
+        assertNotNull("indexId should be set", indexId);
+    }
+
+    @Test
     public void testBulkIndex() throws Exception {
         List<Map<String, String>> documents = new ArrayList<Map<String, String>>();
         Map<String, String> document1 = createIndexedData("1");
@@ -215,6 +229,33 @@ public class ElasticsearchComponentTest extends CamelTestSupport {
     }
 
     @Test
+    @Ignore("need to setup the cluster with multiple nodes for this test")
+    public void indexWithTransportAddresses()  throws Exception {
+        Map<String, String> map = createIndexedData();
+        Map<String, Object> headers = new HashMap<String, Object>();
+        headers.put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_INDEX);
+        headers.put(ElasticsearchConfiguration.PARAM_INDEX_NAME, "twitter");
+        headers.put(ElasticsearchConfiguration.PARAM_INDEX_TYPE, "tweet");
+
+        String indexId = template.requestBodyAndHeaders("direct:indexWithTransportAddresses", map, headers, String.class);
+        assertNotNull("indexId should be set", indexId);
+    }
+
+    @Test
+    @Ignore("need to setup the cluster with multiple nodes for this test")
+    public void indexWithIpAndTransportAddresses()  throws Exception {
+        Map<String, String> map = createIndexedData();
+        Map<String, Object> headers = new HashMap<String, Object>();
+        headers.put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_INDEX);
+        headers.put(ElasticsearchConfiguration.PARAM_INDEX_NAME, "twitter");
+        headers.put(ElasticsearchConfiguration.PARAM_INDEX_TYPE, "tweet");
+
+        //should ignore transport addresses configuration
+        String indexId = template.requestBodyAndHeaders("direct:indexWithIpAndTransportAddresses", map, headers, String.class);
+        assertNotNull("indexId should be set", indexId);
+    }
+
+    @Test
     public void testGetWithHeaders() throws Exception {
         //first, INDEX a value
         Map<String, String> map = createIndexedData();
@@ -357,6 +398,8 @@ public class ElasticsearchComponentTest extends CamelTestSupport {
             public void configure() {
                 from("direct:start").to("elasticsearch://local");
                 from("direct:index").to("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet");
+                from("direct:indexWithReplication").to("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet&replicationType=SYNC");
+                from("direct:indexWithWriteConsistency").to("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet&consistencyLevel=ONE");
                 from("direct:get").to("elasticsearch://local?operation=GET_BY_ID&indexName=twitter&indexType=tweet");
                 from("direct:delete").to("elasticsearch://local?operation=DELETE&indexName=twitter&indexType=tweet");
                 from("direct:search").to("elasticsearch://local?operation=SEARCH&indexName=twitter&indexType=tweet");
@@ -364,6 +407,8 @@ public class ElasticsearchComponentTest extends CamelTestSupport {
                 from("direct:bulk").to("elasticsearch://local?operation=BULK&indexName=twitter&indexType=tweet");
                 //from("direct:indexWithIp").to("elasticsearch://elasticsearch?operation=INDEX&indexName=twitter&indexType=tweet&ip=localhost");
                 //from("direct:indexWithIpAndPort").to("elasticsearch://elasticsearch?operation=INDEX&indexName=twitter&indexType=tweet&ip=localhost&port=9300");
+                //from("direct:indexWithTransportAddresses").to("elasticsearch://elasticsearch?operation=INDEX&indexName=twitter&indexType=tweet&transportAddresses=localhost:9300,localhost:9301");
+                //from("direct:indexWithIpAndTransportAddresses").to("elasticsearch://elasticsearch?operation=INDEX&indexName=twitter&indexType=tweet&ip=localhost&port=9300&transportAddresses=localhost:4444,localhost:5555");
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/fba8aa47/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchConfigurationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchConfigurationTest.java b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchConfigurationTest.java
index f96c164..294cd32 100644
--- a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchConfigurationTest.java
+++ b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchConfigurationTest.java
@@ -21,6 +21,8 @@ import java.util.Map;
 
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.apache.camel.util.URISupport;
+import org.elasticsearch.action.WriteConsistencyLevel;
+import org.elasticsearch.action.support.replication.ReplicationType;
 
 import org.junit.Test;
 
@@ -93,4 +95,110 @@ public class ElasticsearchConfigurationTest extends CamelTestSupport {
         assertNull(conf.getClusterName());
     }
 
+    @Test
+    public void writeConsistencyLevelDefaultConfTest() throws Exception {
+        URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet");
+        Map<String, Object> parameters = URISupport.parseParameters(uri);
+        ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters);
+        assertTrue(conf.isLocal());
+        assertEquals("INDEX", conf.getOperation());
+        assertEquals("twitter", conf.getIndexName());
+        assertEquals("tweet", conf.getIndexType());
+        assertEquals(WriteConsistencyLevel.DEFAULT, conf.getConsistencyLevel());
+        assertNull(conf.getClusterName());
+    }
+
+    @Test
+    public void writeConsistencyLevelConfTest() throws Exception {
+        URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet&consistencyLevel=QUORUM");
+        Map<String, Object> parameters = URISupport.parseParameters(uri);
+        ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters);
+        assertTrue(conf.isLocal());
+        assertEquals("INDEX", conf.getOperation());
+        assertEquals("twitter", conf.getIndexName());
+        assertEquals("tweet", conf.getIndexType());
+        assertEquals(WriteConsistencyLevel.QUORUM, conf.getConsistencyLevel());
+        assertNull(conf.getClusterName());
+    }
+
+    @Test
+    public void replicationTypeConfTest() throws Exception {
+        URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet&replicationType=ASYNC");
+        Map<String, Object> parameters = URISupport.parseParameters(uri);
+        ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters);
+        assertDefaultConfigurationParameters(conf);
+        assertEquals(ReplicationType.ASYNC, conf.getReplicationType());
+    }
+
+    @Test
+    public void replicationTypeDefaultConfTest() throws Exception {
+        URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet");
+        Map<String, Object> parameters = URISupport.parseParameters(uri);
+        ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters);
+        assertDefaultConfigurationParameters(conf);
+        assertEquals(ReplicationType.DEFAULT, conf.getReplicationType());
+    }
+
+    @Test
+    public void transportAddressesSimpleHostnameTest() throws Exception {
+        URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&" +
+                "indexType=tweet&transportAddresses=127.0.0.1");
+        Map<String, Object> parameters = URISupport.parseParameters(uri);
+        ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters);
+        assertDefaultConfigurationParameters(conf);
+        assertEquals(1, conf.getTransportAddresses().size());
+        assertEquals("127.0.0.1", conf.getTransportAddresses().get(0).address().getHostString());
+        assertEquals(9300, conf.getTransportAddresses().get(0).address().getPort());
+    }
+
+    @Test
+    public void transportAddressesMultipleHostnameTest() throws Exception {
+        URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&" +
+                "indexType=tweet&transportAddresses=127.0.0.1,127.0.0.2");
+        Map<String, Object> parameters = URISupport.parseParameters(uri);
+        ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters);
+        assertDefaultConfigurationParameters(conf);
+        assertEquals(2, conf.getTransportAddresses().size());
+        assertEquals("127.0.0.1", conf.getTransportAddresses().get(0).address().getHostString());
+        assertEquals(9300, conf.getTransportAddresses().get(0).address().getPort());
+        assertEquals("127.0.0.2", conf.getTransportAddresses().get(1).address().getHostString());
+        assertEquals(9300, conf.getTransportAddresses().get(1).address().getPort());
+    }
+
+    @Test
+    public void transportAddressesSimpleHostnameAndPortTest() throws Exception {
+        URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&" +
+                "indexType=tweet&transportAddresses=127.0.0.1:9305");
+        Map<String, Object> parameters = URISupport.parseParameters(uri);
+        ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters);
+        assertDefaultConfigurationParameters(conf);
+        assertEquals(1, conf.getTransportAddresses().size());
+        assertEquals("127.0.0.1", conf.getTransportAddresses().get(0).address().getHostString());
+        assertEquals(9305, conf.getTransportAddresses().get(0).address().getPort());
+    }
+
+    @Test
+    public void transportAddressesMultipleHostnameAndPortTest() throws Exception {
+        URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&" +
+                "indexType=tweet&transportAddresses=127.0.0.1:9400,127.0.0.2,127.0.0.3:9401");
+        Map<String, Object> parameters = URISupport.parseParameters(uri);
+        ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters);
+        assertDefaultConfigurationParameters(conf);
+        assertEquals(3, conf.getTransportAddresses().size());
+        assertEquals("127.0.0.1", conf.getTransportAddresses().get(0).address().getHostString());
+        assertEquals(9400, conf.getTransportAddresses().get(0).address().getPort());
+        assertEquals("127.0.0.2", conf.getTransportAddresses().get(1).address().getHostString());
+        assertEquals(9300, conf.getTransportAddresses().get(1).address().getPort());
+        assertEquals("127.0.0.3", conf.getTransportAddresses().get(2).address().getHostString());
+        assertEquals(9401, conf.getTransportAddresses().get(2).address().getPort());
+    }
+
+    private void assertDefaultConfigurationParameters(ElasticsearchConfiguration conf) {
+        assertTrue(conf.isLocal());
+        assertEquals("INDEX", conf.getOperation());
+        assertEquals("twitter", conf.getIndexName());
+        assertEquals("tweet", conf.getIndexType());
+        assertNull(conf.getClusterName());
+    }
+
 }