You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2017/04/18 19:26:17 UTC

[2/4] storm git commit: STORM-2379: update for Elasticsearch 2.

STORM-2379: update for Elasticsearch 2.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2b4565c6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2b4565c6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2b4565c6

Branch: refs/heads/master
Commit: 2b4565c668c16c366e086214d11f3ec2e1d4769c
Parents: 1a17f74
Author: Heather McCartney <he...@problemchimp.org>
Authored: Mon Mar 13 13:16:27 2017 +0000
Committer: Heather McCartney <he...@problemchimp.org>
Committed: Thu Apr 13 21:30:18 2017 +0100

----------------------------------------------------------------------
 examples/storm-elasticsearch-examples/pom.xml   |  10 ++
 .../elasticsearch/bolt/EsIndexTopology.java     |   3 +-
 .../storm/elasticsearch/common/EsTestUtil.java  |  14 +-
 .../trident/TridentEsTopology.java              |   3 +-
 external/storm-elasticsearch/pom.xml            |  44 ++++++-
 .../DefaultEsLookupResultOutput.java            |  62 +++++++++
 .../elasticsearch/ElasticsearchGetRequest.java  |  36 -----
 .../elasticsearch/EsLookupResultOutput.java     |   5 +-
 .../elasticsearch/bolt/AbstractEsBolt.java      |  53 +++++---
 .../storm/elasticsearch/bolt/EsIndexBolt.java   |  31 +++--
 .../storm/elasticsearch/bolt/EsLookupBolt.java  |  49 ++++---
 .../elasticsearch/bolt/EsPercolateBolt.java     |  41 ++++--
 .../common/DefaultEsTupleMapper.java            |  20 +++
 .../storm/elasticsearch/common/EsConfig.java    | 130 +++++++++++++------
 .../elasticsearch/common/EsTupleMapper.java     |   9 ++
 .../common/StormElasticSearchClient.java        |  37 +++---
 .../common/TransportAddresses.java              |  72 ----------
 .../apache/storm/elasticsearch/doc/Index.java   |  69 ++++++++++
 .../storm/elasticsearch/doc/IndexDoc.java       |  43 ++++++
 .../storm/elasticsearch/doc/IndexItem.java      |  91 +++++++++++++
 .../storm/elasticsearch/doc/IndexItemDoc.java   |  42 ++++++
 .../apache/storm/elasticsearch/doc/Shards.java  |  63 +++++++++
 .../storm/elasticsearch/doc/SourceDoc.java      |  43 ++++++
 .../response/BulkIndexResponse.java             |  80 ++++++++++++
 .../elasticsearch/response/LookupResponse.java  |  63 +++++++++
 .../response/PercolateResponse.java             |  85 ++++++++++++
 .../storm/elasticsearch/trident/EsState.java    |  72 ++++++----
 .../elasticsearch/trident/EsStateFactory.java   |  15 +--
 .../bolt/AbstractEsBoltIntegrationTest.java     |  68 +++-------
 .../elasticsearch/bolt/AbstractEsBoltTest.java  |  15 ++-
 .../elasticsearch/bolt/EsIndexBoltTest.java     |  44 ++++---
 .../bolt/EsLookupBoltIntegrationTest.java       |  75 +++--------
 .../elasticsearch/bolt/EsLookupBoltTest.java    |  67 +++++-----
 .../elasticsearch/bolt/EsPercolateBoltTest.java |  62 ++++++---
 .../elasticsearch/common/EsConfigTest.java      |  60 ++++-----
 .../storm/elasticsearch/common/EsTestUtil.java  |  99 ++++++++++++--
 .../common/TransportAddressesTest.java          |  81 ------------
 .../trident/EsStateFactoryTest.java             |   2 +-
 .../elasticsearch/trident/EsStateTest.java      |  98 ++++++++++++++
 .../src/test/resources/log4j2.xml               |  33 +++++
 40 files changed, 1424 insertions(+), 565 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/examples/storm-elasticsearch-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-elasticsearch-examples/pom.xml b/examples/storm-elasticsearch-examples/pom.xml
index 64c8229..23a717d 100644
--- a/examples/storm-elasticsearch-examples/pom.xml
+++ b/examples/storm-elasticsearch-examples/pom.xml
@@ -26,6 +26,10 @@
     </parent>
 
     <artifactId>storm-elasticsearch-examples</artifactId>
+    <properties>
+        <elasticsearch.test.version>2.4.4</elasticsearch.test.version>
+    </properties>
+
     <dependencies>
         <dependency>
             <groupId>org.apache.storm</groupId>
@@ -38,6 +42,12 @@
             <artifactId>storm-elasticsearch</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.elasticsearch</groupId>
+            <artifactId>elasticsearch</artifactId>
+            <version>${elasticsearch.test.version}</version>
+            <scope>${provided.scope}</scope>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
index 3cd2bc8..c7ec7d0 100644
--- a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
+++ b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
@@ -24,7 +24,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.elasticsearch.common.EsConfig;
-import org.apache.storm.elasticsearch.common.EsConstants;
 import org.apache.storm.elasticsearch.common.EsTestUtil;
 import org.apache.storm.elasticsearch.common.EsTupleMapper;
 import org.apache.storm.spout.SpoutOutputCollector;
@@ -48,7 +47,7 @@ public class EsIndexTopology {
         UserDataSpout spout = new UserDataSpout();
         builder.setSpout(SPOUT_ID, spout, 1);
         EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
-        EsConfig esConfig = new EsConfig(EsConstants.clusterName, new String[]{"localhost:9300"});
+        EsConfig esConfig = new EsConfig("http://localhost:9300");
         builder.setBolt(BOLT_ID, new EsIndexBolt(esConfig, tupleMapper), 1).shuffleGrouping(SPOUT_ID);
 
         EsTestUtil.startEsNode();

http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
----------------------------------------------------------------------
diff --git a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsTestUtil.java b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
index cb1c745..189813a 100644
--- a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
+++ b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
@@ -17,23 +17,22 @@
  */
 package org.apache.storm.elasticsearch.common;
 
+import java.util.HashMap;
+
 import org.apache.storm.Config;
 import org.apache.storm.task.GeneralTopologyContext;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.ITuple;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.TupleImpl;
 import org.apache.storm.tuple.Values;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
-import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.node.Node;
 import org.elasticsearch.node.NodeBuilder;
 
-import java.util.HashMap;
-
 public class EsTestUtil {
     public static Tuple generateTestTuple(String source, String index, String type, String id) {
         TopologyBuilder builder = new TopologyBuilder();
@@ -53,14 +52,15 @@ public class EsTestUtil {
 
     public static Node startEsNode(){
         Node node = NodeBuilder.nodeBuilder().data(true).settings(
-                ImmutableSettings.builder()
+                Settings.settingsBuilder()
                         .put(ClusterName.SETTING, EsConstants.clusterName)
                         .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
                         .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
                         .put(EsExecutors.PROCESSORS, 1)
-                        .put("http.enabled", false)
+                        .put("http.enabled", true)
                         .put("index.percolator.map_unmapped_fields_as_string", true)
-                        .put("index.store.type", "memory")
+                        .put("index.store.type", "mmapfs")
+                        .put("path.home", "./data")
         ).build();
         node.start();
         return node;

http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
index 307a991..e7fb2ef 100644
--- a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
+++ b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
@@ -26,7 +26,6 @@ import java.util.UUID;
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.elasticsearch.common.EsConfig;
-import org.apache.storm.elasticsearch.common.EsConstants;
 import org.apache.storm.elasticsearch.common.EsTestUtil;
 import org.apache.storm.elasticsearch.common.EsTupleMapper;
 import org.apache.storm.task.TopologyContext;
@@ -50,7 +49,7 @@ public class TridentEsTopology {
 
         TridentTopology topology = new TridentTopology();
         Stream stream = topology.newStream("spout", spout);
-        EsConfig esConfig = new EsConfig(EsConstants.clusterName, new String[]{"localhost:9300"});
+        EsConfig esConfig = new EsConfig("http://localhost:9300");
         Fields esFields = new Fields("index", "type", "source");
         EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
         StateFactory factory = new EsStateFactory(esConfig, tupleMapper);

http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/pom.xml b/external/storm-elasticsearch/pom.xml
index bab0426..17d65d1 100644
--- a/external/storm-elasticsearch/pom.xml
+++ b/external/storm-elasticsearch/pom.xml
@@ -34,10 +34,16 @@
             <name>Adrian Seungjin Lee</name>
             <email>sweetest.sj@navercorp.com</email>
         </developer>
+        <developer>
+            <id>hmcc</id>
+            <name>Heather McCartney</name>
+            <email>heather@problemchimp.org</email>
+        </developer>
     </developers>
 
     <properties>
-        <elasticsearch.version>1.6.0</elasticsearch.version>
+        <elasticsearch.version>5.2.2</elasticsearch.version>
+        <elasticsearch.test.version>2.4.4</elasticsearch.test.version>
     </properties>
 
     <dependencies>
@@ -53,10 +59,27 @@
             <scope>${provided.scope}</scope>
         </dependency>
         <dependency>
-            <groupId>org.elasticsearch</groupId>
-            <artifactId>elasticsearch</artifactId>
+            <groupId>org.elasticsearch.client</groupId>
+            <artifactId>rest</artifactId>
             <version>${elasticsearch.version}</version>
-            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpasyncclient</artifactId>
+            <version>4.1.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>4.5.2</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
         </dependency>
         <dependency>
             <groupId>junit</groupId>
@@ -80,13 +103,24 @@
             <artifactId>mockito-all</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.elasticsearch</groupId>
+            <artifactId>elasticsearch</artifactId>
+            <version>${elasticsearch.test.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>18.0</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
         <plugins>
             <plugin>
                 <artifactId>maven-clean-plugin</artifactId>
-                <version>2.5</version>
                 <executions>
                     <execution>
                         <id>cleanup</id>

http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/DefaultEsLookupResultOutput.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/DefaultEsLookupResultOutput.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/DefaultEsLookupResultOutput.java
new file mode 100644
index 0000000..533dc65
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/DefaultEsLookupResultOutput.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.storm.elasticsearch.response.LookupResponse;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.elasticsearch.client.Response;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * Default implementation of {@link EsLookupResultOutput}.
+ * Outputs the index, type, id and source as strings.
+ */
+public class DefaultEsLookupResultOutput implements EsLookupResultOutput {
+    
+    private static final long serialVersionUID = 2932278450655703239L;
+    
+    private ObjectMapper objectMapper;
+
+    public DefaultEsLookupResultOutput(ObjectMapper objectMapper) {
+        super();
+        this.objectMapper = objectMapper;
+    }
+
+    @Override
+    public Collection<Values> toValues(Response response) {
+        LookupResponse lookupResponse;
+        try {
+            lookupResponse = objectMapper.readValue(response.getEntity().getContent(), LookupResponse.class);
+        } catch (UnsupportedOperationException | IOException e) {
+            throw new IllegalArgumentException("Response " + response + " is invalid", e);
+        }
+        return Collections.singleton(new Values(lookupResponse.getIndex(), lookupResponse.getType(), lookupResponse.getId(), lookupResponse.getSource()));
+    }
+
+    @Override
+    public Fields fields() {
+        return new Fields("index", "type", "id", "source");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/ElasticsearchGetRequest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/ElasticsearchGetRequest.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/ElasticsearchGetRequest.java
deleted file mode 100644
index b9a7885..0000000
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/ElasticsearchGetRequest.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.elasticsearch;
-
-import java.io.Serializable;
-
-import org.elasticsearch.action.get.GetRequest;
-
-import org.apache.storm.tuple.ITuple;
-
-/**
- * @since 0.11
- * The adapter to convert the incoming tuple to Elasticsearch GetRequest.
- */
-public interface ElasticsearchGetRequest extends Serializable {
-
-    /**
-     * @return GetRequest to perform against Elasticsearch.
-     */
-    GetRequest extractFrom(ITuple tuple);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/EsLookupResultOutput.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/EsLookupResultOutput.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/EsLookupResultOutput.java
index d00fd47..b057729 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/EsLookupResultOutput.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/EsLookupResultOutput.java
@@ -20,10 +20,9 @@ package org.apache.storm.elasticsearch;
 import java.io.Serializable;
 import java.util.Collection;
 
-import org.elasticsearch.action.get.GetResponse;
-
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
+import org.elasticsearch.client.Response;
 
 /**
  * @since 0.11
@@ -34,7 +33,7 @@ public interface EsLookupResultOutput extends Serializable {
     /**
      * @return collection of values to emit.
      */
-    Collection<Values> toValues(GetResponse response);
+    Collection<Values> toValues(Response response);
 
     /**
      * @return output fields to declare.

http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
index b53e183..42f20e6 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
@@ -17,37 +17,35 @@
  */
 package org.apache.storm.elasticsearch.bolt;
 
+import static java.util.Objects.requireNonNull;
+import static org.apache.http.util.Args.notBlank;
+
 import java.util.Map;
 
 import org.apache.storm.elasticsearch.common.EsConfig;
 import org.apache.storm.elasticsearch.common.StormElasticSearchClient;
-import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
-import org.apache.storm.utils.TupleUtils;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.common.annotations.VisibleForTesting;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.apache.storm.tuple.Tuple;
+import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
+import org.elasticsearch.client.RestClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import static org.elasticsearch.common.base.Preconditions.checkNotNull;
+import com.fasterxml.jackson.databind.ObjectMapper;
 
 public abstract class AbstractEsBolt extends BaseTickTupleAwareRichBolt {
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractEsBolt.class);
 
-    protected static Client client;
+    protected static RestClient client;
+    protected final static ObjectMapper objectMapper = new ObjectMapper();
 
     protected OutputCollector collector;
     private EsConfig esConfig;
 
     public AbstractEsBolt(EsConfig esConfig) {
-        checkNotNull(esConfig);
-        this.esConfig = esConfig;
+        this.esConfig = requireNonNull(esConfig);
     }
 
     @Override
@@ -68,13 +66,34 @@ public abstract class AbstractEsBolt extends BaseTickTupleAwareRichBolt {
     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
     }
 
-    @VisibleForTesting
-    static Client getClient() {
+    /**
+     * Construct an Elasticsearch endpoint from the provided index, type and
+     * id.
+     * @param index - required; name of Elasticsearch index
+     * @param type - optional; name of Elasticsearch type
+     * @param id - optional; Elasticsearch document ID
+     * @return the index, type and id concatenated with '/'.
+     */
+    static String getEndpoint(String index, String type, String id) {
+        requireNonNull(index);
+        notBlank(index, "index");
+
+        StringBuilder sb = new StringBuilder();
+        sb.append("/").append(index);
+        if (!(type == null || type.isEmpty())) {
+            sb.append("/").append(type);
+        }
+        if (!(id == null || id.isEmpty())) {
+            sb.append("/").append(id);
+        }
+        return sb.toString();
+    }
+
+    static RestClient getClient() {
         return AbstractEsBolt.client;
     }
 
-    @VisibleForTesting
-    static void replaceClient(Client client) {
+    static void replaceClient(RestClient client) {
         AbstractEsBolt.client = client;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
index 5d66eb9..1f3122b 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
@@ -17,17 +17,19 @@
  */
 package org.apache.storm.elasticsearch.bolt;
 
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.http.entity.StringEntity;
+import org.apache.storm.elasticsearch.common.DefaultEsTupleMapper;
+import org.apache.storm.elasticsearch.common.EsConfig;
+import org.apache.storm.elasticsearch.common.EsTupleMapper;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.tuple.Tuple;
-import org.apache.storm.elasticsearch.common.EsConfig;
-import org.apache.storm.elasticsearch.common.EsTupleMapper;
-import org.apache.storm.utils.TupleUtils;
-
-import java.util.Map;
-
-import static org.elasticsearch.common.base.Preconditions.checkNotNull;
 
 /**
  * Basic bolt for storing tuple to ES document.
@@ -37,12 +39,20 @@ public class EsIndexBolt extends AbstractEsBolt {
 
     /**
      * EsIndexBolt constructor
-     * @param esConfig Elasticsearch configuration containing node addresses and cluster name {@link EsConfig}
+     * @param esConfig Elasticsearch configuration containing node addresses {@link EsConfig}
+     */
+    public EsIndexBolt(EsConfig esConfig) {
+        this(esConfig, new DefaultEsTupleMapper());
+    }
+
+    /**
+     * EsIndexBolt constructor
+     * @param esConfig Elasticsearch configuration containing node addresses {@link EsConfig}
      * @param tupleMapper Tuple to ES document mapper {@link EsTupleMapper}
      */
     public EsIndexBolt(EsConfig esConfig, EsTupleMapper tupleMapper) {
         super(esConfig);
-        this.tupleMapper = checkNotNull(tupleMapper);
+        this.tupleMapper = requireNonNull(tupleMapper);
     }
 
     @Override
@@ -61,8 +71,9 @@ public class EsIndexBolt extends AbstractEsBolt {
             String index = tupleMapper.getIndex(tuple);
             String type = tupleMapper.getType(tuple);
             String id = tupleMapper.getId(tuple);
+            Map<String, String> params = tupleMapper.getParams(tuple, new HashMap<>());
 
-            client.prepareIndex(index, type, id).setSource(source).execute().actionGet();
+            client.performRequest("put", getEndpoint(index, type, id), params, new StringEntity(source));
             collector.ack(tuple);
         } catch (Exception e) {
             collector.reportError(e);

http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java
index 895e30b..1ff4686 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java
@@ -17,38 +17,51 @@
  */
 package org.apache.storm.elasticsearch.bolt;
 
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 
-import org.apache.storm.elasticsearch.ElasticsearchGetRequest;
+import org.apache.storm.elasticsearch.DefaultEsLookupResultOutput;
 import org.apache.storm.elasticsearch.EsLookupResultOutput;
+import org.apache.storm.elasticsearch.common.DefaultEsTupleMapper;
 import org.apache.storm.elasticsearch.common.EsConfig;
-import org.apache.storm.utils.TupleUtils;
-import org.elasticsearch.action.get.GetRequest;
-import org.elasticsearch.action.get.GetResponse;
-
+import org.apache.storm.elasticsearch.common.EsTupleMapper;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
-
-import static org.elasticsearch.common.base.Preconditions.checkNotNull;
+import org.elasticsearch.client.Response;
 
 /**
  * @since 0.11
  */
 public class EsLookupBolt extends AbstractEsBolt {
 
-    private final ElasticsearchGetRequest getRequest;
+    private final EsTupleMapper tupleMapper;
     private final EsLookupResultOutput output;
 
     /**
+     * EsLookupBolt constructor.
+     * @param esConfig Elasticsearch configuration containing node addresses {@link EsConfig}
      * @throws NullPointerException if any of the parameters is null
      */
-    public EsLookupBolt(EsConfig esConfig, ElasticsearchGetRequest getRequest, EsLookupResultOutput output) {
+    public EsLookupBolt(EsConfig esConfig) {
+        this(esConfig, new DefaultEsTupleMapper(), new DefaultEsLookupResultOutput(objectMapper));
+    }
+
+    /**
+     * EsLookupBolt constructor.
+     * @param esConfig Elasticsearch configuration containing node addresses {@link EsConfig}
+     * @param tupleMapper Tuple to ES document mapper {@link EsTupleMapper}
+     * @param output ES response to Values mapper {@link EsLookupResultOutput}
+     * @throws NullPointerException if any of the parameters is null
+     */
+    public EsLookupBolt(EsConfig esConfig, EsTupleMapper tupleMapper, EsLookupResultOutput output) {
         super(esConfig);
-        checkNotNull(getRequest);
-        checkNotNull(output);
-        this.getRequest = getRequest;
-        this.output = output;
+        this.tupleMapper = requireNonNull(tupleMapper);
+        this.output = requireNonNull(output);
     }
 
     @Override
@@ -62,9 +75,13 @@ public class EsLookupBolt extends AbstractEsBolt {
         }
     }
 
-    private Collection<Values> lookupValuesInEs(Tuple tuple) {
-        GetRequest request = getRequest.extractFrom(tuple);
-        GetResponse response = client.get(request).actionGet();
+    private Collection<Values> lookupValuesInEs(Tuple tuple) throws IOException {
+    	String index = tupleMapper.getIndex(tuple);
+    	String type = tupleMapper.getType(tuple);
+    	String id = tupleMapper.getId(tuple);
+    	Map<String, String> params = tupleMapper.getParams(tuple, new HashMap<>());
+
+    	Response response = client.performRequest("get", getEndpoint(index, type, id), params);
         return output.toValues(response);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
index 0b96dfc..4d969d5 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
@@ -17,21 +17,23 @@
  */
 package org.apache.storm.elasticsearch.bolt;
 
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.http.entity.StringEntity;
+import org.apache.storm.elasticsearch.common.DefaultEsTupleMapper;
+import org.apache.storm.elasticsearch.common.EsConfig;
+import org.apache.storm.elasticsearch.common.EsTupleMapper;
+import org.apache.storm.elasticsearch.response.PercolateResponse;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.elasticsearch.common.EsConfig;
-import org.apache.storm.elasticsearch.common.EsTupleMapper;
-import org.apache.storm.utils.TupleUtils;
-import org.elasticsearch.action.percolate.PercolateResponse;
-import org.elasticsearch.action.percolate.PercolateSourceBuilder;
-
-import java.util.Map;
-
-import static org.elasticsearch.common.base.Preconditions.checkNotNull;
+import org.elasticsearch.client.Response;
 
 /**
  * Basic bolt for retrieve matched percolate queries.
@@ -42,12 +44,20 @@ public class EsPercolateBolt extends AbstractEsBolt {
 
     /**
      * EsPercolateBolt constructor
+     * @param esConfig Elasticsearch configuration containing node addresses {@link EsConfig}
+     */
+    public EsPercolateBolt(EsConfig esConfig) {
+        this(esConfig, new DefaultEsTupleMapper());
+    }
+
+    /**
+     * EsPercolateBolt constructor
      * @param esConfig Elasticsearch configuration containing node addresses and cluster name {@link EsConfig}
      * @param tupleMapper Tuple to ES document mapper {@link EsTupleMapper}
      */
     public EsPercolateBolt(EsConfig esConfig, EsTupleMapper tupleMapper) {
         super(esConfig);
-        this.tupleMapper = checkNotNull(tupleMapper);
+        this.tupleMapper = requireNonNull(tupleMapper);
     }
 
     @Override
@@ -68,10 +78,13 @@ public class EsPercolateBolt extends AbstractEsBolt {
             String index = tupleMapper.getIndex(tuple);
             String type = tupleMapper.getType(tuple);
 
-            PercolateResponse response = client.preparePercolate().setIndices(index).setDocumentType(type)
-                    .setPercolateDoc(PercolateSourceBuilder.docBuilder().setDoc(source)).execute().actionGet();
-            if (response.getCount() > 0) {
-                for (PercolateResponse.Match match : response) {
+            Map<String, String> indexParams = new HashMap<>();
+            indexParams.put(type, null);
+            String percolateDoc = "{\"doc\": " + source + "}";
+            Response response = client.performRequest("get", getEndpoint(index, type, "_percolate"), new HashMap<>(), new StringEntity(percolateDoc));
+            PercolateResponse percolateResponse = objectMapper.readValue(response.getEntity().getContent(), PercolateResponse.class);
+            if (!percolateResponse.getMatches().isEmpty()) {
+                for (PercolateResponse.Match match : percolateResponse.getMatches()) {
                     collector.emit(new Values(source, match));
                 }
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/DefaultEsTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/DefaultEsTupleMapper.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/DefaultEsTupleMapper.java
index 0a15922..c8d750d 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/DefaultEsTupleMapper.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/DefaultEsTupleMapper.java
@@ -17,6 +17,10 @@
  */
 package org.apache.storm.elasticsearch.common;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
 import org.apache.storm.tuple.ITuple;
 
 public class DefaultEsTupleMapper implements EsTupleMapper {
@@ -39,4 +43,20 @@ public class DefaultEsTupleMapper implements EsTupleMapper {
     public String getId(ITuple tuple) {
         return tuple.getStringByField("id");
     }
+
+    @Override
+    public Map<String, String> getParams(ITuple tuple, Map<String, String> defaultValue) {
+        if (!tuple.contains("params")) {
+            return defaultValue;
+        }
+        Object o = tuple.getValueByField("params");
+        if (o instanceof Map) {
+            Map<String, String> params = new HashMap<>();
+            for (Map.Entry<?, ?> entry : ((Map<?, ?>) o).entrySet()) {
+                params.put(entry.getKey().toString(), entry.getValue() == null ? null : entry.getValue().toString());
+            }
+            return params;
+        }
+        return defaultValue;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
index 6bbd81f..02f045d 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
@@ -18,65 +18,117 @@
 package org.apache.storm.elasticsearch.common;
 
 import java.io.Serializable;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+import java.net.URI;
+import java.net.URISyntaxException;
 
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.settings.Settings;
-
-import static org.elasticsearch.common.base.Preconditions.checkArgument;
-import static org.elasticsearch.common.base.Preconditions.checkNotNull;
+import org.apache.http.Header;
+import org.apache.http.HttpHost;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
+import org.elasticsearch.client.RestClientBuilder.RequestConfigCallback;
 
 /**
  * @since 0.11
  */
 public class EsConfig implements Serializable {
 
-    private final String clusterName;
-    private final String[] nodes;
-    private final Map<String, String> additionalConfiguration;
+    private final HttpHost[] httpHosts;
+    private Integer maxRetryTimeoutMillis;
+    private Header[] defaultHeaders;
+    private RestClient.FailureListener failureListener;
+    private HttpClientConfigCallback httpClientConfigCallback;
+    private RequestConfigCallback requestConfigCallback;
+    private String pathPrefix;
 
     /**
-     * EsConfig Constructor to be used in EsIndexBolt, EsPercolateBolt and EsStateFactory
-     *
-     * @param clusterName Elasticsearch cluster name
-     * @param nodes       Elasticsearch addresses in host:port pattern string array
-     * @throws IllegalArgumentException if nodes are empty
-     * @throws NullPointerException     on any of the fields being null
-     */
-    public EsConfig(String clusterName, String[] nodes) {
-        this(clusterName, nodes, Collections.<String, String>emptyMap());
+    * EsConfig Constructor to be used in EsIndexBolt, EsPercolateBolt and EsStateFactory.
+    * Connects to Elasticsearch at http://localhost:9200.
+    */
+    public EsConfig() {
+        this("http://localhost:9200");
     }
 
     /**
      * EsConfig Constructor to be used in EsIndexBolt, EsPercolateBolt and EsStateFactory
      *
-     * @param clusterName             Elasticsearch cluster name
-     * @param nodes                   Elasticsearch addresses in host:port pattern string array
-     * @param additionalConfiguration Additional Elasticsearch configuration
-     * @throws IllegalArgumentException if nodes are empty
+     * @param urls Elasticsearch addresses in scheme://host:port pattern string array
+     * @throws IllegalArgumentException if urls are empty
      * @throws NullPointerException     on any of the fields being null
      */
-    public EsConfig(String clusterName, String[] nodes, Map<String, String> additionalConfiguration) {
-        checkNotNull(clusterName);
-        checkNotNull(nodes);
-        checkNotNull(additionalConfiguration);
+    public EsConfig(String... urls) {
+        if (urls.length == 0) {
+            throw new IllegalArgumentException("urls is required");
+        }
+        this.httpHosts = new HttpHost[urls.length];
+        for (int i = 0; i < urls.length; i++) {
+            URI uri = toURI(urls[i]);
+            this.httpHosts[i] = new HttpHost(uri.getHost(), uri.getPort(), uri.getScheme());
+        }
+    }
+
+    static URI toURI(String url) throws IllegalArgumentException {
+        try {
+            return new URI(url);
+        } catch (URISyntaxException e) {
+            throw new IllegalArgumentException("Invalid url " + url);
+        }
+    }
+    
+    public EsConfig withMaxRetryTimeoutMillis(Integer maxRetryTimeoutMillis) {
+        this.maxRetryTimeoutMillis = maxRetryTimeoutMillis;
+        return this;
+    }
+
+    public EsConfig withDefaultHeaders(Header[] defaultHeaders) {
+        this.defaultHeaders = defaultHeaders;
+        return this;
+    }
+
+    public EsConfig withFailureListener(RestClient.FailureListener failureListener) {
+        this.failureListener = failureListener;
+        return this;
+    }
+
+    public EsConfig withHttpClientConfigCallback(HttpClientConfigCallback httpClientConfigCallback) {
+        this.httpClientConfigCallback = httpClientConfigCallback;
+        return this;
+    }
+
+    public EsConfig withRequestConfigCallback(RequestConfigCallback requestConfigCallback) {
+        this.requestConfigCallback = requestConfigCallback;
+        return this;
+    }
+
+    public EsConfig withPathPrefix(String pathPrefix) {
+        this.pathPrefix = pathPrefix;
+        return this;
+    }
+
+    public HttpHost[] getHttpHosts() {
+        return httpHosts;
+    }
+
+    public Integer getMaxRetryTimeoutMillis() {
+        return maxRetryTimeoutMillis;
+    }
+
+    public Header[] getDefaultHeaders() {
+        return defaultHeaders;
+    }
+
+    public RestClient.FailureListener getFailureListener() {
+        return failureListener;
+    }
 
-        checkArgument(nodes.length != 0, "Nodes cannot be empty");
-        this.clusterName = clusterName;
-        this.nodes = nodes;
-        this.additionalConfiguration = new HashMap<>(additionalConfiguration);
+    public HttpClientConfigCallback getHttpClientConfigCallback() {
+        return httpClientConfigCallback;
     }
 
-    TransportAddresses getTransportAddresses() {
-        return new TransportAddresses(nodes);
+    public RequestConfigCallback getRequestConfigCallback() {
+        return requestConfigCallback;
     }
 
-    Settings toBasicSettings() {
-        return ImmutableSettings.settingsBuilder()
-                                .put("cluster.name", clusterName)
-                                .put(additionalConfiguration)
-                                .build();
+    public String getPathPrefix() {
+        return pathPrefix;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsTupleMapper.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsTupleMapper.java
index 5b6c425..285e112 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsTupleMapper.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsTupleMapper.java
@@ -20,6 +20,7 @@ package org.apache.storm.elasticsearch.common;
 import org.apache.storm.tuple.ITuple;
 
 import java.io.Serializable;
+import java.util.Map;
 
 /**
  * TupleMapper defines how to extract source, index, type, and id from tuple for ElasticSearch.
@@ -52,4 +53,12 @@ public interface EsTupleMapper extends Serializable {
      * @return id
      */
     String getId(ITuple tuple);
+
+    /**
+     * Extracts params from tuple if available.
+     * @param tuple source tuple
+     * @param defaultValue value to return if params are missing
+     * @return params
+     */
+    Map<String, String> getParams(ITuple tuple, Map<String, String> defaultValue);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/StormElasticSearchClient.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/StormElasticSearchClient.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/StormElasticSearchClient.java
index 3ebfe72..a4aca59 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/StormElasticSearchClient.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/StormElasticSearchClient.java
@@ -19,10 +19,8 @@ package org.apache.storm.elasticsearch.common;
 
 import java.io.Serializable;
 
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
 
 public final class StormElasticSearchClient implements Serializable {
 
@@ -32,17 +30,26 @@ public final class StormElasticSearchClient implements Serializable {
         this.esConfig = esConfig;
     }
 
-    public Client construct() {
-        Settings settings = esConfig.toBasicSettings();
-        TransportClient transportClient = new TransportClient(settings);
-        addTransportAddresses(transportClient);
-        return transportClient;
-    }
-
-    private void addTransportAddresses(TransportClient transportClient) {
-        Iterable<InetSocketTransportAddress> transportAddresses = esConfig.getTransportAddresses();
-        for (InetSocketTransportAddress transportAddress : transportAddresses) {
-            transportClient.addTransportAddress(transportAddress);
+    public RestClient construct() {
+        RestClientBuilder builder = RestClient.builder(esConfig.getHttpHosts());
+        if (esConfig.getMaxRetryTimeoutMillis() != null) {
+            builder.setMaxRetryTimeoutMillis(esConfig.getMaxRetryTimeoutMillis());
+        }
+        if (esConfig.getDefaultHeaders() != null) {
+            builder.setDefaultHeaders(esConfig.getDefaultHeaders());
+        }
+        if (esConfig.getFailureListener() != null) {
+            builder.setFailureListener(esConfig.getFailureListener());
+        }
+        if (esConfig.getHttpClientConfigCallback() != null) {
+            builder.setHttpClientConfigCallback(esConfig.getHttpClientConfigCallback());
+        }
+        if (esConfig.getRequestConfigCallback() != null) {
+            builder.setRequestConfigCallback(esConfig.getRequestConfigCallback());
+        }
+        if (esConfig.getPathPrefix() != null) {
+            builder.setPathPrefix(esConfig.getPathPrefix());
         }
+        return builder.build();
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/TransportAddresses.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/TransportAddresses.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/TransportAddresses.java
deleted file mode 100644
index cd082a7..0000000
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/TransportAddresses.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.elasticsearch.common;
-
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-
-final class TransportAddresses implements Iterable<InetSocketTransportAddress> {
-
-    static final String DELIMETER = ":";
-
-    private final String[] nodes;
-
-    TransportAddresses(String[] nodes) {
-        if (nodes == null) {
-            throw new IllegalArgumentException("Elasticsearch hosts cannot be null");
-        }
-        if (nodes.length == 0) {
-            throw new IllegalArgumentException("At least one Elasticsearch host must be specified");
-        }
-
-        this.nodes = nodes;
-    }
-
-    @Override
-    public Iterator<InetSocketTransportAddress> iterator() {
-        List<InetSocketTransportAddress> result = new LinkedList<>();
-
-        for (String node : nodes) {
-            InetSocketTransportAddress transportAddress = transformToInetAddress(node);
-            result.add(transportAddress);
-        }
-
-        return result.iterator();
-    }
-
-    private InetSocketTransportAddress transformToInetAddress(String node) {
-        String[] hostAndPort = node.split(DELIMETER);
-        if (hostAndPort.length != 2) {
-            throw new IllegalArgumentException(
-                    "Incorrect Elasticsearch node format, should follow {host}" + DELIMETER + "{port} pattern");
-        }
-        String hostname = hostname(hostAndPort[0]);
-        return new InetSocketTransportAddress(hostname, port(hostAndPort[1]));
-    }
-
-    private String hostname(String input) {
-        return input.trim();
-    }
-
-    private int port(String input) {
-        return Integer.parseInt(input.trim());
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/Index.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/Index.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/Index.java
new file mode 100644
index 0000000..d010abe
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/Index.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.doc;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Elasticsearch document fragment with "_index", "_type" and "_id" fields.
+ */
+public class Index {
+
+    public Index() {
+
+    }
+
+    public Index(String index, String type, String id) {
+        this.index = index;
+        this.type = type;
+        this.id = id;
+    }
+
+    @JsonProperty("_index")
+    private String index;
+
+    @JsonProperty("_type")
+    private String type;
+
+    @JsonProperty("_id")
+    private String id;
+
+    public String getIndex() {
+        return index;
+    }
+
+    public void setIndex(String index) {
+        this.index = index;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/IndexDoc.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/IndexDoc.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/IndexDoc.java
new file mode 100644
index 0000000..c695638
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/IndexDoc.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.doc;
+
+/**
+ * Elasticsearch document fragment with a single "index" field, used in bulk
+ * requests.
+ */
+public class IndexDoc {
+
+    public IndexDoc() {
+
+    }
+
+    public IndexDoc(Index index) {
+        this.index = index;
+    }
+
+    private Index index;
+
+    public Index getIndex() {
+        return index;
+    }
+
+    public void setIndex(Index index) {
+        this.index = index;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/IndexItem.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/IndexItem.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/IndexItem.java
new file mode 100644
index 0000000..e04c48e
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/IndexItem.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.doc;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonRawValue;
+import com.fasterxml.jackson.databind.JsonNode;
+
+/**
+ * Elasticsearch document fragment containing extended index information, used
+ * in bulk index responses.
+ */
+public class IndexItem extends Index {
+
+    @JsonProperty("_version")
+    private long version;
+    
+    private String result;
+    private boolean created;
+    private int status;
+    
+    @JsonRawValue
+    private Object error;
+    
+    
+    @JsonProperty("_shards")
+    private Shards shards;
+
+    public long getVersion() {
+        return version;
+    }
+
+    public void setVersion(long version) {
+        this.version = version;
+    }
+
+    public String getResult() {
+        return result;
+    }
+
+    public void setResult(String result) {
+        this.result = result;
+    }
+
+    public boolean isCreated() {
+        return created;
+    }
+
+    public void setCreated(boolean created) {
+        this.created = created;
+    }
+
+    public int getStatus() {
+        return status;
+    }
+
+    public void setStatus(int status) {
+        this.status = status;
+    }
+
+    public String getError() {
+        return error == null ? null : error.toString();
+    }
+
+    public void setError(JsonNode error) {
+        this.error = error;
+    }
+
+    public Shards getShards() {
+        return shards;
+    }
+
+    public void setShards(Shards shards) {
+        this.shards = shards;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/IndexItemDoc.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/IndexItemDoc.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/IndexItemDoc.java
new file mode 100644
index 0000000..60a330e
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/IndexItemDoc.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.doc;
+
+/**
+ * Elasticsearch document with a single "index" field, used in bulk responses.
+ */
+public class IndexItemDoc {
+
+    public IndexItemDoc() {
+
+    }
+
+    public IndexItemDoc(IndexItem index) {
+        this.index = index;
+    }
+
+    private IndexItem index;
+
+    public IndexItem getIndex() {
+        return index;
+    }
+
+    public void setIndex(IndexItem index) {
+        this.index = index;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/Shards.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/Shards.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/Shards.java
new file mode 100644
index 0000000..87c7ad5
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/Shards.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.doc;
+
+import java.util.List;
+
+/**
+ * Elasticsearch document fragment containing shard success information,
+ * used in percolate and bulk index responses.
+ */
+public class Shards {
+    private int total;
+    private int successful;
+    private int failed;
+    private List<Object> failures;
+    
+    public int getTotal() {
+        return total;
+    }
+    
+    public void setTotal(int total) {
+        this.total = total;
+    }
+    
+    public int getSuccessful() {
+        return successful;
+    }
+    
+    public void setSuccessful(int successful) {
+        this.successful = successful;
+    }
+
+    public int getFailed() {
+        return failed;
+    }
+
+    public void setFailed(int failed) {
+        this.failed = failed;
+    }
+
+    public List<Object> getFailures() {
+        return failures;
+    }
+
+    public void setFailures(List<Object> failures) {
+        this.failures = failures;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/SourceDoc.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/SourceDoc.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/SourceDoc.java
new file mode 100644
index 0000000..ea30e65
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/SourceDoc.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.doc;
+
+/**
+ * Elasticsearch document fragment with a single field, "source", used in bulk
+ * requests.
+ */
+public class SourceDoc {
+    
+    public SourceDoc() {
+        
+    }
+    
+    public SourceDoc(String source) {
+        this.source = source;
+    }
+
+    private String source;
+
+    public String getSource() {
+        return source;
+    }
+
+    public void setSource(String source) {
+        this.source = source;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/response/BulkIndexResponse.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/response/BulkIndexResponse.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/response/BulkIndexResponse.java
new file mode 100644
index 0000000..7131e63
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/response/BulkIndexResponse.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.response;
+
+import java.util.List;
+
+import org.apache.storm.elasticsearch.doc.IndexItemDoc;
+
+/**
+ * Mapped response for bulk index.
+ */
+public class BulkIndexResponse {
+
+    private boolean errors;
+    private long took;
+    private List<IndexItemDoc> items;
+
+    public BulkIndexResponse() {
+    
+    }
+
+    public boolean hasErrors() {
+        return errors;
+    }
+
+    public void setErrors(boolean errors) {
+        this.errors = errors;
+    }
+
+    public long getTook() {
+        return took;
+    }
+
+    public void setTook(long took) {
+        this.took = took;
+    }
+
+    public List<IndexItemDoc> getItems() {
+        return items;
+    }
+
+    public void setItems(List<IndexItemDoc> items) {
+        this.items = items;
+    }
+
+    public Integer getFirstError() {
+        if (items == null || items.isEmpty()) {
+            return null;
+        }
+        for (IndexItemDoc item : items) {
+            int status = item.getIndex().getStatus();
+            if (400 <= status && status <= 599) {
+                return status;
+            }
+        }
+        return null;
+    }
+
+    public String getFirstResult() {
+        if (items == null || items.isEmpty()) {
+            return null;
+        }
+        return items.get(0).getIndex().getResult();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/response/LookupResponse.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/response/LookupResponse.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/response/LookupResponse.java
new file mode 100644
index 0000000..8b6b24d
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/response/LookupResponse.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.response;
+
+import org.apache.storm.elasticsearch.doc.Index;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonRawValue;
+import com.fasterxml.jackson.databind.JsonNode;
+
+/**
+ * Mapped response for document lookup.
+ */
+public class LookupResponse extends Index {
+    
+    @JsonProperty("_version")
+    private long version;
+    
+    private boolean found;
+    
+    @JsonProperty("_source")
+    @JsonRawValue
+    private Object source;
+
+    public long getVersion() {
+        return version;
+    }
+
+    public void setVersion(long version) {
+        this.version = version;
+    }
+
+    public boolean isFound() {
+        return found;
+    }
+
+    public void setFound(boolean found) {
+        this.found = found;
+    }
+
+    public String getSource() {
+        return source == null ? null : source.toString();
+    }
+
+    public void setSource(JsonNode source) {
+        this.source = source;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/response/PercolateResponse.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/response/PercolateResponse.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/response/PercolateResponse.java
new file mode 100644
index 0000000..87bc004
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/response/PercolateResponse.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.response;
+
+import java.util.List;
+
+import org.apache.storm.elasticsearch.doc.Shards;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Mapped response for percolate.
+ */
+public class PercolateResponse {
+
+    private long took;
+    private long total;
+    private List<Match> matches;
+    
+    @JsonProperty("_shards")
+    private Shards shards;
+
+    public static class Match {
+        @JsonProperty("_index")
+        private String index;
+
+        @JsonProperty("_id")
+        private String id;
+
+        public String getIndex() {
+            return index;
+        }
+
+        public void setIndex(String index) {
+            this.index = index;
+        }
+
+        public String getId() {
+            return id;
+        }
+
+        public void setId(String id) {
+            this.id = id;
+        }
+    }
+    
+    public long getTook() {
+        return took;
+    }
+
+    public void setTook(long took) {
+        this.took = took;
+    }
+
+    public long getTotal() {
+        return total;
+    }
+
+    public void setTotal(long total) {
+        this.total = total;
+    }
+
+    public List<Match> getMatches() {
+        return matches;
+    }
+
+    public void setMatches(List<Match> matches) {
+        this.matches = matches;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
index 2241f4b..fb34407 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
@@ -17,20 +17,29 @@
  */
 package org.apache.storm.elasticsearch.trident;
 
-import org.apache.storm.topology.FailedException;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.HashMap;
+import java.util.List;
 
-import org.apache.storm.elasticsearch.common.StormElasticSearchClient;
+import org.apache.http.entity.StringEntity;
 import org.apache.storm.elasticsearch.common.EsConfig;
 import org.apache.storm.elasticsearch.common.EsTupleMapper;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.client.Client;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.storm.elasticsearch.common.StormElasticSearchClient;
+import org.apache.storm.elasticsearch.doc.Index;
+import org.apache.storm.elasticsearch.doc.IndexDoc;
+import org.apache.storm.elasticsearch.doc.SourceDoc;
+import org.apache.storm.elasticsearch.response.BulkIndexResponse;
+import org.apache.storm.topology.FailedException;
 import org.apache.storm.trident.state.State;
 import org.apache.storm.trident.tuple.TridentTuple;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.util.List;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 
 /**
  * Trident State for storing tuple to ES document.
@@ -38,8 +47,9 @@ import java.util.List;
  */
 class EsState implements State {
     private static final Logger LOG = LoggerFactory.getLogger(EsState.class);
-    private static Client client;
+    private static RestClient client;
     private EsConfig esConfig;
+    private final ObjectMapper objectMapper;
     private EsTupleMapper tupleMapper;
 
     /**
@@ -49,6 +59,7 @@ class EsState implements State {
      */
     public EsState(EsConfig esConfig, EsTupleMapper tupleMapper) {
         this.esConfig = esConfig;
+        this.objectMapper = new ObjectMapper();
         this.tupleMapper = tupleMapper;
     }
 
@@ -88,26 +99,43 @@ class EsState implements State {
         }
     }
 
-    /**
-     * Store current state to ElasticSearch.
-     *
-     * @param tuples list of tuples for storing to ES.
-     *               Each tuple should have relevant fields (source, index, type, id) for EsState's tupleMapper to extract ES document.
-     */
-    public void updateState(List<TridentTuple> tuples) {
-        BulkRequestBuilder bulkRequest = client.prepareBulk();
+    private String buildRequest(List<TridentTuple> tuples) throws JsonProcessingException {
+        StringBuilder bulkRequest = new StringBuilder();
         for (TridentTuple tuple : tuples) {
             String source = tupleMapper.getSource(tuple);
             String index = tupleMapper.getIndex(tuple);
             String type = tupleMapper.getType(tuple);
             String id = tupleMapper.getId(tuple);
 
-            bulkRequest.add(client.prepareIndex(index, type, id).setSource(source));
+            IndexDoc indexDoc = new IndexDoc(new Index(index, type, id));
+            SourceDoc sourceDoc = new SourceDoc(source);
+            bulkRequest.append(objectMapper.writeValueAsString(indexDoc)).append('\n');
+            bulkRequest.append(objectMapper.writeValueAsString(sourceDoc)).append('\n');
+
         }
-        BulkResponse bulkResponse = bulkRequest.execute().actionGet();
-        if (bulkResponse.hasFailures()) {
-            LOG.warn("failed processing bulk index requests " + bulkResponse.buildFailureMessage());
-            throw new FailedException();
+        return bulkRequest.toString();
+    }
+
+    /**
+     * Store current state to ElasticSearch.
+     *
+     * @param tuples list of tuples for storing to ES.
+     *               Each tuple should have relevant fields (source, index, type, id) for EsState's tupleMapper to extract ES document.
+     * @throws IOException
+     * @throws UnsupportedEncodingException
+     */
+    public void updateState(List<TridentTuple> tuples) {
+        try {
+            String bulkRequest = buildRequest(tuples);
+            Response response = client.performRequest("post", "_bulk", new HashMap<>(), new StringEntity(bulkRequest.toString()));
+            BulkIndexResponse bulkResponse = objectMapper.readValue(response.getEntity().getContent(), BulkIndexResponse.class);
+            if (bulkResponse.hasErrors()) {
+                LOG.warn("failed processing bulk index requests: " + bulkResponse.getFirstError() + ": " + bulkResponse.getFirstResult());
+                throw new FailedException();
+            }
+        } catch (IOException e) {
+            LOG.warn("failed processing bulk index requests: " + e.toString());
+            throw new FailedException(e);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
index 5ae174f..84e3cea 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
@@ -17,17 +17,16 @@
  */
 package org.apache.storm.elasticsearch.trident;
 
-import org.apache.storm.task.IMetricsContext;
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
 import org.apache.storm.elasticsearch.common.EsConfig;
 import org.apache.storm.elasticsearch.common.EsTupleMapper;
-
+import org.apache.storm.task.IMetricsContext;
 import org.apache.storm.trident.state.State;
 import org.apache.storm.trident.state.StateFactory;
 
-import java.util.Map;
-
-import static org.elasticsearch.common.base.Preconditions.checkNotNull;
-
 /**
  * StateFactory for providing EsState.
  * @since 0.11
@@ -42,8 +41,8 @@ public class EsStateFactory implements StateFactory {
      * @param tupleMapper Tuple to ES document mapper {@link EsTupleMapper}
      */
     public EsStateFactory(EsConfig esConfig, EsTupleMapper tupleMapper) {
-        this.esConfig = checkNotNull(esConfig);
-        this.tupleMapper = checkNotNull(tupleMapper);
+        this.esConfig = requireNonNull(esConfig);
+        this.tupleMapper = requireNonNull(tupleMapper);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java
index 87ffefa..559dedf 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java
@@ -17,28 +17,16 @@
  */
 package org.apache.storm.elasticsearch.bolt;
 
+import org.apache.storm.elasticsearch.common.EsTestUtil;
 import org.apache.storm.testing.IntegrationTest;
-import org.apache.commons.io.FileUtils;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.cluster.ClusterName;
-import org.elasticsearch.cluster.metadata.IndexMetaData;
-import org.elasticsearch.common.Priority;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.node.Node;
-import org.elasticsearch.node.NodeBuilder;
+import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.experimental.categories.Category;
 
-import java.io.File;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
-
 @Category(IntegrationTest.class)
 public abstract class AbstractEsBoltIntegrationTest<Bolt extends AbstractEsBolt> extends AbstractEsBoltTest<Bolt> {
 
@@ -46,49 +34,23 @@ public abstract class AbstractEsBoltIntegrationTest<Bolt extends AbstractEsBolt>
 
     @BeforeClass
     public static void startElasticSearchNode() throws Exception {
-        node = NodeBuilder.nodeBuilder().data(true).settings(createSettings()).build();
-        node.start();
-        ensureEsGreen(node);
-        ClusterHealthResponse clusterHealth = node.client()
-                                                  .admin()
-                                                  .cluster()
-                                                  .health(Requests.clusterHealthRequest()
-                                                                  .timeout(TimeValue.timeValueSeconds(30))
-                                                                  .waitForGreenStatus()
-                                                                  .waitForRelocatingShards(0))
-                                                  .actionGet();
-        Thread.sleep(1000);
-    }
-
-    private static ImmutableSettings.Builder createSettings() {
-        return ImmutableSettings.builder()
-                                .put(ClusterName.SETTING, "test-cluster")
-                                .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
-                                .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
-                                .put(EsExecutors.PROCESSORS, 1)
-                                .put("http.enabled", false)
-                                .put("index.percolator.map_unmapped_fields_as_string", true)
-                                .put("index.store.type", "memory");
+        node = EsTestUtil.startEsNode();
+        EsTestUtil.ensureEsGreen(node);
     }
 
     @AfterClass
     public static void closeElasticSearchNode() throws Exception {
-        node.stop();
-        node.close();
-        FileUtils.deleteDirectory(new File("./data"));
+        EsTestUtil.stopEsNode(node);
     }
 
-    private static void ensureEsGreen(Node node) {
-        ClusterHealthResponse chr = node.client()
-                                        .admin()
-                                        .cluster()
-                                        .health(Requests.clusterHealthRequest()
-                                                        .timeout(TimeValue.timeValueSeconds(30))
-                                                        .waitForGreenStatus()
-                                                        .waitForEvents(Priority.LANGUID)
-                                                        .waitForRelocatingShards(0))
-                                        .actionGet();
-        assertThat("cluster status is green", chr.getStatus(), equalTo(ClusterHealthStatus.GREEN));
+    @Before
+    public void createIndex() {
+        node.client().admin().indices().create(new CreateIndexRequest(index)).actionGet();
     }
 
+    @After
+    public void clearIndex() throws Exception {
+        EsTestUtil.clearIndex(node, index);
+        EsTestUtil.clearIndex(node, "missing");
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
index fb9739c..d6ef979 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
@@ -19,6 +19,9 @@ package org.apache.storm.elasticsearch.bolt;
 
 import com.google.common.testing.NullPointerTester;
 
+import java.lang.reflect.Method;
+import java.util.UUID;
+
 import org.apache.storm.Config;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.elasticsearch.common.EsConfig;
@@ -33,6 +36,10 @@ import org.mockito.runners.MockitoJUnitRunner;
 public abstract class AbstractEsBoltTest<Bolt extends AbstractEsBolt> {
 
     protected static Config config = new Config();
+    protected static final String documentId = UUID.randomUUID().toString();
+    protected static final String index = "index";
+    protected static final String source = "{\"user\":\"user1\"}";
+    protected static final String type = "type";
 
     @Mock
     protected OutputCollector outputCollector;
@@ -48,7 +55,7 @@ public abstract class AbstractEsBoltTest<Bolt extends AbstractEsBolt> {
     protected abstract Bolt createBolt(EsConfig esConfig);
 
     protected EsConfig esConfig() {
-        return new EsConfig("test-cluster", new String[] {"127.0.0.1:9300"});
+        return new EsConfig();
     }
 
     @After
@@ -61,5 +68,11 @@ public abstract class AbstractEsBoltTest<Bolt extends AbstractEsBolt> {
         new NullPointerTester().setDefault(EsConfig.class, esConfig()).testAllPublicConstructors(getBoltClass());
     }
 
+    @Test
+    public void getEndpointThrowsOnNull() throws Exception {
+        Method getEndpointMethod = AbstractEsBolt.class.getDeclaredMethod("getEndpoint", String.class, String.class, String.class);
+        new NullPointerTester().setDefault(String.class, "test").testMethodParameter(null, getEndpointMethod, 0);
+    }
+
     protected abstract Class<Bolt> getBoltClass();
 }