You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/09/01 15:32:03 UTC

[01/10] storm git commit: Fix STORM-1013

Repository: storm
Updated Branches:
  refs/heads/master bf5c35ba4 -> 436878128


Fix STORM-1013


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6a54fdb2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6a54fdb2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6a54fdb2

Branch: refs/heads/master
Commit: 6a54fdb2e617723ca36a82258ba562fd7fadb29f
Parents: 8618eb3
Author: Alex Panov <al...@teradata.com>
Authored: Fri Aug 28 17:35:39 2015 +0200
Committer: Alex Panov <al...@teradata.com>
Committed: Fri Aug 28 17:35:39 2015 +0200

----------------------------------------------------------------------
 external/storm-elasticsearch/pom.xml            |  6 ++
 .../elasticsearch/bolt/AbstractEsBolt.java      | 10 ++-
 .../elasticsearch/bolt/ElasticSearchClient.java | 57 --------------
 .../storm/elasticsearch/bolt/EsLookupBolt.java  | 10 +++
 .../elasticsearch/bolt/TransportAddresses.java  | 72 -----------------
 .../storm/elasticsearch/common/EsConfig.java    | 65 +++++++++++-----
 .../common/StormElasticSearchClient.java        | 48 ++++++++++++
 .../common/TransportAddresses.java              | 72 +++++++++++++++++
 .../storm/elasticsearch/trident/EsState.java    | 33 +++-----
 .../elasticsearch/trident/EsStateFactory.java   | 18 ++---
 .../storm/elasticsearch/trident/EsUpdater.java  |  2 +-
 .../elasticsearch/bolt/AbstractEsBoltTest.java  | 15 +++-
 .../elasticsearch/bolt/EsIndexBoltTest.java     |  5 ++
 .../elasticsearch/bolt/EsIndexTopology.java     |  4 +-
 .../bolt/EsLookupBoltIntegrationTest.java       |  5 ++
 .../elasticsearch/bolt/EsLookupBoltTest.java    |  5 ++
 .../elasticsearch/bolt/EsPercolateBoltTest.java |  5 ++
 .../bolt/TransportAddressesTest.java            | 81 --------------------
 .../elasticsearch/common/EsConfigTest.java      | 54 +++++++++++++
 .../elasticsearch/common/SniffSettings.java     |  5 ++
 .../common/TransportAddressesTest.java          | 81 ++++++++++++++++++++
 .../trident/TridentEsTopology.java              |  4 +-
 22 files changed, 381 insertions(+), 276 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/pom.xml b/external/storm-elasticsearch/pom.xml
index ecc1bd2..7777549 100644
--- a/external/storm-elasticsearch/pom.xml
+++ b/external/storm-elasticsearch/pom.xml
@@ -63,6 +63,12 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava-testlib</artifactId>
+            <version>${guava.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.mockito</groupId>
             <artifactId>mockito-all</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
index a19a660..fa7356a 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,6 +20,7 @@ package org.apache.storm.elasticsearch.bolt;
 import java.util.Map;
 
 import org.apache.storm.elasticsearch.common.EsConfig;
+import org.apache.storm.elasticsearch.common.StormElasticSearchClient;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
@@ -31,6 +32,8 @@ import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.base.BaseRichBolt;
 import backtype.storm.tuple.Tuple;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 public abstract class AbstractEsBolt extends BaseRichBolt {
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractEsBolt.class);
@@ -41,6 +44,7 @@ public abstract class AbstractEsBolt extends BaseRichBolt {
     private EsConfig esConfig;
 
     public AbstractEsBolt(EsConfig esConfig) {
+        checkNotNull(esConfig);
         this.esConfig = esConfig;
     }
 
@@ -50,7 +54,7 @@ public abstract class AbstractEsBolt extends BaseRichBolt {
             this.collector = outputCollector;
             synchronized (AbstractEsBolt.class) {
                 if (client == null) {
-                    client = new ElasticSearchClient(esConfig).construct();
+                    client = new StormElasticSearchClient(esConfig).construct();
                 }
             }
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/ElasticSearchClient.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/ElasticSearchClient.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/ElasticSearchClient.java
deleted file mode 100644
index 0a9f4ea..0000000
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/ElasticSearchClient.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.elasticsearch.bolt;
-
-import java.io.Serializable;
-
-import org.apache.storm.elasticsearch.common.EsConfig;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-
-final class ElasticSearchClient implements Serializable {
-
-    private final EsConfig esConfig;
-
-    ElasticSearchClient(EsConfig esConfig) {
-        this.esConfig = esConfig;
-    }
-
-    Client construct() {
-        Settings settings = createBasicSettings();
-        TransportClient transportClient = new TransportClient(settings);
-        addTransportAddresses(transportClient);
-        return transportClient;
-    }
-
-    private Settings createBasicSettings() {
-        return ImmutableSettings.settingsBuilder()
-                                .put("cluster.name", esConfig.getClusterName())
-                                .put("client.transport.sniff", "true")
-                                .build();
-    }
-
-    private void addTransportAddresses(TransportClient transportClient) {
-        Iterable<InetSocketTransportAddress> transportAddresses = new TransportAddresses(esConfig.getNodes());
-        for (InetSocketTransportAddress transportAddress : transportAddresses) {
-            transportClient.addTransportAddress(transportAddress);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java
index d9982f2..bd7178b 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java
@@ -29,13 +29,23 @@ import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * @since 0.11
+ */
 public class EsLookupBolt extends AbstractEsBolt {
 
     private final ElasticsearchGetRequest getRequest;
     private final EsLookupResultOutput output;
 
+    /**
+     * @throws NullPointerException if any of the parameters is null
+     */
     public EsLookupBolt(EsConfig esConfig, ElasticsearchGetRequest getRequest, EsLookupResultOutput output) {
         super(esConfig);
+        checkNotNull(getRequest);
+        checkNotNull(output);
         this.getRequest = getRequest;
         this.output = output;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/TransportAddresses.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/TransportAddresses.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/TransportAddresses.java
deleted file mode 100644
index f4479df..0000000
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/TransportAddresses.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.elasticsearch.bolt;
-
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-
-final class TransportAddresses implements Iterable<InetSocketTransportAddress> {
-
-    static final String DELIMETER = ":";
-
-    private final String[] nodes;
-
-    TransportAddresses(String[] nodes) {
-        if (nodes == null) {
-            throw new IllegalArgumentException("Elasticsearch hosts cannot be null");
-        }
-        if (nodes.length == 0) {
-            throw new IllegalArgumentException("At least one Elasticsearch host must be specified");
-        }
-
-        this.nodes = nodes;
-    }
-
-    @Override
-    public Iterator<InetSocketTransportAddress> iterator() {
-        List<InetSocketTransportAddress> result = new LinkedList<>();
-
-        for (String node : nodes) {
-            InetSocketTransportAddress transportAddress = transformToInetAddress(node);
-            result.add(transportAddress);
-        }
-
-        return result.iterator();
-    }
-
-    private InetSocketTransportAddress transformToInetAddress(String node) {
-        String[] hostAndPort = node.split(DELIMETER);
-        if (hostAndPort.length != 2) {
-            throw new IllegalArgumentException(
-                    "Incorrect Elasticsearch node format, should follow {host}" + DELIMETER + "{port} pattern");
-        }
-        String hostname = hostname(hostAndPort[0]);
-        return new InetSocketTransportAddress(hostname, port(hostAndPort[1]));
-    }
-
-    private String hostname(String input) {
-        return input.trim();
-    }
-
-    private int port(String input) {
-        return Integer.parseInt(input.trim());
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
index 0b57788..5724519 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,37 +18,66 @@
 package org.apache.storm.elasticsearch.common;
 
 import java.io.Serializable;
+import java.util.Collections;
+import java.util.Map;
 
-public class EsConfig implements Serializable{
-    private String clusterName;
-    private String[] nodes;
+import com.google.common.collect.ImmutableMap;
 
-    public EsConfig() {
-    }
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * @since 0.11
+ */
+public class EsConfig implements Serializable {
+
+    private final String clusterName;
+    private final String[] nodes;
+    private final Map<String, String> additionalConfiguration;
 
     /**
      * EsConfig Constructor to be used in EsIndexBolt, EsPercolateBolt and EsStateFactory
+     *
      * @param clusterName Elasticsearch cluster name
-     * @param nodes Elasticsearch addresses in host:port pattern string array
+     * @param nodes       Elasticsearch addresses in host:port pattern string array
+     * @throws IllegalArgumentException if nodes are empty
+     * @throws NullPointerException on any of the fields being null
      */
     public EsConfig(String clusterName, String[] nodes) {
-        this.clusterName = clusterName;
-        this.nodes = nodes;
+        this(clusterName, nodes, Collections.<String, String>emptyMap());
     }
 
-    public String getClusterName() {
-        return clusterName;
-    }
+    /**
+     * EsConfig Constructor to be used in EsIndexBolt, EsPercolateBolt and EsStateFactory
+     *
+     * @param clusterName             Elasticsearch cluster name
+     * @param nodes                   Elasticsearch addresses in host:port pattern string array
+     * @param additionalConfiguration Additional Elasticsearch configuration
+     * @throws IllegalArgumentException if nodes are empty
+     * @throws NullPointerException on any of the fields being null
+     */
+    public EsConfig(String clusterName, String[] nodes, Map<String, String> additionalConfiguration) {
+        checkNotNull(clusterName);
+        checkNotNull(nodes);
+        checkNotNull(additionalConfiguration);
 
-    public void setClusterName(String clusterName) {
+        checkArgument(nodes.length != 0, "Nodes cannot be empty");
         this.clusterName = clusterName;
+        this.nodes = nodes;
+        this.additionalConfiguration = ImmutableMap.copyOf(additionalConfiguration);
     }
 
-    public String[] getNodes() {
-        return nodes;
+    TransportAddresses getTransportAddresses() {
+        return new TransportAddresses(nodes);
     }
 
-    public void setNodes(String[] nodes) {
-        this.nodes = nodes;
+    Settings toBasicSettings() {
+        return ImmutableSettings.settingsBuilder()
+                                .put("cluster.name", clusterName)
+                                .put(additionalConfiguration)
+                                .build();
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/StormElasticSearchClient.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/StormElasticSearchClient.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/StormElasticSearchClient.java
new file mode 100644
index 0000000..3ebfe72
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/StormElasticSearchClient.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.common;
+
+import java.io.Serializable;
+
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+
+public final class StormElasticSearchClient implements Serializable {
+
+    private final EsConfig esConfig;
+
+    public StormElasticSearchClient(EsConfig esConfig) {
+        this.esConfig = esConfig;
+    }
+
+    public Client construct() {
+        Settings settings = esConfig.toBasicSettings();
+        TransportClient transportClient = new TransportClient(settings);
+        addTransportAddresses(transportClient);
+        return transportClient;
+    }
+
+    private void addTransportAddresses(TransportClient transportClient) {
+        Iterable<InetSocketTransportAddress> transportAddresses = esConfig.getTransportAddresses();
+        for (InetSocketTransportAddress transportAddress : transportAddresses) {
+            transportClient.addTransportAddress(transportAddress);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/TransportAddresses.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/TransportAddresses.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/TransportAddresses.java
new file mode 100644
index 0000000..cd082a7
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/TransportAddresses.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.common;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+
+final class TransportAddresses implements Iterable<InetSocketTransportAddress> {
+
+    static final String DELIMETER = ":";
+
+    private final String[] nodes;
+
+    TransportAddresses(String[] nodes) {
+        if (nodes == null) {
+            throw new IllegalArgumentException("Elasticsearch hosts cannot be null");
+        }
+        if (nodes.length == 0) {
+            throw new IllegalArgumentException("At least one Elasticsearch host must be specified");
+        }
+
+        this.nodes = nodes;
+    }
+
+    @Override
+    public Iterator<InetSocketTransportAddress> iterator() {
+        List<InetSocketTransportAddress> result = new LinkedList<>();
+
+        for (String node : nodes) {
+            InetSocketTransportAddress transportAddress = transformToInetAddress(node);
+            result.add(transportAddress);
+        }
+
+        return result.iterator();
+    }
+
+    private InetSocketTransportAddress transformToInetAddress(String node) {
+        String[] hostAndPort = node.split(DELIMETER);
+        if (hostAndPort.length != 2) {
+            throw new IllegalArgumentException(
+                    "Incorrect Elasticsearch node format, should follow {host}" + DELIMETER + "{port} pattern");
+        }
+        String hostname = hostname(hostAndPort[0]);
+        return new InetSocketTransportAddress(hostname, port(hostAndPort[1]));
+    }
+
+    private String hostname(String input) {
+        return input.trim();
+    }
+
+    private int port(String input) {
+        return Integer.parseInt(input.trim());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
index e804084..e60c003 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
@@ -17,27 +17,24 @@
  */
 package org.apache.storm.elasticsearch.trident;
 
-import backtype.storm.task.IMetricsContext;
 import backtype.storm.topology.FailedException;
+
+import org.apache.storm.elasticsearch.common.StormElasticSearchClient;
 import org.apache.storm.elasticsearch.common.EsConfig;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.client.Client;
-import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import storm.trident.operation.TridentCollector;
 import storm.trident.state.State;
 import storm.trident.tuple.TridentTuple;
 
-import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 
-public class EsState implements State {
+/**
+ * @since 0.11
+ */
+class EsState implements State {
     private static final Logger LOG = LoggerFactory.getLogger(EsState.class);
     private static Client client;
     private EsConfig esConfig;
@@ -74,23 +71,11 @@ public class EsState implements State {
 
     }
 
-    public void prepare(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+    public void prepare() {
         try {
             synchronized (EsState.class) {
                 if (client == null) {
-                    Settings settings =
-                            ImmutableSettings.settingsBuilder().put("cluster.name", esConfig.getClusterName())
-                                    .put("client.transport.sniff", "true").build();
-                    List<InetSocketTransportAddress> transportAddressList = new ArrayList<InetSocketTransportAddress>();
-                    for (String node : esConfig.getNodes()) {
-                        String[] hostAndPort = node.split(":");
-                        if (hostAndPort.length != 2) {
-                            throw new IllegalArgumentException("incorrect Elasticsearch node format, should follow {host}:{port} pattern");
-                        }
-                        transportAddressList.add(new InetSocketTransportAddress(hostAndPort[0], Integer.parseInt(hostAndPort[1])));
-                    }
-                    client = new TransportClient(settings)
-                            .addTransportAddresses(transportAddressList.toArray(new InetSocketTransportAddress[transportAddressList.size()]));
+                    client = new StormElasticSearchClient(esConfig).construct();
                 }
             }
         } catch (Exception e) {
@@ -98,7 +83,7 @@ public class EsState implements State {
         }
     }
 
-    public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
+    public void updateState(List<TridentTuple> tuples) {
         BulkRequestBuilder bulkRequest = client.prepareBulk();
         for (TridentTuple tuple : tuples) {
             String source = tuple.getStringByField("source");

http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
index c3a2e6c..4f4dd39 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
@@ -19,32 +19,30 @@ package org.apache.storm.elasticsearch.trident;
 
 import backtype.storm.task.IMetricsContext;
 import org.apache.storm.elasticsearch.common.EsConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
 import storm.trident.state.State;
 import storm.trident.state.StateFactory;
 
 import java.util.Map;
 
-public class EsStateFactory implements StateFactory {
-    private EsConfig esConfig;
-
-    public EsStateFactory(){
-
-    }
+/**
+ * @since 0.11
+ */
+class EsStateFactory implements StateFactory {
+    private final EsConfig esConfig;
 
     /**
      * EsStateFactory constructor
      * @param esConfig Elasticsearch configuration containing node addresses and cluster name {@link EsConfig}
      */
-    public EsStateFactory(EsConfig esConfig){
+    EsStateFactory(EsConfig esConfig){
         this.esConfig = esConfig;
     }
 
     @Override
     public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
         EsState esState = new EsState(esConfig);
-        esState.prepare(conf, metrics, partitionIndex, numPartitions);
+        esState.prepare();
         return esState;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsUpdater.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsUpdater.java
index 6fa42f3..685ea8c 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsUpdater.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsUpdater.java
@@ -26,6 +26,6 @@ import java.util.List;
 public class EsUpdater extends BaseStateUpdater<EsState> {
     @Override
     public void updateState(EsState state, List<TridentTuple> tuples, TridentCollector collector) {
-        state.updateState(tuples, collector);
+        state.updateState(tuples);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
index d9c09d0..8a5fad2 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
@@ -17,9 +17,12 @@
  */
 package org.apache.storm.elasticsearch.bolt;
 
+import com.google.common.testing.NullPointerTester;
+
 import org.apache.storm.elasticsearch.common.EsConfig;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.runners.MockitoJUnitRunner;
@@ -46,14 +49,18 @@ public abstract class AbstractEsBoltTest<Bolt extends AbstractEsBolt> {
     protected abstract Bolt createBolt(EsConfig esConfig);
 
     protected EsConfig esConfig() {
-        EsConfig esConfig = new EsConfig();
-        esConfig.setClusterName("test-cluster");
-        esConfig.setNodes(new String[] {"127.0.0.1:9300"});
-        return esConfig;
+        return new EsConfig("test-cluster", new String[] {"127.0.0.1:9300"});
     }
 
     @After
     public void cleanupBolt() throws Exception {
         bolt.cleanup();
     }
+
+    @Test
+    public void constructorsThrowOnNull() throws Exception {
+        new NullPointerTester().setDefault(EsConfig.class, esConfig()).testAllPublicConstructors(getBoltClass());
+    }
+
+    protected abstract Class<Bolt> getBoltClass();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
index d2def5d..bcba6d4 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
@@ -62,4 +62,9 @@ public class EsIndexBoltTest extends AbstractEsBoltIntegrationTest<EsIndexBolt>
     protected EsIndexBolt createBolt(EsConfig esConfig) {
         return new EsIndexBolt(esConfig);
     }
+
+    @Override
+    protected Class<EsIndexBolt> getBoltClass() {
+        return EsIndexBolt.class;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
index fc9c178..3c1e949 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
@@ -46,9 +46,7 @@ public class EsIndexTopology {
         TopologyBuilder builder = new TopologyBuilder();
         UserDataSpout spout = new UserDataSpout();
         builder.setSpout(SPOUT_ID, spout, 1);
-        EsConfig esConfig = new EsConfig();
-        esConfig.setClusterName(EsConstants.clusterName);
-        esConfig.setNodes(new String[]{"localhost:9300"});
+        EsConfig esConfig = new EsConfig(EsConstants.clusterName, new String[]{"localhost:9300"});
         builder.setBolt(BOLT_ID, new EsIndexBolt(esConfig), 1).shuffleGrouping(SPOUT_ID);
 
         EsTestUtil.startEsNode();

http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltIntegrationTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltIntegrationTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltIntegrationTest.java
index c391688..d100b84 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltIntegrationTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltIntegrationTest.java
@@ -129,4 +129,9 @@ public class EsLookupBoltIntegrationTest extends AbstractEsBoltIntegrationTest<E
             return new Fields("data");
         }
     }
+
+    @Override
+    protected Class<EsLookupBolt> getBoltClass() {
+        return EsLookupBolt.class;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltTest.java
index 70ea140..175b2a4 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltTest.java
@@ -117,4 +117,9 @@ public class EsLookupBoltTest extends AbstractEsBoltTest<EsLookupBolt> {
 
         assertThat(declaredFields.getValue(), is(fields));
     }
+
+    @Override
+    protected Class<EsLookupBolt> getBoltClass() {
+        return EsLookupBolt.class;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
index d68cc89..c5ed91f 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
@@ -52,4 +52,9 @@ public class EsPercolateBoltTest extends AbstractEsBoltIntegrationTest<EsPercola
         verify(outputCollector).ack(tuple);
         verify(outputCollector).emit(new Values(source, any(PercolateResponse.Match.class)));
     }
+
+    @Override
+    protected Class<EsPercolateBolt> getBoltClass() {
+        return EsPercolateBolt.class;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/TransportAddressesTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/TransportAddressesTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/TransportAddressesTest.java
deleted file mode 100644
index 411f187..0000000
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/TransportAddressesTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.elasticsearch.bolt;
-
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.junit.Test;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertThat;
-
-public class TransportAddressesTest {
-
-    @Test
-    public void readsMultipleHosts() throws Exception {
-        String[] hosts = new String[] {"h1:1000", "h2:10003"};
-        TransportAddresses addresses = new TransportAddresses(hosts);
-        assertThat(addresses, containsInAnyOrder(new InetSocketTransportAddress("h1", 1000),
-                                                 new InetSocketTransportAddress("h2", 10003)));
-    }
-
-    @Test
-    public void stripsSpaces() throws Exception {
-        String[] hosts = new String[] {"h1:1000", " h2:10003 "};
-        TransportAddresses addresses = new TransportAddresses(hosts);
-        assertThat(addresses, containsInAnyOrder(new InetSocketTransportAddress("h1", 1000),
-                                                 new InetSocketTransportAddress("h2", 10003)));
-    }
-
-    @Test
-    public void readsOneHost() throws Exception {
-        String[] hosts = new String[] {"h1:1000"};
-        TransportAddresses addresses = new TransportAddresses(hosts);
-        assertThat(addresses, containsInAnyOrder(new InetSocketTransportAddress("h1", 1000)));
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void throwsOnNullHosts() throws Exception {
-        new TransportAddresses(null);
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void throwsOnEmptyArray() throws Exception {
-        new TransportAddresses(new String[] {}).iterator();
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void throwsOnInvalidHostAndPortPair() throws Exception {
-        new TransportAddresses(new String[] {"h1:1000", "h2"}).iterator();
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void throwsOnInvalidPortValue() throws Exception {
-        new TransportAddresses(new String[] {"h1:-1000"}).iterator();
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void throwsOnPortNotANumber() throws Exception {
-        new TransportAddresses(new String[] {"h1:dummy"}).iterator();
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void throwsOnInvalidHostAndPortFormat() throws Exception {
-        new TransportAddresses(new String[] {"h1:dummy:231"}).iterator();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConfigTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConfigTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConfigTest.java
new file mode 100644
index 0000000..de28940
--- /dev/null
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConfigTest.java
@@ -0,0 +1,54 @@
+package org.apache.storm.elasticsearch.common;
+
+import java.util.Map;
+import java.util.UUID;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.testing.NullPointerTester;
+
+import org.elasticsearch.common.settings.Settings;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+public class EsConfigTest {
+
+    private String clusterName = "name";
+    private String[] nodes = new String[] {"localhost:9300"};
+
+    @Test(expected = IllegalArgumentException.class)
+    public void nodesCannotBeEmpty() throws Exception {
+        new EsConfig(clusterName, new String[] {});
+    }
+
+    @Test
+    public void settingsContainClusterName() throws Exception {
+        EsConfig esConfig = new EsConfig(clusterName, nodes);
+        assertThat(esConfig.toBasicSettings().get("cluster.name"), is(clusterName));
+    }
+
+    @Test
+    public void usesAdditionalConfiguration() throws Exception {
+        Map<String, String> additionalSettings = additionalSettings();
+        EsConfig esConfig = new EsConfig(clusterName, nodes, additionalSettings);
+        Settings settings = esConfig.toBasicSettings();
+        assertSettingsContainAllAdditionalValues(settings, additionalSettings);
+    }
+
+    private Map<String, String> additionalSettings() {
+        return ImmutableMap.of("client.transport.sniff", "true", UUID.randomUUID().toString(),
+                               UUID.randomUUID().toString());
+    }
+
+    private void assertSettingsContainAllAdditionalValues(Settings settings, Map<String, String> additionalSettings) {
+        for (String key : additionalSettings.keySet()) {
+            assertThat(settings.get(key), is(additionalSettings.get(key)));
+        }
+    }
+
+    @Test
+    public void constructorThrowsOnNull() throws Exception {
+        new NullPointerTester().testAllPublicConstructors(EsConfig.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/SniffSettings.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/SniffSettings.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/SniffSettings.java
new file mode 100644
index 0000000..e2666ee
--- /dev/null
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/SniffSettings.java
@@ -0,0 +1,5 @@
+package org.apache.storm.elasticsearch.common;
+
+public final class SniffSettings {
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/TransportAddressesTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/TransportAddressesTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/TransportAddressesTest.java
new file mode 100644
index 0000000..f2e4936
--- /dev/null
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/TransportAddressesTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.common;
+
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+
+public class TransportAddressesTest {
+
+    @Test
+    public void readsMultipleHosts() throws Exception {
+        String[] hosts = new String[] {"h1:1000", "h2:10003"};
+        TransportAddresses addresses = new TransportAddresses(hosts);
+        assertThat(addresses, containsInAnyOrder(new InetSocketTransportAddress("h1", 1000),
+                                                 new InetSocketTransportAddress("h2", 10003)));
+    }
+
+    @Test
+    public void stripsSpaces() throws Exception {
+        String[] hosts = new String[] {"h1:1000", " h2:10003 "};
+        TransportAddresses addresses = new TransportAddresses(hosts);
+        assertThat(addresses, containsInAnyOrder(new InetSocketTransportAddress("h1", 1000),
+                                                 new InetSocketTransportAddress("h2", 10003)));
+    }
+
+    @Test
+    public void readsOneHost() throws Exception {
+        String[] hosts = new String[] {"h1:1000"};
+        TransportAddresses addresses = new TransportAddresses(hosts);
+        assertThat(addresses, containsInAnyOrder(new InetSocketTransportAddress("h1", 1000)));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void throwsOnNullHosts() throws Exception {
+        new TransportAddresses(null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void throwsOnEmptyArray() throws Exception {
+        new TransportAddresses(new String[] {}).iterator();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void throwsOnInvalidHostAndPortPair() throws Exception {
+        new TransportAddresses(new String[] {"h1:1000", "h2"}).iterator();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void throwsOnInvalidPortValue() throws Exception {
+        new TransportAddresses(new String[] {"h1:-1000"}).iterator();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void throwsOnPortNotANumber() throws Exception {
+        new TransportAddresses(new String[] {"h1:dummy"}).iterator();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void throwsOnInvalidHostAndPortFormat() throws Exception {
+        new TransportAddresses(new String[] {"h1:dummy:231"}).iterator();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
index 2c951f8..1ae1df7 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
@@ -45,9 +45,7 @@ public class TridentEsTopology {
 
         TridentTopology topology = new TridentTopology();
         Stream stream = topology.newStream("spout", spout);
-        EsConfig esConfig = new EsConfig();
-        esConfig.setClusterName(EsConstants.clusterName);
-        esConfig.setNodes(new String[]{"localhost:9300"});
+        EsConfig esConfig = new EsConfig(EsConstants.clusterName, new String[]{"localhost:9300"});
         Fields esFields = new Fields("index", "type", "source");
         StateFactory factory = new EsStateFactory(esConfig);
         TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields());


[08/10] storm git commit: Remove guava leftovers

Posted by ka...@apache.org.
Remove guava leftovers


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fff82fd8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fff82fd8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fff82fd8

Branch: refs/heads/master
Commit: fff82fd8d4e72993af61c2ca6725dcd4a9af2d1f
Parents: 4f53fc4
Author: Alex Panov <al...@teradata.com>
Authored: Mon Aug 31 17:31:44 2015 +0200
Committer: Alex Panov <al...@teradata.com>
Committed: Mon Aug 31 17:31:44 2015 +0200

----------------------------------------------------------------------
 .../org/apache/storm/elasticsearch/common/EsConfig.java     | 9 ++++-----
 1 file changed, 4 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/fff82fd8/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
index 6e9b128..6bbd81f 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
@@ -19,10 +19,9 @@ package org.apache.storm.elasticsearch.common;
 
 import java.io.Serializable;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 
-import com.google.common.collect.ImmutableMap;
-
 import org.elasticsearch.common.settings.ImmutableSettings;
 import org.elasticsearch.common.settings.Settings;
 
@@ -44,7 +43,7 @@ public class EsConfig implements Serializable {
      * @param clusterName Elasticsearch cluster name
      * @param nodes       Elasticsearch addresses in host:port pattern string array
      * @throws IllegalArgumentException if nodes are empty
-     * @throws NullPointerException on any of the fields being null
+     * @throws NullPointerException     on any of the fields being null
      */
     public EsConfig(String clusterName, String[] nodes) {
         this(clusterName, nodes, Collections.<String, String>emptyMap());
@@ -57,7 +56,7 @@ public class EsConfig implements Serializable {
      * @param nodes                   Elasticsearch addresses in host:port pattern string array
      * @param additionalConfiguration Additional Elasticsearch configuration
      * @throws IllegalArgumentException if nodes are empty
-     * @throws NullPointerException on any of the fields being null
+     * @throws NullPointerException     on any of the fields being null
      */
     public EsConfig(String clusterName, String[] nodes, Map<String, String> additionalConfiguration) {
         checkNotNull(clusterName);
@@ -67,7 +66,7 @@ public class EsConfig implements Serializable {
         checkArgument(nodes.length != 0, "Nodes cannot be empty");
         this.clusterName = clusterName;
         this.nodes = nodes;
-        this.additionalConfiguration = ImmutableMap.copyOf(additionalConfiguration);
+        this.additionalConfiguration = new HashMap<>(additionalConfiguration);
     }
 
     TransportAddresses getTransportAddresses() {


[03/10] storm git commit: Switch to ES shaded Preconditions instead of guava

Posted by ka...@apache.org.
Switch to ES shaded Preconditions instead of guava


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/677d3c5d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/677d3c5d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/677d3c5d

Branch: refs/heads/master
Commit: 677d3c5d601ccd954982d8753fa0a67a21ca3694
Parents: ae0f814
Author: Alex Panov <al...@teradata.com>
Authored: Sun Aug 30 12:11:28 2015 +0200
Committer: Alex Panov <al...@teradata.com>
Committed: Sun Aug 30 12:11:28 2015 +0200

----------------------------------------------------------------------
 .../java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java | 2 +-
 .../java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java    | 4 +++-
 .../java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java   | 2 +-
 .../org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java     | 4 +++-
 .../java/org/apache/storm/elasticsearch/common/EsConfig.java     | 4 ++--
 .../java/org/apache/storm/elasticsearch/trident/EsState.java     | 1 -
 .../org/apache/storm/elasticsearch/trident/EsStateFactory.java   | 3 +--
 7 files changed, 11 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/677d3c5d/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
index fa7356a..784a57f 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
@@ -32,7 +32,7 @@ import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.base.BaseRichBolt;
 import backtype.storm.tuple.Tuple;
 
-import static com.google.common.base.Preconditions.checkNotNull;
+import static org.elasticsearch.common.base.Preconditions.checkNotNull;
 
 public abstract class AbstractEsBolt extends BaseRichBolt {
 

http://git-wip-us.apache.org/repos/asf/storm/blob/677d3c5d/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
index 36b9d28..1c5983e 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
@@ -26,6 +26,8 @@ import org.apache.storm.elasticsearch.common.EsTupleMapper;
 
 import java.util.Map;
 
+import static org.elasticsearch.common.base.Preconditions.checkNotNull;
+
 /**
  * Basic bolt for storing tuple to ES document.
  */
@@ -39,7 +41,7 @@ public class EsIndexBolt extends AbstractEsBolt {
      */
     public EsIndexBolt(EsConfig esConfig, EsTupleMapper tupleMapper) {
         super(esConfig);
-        this.tupleMapper = tupleMapper;
+        this.tupleMapper = checkNotNull(tupleMapper);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/677d3c5d/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java
index bd7178b..1676a79 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java
@@ -29,7 +29,7 @@ import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
 
-import static com.google.common.base.Preconditions.checkNotNull;
+import static org.elasticsearch.common.base.Preconditions.checkNotNull;
 
 /**
  * @since 0.11

http://git-wip-us.apache.org/repos/asf/storm/blob/677d3c5d/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
index 27e5e00..a361464 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
@@ -30,6 +30,8 @@ import org.elasticsearch.action.percolate.PercolateSourceBuilder;
 
 import java.util.Map;
 
+import static org.elasticsearch.common.base.Preconditions.checkNotNull;
+
 /**
  * Basic bolt for retrieve matched percolate queries.
  */
@@ -44,7 +46,7 @@ public class EsPercolateBolt extends AbstractEsBolt {
      */
     public EsPercolateBolt(EsConfig esConfig, EsTupleMapper tupleMapper) {
         super(esConfig);
-        this.tupleMapper = tupleMapper;
+        this.tupleMapper = checkNotNull(tupleMapper);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/677d3c5d/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
index 5724519..6e9b128 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
@@ -26,8 +26,8 @@ import com.google.common.collect.ImmutableMap;
 import org.elasticsearch.common.settings.ImmutableSettings;
 import org.elasticsearch.common.settings.Settings;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
+import static org.elasticsearch.common.base.Preconditions.checkArgument;
+import static org.elasticsearch.common.base.Preconditions.checkNotNull;
 
 /**
  * @since 0.11

http://git-wip-us.apache.org/repos/asf/storm/blob/677d3c5d/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
index 1eb0d04..c066553 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
@@ -93,7 +93,6 @@ class EsState implements State {
      *
      * @param tuples list of tuples for storing to ES.
      *               Each tuple should have relevant fields (source, index, type, id) for EsState's tupleMapper to extract ES document.
-     * @param collector
      */
     public void updateState(List<TridentTuple> tuples) {
         BulkRequestBuilder bulkRequest = client.prepareBulk();

http://git-wip-us.apache.org/repos/asf/storm/blob/677d3c5d/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
index e5733e7..6904298 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
@@ -32,8 +32,7 @@ import java.util.Map;
  */
 class EsStateFactory implements StateFactory {
     private final EsConfig esConfig;
-    private EsConfig esConfig;
-    private EsTupleMapper tupleMapper;
+    private final EsTupleMapper tupleMapper;
 
     /**
      * EsStateFactory constructor


[09/10] storm git commit: Merge branch 'master' of https://github.com/alexpanov/storm into STORM-1013

Posted by ka...@apache.org.
Merge branch 'master' of https://github.com/alexpanov/storm into STORM-1013


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/25aecf17
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/25aecf17
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/25aecf17

Branch: refs/heads/master
Commit: 25aecf17002b04c43ac258f3ef027f67a3de60df
Parents: bf5c35b fff82fd
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Sep 1 22:22:45 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Sep 1 22:22:45 2015 +0900

----------------------------------------------------------------------
 external/storm-elasticsearch/README.md          | 37 ++++-----
 external/storm-elasticsearch/pom.xml            |  6 ++
 .../elasticsearch/bolt/AbstractEsBolt.java      | 10 ++-
 .../elasticsearch/bolt/ElasticSearchClient.java | 57 --------------
 .../storm/elasticsearch/bolt/EsIndexBolt.java   |  4 +-
 .../storm/elasticsearch/bolt/EsLookupBolt.java  | 10 +++
 .../elasticsearch/bolt/EsPercolateBolt.java     |  4 +-
 .../elasticsearch/bolt/TransportAddresses.java  | 72 -----------------
 .../storm/elasticsearch/common/EsConfig.java    | 64 +++++++++++-----
 .../common/StormElasticSearchClient.java        | 48 ++++++++++++
 .../common/TransportAddresses.java              | 72 +++++++++++++++++
 .../storm/elasticsearch/trident/EsState.java    | 32 ++------
 .../elasticsearch/trident/EsStateFactory.java   | 20 ++---
 .../storm/elasticsearch/trident/EsUpdater.java  |  2 +-
 .../elasticsearch/bolt/AbstractEsBoltTest.java  | 15 +++-
 .../elasticsearch/bolt/EsIndexBoltTest.java     |  5 ++
 .../elasticsearch/bolt/EsIndexTopology.java     |  4 +-
 .../bolt/EsLookupBoltIntegrationTest.java       |  5 ++
 .../elasticsearch/bolt/EsLookupBoltTest.java    |  5 ++
 .../elasticsearch/bolt/EsPercolateBoltTest.java |  5 ++
 .../bolt/TransportAddressesTest.java            | 81 --------------------
 .../elasticsearch/common/EsConfigTest.java      | 71 +++++++++++++++++
 .../common/TransportAddressesTest.java          | 81 ++++++++++++++++++++
 .../trident/EsStateFactoryTest.java             | 32 ++++++++
 .../trident/TridentEsTopology.java              |  6 +-
 25 files changed, 450 insertions(+), 298 deletions(-)
----------------------------------------------------------------------



[06/10] storm git commit: Apply suggested changes in the pull request: - Add Apache License to EsConfigTest; - Remove unused class; - Change README

Posted by ka...@apache.org.
Apply suggested changes in the pull request:
 - Add Apache License to EsConfigTest;
 - Remove unused class;
 - Change README


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1469c647
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1469c647
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1469c647

Branch: refs/heads/master
Commit: 1469c647799c2d9d644c40e078fc0d533f568699
Parents: 189ba57
Author: Alex Panov <al...@teradata.com>
Authored: Mon Aug 31 10:55:24 2015 +0200
Committer: Alex Panov <al...@teradata.com>
Committed: Mon Aug 31 10:55:24 2015 +0200

----------------------------------------------------------------------
 external/storm-elasticsearch/README.md             |  2 +-
 .../storm/elasticsearch/common/EsConfigTest.java   | 17 +++++++++++++++++
 .../storm/elasticsearch/common/SniffSettings.java  |  5 -----
 3 files changed, 18 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1469c647/external/storm-elasticsearch/README.md
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/README.md b/external/storm-elasticsearch/README.md
index 06a350c..2c52531 100644
--- a/external/storm-elasticsearch/README.md
+++ b/external/storm-elasticsearch/README.md
@@ -84,7 +84,7 @@ EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"}, ad
 |---	|--- |---
 |clusterName | Elasticsearch cluster name | String (required) |
 |nodes | Elasticsearch nodes in a String array, each element should follow {host}:{port} pattern | String array (required) |
-|additionalParameters | Additional Elasticsearch configuration parameters | Map<String, String> (optional) |
+|additionalParameters | Additional Elasticsearch Transport Client configuration parameters | Map<String, String> (optional) |
 
 ## EsTupleMapper (org.apache.storm.elasticsearch.common.EsTupleMapper)
 

http://git-wip-us.apache.org/repos/asf/storm/blob/1469c647/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConfigTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConfigTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConfigTest.java
index de28940..d50337f 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConfigTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConfigTest.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.storm.elasticsearch.common;
 
 import java.util.Map;

http://git-wip-us.apache.org/repos/asf/storm/blob/1469c647/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/SniffSettings.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/SniffSettings.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/SniffSettings.java
deleted file mode 100644
index e2666ee..0000000
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/SniffSettings.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package org.apache.storm.elasticsearch.common;
-
-public final class SniffSettings {
-
-}


[07/10] storm git commit: Add Apache Licence to EsStateFactoryTest

Posted by ka...@apache.org.
Add Apache Licence to EsStateFactoryTest


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4f53fc4c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4f53fc4c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4f53fc4c

Branch: refs/heads/master
Commit: 4f53fc4c7171c832b53ce9c6606d0cc7da208037
Parents: 1469c64
Author: Alex Panov <al...@teradata.com>
Authored: Mon Aug 31 12:13:47 2015 +0200
Committer: Alex Panov <al...@teradata.com>
Committed: Mon Aug 31 12:13:47 2015 +0200

----------------------------------------------------------------------
 .../elasticsearch/trident/EsStateFactoryTest.java  | 17 +++++++++++++++++
 1 file changed, 17 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4f53fc4c/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateFactoryTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateFactoryTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateFactoryTest.java
index ce9eaf3..e2eb13e 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateFactoryTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateFactoryTest.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.storm.elasticsearch.trident;
 
 import com.google.common.testing.NullPointerTester;


[05/10] storm git commit: Update storm-elasticsearch documentation

Posted by ka...@apache.org.
Update storm-elasticsearch documentation


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/189ba57e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/189ba57e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/189ba57e

Branch: refs/heads/master
Commit: 189ba57eb11854c1cd3db7b54ad8dd341c534b58
Parents: 499f891
Author: Alex Panov <al...@teradata.com>
Authored: Sun Aug 30 12:22:07 2015 +0200
Committer: Alex Panov <al...@teradata.com>
Committed: Sun Aug 30 12:22:07 2015 +0200

----------------------------------------------------------------------
 external/storm-elasticsearch/README.md | 37 +++++++++++++++--------------
 1 file changed, 19 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/189ba57e/external/storm-elasticsearch/README.md
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/README.md b/external/storm-elasticsearch/README.md
index a4792c7..06a350c 100644
--- a/external/storm-elasticsearch/README.md
+++ b/external/storm-elasticsearch/README.md
@@ -11,9 +11,7 @@ Users should make sure that ```EsTupleMapper``` can extract "source", "index", "
 "source" is a document in JSON format string that will be indexed in Elasticsearch.
 
 ```java
-EsConfig esConfig = new EsConfig();
-esConfig.setClusterName(clusterName);
-esConfig.setNodes(new String[]{"localhost:9300"});
+EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
 EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
 EsIndexBolt indexBolt = new EsIndexBolt(esConfig, tupleMapper);
 ```
@@ -26,9 +24,7 @@ User should make sure ```EsTupleMapper``` can extract "source", "index", "type"
 "source" is a document in JSON format string that will be sent in percolate request to Elasticsearch.
 
 ```java
-EsConfig esConfig = new EsConfig();
-esConfig.setClusterName(clusterName);
-esConfig.setNodes(new String[]{"localhost:9300"});
+EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
 EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
 EsPercolateBolt percolateBolt = new EsPercolateBolt(esConfig, tupleMapper);
 ```
@@ -40,14 +36,12 @@ for each Percolate.Match in PercolateResponse.
 
 Elasticsearch Trident state also follows similar pattern to EsBolts. It takes in EsConfig and EsTupleMapper as an arg.
 
-```code
-   EsConfig esConfig = new EsConfig();
-   esConfig.setClusterName(clusterName);
-   esConfig.setNodes(new String[]{"localhost:9300"});
-   EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
+```java
+EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
+EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
 
-   StateFactory factory = new EsStateFactory(esConfig, tupleMapper);
-   TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields());
+StateFactory factory = new EsStateFactory(esConfig, tupleMapper);
+TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields());
  ```
 
 ## EsLookupBolt (org.apache.storm.elasticsearch.bolt.EsLookupBolt)
@@ -72,11 +66,17 @@ EsLookupBolt lookupBolt = new EsLookupBolt(esConfig, getRequestAdapter, output);
   
 Provided components (Bolt, State) takes in EsConfig as a constructor arg.
 
-  ```java
-   EsConfig esConfig = new EsConfig();
-   esConfig.setClusterName(clusterName);
-   esConfig.setNodes(new String[]{"localhost:9300"});
-  ```
+```java
+EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
+```
+
+or
+
+```java
+Map<String, String> additionalParameters = new HashMap<>();
+additionalParameters.put("client.transport.sniff", "true");
+EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"}, additionalParameters);
+```
 
 ### EsConfig params
 
@@ -84,6 +84,7 @@ Provided components (Bolt, State) takes in EsConfig as a constructor arg.
 |---	|--- |---
 |clusterName | Elasticsearch cluster name | String (required) |
 |nodes | Elasticsearch nodes in a String array, each element should follow {host}:{port} pattern | String array (required) |
+|additionalParameters | Additional Elasticsearch configuration parameters | Map<String, String> (optional) |
 
 ## EsTupleMapper (org.apache.storm.elasticsearch.common.EsTupleMapper)
 


[04/10] storm git commit: Make EsStateFactory public, add tests

Posted by ka...@apache.org.
Make EsStateFactory public, add tests


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/499f8911
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/499f8911
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/499f8911

Branch: refs/heads/master
Commit: 499f891153c575b46f6b69a233f8e9e99e2183e4
Parents: 677d3c5
Author: Alex Panov <al...@teradata.com>
Authored: Sun Aug 30 12:14:10 2015 +0200
Committer: Alex Panov <al...@teradata.com>
Committed: Sun Aug 30 12:14:10 2015 +0200

----------------------------------------------------------------------
 .../storm/elasticsearch/trident/EsStateFactory.java  | 10 ++++++----
 .../elasticsearch/trident/EsStateFactoryTest.java    | 15 +++++++++++++++
 2 files changed, 21 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/499f8911/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
index 6904298..e85cdc2 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
@@ -26,11 +26,13 @@ import storm.trident.state.StateFactory;
 
 import java.util.Map;
 
+import static org.elasticsearch.common.base.Preconditions.checkNotNull;
+
 /**
  * StateFactory for providing EsState.
  * @since 0.11
  */
-class EsStateFactory implements StateFactory {
+public class EsStateFactory implements StateFactory {
     private final EsConfig esConfig;
     private final EsTupleMapper tupleMapper;
 
@@ -39,9 +41,9 @@ class EsStateFactory implements StateFactory {
      * @param esConfig Elasticsearch configuration containing node addresses and cluster name {@link EsConfig}
      * @param tupleMapper Tuple to ES document mapper {@link EsTupleMapper}
      */
-    EsStateFactory(EsConfig esConfig, EsTupleMapper tupleMapper){
-        this.esConfig = esConfig;
-        this.tupleMapper = tupleMapper;
+    public EsStateFactory(EsConfig esConfig, EsTupleMapper tupleMapper) {
+        this.esConfig = checkNotNull(esConfig);
+        this.tupleMapper = checkNotNull(tupleMapper);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/499f8911/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateFactoryTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateFactoryTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateFactoryTest.java
new file mode 100644
index 0000000..ce9eaf3
--- /dev/null
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateFactoryTest.java
@@ -0,0 +1,15 @@
+package org.apache.storm.elasticsearch.trident;
+
+import com.google.common.testing.NullPointerTester;
+
+import org.apache.storm.elasticsearch.common.EsConfig;
+import org.junit.Test;
+
+public class EsStateFactoryTest {
+
+    @Test
+    public void constructorThrowsOnNull() throws Exception {
+        new NullPointerTester().setDefault(EsConfig.class, new EsConfig("cluster", new String[] {"localhost:9300"}))
+                               .testAllPublicConstructors(EsStateFactory.class);
+    }
+}


[10/10] storm git commit: add STORM-1013 to CHANGELOG.md

Posted by ka...@apache.org.
add STORM-1013 to CHANGELOG.md


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/43687812
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/43687812
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/43687812

Branch: refs/heads/master
Commit: 436878128310afda4519e625c5db0339b889b811
Parents: 25aecf1
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Sep 1 22:31:30 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Sep 1 22:31:30 2015 +0900

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/43687812/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index a917e3a..dce6010 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.11.0
+ * STORM-1013: [storm-elasticsearch] Expose TransportClient configuration Map to EsConfig
  * STORM-1012: Shading jackson.
  * STORM-974: [storm-elasticsearch] Introduces Tuple -> ES document mapper to get rid of constant field mapping (source, index, type)
  * STORM-978: [storm-elasticsearch] Introduces Lookup(or Query)Bolt which emits matched documents from ES


[02/10] storm git commit: Merge remote-tracking branch 'upstream/master'

Posted by ka...@apache.org.
Merge remote-tracking branch 'upstream/master'

# Conflicts:
#	external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
#	external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
#	external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
#	external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
#	external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ae0f814d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ae0f814d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ae0f814d

Branch: refs/heads/master
Commit: ae0f814d589f718c4f989adffccea9b24bbfd3cf
Parents: 6a54fdb 07e0ff2
Author: Alex Panov <al...@teradata.com>
Authored: Sun Aug 30 11:48:37 2015 +0200
Committer: Alex Panov <al...@teradata.com>
Committed: Sun Aug 30 11:48:37 2015 +0200

----------------------------------------------------------------------
 .gitignore                                      |   2 +-
 CHANGELOG.md                                    |  11 +-
 README.markdown                                 |   4 +
 STORM-UI-REST-API.md                            |  41 +-
 conf/defaults.yaml                              |   7 +-
 .../nimbus_ha_leader_election_and_failover.png  | Bin 0 -> 154316 bytes
 .../images/nimbus_ha_topology_submission.png    | Bin 0 -> 134180 bytes
 docs/documentation/nimbus-ha-design.md          | 217 +++++
 external/storm-elasticsearch/README.md          |  48 +-
 .../storm/elasticsearch/bolt/EsIndexBolt.java   |  23 +-
 .../elasticsearch/bolt/EsPercolateBolt.java     |  24 +-
 .../common/DefaultEsTupleMapper.java            |  42 +
 .../elasticsearch/common/EsTupleMapper.java     |  55 ++
 .../storm/elasticsearch/trident/EsState.java    |  22 +-
 .../elasticsearch/trident/EsStateFactory.java   |  10 +-
 .../storm/elasticsearch/trident/EsUpdater.java  |   4 +
 .../elasticsearch/bolt/AbstractEsBoltTest.java  |   5 +-
 .../elasticsearch/bolt/EsIndexBoltTest.java     |   7 +-
 .../elasticsearch/bolt/EsIndexTopology.java     |   4 +-
 .../elasticsearch/bolt/EsPercolateBoltTest.java |   4 +-
 .../storm/elasticsearch/common/EsTestUtil.java  |   5 +
 .../trident/TridentEsTopology.java              |   4 +-
 external/storm-hbase/pom.xml                    |   2 +-
 external/storm-hdfs/pom.xml                     |   4 +-
 .../ha/codedistributor/HDFSCodeDistributor.java | 101 +++
 external/storm-kafka/README.md                  |   3 +
 .../src/jvm/storm/kafka/KafkaSpout.java         |   6 +-
 .../src/jvm/storm/kafka/PartitionManager.java   |  10 +-
 .../src/jvm/storm/kafka/SpoutConfig.java        |   3 +
 external/storm-solr/README.md                   | 201 +++++
 external/storm-solr/pom.xml                     |  98 +++
 .../apache/storm/solr/bolt/SolrUpdateBolt.java  | 136 ++++
 .../storm/solr/config/CountBasedCommit.java     |  59 ++
 .../storm/solr/config/SolrCommitStrategy.java   |  30 +
 .../apache/storm/solr/config/SolrConfig.java    |  42 +
 .../storm/solr/mapper/SolrFieldsMapper.java     | 182 +++++
 .../storm/solr/mapper/SolrJsonMapper.java       | 160 ++++
 .../apache/storm/solr/mapper/SolrMapper.java    |  32 +
 .../storm/solr/mapper/SolrMapperException.java  |  24 +
 .../org/apache/storm/solr/schema/CopyField.java |  50 ++
 .../org/apache/storm/solr/schema/Field.java     |  50 ++
 .../org/apache/storm/solr/schema/FieldType.java |  63 ++
 .../org/apache/storm/solr/schema/Schema.java    | 116 +++
 .../storm/solr/schema/SolrFieldTypeFinder.java  | 182 +++++
 .../schema/builder/RestJsonSchemaBuilder.java   |  69 ++
 .../solr/schema/builder/SchemaBuilder.java      |  27 +
 .../apache/storm/solr/trident/SolrState.java    |  67 ++
 .../storm/solr/trident/SolrStateFactory.java    |  44 +
 .../apache/storm/solr/trident/SolrUpdater.java  |  33 +
 .../storm/solr/spout/SolrFieldsSpout.java       |  76 ++
 .../apache/storm/solr/spout/SolrJsonSpout.java  | 120 +++
 .../storm/solr/topology/SolrFieldsTopology.java |  56 ++
 .../storm/solr/topology/SolrJsonTopology.java   |  48 ++
 .../storm/solr/topology/SolrTopology.java       |  82 ++
 .../solr/trident/SolrFieldsTridentTopology.java |  45 ++
 .../solr/trident/SolrJsonTridentTopology.java   |  45 ++
 .../org/apache/storm/solr/util/TestUtil.java    |  30 +
 pom.xml                                         |  19 +-
 storm-core/pom.xml                              |  79 +-
 storm-core/src/clj/backtype/storm/cluster.clj   |  94 ++-
 .../backtype/storm/command/shell_submission.clj |   9 +-
 storm-core/src/clj/backtype/storm/config.clj    |  15 +-
 .../src/clj/backtype/storm/daemon/nimbus.clj    | 252 ++++--
 .../clj/backtype/storm/daemon/supervisor.clj    |  53 +-
 storm-core/src/clj/backtype/storm/thrift.clj    |  23 +-
 storm-core/src/clj/backtype/storm/timer.clj     |   7 +-
 storm-core/src/clj/backtype/storm/ui/core.clj   |  95 ++-
 storm-core/src/clj/backtype/storm/zookeeper.clj | 108 ++-
 storm-core/src/jvm/backtype/storm/Config.java   |  47 +-
 .../storm/codedistributor/ICodeDistributor.java |  56 ++
 .../LocalFileSystemCodeDistributor.java         | 106 +++
 .../storm/generated/ClusterSummary.java         | 292 ++++---
 .../backtype/storm/generated/NimbusSummary.java | 796 +++++++++++++++++++
 .../backtype/storm/generated/TopologyInfo.java  | 221 +++--
 .../storm/generated/TopologySummary.java        | 107 ++-
 .../backtype/storm/nimbus/ILeaderElector.java   |  60 ++
 .../jvm/backtype/storm/nimbus/NimbusInfo.java   |  93 +++
 .../jvm/backtype/storm/utils/NimbusClient.java  |  78 +-
 .../jvm/backtype/storm/utils/RotatingMap.java   |  12 +-
 .../jvm/backtype/storm/utils/TimeCacheMap.java  |  60 +-
 .../src/jvm/backtype/storm/utils/Utils.java     |   9 +
 storm-core/src/py/storm/ttypes.py               | 613 ++++++++------
 storm-core/src/storm.thrift                     |  12 +-
 storm-core/src/ui/public/index.html             |  21 +
 .../public/templates/index-page-template.html   |  59 +-
 .../templates/topology-page-template.html       |  18 +-
 storm-core/src/ui/public/topology.html          |  35 +-
 .../test/clj/backtype/storm/cluster_test.clj    |  23 +-
 .../storm/messaging/netty_unit_test.clj         |   2 +-
 .../test/clj/backtype/storm/nimbus_test.clj     | 210 +++--
 .../backtype/storm/security/auth/auth_test.clj  |   4 +-
 .../storm/security/auth/nimbus_auth_test.clj    |  14 +-
 .../test/clj/backtype/storm/supervisor_test.clj |   1 +
 .../test/clj/backtype/storm/utils_test.clj      |  12 -
 storm-dist/binary/src/main/assembly/binary.xml  |  14 +
 95 files changed, 5721 insertions(+), 843 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ae0f814d/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
----------------------------------------------------------------------
diff --cc external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
index e60c003,e3865e5..1eb0d04
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
@@@ -17,10 -17,10 +17,11 @@@
   */
  package org.apache.storm.elasticsearch.trident;
  
 -import backtype.storm.task.IMetricsContext;
  import backtype.storm.topology.FailedException;
 +
 +import org.apache.storm.elasticsearch.common.StormElasticSearchClient;
  import org.apache.storm.elasticsearch.common.EsConfig;
+ import org.apache.storm.elasticsearch.common.EsTupleMapper;
  import org.elasticsearch.action.bulk.BulkRequestBuilder;
  import org.elasticsearch.action.bulk.BulkResponse;
  import org.elasticsearch.client.Client;
@@@ -29,12 -33,15 +30,13 @@@ import org.slf4j.LoggerFactory
  import storm.trident.state.State;
  import storm.trident.tuple.TridentTuple;
  
 -import java.util.ArrayList;
  import java.util.List;
 -import java.util.Map;
  
  /**
+  * Trident State for storing tuple to ES document.
 + * @since 0.11
   */
 -public class EsState implements State {
 +class EsState implements State {
      private static final Logger LOG = LoggerFactory.getLogger(EsState.class);
      private static Client client;
      private EsConfig esConfig;
@@@ -83,13 -105,20 +88,20 @@@
          }
      }
  
+     /**
+      * Store current state to ElasticSearch.
+      *
+      * @param tuples list of tuples for storing to ES.
+      *               Each tuple should have relevant fields (source, index, type, id) for EsState's tupleMapper to extract ES document.
+      * @param collector
+      */
 -    public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
 +    public void updateState(List<TridentTuple> tuples) {
          BulkRequestBuilder bulkRequest = client.prepareBulk();
          for (TridentTuple tuple : tuples) {
-             String source = tuple.getStringByField("source");
-             String index = tuple.getStringByField("index");
-             String type = tuple.getStringByField("type");
-             String id = tuple.getStringByField("id");
+             String source = tupleMapper.getSource(tuple);
+             String index = tupleMapper.getIndex(tuple);
+             String type = tupleMapper.getType(tuple);
+             String id = tupleMapper.getId(tuple);
  
              bulkRequest.add(client.prepareIndex(index, type, id).setSource(source));
          }

http://git-wip-us.apache.org/repos/asf/storm/blob/ae0f814d/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
----------------------------------------------------------------------
diff --cc external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
index 4f4dd39,9fdf7c6..e5733e7
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
@@@ -19,30 -19,37 +19,36 @@@ package org.apache.storm.elasticsearch.
  
  import backtype.storm.task.IMetricsContext;
  import org.apache.storm.elasticsearch.common.EsConfig;
+ import org.apache.storm.elasticsearch.common.EsTupleMapper;
 +
  import storm.trident.state.State;
  import storm.trident.state.StateFactory;
  
  import java.util.Map;
  
  /**
+  * StateFactory for providing EsState.
 + * @since 0.11
   */
 -public class EsStateFactory implements StateFactory {
 +class EsStateFactory implements StateFactory {
 +    private final EsConfig esConfig;
+     private EsConfig esConfig;
+     private EsTupleMapper tupleMapper;
  
 -    public EsStateFactory(){
 -
 -    }
 -
      /**
       * EsStateFactory constructor
       * @param esConfig Elasticsearch configuration containing node addresses and cluster name {@link EsConfig}
+      * @param tupleMapper Tuple to ES document mapper {@link EsTupleMapper}
       */
-     EsStateFactory(EsConfig esConfig){
 -    public EsStateFactory(EsConfig esConfig, EsTupleMapper tupleMapper){
++    EsStateFactory(EsConfig esConfig, EsTupleMapper tupleMapper){
          this.esConfig = esConfig;
+         this.tupleMapper = tupleMapper;
      }
  
      @Override
      public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
-         EsState esState = new EsState(esConfig);
+         EsState esState = new EsState(esConfig, tupleMapper);
 -        esState.prepare(conf, metrics, partitionIndex, numPartitions);
 +        esState.prepare();
          return esState;
      }
  }

http://git-wip-us.apache.org/repos/asf/storm/blob/ae0f814d/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsUpdater.java
----------------------------------------------------------------------
diff --cc external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsUpdater.java
index 685ea8c,935c92e..1fb998b
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsUpdater.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsUpdater.java
@@@ -24,8 -24,12 +24,12 @@@ import storm.trident.tuple.TridentTuple
  import java.util.List;
  
  public class EsUpdater extends BaseStateUpdater<EsState> {
+     /**
+      * {@inheritDoc}
+      * Each tuple should have relevant fields (source, index, type, id) for EsState's tupleMapper to extract ES document.
+      */
      @Override
      public void updateState(EsState state, List<TridentTuple> tuples, TridentCollector collector) {
 -        state.updateState(tuples, collector);
 +        state.updateState(tuples);
      }
  }

http://git-wip-us.apache.org/repos/asf/storm/blob/ae0f814d/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
----------------------------------------------------------------------
diff --cc external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
index 8a5fad2,17ccdc9..07b7c43
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
@@@ -17,8 -17,8 +17,10 @@@
   */
  package org.apache.storm.elasticsearch.bolt;
  
 +import com.google.common.testing.NullPointerTester;
 +
+ import backtype.storm.Config;
+ import backtype.storm.task.OutputCollector;
  import org.apache.storm.elasticsearch.common.EsConfig;
  import org.junit.After;
  import org.junit.Before;

http://git-wip-us.apache.org/repos/asf/storm/blob/ae0f814d/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
----------------------------------------------------------------------
diff --cc external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
index bcba6d4,5860b3b..ba87616
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
@@@ -60,11 -58,7 +58,12 @@@ public class EsIndexBoltTest extends Ab
  
      @Override
      protected EsIndexBolt createBolt(EsConfig esConfig) {
-         return new EsIndexBolt(esConfig);
+         EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
+         return new EsIndexBolt(esConfig, tupleMapper);
      }
 +
 +    @Override
 +    protected Class<EsIndexBolt> getBoltClass() {
 +        return EsIndexBolt.class;
 +    }
  }

http://git-wip-us.apache.org/repos/asf/storm/blob/ae0f814d/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
----------------------------------------------------------------------
diff --cc external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
index 3c1e949,1f0118b..70e0738
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
@@@ -46,8 -47,11 +47,9 @@@ public class EsIndexTopology 
          TopologyBuilder builder = new TopologyBuilder();
          UserDataSpout spout = new UserDataSpout();
          builder.setSpout(SPOUT_ID, spout, 1);
 -        EsConfig esConfig = new EsConfig();
 -        esConfig.setClusterName(EsConstants.clusterName);
 -        esConfig.setNodes(new String[]{"localhost:9300"});
+         EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
 +        EsConfig esConfig = new EsConfig(EsConstants.clusterName, new String[]{"localhost:9300"});
-         builder.setBolt(BOLT_ID, new EsIndexBolt(esConfig), 1).shuffleGrouping(SPOUT_ID);
+         builder.setBolt(BOLT_ID, new EsIndexBolt(esConfig, tupleMapper), 1).shuffleGrouping(SPOUT_ID);
  
          EsTestUtil.startEsNode();
          EsTestUtil.waitForSeconds(5);

http://git-wip-us.apache.org/repos/asf/storm/blob/ae0f814d/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/ae0f814d/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
----------------------------------------------------------------------
diff --cc external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
index 1ae1df7,ee5e607..45d86f8
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
@@@ -45,9 -46,12 +46,10 @@@ public class TridentEsTopology 
  
          TridentTopology topology = new TridentTopology();
          Stream stream = topology.newStream("spout", spout);
 -        EsConfig esConfig = new EsConfig();
 -        esConfig.setClusterName(EsConstants.clusterName);
 -        esConfig.setNodes(new String[]{"localhost:9300"});
 -        Fields esFields = new Fields("index", "type", "source", "id");
 +        EsConfig esConfig = new EsConfig(EsConstants.clusterName, new String[]{"localhost:9300"});
 +        Fields esFields = new Fields("index", "type", "source");
-         StateFactory factory = new EsStateFactory(esConfig);
+         EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
+         StateFactory factory = new EsStateFactory(esConfig, tupleMapper);
          TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields());
  
          EsTestUtil.startEsNode();