You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/08/06 00:58:46 UTC

[01/11] storm git commit: STORM-845 Storm ElasticSearch connector

Repository: storm
Updated Branches:
  refs/heads/master 827ddbd8c -> 124e8468a


STORM-845 Storm ElasticSearch connector


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/acaab9c3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/acaab9c3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/acaab9c3

Branch: refs/heads/master
Commit: acaab9c3d6c13952a722480bc64393b5b2dc61bc
Parents: 0c00544
Author: SEUNGJIN LEE <sw...@navercorp.com>
Authored: Thu Jul 23 14:20:14 2015 +0900
Committer: SEUNGJIN LEE <sw...@navercorp.com>
Committed: Thu Jul 23 14:32:17 2015 +0900

----------------------------------------------------------------------
 external/storm-elasticsearch/README.md          |  4 ++--
 .../storm/elasticsearch/bolt/EsIndexBolt.java   | 14 ++++++++++--
 .../elasticsearch/bolt/EsPercolateBolt.java     | 11 ++++++++-
 .../storm/elasticsearch/common/EsConfig.java    |  7 +++++-
 .../storm/elasticsearch/trident/EsState.java    | 24 ++++++++++++++++++--
 .../elasticsearch/trident/EsStateFactory.java   |  4 ++++
 .../elasticsearch/bolt/EsIndexBoltTest.java     | 23 +++++++++++++++++--
 .../elasticsearch/bolt/EsIndexTopology.java     |  4 ++--
 .../elasticsearch/bolt/EsPercolateBoltTest.java | 12 ++++++++--
 .../storm/elasticsearch/common/EsTestUtil.java  |  6 ++---
 .../trident/TridentEsTopology.java              | 15 +++++-------
 11 files changed, 98 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/acaab9c3/external/storm-elasticsearch/README.md
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/README.md b/external/storm-elasticsearch/README.md
index 3fa1592..9e1bbb8 100644
--- a/external/storm-elasticsearch/README.md
+++ b/external/storm-elasticsearch/README.md
@@ -6,7 +6,7 @@
 ## EsIndexBolt (org.apache.storm.elasticsearch.bolt.EsIndexBolt)
 
 EsIndexBolt streams tuples directly into Elasticsearch. Tuples are indexed in specified index & type combination. 
-User should make sure that there are "index","type", and "source" fields declared in preceding bolts or spout.
+User should make sure that there are "source", "index","type", and "id" fields declared in preceding bolts or spout.
 "index" and "type" fields are used for identifying target index and type.
 "source" is a document in JSON format string that will be indexed in elastic search.
 
@@ -20,7 +20,7 @@ EsIndexBolt indexBolt = new IndexBolt(esConfig);
 ## EsPercolateBolt (org.apache.storm.elasticsearch.bolt.EsPercolateBolt)
 
 EsPercolateBolt streams tuples directly into Elasticsearch. Tuples are used to send percolate request to specified index & type combination. 
-User should make sure that there are "index","type", and "source" fields declared in preceding bolts or spout.
+User should make sure that there are "source", "index", and "type" fields declared in preceding bolts or spout.
 "index" and "type" fields are used for identifying target index and type.
 "source" is a document in JSON format string that will be sent in percolate request to elastic search.
 

http://git-wip-us.apache.org/repos/asf/storm/blob/acaab9c3/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
index 9d7522c..c1d7daa 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
@@ -30,6 +30,10 @@ import java.util.Map;
 public class EsIndexBolt extends AbstractEsBolt {
     private static final Logger LOG = LoggerFactory.getLogger(EsIndexBolt.class);
 
+    /**
+     * EsIndexBolt constructor
+     * @param esConfig Elasticsearch configuration containing node addresses and cluster name {@link EsConfig}
+     */
     public EsIndexBolt(EsConfig esConfig) {
         super(esConfig);
     }
@@ -39,13 +43,19 @@ public class EsIndexBolt extends AbstractEsBolt {
         super.prepare(map, topologyContext, outputCollector);
     }
 
+    /**
+     * Executes index request for given tuple.
+     * @param tuple should contain string values of 4 declared fields: "source", "index", "type", "id"
+     */
     @Override
     public void execute(Tuple tuple) {
         try {
+            String source = tuple.getStringByField("source");
             String index = tuple.getStringByField("index");
             String type = tuple.getStringByField("type");
-            String source = tuple.getStringByField("source");
-            client.prepareIndex(index, type).setSource(source).execute().actionGet();
+            String id = tuple.getStringByField("id");
+
+            client.prepareIndex(index, type, id).setSource(source).execute().actionGet();
             collector.ack(tuple);
         } catch (Exception e) {
             e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/storm/blob/acaab9c3/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
index 3142fc1..7ee6835 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
@@ -34,6 +34,10 @@ import java.util.Map;
 public class EsPercolateBolt extends AbstractEsBolt {
     private static final Logger LOG = LoggerFactory.getLogger(EsPercolateBolt.class);
 
+    /**
+     * EsPercolateBolt constructor
+     * @param esConfig Elasticsearch configuration containing node addresses and cluster name {@link EsConfig}
+     */
     public EsPercolateBolt(EsConfig esConfig) {
         super(esConfig);
     }
@@ -43,12 +47,17 @@ public class EsPercolateBolt extends AbstractEsBolt {
         super.prepare(map, topologyContext, outputCollector);
     }
 
+    /**
+     * Executes percolate request for given tuple.
+     * @param tuple should contain string values of 3 declared fields: "source", "index", "type"
+     */
     @Override
     public void execute(Tuple tuple) {
         try {
+            String source = tuple.getStringByField("source");
             String index = tuple.getStringByField("index");
             String type = tuple.getStringByField("type");
-            String source = tuple.getStringByField("source");
+
             PercolateResponse response = client.preparePercolate().setIndices(index).setDocumentType(type)
                     .setPercolateDoc(PercolateSourceBuilder.docBuilder().setDoc(source)).execute().actionGet();
             if (response.getCount() > 0) {

http://git-wip-us.apache.org/repos/asf/storm/blob/acaab9c3/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
index c97d77f..0b57788 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
@@ -26,7 +26,12 @@ public class EsConfig implements Serializable{
     public EsConfig() {
     }
 
-    public EsConfig(String clusterName, String[] nodes, int port) {
+    /**
+     * EsConfig Constructor to be used in EsIndexBolt, EsPercolateBolt and EsStateFactory
+     * @param clusterName Elasticsearch cluster name
+     * @param nodes Elasticsearch addresses in host:port pattern string array
+     */
+    public EsConfig(String clusterName, String[] nodes) {
         this.clusterName = clusterName;
         this.nodes = nodes;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/acaab9c3/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
index 58de6cc..e804084 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
@@ -42,15 +42,33 @@ public class EsState implements State {
     private static Client client;
     private EsConfig esConfig;
 
+    /**
+     * EsState constructor
+     * @param esConfig Elasticsearch configuration containing node addresses and cluster name {@link EsConfig}
+     */
     public EsState(EsConfig esConfig) {
         this.esConfig = esConfig;
     }
 
+    /**
+     * @param txid
+     *
+     * Elasticsearch index requests with same id will result in update operation
+     * which means if same tuple replays, only one record will be stored in elasticsearch for same document
+     * without control with txid
+     */
     @Override
     public void beginCommit(Long txid) {
 
     }
 
+    /**
+     * @param txid
+     *
+     * Elasticsearch index requests with same id will result in update operation
+     * which means if same tuple replays, only one record will be stored in elasticsearch for same document
+     * without control with txid
+     */
     @Override
     public void commit(Long txid) {
 
@@ -83,10 +101,12 @@ public class EsState implements State {
     public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
         BulkRequestBuilder bulkRequest = client.prepareBulk();
         for (TridentTuple tuple : tuples) {
+            String source = tuple.getStringByField("source");
             String index = tuple.getStringByField("index");
             String type = tuple.getStringByField("type");
-            String source = tuple.getStringByField("source");
-            bulkRequest.add(client.prepareIndex(index, type).setSource(source));
+            String id = tuple.getStringByField("id");
+
+            bulkRequest.add(client.prepareIndex(index, type, id).setSource(source));
         }
         BulkResponse bulkResponse = bulkRequest.execute().actionGet();
         if (bulkResponse.hasFailures()) {

http://git-wip-us.apache.org/repos/asf/storm/blob/acaab9c3/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
index d7f4330..b1eaa04 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
@@ -34,6 +34,10 @@ public class EsStateFactory implements StateFactory {
 
     }
 
+    /**
+     * EsStateFactory constructor
+     * @param esConfig Elasticsearch configuration containing node addresses and cluster name {@link EsConfig}
+     */
     public EsStateFactory(EsConfig esConfig){
         this.esConfig = esConfig;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/acaab9c3/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
index 28b8bf7..dd4b088 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
@@ -20,6 +20,12 @@ package org.apache.storm.elasticsearch.bolt;
 import backtype.storm.tuple.Tuple;
 import org.apache.storm.elasticsearch.common.EsConfig;
 import org.apache.storm.elasticsearch.common.EsTestUtil;
+import org.elasticsearch.action.count.CountRequest;
+import org.elasticsearch.action.count.CountRequestBuilder;
+import org.elasticsearch.action.count.CountResponse;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.TermQueryBuilder;
+import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,14 +42,27 @@ public class EsIndexBoltTest extends AbstractEsBoltTest{
         EsConfig esConfig = new EsConfig();
         esConfig.setClusterName("test-cluster");
         esConfig.setNodes(new String[]{"127.0.0.1:9300"});
+
         bolt = new EsIndexBolt(esConfig);
         bolt.prepare(config, null, collector);
+
+        String source = "{\"user\":\"user1\"}";
         String index = "index1";
         String type = "type1";
-        String source = "{\"user\":\"user1\"}";
-        Tuple tuple = EsTestUtil.generateTestTuple(index, type, source);
+        String id = "docId";
+        Tuple tuple = EsTestUtil.generateTestTuple(source, index, type, id);
+
         bolt.execute(tuple);
+
         verify(collector).ack(tuple);
+
+        node.client().admin().indices().prepareRefresh(index).execute().actionGet();
+        CountResponse resp = node.client().prepareCount(index)
+                .setQuery(new TermQueryBuilder("_type", type))
+                .execute().actionGet();
+
+        Assert.assertEquals(1, resp.getCount());
+
         bolt.cleanup();
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/acaab9c3/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
index f5e868a..fc9c178 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
@@ -80,7 +80,7 @@ public class EsIndexTopology {
         private String typeName = "type1";
 
         public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("index", "type", "source"));
+            declarer.declare(new Fields("source", "index", "type", "id"));
         }
 
         public void open(Map config, TopologyContext context,
@@ -91,8 +91,8 @@ public class EsIndexTopology {
 
         public void nextTuple() {
             String source = sources[index];
-            Values values = new Values(indexName, typeName, source);
             UUID msgId = UUID.randomUUID();
+            Values values = new Values(source, indexName, typeName, msgId);
             this.pending.put(msgId, values);
             this.collector.emit(values, msgId);
             index++;

http://git-wip-us.apache.org/repos/asf/storm/blob/acaab9c3/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
index 4520389..ea0504b 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
@@ -21,6 +21,9 @@ import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
 import org.apache.storm.elasticsearch.common.EsConfig;
 import org.apache.storm.elasticsearch.common.EsTestUtil;
+import org.elasticsearch.action.count.CountResponse;
+import org.elasticsearch.index.query.TermQueryBuilder;
+import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,17 +42,22 @@ public class EsPercolateBoltTest extends AbstractEsBoltTest {
         esConfig.setNodes(new String[]{"localhost:9300"});
         bolt = new EsPercolateBolt(esConfig);
         bolt.prepare(config, null, collector);
+
+        String source = "{\"user\":\"user1\"}";
         String index = "index1";
         String type = ".percolator";
-        String source = "{\"user\":\"user1\"}";
+
         node.client().prepareIndex("index1",".percolator")
                 .setId("1")
                 .setSource("{\"query\":{\"match\":{\"user\":\"user1\"}}}").
                 execute().actionGet();
-        Tuple tuple = EsTestUtil.generateTestTuple(index, type, source);
+        Tuple tuple = EsTestUtil.generateTestTuple(source, index, type, null);
+
         bolt.execute(tuple);
+
         verify(collector).ack(tuple);
         verify(collector).emit(new Values("1"));
+
         bolt.cleanup();
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/acaab9c3/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
index 2c0026d..3b20383 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
@@ -34,16 +34,16 @@ import org.elasticsearch.node.NodeBuilder;
 import java.util.HashMap;
 
 public class EsTestUtil {
-    public static Tuple generateTestTuple(String index, String type, String source) {
+    public static Tuple generateTestTuple(String source, String index, String type, String id) {
         TopologyBuilder builder = new TopologyBuilder();
         GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(),
                 new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
             @Override
             public Fields getComponentOutputFields(String componentId, String streamId) {
-                return new Fields("index", "type", "source");
+                return new Fields("source", "index", "type", "id");
             }
         };
-        return new TupleImpl(topologyContext, new Values(index, type, source), 1, "");
+        return new TupleImpl(topologyContext, new Values(source, index, type, id), 1, "");
     }
 
     public static Node startEsNode(){

http://git-wip-us.apache.org/repos/asf/storm/blob/acaab9c3/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
index aed06f6..2c951f8 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
@@ -32,10 +32,7 @@ import storm.trident.operation.TridentCollector;
 import storm.trident.spout.IBatchSpout;
 import storm.trident.state.StateFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 public class TridentEsTopology {
 
@@ -72,10 +69,10 @@ public class TridentEsTopology {
         int maxBatchSize;
         HashMap<Long, List<List<Object>>> batches = new HashMap<Long, List<List<Object>>>();
         private Values[] outputs = {
-                new Values("index1", "type1", "{\"user\":\"user1\"}"),
-                new Values("index1", "type2", "{\"user\":\"user2\"}"),
-                new Values("index2", "type1", "{\"user\":\"user3\"}"),
-                new Values("index2", "type2", "{\"user\":\"user4\"}")
+                new Values("{\"user\":\"user1\"}", "index1", "type1", UUID.randomUUID().toString()),
+                new Values("{\"user\":\"user2\"}", "index1", "type2", UUID.randomUUID().toString()),
+                new Values("{\"user\":\"user3\"}", "index2", "type1", UUID.randomUUID().toString()),
+                new Values("{\"user\":\"user4\"}", "index2", "type2", UUID.randomUUID().toString())
         };
         private int index = 0;
         boolean cycle = false;
@@ -90,7 +87,7 @@ public class TridentEsTopology {
 
         @Override
         public Fields getOutputFields() {
-            return new Fields("index", "type", "source");
+            return new Fields("source", "index", "type", "id");
         }
 
         @Override


[09/11] storm git commit: STORM-845 Storm ElasticSearch connector

Posted by ka...@apache.org.
STORM-845 Storm ElasticSearch connector


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/aab2c17b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/aab2c17b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/aab2c17b

Branch: refs/heads/master
Commit: aab2c17bb5541ae75493b27b77f99b4e14dc2c0a
Parents: 2ad6dd6
Author: SEUNGJIN LEE <sw...@navercorp.com>
Authored: Mon Jul 27 17:31:41 2015 +0900
Committer: SEUNGJIN LEE <sw...@navercorp.com>
Committed: Mon Jul 27 17:31:41 2015 +0900

----------------------------------------------------------------------
 external/storm-elasticsearch/README.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/aab2c17b/external/storm-elasticsearch/README.md
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/README.md b/external/storm-elasticsearch/README.md
index a54b62c..4fa1098 100644
--- a/external/storm-elasticsearch/README.md
+++ b/external/storm-elasticsearch/README.md
@@ -68,5 +68,5 @@ Elasticsearch Trident state also follows similar pattern to EsBolts. It takes in
   
 ## Committer Sponsors
 
- * Robert Evans ([@revans2](https://github.com/revans2))
+ * Sriharsha Chintalapani ([@harshach](https://github.com/harshach))
  * Jungtaek Lim ([@HeartSaVioR](https://github.com/HeartSaVioR))
\ No newline at end of file


[02/11] storm git commit: STORM-845 Storm ElasticSearch connector

Posted by ka...@apache.org.
STORM-845 Storm ElasticSearch connector


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0c00544b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0c00544b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0c00544b

Branch: refs/heads/master
Commit: 0c00544ba72339c18d60dffd5e27e5f982ddb649
Parents: 2002dbd
Author: SEUNGJIN LEE <sw...@navercorp.com>
Authored: Tue Jun 23 18:35:44 2015 +0900
Committer: SEUNGJIN LEE <sw...@navercorp.com>
Committed: Thu Jul 23 14:32:17 2015 +0900

----------------------------------------------------------------------
 external/storm-elasticsearch/README.md                | 14 +++++++-------
 external/storm-elasticsearch/pom.xml                  |  2 +-
 .../storm/elasticsearch/bolt/AbstractEsBolt.java      |  6 +++---
 .../apache/storm/elasticsearch/trident/EsState.java   |  6 +++---
 4 files changed, 14 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0c00544b/external/storm-elasticsearch/README.md
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/README.md b/external/storm-elasticsearch/README.md
index 20caece..3fa1592 100644
--- a/external/storm-elasticsearch/README.md
+++ b/external/storm-elasticsearch/README.md
@@ -1,11 +1,11 @@
-# Storm ElasticSearch Bolt & Trident State
+# Storm Elasticsearch Bolt & Trident State
 
-  EsIndexBolt, EsPercolateBolt and EsState allows users to stream data from storm into ElasticSearch directly.
+  EsIndexBolt, EsPercolateBolt and EsState allows users to stream data from storm into Elasticsearch directly.
   For detailed description, please refer to the following.   
 
 ## EsIndexBolt (org.apache.storm.elasticsearch.bolt.EsIndexBolt)
 
-EsIndexBolt streams tuples directly into ElasticSearch. Tuples are indexed in specified index & type combination. 
+EsIndexBolt streams tuples directly into Elasticsearch. Tuples are indexed in specified index & type combination. 
 User should make sure that there are "index","type", and "source" fields declared in preceding bolts or spout.
 "index" and "type" fields are used for identifying target index and type.
 "source" is a document in JSON format string that will be indexed in elastic search.
@@ -19,7 +19,7 @@ EsIndexBolt indexBolt = new IndexBolt(esConfig);
 
 ## EsPercolateBolt (org.apache.storm.elasticsearch.bolt.EsPercolateBolt)
 
-EsPercolateBolt streams tuples directly into ElasticSearch. Tuples are used to send percolate request to specified index & type combination. 
+EsPercolateBolt streams tuples directly into Elasticsearch. Tuples are used to send percolate request to specified index & type combination. 
 User should make sure that there are "index","type", and "source" fields declared in preceding bolts or spout.
 "index" and "type" fields are used for identifying target index and type.
 "source" is a document in JSON format string that will be sent in percolate request to elastic search.
@@ -45,14 +45,14 @@ EsConfig params
 
 |Arg  |Description | Type
 |---	|--- |---
-|clusterName | ElasticSearch cluster name | String (required) |
-|nodes | ElasticSearch nodes in a String array, each element should follow {host}:{port} pattern | String array (required) |
+|clusterName | Elasticsearch cluster name | String (required) |
+|nodes | Elasticsearch nodes in a String array, each element should follow {host}:{port} pattern | String array (required) |
 
 
  
 ## EsState (org.apache.storm.elasticsearch.trident.EsState)
 
-ElasticSearch Trident state also follows similar pattern to EsBolts. It takes in EsConfig as an arg.
+Elasticsearch Trident state also follows similar pattern to EsBolts. It takes in EsConfig as an arg.
 
 ```code
    EsConfig esConfig = new EsConfig();

http://git-wip-us.apache.org/repos/asf/storm/blob/0c00544b/external/storm-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/pom.xml b/external/storm-elasticsearch/pom.xml
index b1b069b..88ea312 100644
--- a/external/storm-elasticsearch/pom.xml
+++ b/external/storm-elasticsearch/pom.xml
@@ -36,7 +36,7 @@
     </developers>
 
     <properties>
-        <elasticsearch.version>1.5.2</elasticsearch.version>
+        <elasticsearch.version>1.6.0</elasticsearch.version>
     </properties>
 
     <dependencies>

http://git-wip-us.apache.org/repos/asf/storm/blob/0c00544b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
index 1e2d1ed..ff1b543 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
@@ -37,8 +37,8 @@ import java.util.Map;
 
 public abstract class AbstractEsBolt extends BaseRichBolt {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractEsBolt.class);
-    protected OutputCollector collector;
     protected static Client client;
+    protected OutputCollector collector;
     private EsConfig esConfig;
 
     public AbstractEsBolt(EsConfig esConfig) {
@@ -53,12 +53,12 @@ public abstract class AbstractEsBolt extends BaseRichBolt {
                 if (client == null) {
                     Settings settings =
                             ImmutableSettings.settingsBuilder().put("cluster.name", esConfig.getClusterName())
-                                    .put("client.transport.sniff", "false").build();
+                                    .put("client.transport.sniff", "true").build();
                     List<InetSocketTransportAddress> transportAddressList = new ArrayList<InetSocketTransportAddress>();
                     for (String node : esConfig.getNodes()) {
                         String[] hostAndPort = node.split(":");
                         if(hostAndPort.length != 2){
-                            throw new Exception("incorrect ElasticSearch node format, should follow {host}:{port} pattern");
+                            throw new IllegalArgumentException("incorrect Elasticsearch node format, should follow {host}:{port} pattern");
                         }
                         transportAddressList.add(new InetSocketTransportAddress(hostAndPort[0], Integer.parseInt(hostAndPort[1])));
                     }

http://git-wip-us.apache.org/repos/asf/storm/blob/0c00544b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
index ee95355..58de6cc 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
@@ -38,9 +38,9 @@ import java.util.List;
 import java.util.Map;
 
 public class EsState implements State {
-    private EsConfig esConfig;
-    private static Client client;
     private static final Logger LOG = LoggerFactory.getLogger(EsState.class);
+    private static Client client;
+    private EsConfig esConfig;
 
     public EsState(EsConfig esConfig) {
         this.esConfig = esConfig;
@@ -67,7 +67,7 @@ public class EsState implements State {
                     for (String node : esConfig.getNodes()) {
                         String[] hostAndPort = node.split(":");
                         if (hostAndPort.length != 2) {
-                            throw new Exception("incorrect ElasticSearch node format, should follow {host}:{port} pattern");
+                            throw new IllegalArgumentException("incorrect Elasticsearch node format, should follow {host}:{port} pattern");
                         }
                         transportAddressList.add(new InetSocketTransportAddress(hostAndPort[0], Integer.parseInt(hostAndPort[1])));
                     }


[03/11] storm git commit: STORM-845 Storm ElasticSearch connector

Posted by ka...@apache.org.
STORM-845 Storm ElasticSearch connector


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6a446cea
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6a446cea
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6a446cea

Branch: refs/heads/master
Commit: 6a446ceae184c0fe13cc993c7966ee73bb63394f
Parents: 9b2fd72
Author: SEUNGJIN LEE <sw...@navercorp.com>
Authored: Wed Jun 3 17:33:57 2015 +0900
Committer: SEUNGJIN LEE <sw...@navercorp.com>
Committed: Thu Jul 23 14:32:17 2015 +0900

----------------------------------------------------------------------
 README.markdown                                 |   1 +
 external/storm-elasticsearch/README.md          |  71 ++++++++++
 external/storm-elasticsearch/pom.xml            |  95 +++++++++++++
 .../elasticsearch/bolt/AbstractEsBolt.java      |  78 +++++++++++
 .../storm/elasticsearch/bolt/EsIndexBolt.java   |  61 ++++++++
 .../elasticsearch/bolt/EsPercolateBolt.java     |  72 ++++++++++
 .../storm/elasticsearch/common/EsConfig.java    |  59 ++++++++
 .../storm/elasticsearch/trident/EsState.java    |  90 ++++++++++++
 .../elasticsearch/trident/EsStateFactory.java   |  47 +++++++
 .../storm/elasticsearch/trident/EsUpdater.java  |  31 +++++
 .../elasticsearch/bolt/AbstractEsBoltTest.java  |  78 +++++++++++
 .../elasticsearch/bolt/EsIndexBoltTest.java     |  50 +++++++
 .../elasticsearch/bolt/EsIndexTopology.java     | 121 ++++++++++++++++
 .../elasticsearch/bolt/EsPercolateBoltTest.java |  56 ++++++++
 .../storm/elasticsearch/common/EsConstants.java |  22 +++
 .../storm/elasticsearch/common/EsTestUtil.java  |  70 ++++++++++
 .../trident/TridentEsTopology.java              | 139 +++++++++++++++++++
 pom.xml                                         |   1 +
 storm-dist/binary/src/main/assembly/binary.xml  |  14 ++
 19 files changed, 1156 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/6a446cea/README.markdown
----------------------------------------------------------------------
diff --git a/README.markdown b/README.markdown
index 502e89f..4d758c7 100644
--- a/README.markdown
+++ b/README.markdown
@@ -210,6 +210,7 @@ under the License.
 * Charles Chan ([@charleswhchan](https://github.com/charleswhchan))
 * Chuanlei Ni ([@chuanlei](https://github.com/chuanlei))
 * Xingyu Su ([@errordaiwa](https://github.com/errordaiwa))
+* Adrian Seungjin Lee ([@sweetest](https://github.com/sweetest))
 
 ## Acknowledgements
 

http://git-wip-us.apache.org/repos/asf/storm/blob/6a446cea/external/storm-elasticsearch/README.md
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/README.md b/external/storm-elasticsearch/README.md
new file mode 100644
index 0000000..562fd6d
--- /dev/null
+++ b/external/storm-elasticsearch/README.md
@@ -0,0 +1,71 @@
+# Storm ElasticSearch Bolt & Trident State
+
+  EsIndexBolt, EsPercolateBolt and EsState allows users to stream data from storm into ElasticSearch directly.
+  For detailed description, please refer to the following.   
+
+## EsIndexBolt (org.apache.storm.elasticsearch.bolt.EsIndexBolt)
+
+EsIndexBolt streams tuples directly into ElasticSearch. Tuples are indexed in specified index & type combination. 
+User should make sure that there are "index","type", and "source" fields declared in preceding bolts or spout.
+"index" and "type" fields are used for identifying target index and type.
+"source" is a document in JSON format string that will be indexed in elastic search.
+
+```java
+EsConfig esConfig = new EsConfig();
+esConfig.setClusterName(clusterName);
+esConfig.setHost(new String[]{"localhost"});
+esConfig.setPort(9300);
+EsIndexBolt indexBolt = new IndexBolt(esConfig);
+```
+
+## EsPercolateBolt (org.apache.storm.elasticsearch.bolt.EsPercolateBolt)
+
+EsPercolateBolt streams tuples directly into ElasticSearch. Tuples are used to send percolate request to specified index & type combination. 
+User should make sure that there are "index","type", and "source" fields declared in preceding bolts or spout.
+"index" and "type" fields are used for identifying target index and type.
+"source" is a document in JSON format string that will be sent in percolate request to elastic search.
+
+```java
+EsConfig esConfig = new EsConfig();
+esConfig.setClusterName(clusterName);
+esConfig.setHost(new String[]{"localhost"});
+esConfig.setPort(9300);
+EsPercolateBolt percolateBolt = new EsPercolateBolt(esConfig);
+```
+
+### EsConfig (org.apache.storm.elasticsearch.common.EsConfig)
+  
+Two bolts above takes in EsConfig as a constructor arg.
+
+  ```java
+   EsConfig esConfig = new EsConfig();
+   esConfig.setClusterName(clusterName);
+   esConfig.setHost(new String[]{"localhost"});
+   esConfig.setPort(9300);
+  ```
+
+EsConfig params
+
+|Arg  |Description | Type
+|---	|--- |---
+|clusterName | ElasticSearch cluster name | String (required) |
+|host | ElasticSearch host | String array (required) |
+|port | ElasticSearch port | int (required) |
+
+
+ 
+## EsState (org.apache.storm.elasticsearch.trident.EsState)
+
+ElasticSearch Trident state also follows similar pattern to EsBolts. It takes in EsConfig as an arg.
+
+```code
+   EsConfig esConfig = new EsConfig();
+   esConfig.setClusterName(clusterName);
+   esConfig.setHost(new String[]{"localhost"});
+   esConfig.setPort(9300);
+                	     		
+   StateFactory factory = new EsStateFactory(esConfig);
+   TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields());
+ ```
+  
+## Committer Sponsors

http://git-wip-us.apache.org/repos/asf/storm/blob/6a446cea/external/storm-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/pom.xml b/external/storm-elasticsearch/pom.xml
new file mode 100644
index 0000000..8e5db5d
--- /dev/null
+++ b/external/storm-elasticsearch/pom.xml
@@ -0,0 +1,95 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>0.10.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>storm-elasticsearch</artifactId>
+
+    <developers>
+        <developer>
+            <id>sweetest</id>
+            <name>Adrian Seungjin Lee</name>
+            <email>sweetest.sj@navercorp.com</email>
+        </developer>
+    </developers>
+
+    <properties>
+        <elasticsearch.version>1.5.2</elasticsearch.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>0.9.4</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.elasticsearch</groupId>
+            <artifactId>elasticsearch</artifactId>
+            <version>1.5.0</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.11</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>1.9.0</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-clean-plugin</artifactId>
+                <version>2.5</version>
+                <executions>
+                    <execution>
+                        <id>cleanup</id>
+                        <phase>test-compile</phase>
+                        <goals>
+                            <goal>clean</goal>
+                        </goals>
+                        <configuration>
+                            <excludeDefaultDirectories>true</excludeDefaultDirectories>
+                            <filesets>
+                                <fileset>
+                                    <directory>./data/</directory>
+                                </fileset>
+                            </filesets>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/6a446cea/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
new file mode 100644
index 0000000..cd7fc81
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+import org.apache.storm.elasticsearch.common.EsConfig;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public abstract class AbstractEsBolt extends BaseRichBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractEsBolt.class);
+    protected OutputCollector collector;
+    protected static Client client;
+    private EsConfig esConfig;
+
+    public AbstractEsBolt(EsConfig esConfig) {
+        this.esConfig = esConfig;
+    }
+
+    @Override
+    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
+        System.out.println(this.getClass().getName());
+        try {
+            this.collector = outputCollector;
+            synchronized (AbstractEsBolt.class) {
+                if (client == null) {
+                    Settings settings =
+                            ImmutableSettings.settingsBuilder().put("cluster.name", esConfig.getClusterName())
+                                    .put("client.transport.sniff", "false").build();
+                    List<InetSocketTransportAddress> transportAddressList = new ArrayList<InetSocketTransportAddress>();
+                    for (String host : esConfig.getHost()) {
+                        transportAddressList.add(new InetSocketTransportAddress(host, esConfig.getPort()));
+                    }
+                    client = new TransportClient(settings)
+                            .addTransportAddresses(transportAddressList.toArray(new InetSocketTransportAddress[transportAddressList.size()]));
+                }
+            }
+        } catch (Exception e) {
+            LOG.warn("unable to initialize EsBolt ", e);
+        }
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6a446cea/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
new file mode 100644
index 0000000..9d7522c
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import org.apache.storm.elasticsearch.common.EsConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class EsIndexBolt extends AbstractEsBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(EsIndexBolt.class);
+
+    public EsIndexBolt(EsConfig esConfig) {
+        super(esConfig);
+    }
+
+    @Override
+    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
+        super.prepare(map, topologyContext, outputCollector);
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+        try {
+            String index = tuple.getStringByField("index");
+            String type = tuple.getStringByField("type");
+            String source = tuple.getStringByField("source");
+            client.prepareIndex(index, type).setSource(source).execute().actionGet();
+            collector.ack(tuple);
+        } catch (Exception e) {
+            e.printStackTrace();
+            System.out.println(e);
+            collector.reportError(e);
+            collector.fail(tuple);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6a446cea/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
new file mode 100644
index 0000000..3142fc1
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import org.apache.storm.elasticsearch.common.EsConfig;
+import org.elasticsearch.action.percolate.PercolateResponse;
+import org.elasticsearch.action.percolate.PercolateSourceBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class EsPercolateBolt extends AbstractEsBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(EsPercolateBolt.class);
+
+    public EsPercolateBolt(EsConfig esConfig) {
+        super(esConfig);
+    }
+
+    @Override
+    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
+        super.prepare(map, topologyContext, outputCollector);
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+        try {
+            String index = tuple.getStringByField("index");
+            String type = tuple.getStringByField("type");
+            String source = tuple.getStringByField("source");
+            PercolateResponse response = client.preparePercolate().setIndices(index).setDocumentType(type)
+                    .setPercolateDoc(PercolateSourceBuilder.docBuilder().setDoc(source)).execute().actionGet();
+            if (response.getCount() > 0) {
+                for (PercolateResponse.Match match : response) {
+                    String id = match.getId().toString();
+                    collector.emit(new Values(id));
+                }
+            }
+            collector.ack(tuple);
+        } catch (Exception e) {
+            e.printStackTrace();
+            collector.reportError(e);
+            collector.fail(tuple);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+        outputFieldsDeclarer.declare(new Fields("id"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6a446cea/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
new file mode 100644
index 0000000..f2aa48f
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.common;
+
+import java.io.Serializable;
+
+public class EsConfig implements Serializable{
+    private String clusterName;
+    private String[] host;
+    private int port;
+
+    public EsConfig() {
+    }
+
+    public EsConfig(String clusterName, String[] host, int port) {
+        this.clusterName = clusterName;
+        this.host = host;
+        this.port = port;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+    public String[] getHost() {
+        return host;
+    }
+
+    public void setHost(String[] host) {
+        this.host = host;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public void setPort(int port) {
+        this.port = port;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6a446cea/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
new file mode 100644
index 0000000..e753119
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.trident;
+
+import backtype.storm.task.IMetricsContext;
+import backtype.storm.topology.FailedException;
+import org.apache.storm.elasticsearch.common.EsConfig;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.State;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class EsState implements State {
+    private EsConfig esConfig;
+    private static Client client;
+    private static final Logger LOG = LoggerFactory.getLogger(EsState.class);
+
+    public EsState(EsConfig esConfig) {
+        this.esConfig = esConfig;
+    }
+
+    @Override
+    public void beginCommit(Long txid) {
+
+    }
+
+    @Override
+    public void commit(Long txid) {
+
+    }
+
+    public void prepare(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+        synchronized (EsState.class) {
+            if (client == null) {
+                Settings settings =
+                        ImmutableSettings.settingsBuilder().put("cluster.name", esConfig.getClusterName())
+                                .put("client.transport.sniff", "true").build();
+                List<InetSocketTransportAddress> transportAddressList = new ArrayList<InetSocketTransportAddress>();
+                for (String host : esConfig.getHost()) {
+                    transportAddressList.add(new InetSocketTransportAddress(host, esConfig.getPort()));
+                }
+                client = new TransportClient(settings)
+                        .addTransportAddresses(transportAddressList.toArray(new InetSocketTransportAddress[transportAddressList.size()]));
+            }
+        }
+
+    }
+
+    public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
+        BulkRequestBuilder bulkRequest = client.prepareBulk();
+        for (TridentTuple tuple : tuples) {
+            String index = tuple.getStringByField("index");
+            String type = tuple.getStringByField("type");
+            String source = tuple.getStringByField("source");
+            bulkRequest.add(client.prepareIndex(index, type).setSource(source));
+        }
+        BulkResponse bulkResponse = bulkRequest.execute().actionGet();
+        if (bulkResponse.hasFailures()) {
+            LOG.warn("failed processing bulk index requests " + bulkResponse.buildFailureMessage());
+            throw new FailedException();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6a446cea/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
new file mode 100644
index 0000000..d7f4330
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.trident;
+
+import backtype.storm.task.IMetricsContext;
+import org.apache.storm.elasticsearch.common.EsConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.trident.state.State;
+import storm.trident.state.StateFactory;
+
+import java.util.Map;
+
+public class EsStateFactory implements StateFactory {
+    private static final Logger LOG = LoggerFactory.getLogger(EsStateFactory.class);
+    private EsConfig esConfig;
+
+    public EsStateFactory(){
+
+    }
+
+    public EsStateFactory(EsConfig esConfig){
+        this.esConfig = esConfig;
+    }
+
+    @Override
+    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+        EsState esState = new EsState(esConfig);
+        esState.prepare(conf, metrics, partitionIndex, numPartitions);
+        return esState;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6a446cea/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsUpdater.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsUpdater.java
new file mode 100644
index 0000000..6fa42f3
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsUpdater.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.trident;
+
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.BaseStateUpdater;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+
+public class EsUpdater extends BaseStateUpdater<EsState> {
+    @Override
+    public void updateState(EsState state, List<TridentTuple> tuples, TridentCollector collector) {
+        state.updateState(tuples, collector);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6a446cea/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
new file mode 100644
index 0000000..fdf7cd4
--- /dev/null
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.bolt;
+
+import backtype.storm.Config;
+import backtype.storm.task.OutputCollector;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.common.Priority;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.node.NodeBuilder;
+import org.junit.After;
+import org.junit.Before;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.mock;
+
+public class AbstractEsBoltTest {
+    protected Config config = new Config();
+    protected OutputCollector collector = mock(OutputCollector.class);
+    protected Node node;
+
+    @Before
+    public void setup() throws Exception {
+        System.out.println("setup");
+        node = NodeBuilder.nodeBuilder().data(true).settings(
+                ImmutableSettings.builder()
+                        .put(ClusterName.SETTING, "test-cluster")
+                        .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
+                        .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
+                        .put(EsExecutors.PROCESSORS, 1)
+                        .put("http.enabled", false)
+                        .put("index.percolator.map_unmapped_fields_as_string", true)
+                        .put("index.store.type", "memory")
+        ).build();
+        node.start();
+        ensureEsGreen(node);
+        ClusterHealthResponse chr = node.client().admin().cluster()
+                .health(Requests.clusterHealthRequest().timeout(TimeValue.timeValueSeconds(30)).waitForGreenStatus().waitForRelocatingShards(0)).actionGet();
+        System.out.println(chr.getStatus());
+        Thread.sleep(1000);
+    }
+
+    @After
+    public void cleanup() throws Exception {
+        System.out.println("cleanup");
+        node.stop();
+        node.close();
+    }
+
+    private void ensureEsGreen(Node node) {
+        ClusterHealthResponse chr = node.client().admin().cluster()
+                .health(Requests.clusterHealthRequest().timeout(TimeValue.timeValueSeconds(30)).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet();
+        assertThat("cluster status is green", chr.getStatus(), equalTo(ClusterHealthStatus.GREEN));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6a446cea/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
new file mode 100644
index 0000000..e66da19
--- /dev/null
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.bolt;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.storm.elasticsearch.common.EsConfig;
+import org.apache.storm.elasticsearch.common.EsTestUtil;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.mockito.Mockito.verify;
+
+public class EsIndexBoltTest extends AbstractEsBoltTest{
+    private static final Logger LOG = LoggerFactory.getLogger(EsIndexBoltTest.class);
+    private EsIndexBolt bolt;
+
+    @Test
+    public void testEsIndexBolt()
+            throws Exception {
+        EsConfig esConfig = new EsConfig();
+        esConfig.setClusterName("test-cluster");
+        esConfig.setHost(new String[]{"127.0.0.1"});
+        esConfig.setPort(9300);
+        bolt = new EsIndexBolt(esConfig);
+        bolt.prepare(config, null, collector);
+        String index = "index1";
+        String type = "type1";
+        String source = "{\"user\":\"user1\"}";
+        Tuple tuple = EsTestUtil.generateTestTuple(index, type, source);
+        bolt.execute(tuple);
+        verify(collector).ack(tuple);
+        bolt.cleanup();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/6a446cea/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
new file mode 100644
index 0000000..4a82c63
--- /dev/null
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.bolt;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import org.apache.storm.elasticsearch.common.EsConfig;
+import org.apache.storm.elasticsearch.common.EsConstants;
+import org.apache.storm.elasticsearch.common.EsTestUtil;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class EsIndexTopology {
+
+    static final String SPOUT_ID = "spout";
+    static final String BOLT_ID = "bolt";
+    static final String TOPOLOGY_NAME = "elasticsearch-test-topology1";
+
+    public static void main(String[] args) throws Exception {
+        Config config = new Config();
+        config.setNumWorkers(1);
+        TopologyBuilder builder = new TopologyBuilder();
+        UserDataSpout spout = new UserDataSpout();
+        builder.setSpout(SPOUT_ID, spout, 1);
+        EsConfig esConfig = new EsConfig();
+        esConfig.setClusterName(EsConstants.clusterName);
+        esConfig.setHost(new String[]{"localhost"});
+        esConfig.setPort(9300);
+        builder.setBolt(BOLT_ID, new EsIndexBolt(esConfig), 1).shuffleGrouping(SPOUT_ID);
+
+        EsTestUtil.startEsNode();
+        EsTestUtil.waitForSeconds(5);
+
+        LocalCluster cluster = new LocalCluster();
+        cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
+        EsTestUtil.waitForSeconds(20);
+        cluster.killTopology(TOPOLOGY_NAME);
+        System.out.println("cluster begin to shutdown");
+        cluster.shutdown();
+        System.out.println("cluster shutdown");
+        System.exit(0);
+    }
+
+    public static class UserDataSpout extends BaseRichSpout {
+        private ConcurrentHashMap<UUID, Values> pending;
+        private SpoutOutputCollector collector;
+        private String[] sources = {
+                "{\"user\":\"user1\"}",
+                "{\"user\":\"user2\"}",
+                "{\"user\":\"user3\"}",
+                "{\"user\":\"user4\"}"
+        };
+        private int index = 0;
+        private int count = 0;
+        private long total = 0L;
+        private String indexName = "index1";
+        private String typeName = "type1";
+
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("index", "type", "source"));
+        }
+
+        public void open(Map config, TopologyContext context,
+                         SpoutOutputCollector collector) {
+            this.collector = collector;
+            this.pending = new ConcurrentHashMap<UUID, Values>();
+        }
+
+        public void nextTuple() {
+            String source = sources[index];
+            Values values = new Values(indexName, typeName, source);
+            UUID msgId = UUID.randomUUID();
+            this.pending.put(msgId, values);
+            this.collector.emit(values, msgId);
+            index++;
+            if (index >= sources.length) {
+                index = 0;
+            }
+            count++;
+            total++;
+            if (count > 1000) {
+                count = 0;
+                System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total);
+            }
+            Thread.yield();
+        }
+
+        public void ack(Object msgId) {
+            this.pending.remove(msgId);
+        }
+
+        public void fail(Object msgId) {
+            System.out.println("**** RESENDING FAILED TUPLE");
+            this.collector.emit(this.pending.get(msgId), msgId);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6a446cea/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
new file mode 100644
index 0000000..1bd338f
--- /dev/null
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.bolt;
+
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import org.apache.storm.elasticsearch.common.EsConfig;
+import org.apache.storm.elasticsearch.common.EsTestUtil;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.mockito.Mockito.verify;
+
+public class EsPercolateBoltTest extends AbstractEsBoltTest {
+    private static final Logger LOG = LoggerFactory.getLogger(EsIndexBoltTest.class);
+    private EsPercolateBolt bolt;
+
+    @Test
+    public void testEsPercolateBolt()
+            throws Exception {
+        EsConfig esConfig = new EsConfig();
+        esConfig.setClusterName("test-cluster");
+        esConfig.setHost(new String[]{"127.0.0.1"});
+        esConfig.setPort(9300);
+        bolt = new EsPercolateBolt(esConfig);
+        bolt.prepare(config, null, collector);
+        String index = "index1";
+        String type = ".percolator";
+        String source = "{\"user\":\"user1\"}";
+        node.client().prepareIndex("index1",".percolator")
+                .setId("1")
+                .setSource("{\"query\":{\"match\":{\"user\":\"user1\"}}}").
+                execute().actionGet();
+        Tuple tuple = EsTestUtil.generateTestTuple(index, type, source);
+        bolt.execute(tuple);
+        verify(collector).ack(tuple);
+        verify(collector).emit(new Values("1"));
+        bolt.cleanup();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/6a446cea/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConstants.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConstants.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConstants.java
new file mode 100644
index 0000000..98bb71d
--- /dev/null
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConstants.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.common;
+
+public class EsConstants {
+    public static String clusterName = "test-cluster";
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6a446cea/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
new file mode 100644
index 0000000..2c0026d
--- /dev/null
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.common;
+
+import backtype.storm.Config;
+import backtype.storm.task.GeneralTopologyContext;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleImpl;
+import backtype.storm.tuple.Values;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.node.NodeBuilder;
+
+import java.util.HashMap;
+
+public class EsTestUtil {
+    public static Tuple generateTestTuple(String index, String type, String source) {
+        TopologyBuilder builder = new TopologyBuilder();
+        GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(),
+                new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
+            @Override
+            public Fields getComponentOutputFields(String componentId, String streamId) {
+                return new Fields("index", "type", "source");
+            }
+        };
+        return new TupleImpl(topologyContext, new Values(index, type, source), 1, "");
+    }
+
+    public static Node startEsNode(){
+        Node node = NodeBuilder.nodeBuilder().data(true).settings(
+                ImmutableSettings.builder()
+                        .put(ClusterName.SETTING, EsConstants.clusterName)
+                        .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
+                        .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
+                        .put(EsExecutors.PROCESSORS, 1)
+                        .put("http.enabled", false)
+                        .put("index.percolator.map_unmapped_fields_as_string", true)
+                        .put("index.store.type", "memory")
+        ).build();
+        node.start();
+        return node;
+    }
+
+    public static void waitForSeconds(int seconds) {
+        try {
+            Thread.sleep(seconds * 1000);
+        } catch (InterruptedException e) {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6a446cea/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
new file mode 100644
index 0000000..b1e62ff
--- /dev/null
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.trident;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import org.apache.storm.elasticsearch.common.EsConfig;
+import org.apache.storm.elasticsearch.common.EsConstants;
+import org.apache.storm.elasticsearch.common.EsTestUtil;
+import storm.trident.Stream;
+import storm.trident.TridentState;
+import storm.trident.TridentTopology;
+import storm.trident.operation.TridentCollector;
+import storm.trident.spout.IBatchSpout;
+import storm.trident.state.StateFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TridentEsTopology {
+
+    static final String TOPOLOGY_NAME = "elasticsearch-test-topology2";
+
+    public static void main(String[] args) {
+        int batchSize = 100;
+        FixedBatchSpout spout = new FixedBatchSpout(batchSize);
+        spout.setCycle(true);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("spout", spout);
+        EsConfig esConfig = new EsConfig();
+        esConfig.setClusterName(EsConstants.clusterName);
+        esConfig.setHost(new String[]{"localhost"});
+        esConfig.setPort(9300);
+        Fields esFields = new Fields("index", "type", "source");
+        StateFactory factory = new EsStateFactory(esConfig);
+        TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields());
+
+        EsTestUtil.startEsNode();
+        EsTestUtil.waitForSeconds(5);
+
+        LocalCluster cluster = new LocalCluster();
+        cluster.submitTopology(TOPOLOGY_NAME, null, topology.build());
+        EsTestUtil.waitForSeconds(20);
+        cluster.killTopology(TOPOLOGY_NAME);
+        System.out.println("cluster begin to shutdown");
+        cluster.shutdown();
+        System.out.println("cluster shutdown");
+        System.exit(0);
+    }
+
+    public static class FixedBatchSpout implements IBatchSpout {
+        int maxBatchSize;
+        HashMap<Long, List<List<Object>>> batches = new HashMap<Long, List<List<Object>>>();
+        private Values[] outputs = {
+                new Values("index1", "type1", "{\"user\":\"user1\"}"),
+                new Values("index1", "type2", "{\"user\":\"user2\"}"),
+                new Values("index2", "type1", "{\"user\":\"user3\"}"),
+                new Values("index2", "type2", "{\"user\":\"user4\"}")
+        };
+        private int index = 0;
+        boolean cycle = false;
+
+        public FixedBatchSpout(int maxBatchSize) {
+            this.maxBatchSize = maxBatchSize;
+        }
+
+        public void setCycle(boolean cycle) {
+            this.cycle = cycle;
+        }
+
+        @Override
+        public Fields getOutputFields() {
+            return new Fields("index", "type", "source");
+        }
+
+        @Override
+        public void open(Map conf, TopologyContext context) {
+            index = 0;
+        }
+
+        @Override
+        public void emitBatch(long batchId, TridentCollector collector) {
+            List<List<Object>> batch = this.batches.get(batchId);
+            if (batch == null) {
+                batch = new ArrayList<List<Object>>();
+                if (index >= outputs.length && cycle) {
+                    index = 0;
+                }
+                for (int i = 0; i < maxBatchSize; index++, i++) {
+                    if (index == outputs.length) {
+                        index = 0;
+                    }
+                    batch.add(outputs[index]);
+                }
+                this.batches.put(batchId, batch);
+            }
+            for (List<Object> list : batch) {
+                collector.emit(list);
+            }
+        }
+
+        @Override
+        public void ack(long batchId) {
+            this.batches.remove(batchId);
+        }
+
+        @Override
+        public void close() {
+        }
+
+        @Override
+        public Map getComponentConfiguration() {
+            Config conf = new Config();
+            conf.setMaxTaskParallelism(1);
+            return conf;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6a446cea/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index bd65f04..cb00783 100644
--- a/pom.xml
+++ b/pom.xml
@@ -170,6 +170,7 @@
         <module>external/storm-redis</module>
         <module>external/storm-eventhubs</module>
         <module>external/flux</module>
+        <module>external/storm-elasticsearch</module>
     </modules>
 
     <scm>

http://git-wip-us.apache.org/repos/asf/storm/blob/6a446cea/storm-dist/binary/src/main/assembly/binary.xml
----------------------------------------------------------------------
diff --git a/storm-dist/binary/src/main/assembly/binary.xml b/storm-dist/binary/src/main/assembly/binary.xml
index 21d802a..bb77a0a 100644
--- a/storm-dist/binary/src/main/assembly/binary.xml
+++ b/storm-dist/binary/src/main/assembly/binary.xml
@@ -190,6 +190,20 @@
             </includes>
             <fileMode>0644</fileMode>
         </fileSet>
+        <fileSet>
+            <directory>${project.basedir}/../../external/storm-elasticsearch/target</directory>
+            <outputDirectory>external/storm-elasticsearch</outputDirectory>
+            <includes>
+                <include>storm*jar</include>
+            </includes>
+        </fileSet>
+        <fileSet>
+            <directory>${project.basedir}/../../external/storm-elasticsearch</directory>
+            <outputDirectory>external/storm-elasticsearch</outputDirectory>
+            <includes>
+                <include>README.*</include>
+            </includes>
+        </fileSet>
 
         <fileSet>
             <directory>${project.basedir}/../../external/flux/flux-core/target</directory>


[08/11] storm git commit: STORM-845 Storm ElasticSearch connector

Posted by ka...@apache.org.
STORM-845 Storm ElasticSearch connector


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2ad6dd61
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2ad6dd61
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2ad6dd61

Branch: refs/heads/master
Commit: 2ad6dd61d3d781c3b0d6b8235b2e5a058e8c0acd
Parents: 77986c6
Author: SEUNGJIN LEE <sw...@navercorp.com>
Authored: Mon Jul 27 17:11:40 2015 +0900
Committer: SEUNGJIN LEE <sw...@navercorp.com>
Committed: Mon Jul 27 17:11:40 2015 +0900

----------------------------------------------------------------------
 external/storm-elasticsearch/README.md | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/2ad6dd61/external/storm-elasticsearch/README.md
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/README.md b/external/storm-elasticsearch/README.md
index c0b9b57..a54b62c 100644
--- a/external/storm-elasticsearch/README.md
+++ b/external/storm-elasticsearch/README.md
@@ -8,13 +8,13 @@
 EsIndexBolt streams tuples directly into Elasticsearch. Tuples are indexed in specified index & type combination. 
 User should make sure that there are "source", "index","type", and "id" fields declared in preceding bolts or spout.
 "index" and "type" fields are used for identifying target index and type.
-"source" is a document in JSON format string that will be indexed in elastic search.
+"source" is a document in JSON format string that will be indexed in Elasticsearch.
 
 ```java
 EsConfig esConfig = new EsConfig();
 esConfig.setClusterName(clusterName);
 esConfig.setNodes(new String[]{"localhost:9300"});
-EsIndexBolt indexBolt = new IndexBolt(esConfig);
+EsIndexBolt indexBolt = new EsIndexBolt(esConfig);
 ```
 
 ## EsPercolateBolt (org.apache.storm.elasticsearch.bolt.EsPercolateBolt)
@@ -22,7 +22,7 @@ EsIndexBolt indexBolt = new IndexBolt(esConfig);
 EsPercolateBolt streams tuples directly into Elasticsearch. Tuples are used to send percolate request to specified index & type combination. 
 User should make sure that there are "source", "index", and "type" fields declared in preceding bolts or spout.
 "index" and "type" fields are used for identifying target index and type.
-"source" is a document in JSON format string that will be sent in percolate request to elastic search.
+"source" is a document in JSON format string that will be sent in percolate request to Elasticsearch.
 
 ```java
 EsConfig esConfig = new EsConfig();
@@ -67,3 +67,6 @@ Elasticsearch Trident state also follows similar pattern to EsBolts. It takes in
  ```
   
 ## Committer Sponsors
+
+ * Robert Evans ([@revans2](https://github.com/revans2))
+ * Jungtaek Lim ([@HeartSaVioR](https://github.com/HeartSaVioR))
\ No newline at end of file


[11/11] storm git commit: add STORM-845 to CHANGELOG.md

Posted by ka...@apache.org.
add STORM-845 to CHANGELOG.md


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/124e8468
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/124e8468
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/124e8468

Branch: refs/heads/master
Commit: 124e8468a797769eb58e5f49ed2a93d34b19cf90
Parents: a13c0f2
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Aug 6 07:58:25 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Aug 6 07:58:25 2015 +0900

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/124e8468/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index c21d8dc..d2f6ec3 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -22,6 +22,7 @@
  * STORM-859: Add regression test of STORM-856
  * STORM-913: Use Curator's delete().deletingChildrenIfNeeded() instead of zk/delete-recursive
  * STORM-968: Adding support to generate the id based on names in Trident
+ * STORM-845: Storm ElasticSearch connector
 
 ## 0.10.0-beta2
  * STORM-843: [storm-redis] Add Javadoc to storm-redis


[05/11] storm git commit: STORM-845 Storm ElasticSearch connector

Posted by ka...@apache.org.
STORM-845 Storm ElasticSearch connector


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b2416e1d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b2416e1d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b2416e1d

Branch: refs/heads/master
Commit: b2416e1de16227ba2e280b7d7efb41f65f5ff418
Parents: 6a446ce
Author: SEUNGJIN LEE <sw...@navercorp.com>
Authored: Wed Jun 3 17:58:29 2015 +0900
Committer: SEUNGJIN LEE <sw...@navercorp.com>
Committed: Thu Jul 23 14:32:17 2015 +0900

----------------------------------------------------------------------
 external/storm-elasticsearch/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b2416e1d/external/storm-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/pom.xml b/external/storm-elasticsearch/pom.xml
index 8e5db5d..5ecd2ba 100644
--- a/external/storm-elasticsearch/pom.xml
+++ b/external/storm-elasticsearch/pom.xml
@@ -21,7 +21,7 @@
     <parent>
         <artifactId>storm</artifactId>
         <groupId>org.apache.storm</groupId>
-        <version>0.10.0-SNAPSHOT</version>
+        <version>0.11.0-SNAPSHOT</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
 


[06/11] storm git commit: STORM-845 Storm ElasticSearch connector

Posted by ka...@apache.org.
STORM-845 Storm ElasticSearch connector


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c147a4eb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c147a4eb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c147a4eb

Branch: refs/heads/master
Commit: c147a4eb49faa2a7efc05e12d240aa58a9ccee2b
Parents: b2416e1
Author: SEUNGJIN LEE <sw...@navercorp.com>
Authored: Wed Jun 3 18:33:48 2015 +0900
Committer: SEUNGJIN LEE <sw...@navercorp.com>
Committed: Thu Jul 23 14:32:17 2015 +0900

----------------------------------------------------------------------
 external/storm-elasticsearch/pom.xml | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c147a4eb/external/storm-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/pom.xml b/external/storm-elasticsearch/pom.xml
index 5ecd2ba..b1b069b 100644
--- a/external/storm-elasticsearch/pom.xml
+++ b/external/storm-elasticsearch/pom.xml
@@ -43,13 +43,13 @@
         <dependency>
             <groupId>org.apache.storm</groupId>
             <artifactId>storm-core</artifactId>
-            <version>0.9.4</version>
+            <version>${project.version}</version>
             <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.elasticsearch</groupId>
             <artifactId>elasticsearch</artifactId>
-            <version>1.5.0</version>
+            <version>${elasticsearch.version}</version>
             <scope>compile</scope>
         </dependency>
         <dependency>


[07/11] storm git commit: STORM-845 Storm ElasticSearch connector

Posted by ka...@apache.org.
STORM-845 Storm ElasticSearch connector


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/77986c62
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/77986c62
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/77986c62

Branch: refs/heads/master
Commit: 77986c62870d72cf712fc835e1f19c6844a89ed8
Parents: acaab9c
Author: SEUNGJIN LEE <sw...@navercorp.com>
Authored: Mon Jul 27 14:23:10 2015 +0900
Committer: SEUNGJIN LEE <sw...@navercorp.com>
Committed: Mon Jul 27 14:23:10 2015 +0900

----------------------------------------------------------------------
 external/storm-elasticsearch/README.md                      | 9 ++++++---
 .../org/apache/storm/elasticsearch/bolt/EsIndexBolt.java    | 3 ---
 .../apache/storm/elasticsearch/bolt/EsPercolateBolt.java    | 7 ++-----
 .../apache/storm/elasticsearch/trident/EsStateFactory.java  | 1 -
 .../storm/elasticsearch/bolt/EsPercolateBoltTest.java       | 4 +++-
 5 files changed, 11 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/77986c62/external/storm-elasticsearch/README.md
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/README.md b/external/storm-elasticsearch/README.md
index 9e1bbb8..c0b9b57 100644
--- a/external/storm-elasticsearch/README.md
+++ b/external/storm-elasticsearch/README.md
@@ -1,7 +1,7 @@
 # Storm Elasticsearch Bolt & Trident State
 
   EsIndexBolt, EsPercolateBolt and EsState allows users to stream data from storm into Elasticsearch directly.
-  For detailed description, please refer to the following.   
+  For detailed description, please refer to the following.
 
 ## EsIndexBolt (org.apache.storm.elasticsearch.bolt.EsIndexBolt)
 
@@ -31,7 +31,10 @@ esConfig.setNodes(new String[]{"localhost:9300"});
 EsPercolateBolt percolateBolt = new EsPercolateBolt(esConfig);
 ```
 
-### EsConfig (org.apache.storm.elasticsearch.common.EsConfig)
+If there exists non-empty percolate response, EsPercolateBolt will emit tuple with original source and Percolate.Match
+for each Percolate.Match in PercolateResponse.
+
+## EsConfig (org.apache.storm.elasticsearch.common.EsConfig)
   
 Two bolts above takes in EsConfig as a constructor arg.
 
@@ -41,7 +44,7 @@ Two bolts above takes in EsConfig as a constructor arg.
    esConfig.setNodes(new String[]{"localhost:9300"});
   ```
 
-EsConfig params
+### EsConfig params
 
 |Arg  |Description | Type
 |---	|--- |---

http://git-wip-us.apache.org/repos/asf/storm/blob/77986c62/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
index c1d7daa..0d5cff8 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
@@ -28,7 +28,6 @@ import org.slf4j.LoggerFactory;
 import java.util.Map;
 
 public class EsIndexBolt extends AbstractEsBolt {
-    private static final Logger LOG = LoggerFactory.getLogger(EsIndexBolt.class);
 
     /**
      * EsIndexBolt constructor
@@ -58,8 +57,6 @@ public class EsIndexBolt extends AbstractEsBolt {
             client.prepareIndex(index, type, id).setSource(source).execute().actionGet();
             collector.ack(tuple);
         } catch (Exception e) {
-            e.printStackTrace();
-            System.out.println(e);
             collector.reportError(e);
             collector.fail(tuple);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/77986c62/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
index 7ee6835..394462e 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
@@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory;
 import java.util.Map;
 
 public class EsPercolateBolt extends AbstractEsBolt {
-    private static final Logger LOG = LoggerFactory.getLogger(EsPercolateBolt.class);
 
     /**
      * EsPercolateBolt constructor
@@ -62,13 +61,11 @@ public class EsPercolateBolt extends AbstractEsBolt {
                     .setPercolateDoc(PercolateSourceBuilder.docBuilder().setDoc(source)).execute().actionGet();
             if (response.getCount() > 0) {
                 for (PercolateResponse.Match match : response) {
-                    String id = match.getId().toString();
-                    collector.emit(new Values(id));
+                    collector.emit(new Values(source, match));
                 }
             }
             collector.ack(tuple);
         } catch (Exception e) {
-            e.printStackTrace();
             collector.reportError(e);
             collector.fail(tuple);
         }
@@ -76,6 +73,6 @@ public class EsPercolateBolt extends AbstractEsBolt {
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
-        outputFieldsDeclarer.declare(new Fields("id"));
+        outputFieldsDeclarer.declare(new Fields("source", "match"));
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/77986c62/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
index b1eaa04..c3a2e6c 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
@@ -27,7 +27,6 @@ import storm.trident.state.StateFactory;
 import java.util.Map;
 
 public class EsStateFactory implements StateFactory {
-    private static final Logger LOG = LoggerFactory.getLogger(EsStateFactory.class);
     private EsConfig esConfig;
 
     public EsStateFactory(){

http://git-wip-us.apache.org/repos/asf/storm/blob/77986c62/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
index ea0504b..fd4fa4f 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
@@ -22,6 +22,7 @@ import backtype.storm.tuple.Values;
 import org.apache.storm.elasticsearch.common.EsConfig;
 import org.apache.storm.elasticsearch.common.EsTestUtil;
 import org.elasticsearch.action.count.CountResponse;
+import org.elasticsearch.action.percolate.PercolateResponse;
 import org.elasticsearch.index.query.TermQueryBuilder;
 import org.junit.Assert;
 import org.junit.Test;
@@ -29,6 +30,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.mockito.Mockito.verify;
+import static org.mockito.Matchers.any;
 
 public class EsPercolateBoltTest extends AbstractEsBoltTest {
     private static final Logger LOG = LoggerFactory.getLogger(EsIndexBoltTest.class);
@@ -56,7 +58,7 @@ public class EsPercolateBoltTest extends AbstractEsBoltTest {
         bolt.execute(tuple);
 
         verify(collector).ack(tuple);
-        verify(collector).emit(new Values("1"));
+        verify(collector).emit(new Values(source, any(PercolateResponse.Match.class)));
 
         bolt.cleanup();
     }


[10/11] storm git commit: Merge branch 'STORM-845' of https://github.com/sweetest/storm

Posted by ka...@apache.org.
Merge branch 'STORM-845' of https://github.com/sweetest/storm


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a13c0f21
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a13c0f21
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a13c0f21

Branch: refs/heads/master
Commit: a13c0f2122362dd96c2e524a23814cc58e6cadbc
Parents: 827ddbd aab2c17
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Aug 6 07:40:15 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Aug 6 07:40:15 2015 +0900

----------------------------------------------------------------------
 README.markdown                                 |   1 +
 external/storm-elasticsearch/README.md          |  72 ++++++++++
 external/storm-elasticsearch/pom.xml            |  95 +++++++++++++
 .../elasticsearch/bolt/AbstractEsBolt.java      |  81 +++++++++++
 .../storm/elasticsearch/bolt/EsIndexBolt.java   |  68 ++++++++++
 .../elasticsearch/bolt/EsPercolateBolt.java     |  78 +++++++++++
 .../storm/elasticsearch/common/EsConfig.java    |  54 ++++++++
 .../storm/elasticsearch/trident/EsState.java    | 117 ++++++++++++++++
 .../elasticsearch/trident/EsStateFactory.java   |  50 +++++++
 .../storm/elasticsearch/trident/EsUpdater.java  |  31 +++++
 .../elasticsearch/bolt/AbstractEsBoltTest.java  |  81 +++++++++++
 .../elasticsearch/bolt/EsIndexBoltTest.java     |  68 ++++++++++
 .../elasticsearch/bolt/EsIndexTopology.java     | 120 +++++++++++++++++
 .../elasticsearch/bolt/EsPercolateBoltTest.java |  65 +++++++++
 .../storm/elasticsearch/common/EsConstants.java |  22 +++
 .../storm/elasticsearch/common/EsTestUtil.java  |  70 ++++++++++
 .../trident/TridentEsTopology.java              | 135 +++++++++++++++++++
 pom.xml                                         |   1 +
 storm-dist/binary/src/main/assembly/binary.xml  |  14 ++
 19 files changed, 1223 insertions(+)
----------------------------------------------------------------------



[04/11] storm git commit: STORM-845 Storm ElasticSearch connector

Posted by ka...@apache.org.
STORM-845 Storm ElasticSearch connector


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2002dbd7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2002dbd7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2002dbd7

Branch: refs/heads/master
Commit: 2002dbd7cf54c9615c7c826496275d35b7471c5b
Parents: c147a4e
Author: SEUNGJIN LEE <sw...@navercorp.com>
Authored: Mon Jun 22 19:33:40 2015 +0900
Committer: SEUNGJIN LEE <sw...@navercorp.com>
Committed: Thu Jul 23 14:32:17 2015 +0900

----------------------------------------------------------------------
 external/storm-elasticsearch/README.md          | 17 ++++------
 .../elasticsearch/bolt/AbstractEsBolt.java      |  9 ++++--
 .../storm/elasticsearch/common/EsConfig.java    | 24 +++++---------
 .../storm/elasticsearch/trident/EsState.java    | 33 ++++++++++++--------
 .../elasticsearch/bolt/AbstractEsBoltTest.java  | 25 ++++++++-------
 .../elasticsearch/bolt/EsIndexBoltTest.java     |  3 +-
 .../elasticsearch/bolt/EsIndexTopology.java     |  3 +-
 .../elasticsearch/bolt/EsPercolateBoltTest.java |  3 +-
 .../trident/TridentEsTopology.java              |  3 +-
 9 files changed, 57 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/2002dbd7/external/storm-elasticsearch/README.md
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/README.md b/external/storm-elasticsearch/README.md
index 562fd6d..20caece 100644
--- a/external/storm-elasticsearch/README.md
+++ b/external/storm-elasticsearch/README.md
@@ -13,8 +13,7 @@ User should make sure that there are "index","type", and "source" fields declare
 ```java
 EsConfig esConfig = new EsConfig();
 esConfig.setClusterName(clusterName);
-esConfig.setHost(new String[]{"localhost"});
-esConfig.setPort(9300);
+esConfig.setNodes(new String[]{"localhost:9300"});
 EsIndexBolt indexBolt = new IndexBolt(esConfig);
 ```
 
@@ -28,8 +27,7 @@ User should make sure that there are "index","type", and "source" fields declare
 ```java
 EsConfig esConfig = new EsConfig();
 esConfig.setClusterName(clusterName);
-esConfig.setHost(new String[]{"localhost"});
-esConfig.setPort(9300);
+esConfig.setNodes(new String[]{"localhost:9300"});
 EsPercolateBolt percolateBolt = new EsPercolateBolt(esConfig);
 ```
 
@@ -40,8 +38,7 @@ Two bolts above takes in EsConfig as a constructor arg.
   ```java
    EsConfig esConfig = new EsConfig();
    esConfig.setClusterName(clusterName);
-   esConfig.setHost(new String[]{"localhost"});
-   esConfig.setPort(9300);
+   esConfig.setNodes(new String[]{"localhost:9300"});
   ```
 
 EsConfig params
@@ -49,8 +46,7 @@ EsConfig params
 |Arg  |Description | Type
 |---	|--- |---
 |clusterName | ElasticSearch cluster name | String (required) |
-|host | ElasticSearch host | String array (required) |
-|port | ElasticSearch port | int (required) |
+|nodes | ElasticSearch nodes in a String array, each element should follow {host}:{port} pattern | String array (required) |
 
 
  
@@ -61,9 +57,8 @@ ElasticSearch Trident state also follows similar pattern to EsBolts. It takes in
 ```code
    EsConfig esConfig = new EsConfig();
    esConfig.setClusterName(clusterName);
-   esConfig.setHost(new String[]{"localhost"});
-   esConfig.setPort(9300);
-                	     		
+   esConfig.setNodes(new String[]{"localhost:9300"});
+
    StateFactory factory = new EsStateFactory(esConfig);
    TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields());
  ```

http://git-wip-us.apache.org/repos/asf/storm/blob/2002dbd7/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
index cd7fc81..1e2d1ed 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
@@ -47,7 +47,6 @@ public abstract class AbstractEsBolt extends BaseRichBolt {
 
     @Override
     public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
-        System.out.println(this.getClass().getName());
         try {
             this.collector = outputCollector;
             synchronized (AbstractEsBolt.class) {
@@ -56,8 +55,12 @@ public abstract class AbstractEsBolt extends BaseRichBolt {
                             ImmutableSettings.settingsBuilder().put("cluster.name", esConfig.getClusterName())
                                     .put("client.transport.sniff", "false").build();
                     List<InetSocketTransportAddress> transportAddressList = new ArrayList<InetSocketTransportAddress>();
-                    for (String host : esConfig.getHost()) {
-                        transportAddressList.add(new InetSocketTransportAddress(host, esConfig.getPort()));
+                    for (String node : esConfig.getNodes()) {
+                        String[] hostAndPort = node.split(":");
+                        if(hostAndPort.length != 2){
+                            throw new Exception("incorrect ElasticSearch node format, should follow {host}:{port} pattern");
+                        }
+                        transportAddressList.add(new InetSocketTransportAddress(hostAndPort[0], Integer.parseInt(hostAndPort[1])));
                     }
                     client = new TransportClient(settings)
                             .addTransportAddresses(transportAddressList.toArray(new InetSocketTransportAddress[transportAddressList.size()]));

http://git-wip-us.apache.org/repos/asf/storm/blob/2002dbd7/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
index f2aa48f..c97d77f 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
@@ -21,16 +21,14 @@ import java.io.Serializable;
 
 public class EsConfig implements Serializable{
     private String clusterName;
-    private String[] host;
-    private int port;
+    private String[] nodes;
 
     public EsConfig() {
     }
 
-    public EsConfig(String clusterName, String[] host, int port) {
+    public EsConfig(String clusterName, String[] nodes, int port) {
         this.clusterName = clusterName;
-        this.host = host;
-        this.port = port;
+        this.nodes = nodes;
     }
 
     public String getClusterName() {
@@ -41,19 +39,11 @@ public class EsConfig implements Serializable{
         this.clusterName = clusterName;
     }
 
-    public String[] getHost() {
-        return host;
+    public String[] getNodes() {
+        return nodes;
     }
 
-    public void setHost(String[] host) {
-        this.host = host;
-    }
-
-    public int getPort() {
-        return port;
-    }
-
-    public void setPort(int port) {
-        this.port = port;
+    public void setNodes(String[] nodes) {
+        this.nodes = nodes;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/2002dbd7/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
index e753119..ee95355 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -57,20 +57,27 @@ public class EsState implements State {
     }
 
     public void prepare(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
-        synchronized (EsState.class) {
-            if (client == null) {
-                Settings settings =
-                        ImmutableSettings.settingsBuilder().put("cluster.name", esConfig.getClusterName())
-                                .put("client.transport.sniff", "true").build();
-                List<InetSocketTransportAddress> transportAddressList = new ArrayList<InetSocketTransportAddress>();
-                for (String host : esConfig.getHost()) {
-                    transportAddressList.add(new InetSocketTransportAddress(host, esConfig.getPort()));
+        try {
+            synchronized (EsState.class) {
+                if (client == null) {
+                    Settings settings =
+                            ImmutableSettings.settingsBuilder().put("cluster.name", esConfig.getClusterName())
+                                    .put("client.transport.sniff", "true").build();
+                    List<InetSocketTransportAddress> transportAddressList = new ArrayList<InetSocketTransportAddress>();
+                    for (String node : esConfig.getNodes()) {
+                        String[] hostAndPort = node.split(":");
+                        if (hostAndPort.length != 2) {
+                            throw new Exception("incorrect ElasticSearch node format, should follow {host}:{port} pattern");
+                        }
+                        transportAddressList.add(new InetSocketTransportAddress(hostAndPort[0], Integer.parseInt(hostAndPort[1])));
+                    }
+                    client = new TransportClient(settings)
+                            .addTransportAddresses(transportAddressList.toArray(new InetSocketTransportAddress[transportAddressList.size()]));
                 }
-                client = new TransportClient(settings)
-                        .addTransportAddresses(transportAddressList.toArray(new InetSocketTransportAddress[transportAddressList.size()]));
             }
+        } catch (Exception e) {
+            LOG.warn("unable to initialize EsState ", e);
         }
-
     }
 
     public void updateState(List<TridentTuple> tuples, TridentCollector collector) {

http://git-wip-us.apache.org/repos/asf/storm/blob/2002dbd7/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
index fdf7cd4..ae6b321 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
@@ -19,6 +19,7 @@ package org.apache.storm.elasticsearch.bolt;
 
 import backtype.storm.Config;
 import backtype.storm.task.OutputCollector;
+import org.apache.commons.io.FileUtils;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
 import org.elasticsearch.client.Requests;
@@ -31,20 +32,23 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.node.Node;
 import org.elasticsearch.node.NodeBuilder;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
+
+import java.io.File;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.Mockito.mock;
 
 public class AbstractEsBoltTest {
-    protected Config config = new Config();
-    protected OutputCollector collector = mock(OutputCollector.class);
-    protected Node node;
+    protected static Config config = new Config();
+    protected static OutputCollector collector = mock(OutputCollector.class);
+    protected static Node node;
 
-    @Before
-    public void setup() throws Exception {
-        System.out.println("setup");
+    @BeforeClass
+    public static void setup() throws Exception {
         node = NodeBuilder.nodeBuilder().data(true).settings(
                 ImmutableSettings.builder()
                         .put(ClusterName.SETTING, "test-cluster")
@@ -59,18 +63,17 @@ public class AbstractEsBoltTest {
         ensureEsGreen(node);
         ClusterHealthResponse chr = node.client().admin().cluster()
                 .health(Requests.clusterHealthRequest().timeout(TimeValue.timeValueSeconds(30)).waitForGreenStatus().waitForRelocatingShards(0)).actionGet();
-        System.out.println(chr.getStatus());
         Thread.sleep(1000);
     }
 
-    @After
-    public void cleanup() throws Exception {
-        System.out.println("cleanup");
+    @AfterClass
+    public static void cleanup() throws Exception {
         node.stop();
         node.close();
+        FileUtils.deleteDirectory(new File("./data"));
     }
 
-    private void ensureEsGreen(Node node) {
+    private static void ensureEsGreen(Node node) {
         ClusterHealthResponse chr = node.client().admin().cluster()
                 .health(Requests.clusterHealthRequest().timeout(TimeValue.timeValueSeconds(30)).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet();
         assertThat("cluster status is green", chr.getStatus(), equalTo(ClusterHealthStatus.GREEN));

http://git-wip-us.apache.org/repos/asf/storm/blob/2002dbd7/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
index e66da19..28b8bf7 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
@@ -35,8 +35,7 @@ public class EsIndexBoltTest extends AbstractEsBoltTest{
             throws Exception {
         EsConfig esConfig = new EsConfig();
         esConfig.setClusterName("test-cluster");
-        esConfig.setHost(new String[]{"127.0.0.1"});
-        esConfig.setPort(9300);
+        esConfig.setNodes(new String[]{"127.0.0.1:9300"});
         bolt = new EsIndexBolt(esConfig);
         bolt.prepare(config, null, collector);
         String index = "index1";

http://git-wip-us.apache.org/repos/asf/storm/blob/2002dbd7/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
index 4a82c63..f5e868a 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
@@ -48,8 +48,7 @@ public class EsIndexTopology {
         builder.setSpout(SPOUT_ID, spout, 1);
         EsConfig esConfig = new EsConfig();
         esConfig.setClusterName(EsConstants.clusterName);
-        esConfig.setHost(new String[]{"localhost"});
-        esConfig.setPort(9300);
+        esConfig.setNodes(new String[]{"localhost:9300"});
         builder.setBolt(BOLT_ID, new EsIndexBolt(esConfig), 1).shuffleGrouping(SPOUT_ID);
 
         EsTestUtil.startEsNode();

http://git-wip-us.apache.org/repos/asf/storm/blob/2002dbd7/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
index 1bd338f..4520389 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
@@ -36,8 +36,7 @@ public class EsPercolateBoltTest extends AbstractEsBoltTest {
             throws Exception {
         EsConfig esConfig = new EsConfig();
         esConfig.setClusterName("test-cluster");
-        esConfig.setHost(new String[]{"127.0.0.1"});
-        esConfig.setPort(9300);
+        esConfig.setNodes(new String[]{"localhost:9300"});
         bolt = new EsPercolateBolt(esConfig);
         bolt.prepare(config, null, collector);
         String index = "index1";

http://git-wip-us.apache.org/repos/asf/storm/blob/2002dbd7/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
index b1e62ff..aed06f6 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
@@ -50,8 +50,7 @@ public class TridentEsTopology {
         Stream stream = topology.newStream("spout", spout);
         EsConfig esConfig = new EsConfig();
         esConfig.setClusterName(EsConstants.clusterName);
-        esConfig.setHost(new String[]{"localhost"});
-        esConfig.setPort(9300);
+        esConfig.setNodes(new String[]{"localhost:9300"});
         Fields esFields = new Fields("index", "type", "source");
         StateFactory factory = new EsStateFactory(esConfig);
         TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields());