You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/08/06 00:58:49 UTC

[04/11] storm git commit: STORM-845 Storm ElasticSearch connector

STORM-845 Storm ElasticSearch connector


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2002dbd7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2002dbd7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2002dbd7

Branch: refs/heads/master
Commit: 2002dbd7cf54c9615c7c826496275d35b7471c5b
Parents: c147a4e
Author: SEUNGJIN LEE <sw...@navercorp.com>
Authored: Mon Jun 22 19:33:40 2015 +0900
Committer: SEUNGJIN LEE <sw...@navercorp.com>
Committed: Thu Jul 23 14:32:17 2015 +0900

----------------------------------------------------------------------
 external/storm-elasticsearch/README.md          | 17 ++++------
 .../elasticsearch/bolt/AbstractEsBolt.java      |  9 ++++--
 .../storm/elasticsearch/common/EsConfig.java    | 24 +++++---------
 .../storm/elasticsearch/trident/EsState.java    | 33 ++++++++++++--------
 .../elasticsearch/bolt/AbstractEsBoltTest.java  | 25 ++++++++-------
 .../elasticsearch/bolt/EsIndexBoltTest.java     |  3 +-
 .../elasticsearch/bolt/EsIndexTopology.java     |  3 +-
 .../elasticsearch/bolt/EsPercolateBoltTest.java |  3 +-
 .../trident/TridentEsTopology.java              |  3 +-
 9 files changed, 57 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/2002dbd7/external/storm-elasticsearch/README.md
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/README.md b/external/storm-elasticsearch/README.md
index 562fd6d..20caece 100644
--- a/external/storm-elasticsearch/README.md
+++ b/external/storm-elasticsearch/README.md
@@ -13,8 +13,7 @@ User should make sure that there are "index","type", and "source" fields declare
 ```java
 EsConfig esConfig = new EsConfig();
 esConfig.setClusterName(clusterName);
-esConfig.setHost(new String[]{"localhost"});
-esConfig.setPort(9300);
+esConfig.setNodes(new String[]{"localhost:9300"});
 EsIndexBolt indexBolt = new IndexBolt(esConfig);
 ```
 
@@ -28,8 +27,7 @@ User should make sure that there are "index","type", and "source" fields declare
 ```java
 EsConfig esConfig = new EsConfig();
 esConfig.setClusterName(clusterName);
-esConfig.setHost(new String[]{"localhost"});
-esConfig.setPort(9300);
+esConfig.setNodes(new String[]{"localhost:9300"});
 EsPercolateBolt percolateBolt = new EsPercolateBolt(esConfig);
 ```
 
@@ -40,8 +38,7 @@ Two bolts above takes in EsConfig as a constructor arg.
   ```java
    EsConfig esConfig = new EsConfig();
    esConfig.setClusterName(clusterName);
-   esConfig.setHost(new String[]{"localhost"});
-   esConfig.setPort(9300);
+   esConfig.setNodes(new String[]{"localhost:9300"});
   ```
 
 EsConfig params
@@ -49,8 +46,7 @@ EsConfig params
 |Arg  |Description | Type
 |---	|--- |---
 |clusterName | ElasticSearch cluster name | String (required) |
-|host | ElasticSearch host | String array (required) |
-|port | ElasticSearch port | int (required) |
+|nodes | ElasticSearch nodes in a String array, each element should follow {host}:{port} pattern | String array (required) |
 
 
  
@@ -61,9 +57,8 @@ ElasticSearch Trident state also follows similar pattern to EsBolts. It takes in
 ```code
    EsConfig esConfig = new EsConfig();
    esConfig.setClusterName(clusterName);
-   esConfig.setHost(new String[]{"localhost"});
-   esConfig.setPort(9300);
-                	     		
+   esConfig.setNodes(new String[]{"localhost:9300"});
+
    StateFactory factory = new EsStateFactory(esConfig);
    TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields());
  ```

http://git-wip-us.apache.org/repos/asf/storm/blob/2002dbd7/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
index cd7fc81..1e2d1ed 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
@@ -47,7 +47,6 @@ public abstract class AbstractEsBolt extends BaseRichBolt {
 
     @Override
     public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
-        System.out.println(this.getClass().getName());
         try {
             this.collector = outputCollector;
             synchronized (AbstractEsBolt.class) {
@@ -56,8 +55,12 @@ public abstract class AbstractEsBolt extends BaseRichBolt {
                             ImmutableSettings.settingsBuilder().put("cluster.name", esConfig.getClusterName())
                                     .put("client.transport.sniff", "false").build();
                     List<InetSocketTransportAddress> transportAddressList = new ArrayList<InetSocketTransportAddress>();
-                    for (String host : esConfig.getHost()) {
-                        transportAddressList.add(new InetSocketTransportAddress(host, esConfig.getPort()));
+                    for (String node : esConfig.getNodes()) {
+                        String[] hostAndPort = node.split(":");
+                        if(hostAndPort.length != 2){
+                            throw new Exception("incorrect ElasticSearch node format, should follow {host}:{port} pattern");
+                        }
+                        transportAddressList.add(new InetSocketTransportAddress(hostAndPort[0], Integer.parseInt(hostAndPort[1])));
                     }
                     client = new TransportClient(settings)
                             .addTransportAddresses(transportAddressList.toArray(new InetSocketTransportAddress[transportAddressList.size()]));

http://git-wip-us.apache.org/repos/asf/storm/blob/2002dbd7/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
index f2aa48f..c97d77f 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
@@ -21,16 +21,14 @@ import java.io.Serializable;
 
 public class EsConfig implements Serializable{
     private String clusterName;
-    private String[] host;
-    private int port;
+    private String[] nodes;
 
     public EsConfig() {
     }
 
-    public EsConfig(String clusterName, String[] host, int port) {
+    public EsConfig(String clusterName, String[] nodes, int port) {
         this.clusterName = clusterName;
-        this.host = host;
-        this.port = port;
+        this.nodes = nodes;
     }
 
     public String getClusterName() {
@@ -41,19 +39,11 @@ public class EsConfig implements Serializable{
         this.clusterName = clusterName;
     }
 
-    public String[] getHost() {
-        return host;
+    public String[] getNodes() {
+        return nodes;
     }
 
-    public void setHost(String[] host) {
-        this.host = host;
-    }
-
-    public int getPort() {
-        return port;
-    }
-
-    public void setPort(int port) {
-        this.port = port;
+    public void setNodes(String[] nodes) {
+        this.nodes = nodes;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/2002dbd7/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
index e753119..ee95355 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -57,20 +57,27 @@ public class EsState implements State {
     }
 
     public void prepare(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
-        synchronized (EsState.class) {
-            if (client == null) {
-                Settings settings =
-                        ImmutableSettings.settingsBuilder().put("cluster.name", esConfig.getClusterName())
-                                .put("client.transport.sniff", "true").build();
-                List<InetSocketTransportAddress> transportAddressList = new ArrayList<InetSocketTransportAddress>();
-                for (String host : esConfig.getHost()) {
-                    transportAddressList.add(new InetSocketTransportAddress(host, esConfig.getPort()));
+        try {
+            synchronized (EsState.class) {
+                if (client == null) {
+                    Settings settings =
+                            ImmutableSettings.settingsBuilder().put("cluster.name", esConfig.getClusterName())
+                                    .put("client.transport.sniff", "true").build();
+                    List<InetSocketTransportAddress> transportAddressList = new ArrayList<InetSocketTransportAddress>();
+                    for (String node : esConfig.getNodes()) {
+                        String[] hostAndPort = node.split(":");
+                        if (hostAndPort.length != 2) {
+                            throw new Exception("incorrect ElasticSearch node format, should follow {host}:{port} pattern");
+                        }
+                        transportAddressList.add(new InetSocketTransportAddress(hostAndPort[0], Integer.parseInt(hostAndPort[1])));
+                    }
+                    client = new TransportClient(settings)
+                            .addTransportAddresses(transportAddressList.toArray(new InetSocketTransportAddress[transportAddressList.size()]));
                 }
-                client = new TransportClient(settings)
-                        .addTransportAddresses(transportAddressList.toArray(new InetSocketTransportAddress[transportAddressList.size()]));
             }
+        } catch (Exception e) {
+            LOG.warn("unable to initialize EsState ", e);
         }
-
     }
 
     public void updateState(List<TridentTuple> tuples, TridentCollector collector) {

http://git-wip-us.apache.org/repos/asf/storm/blob/2002dbd7/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
index fdf7cd4..ae6b321 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
@@ -19,6 +19,7 @@ package org.apache.storm.elasticsearch.bolt;
 
 import backtype.storm.Config;
 import backtype.storm.task.OutputCollector;
+import org.apache.commons.io.FileUtils;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
 import org.elasticsearch.client.Requests;
@@ -31,20 +32,23 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.node.Node;
 import org.elasticsearch.node.NodeBuilder;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
+
+import java.io.File;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.Mockito.mock;
 
 public class AbstractEsBoltTest {
-    protected Config config = new Config();
-    protected OutputCollector collector = mock(OutputCollector.class);
-    protected Node node;
+    protected static Config config = new Config();
+    protected static OutputCollector collector = mock(OutputCollector.class);
+    protected static Node node;
 
-    @Before
-    public void setup() throws Exception {
-        System.out.println("setup");
+    @BeforeClass
+    public static void setup() throws Exception {
         node = NodeBuilder.nodeBuilder().data(true).settings(
                 ImmutableSettings.builder()
                         .put(ClusterName.SETTING, "test-cluster")
@@ -59,18 +63,17 @@ public class AbstractEsBoltTest {
         ensureEsGreen(node);
         ClusterHealthResponse chr = node.client().admin().cluster()
                 .health(Requests.clusterHealthRequest().timeout(TimeValue.timeValueSeconds(30)).waitForGreenStatus().waitForRelocatingShards(0)).actionGet();
-        System.out.println(chr.getStatus());
         Thread.sleep(1000);
     }
 
-    @After
-    public void cleanup() throws Exception {
-        System.out.println("cleanup");
+    @AfterClass
+    public static void cleanup() throws Exception {
         node.stop();
         node.close();
+        FileUtils.deleteDirectory(new File("./data"));
     }
 
-    private void ensureEsGreen(Node node) {
+    private static void ensureEsGreen(Node node) {
         ClusterHealthResponse chr = node.client().admin().cluster()
                 .health(Requests.clusterHealthRequest().timeout(TimeValue.timeValueSeconds(30)).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet();
         assertThat("cluster status is green", chr.getStatus(), equalTo(ClusterHealthStatus.GREEN));

http://git-wip-us.apache.org/repos/asf/storm/blob/2002dbd7/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
index e66da19..28b8bf7 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
@@ -35,8 +35,7 @@ public class EsIndexBoltTest extends AbstractEsBoltTest{
             throws Exception {
         EsConfig esConfig = new EsConfig();
         esConfig.setClusterName("test-cluster");
-        esConfig.setHost(new String[]{"127.0.0.1"});
-        esConfig.setPort(9300);
+        esConfig.setNodes(new String[]{"127.0.0.1:9300"});
         bolt = new EsIndexBolt(esConfig);
         bolt.prepare(config, null, collector);
         String index = "index1";

http://git-wip-us.apache.org/repos/asf/storm/blob/2002dbd7/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
index 4a82c63..f5e868a 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
@@ -48,8 +48,7 @@ public class EsIndexTopology {
         builder.setSpout(SPOUT_ID, spout, 1);
         EsConfig esConfig = new EsConfig();
         esConfig.setClusterName(EsConstants.clusterName);
-        esConfig.setHost(new String[]{"localhost"});
-        esConfig.setPort(9300);
+        esConfig.setNodes(new String[]{"localhost:9300"});
         builder.setBolt(BOLT_ID, new EsIndexBolt(esConfig), 1).shuffleGrouping(SPOUT_ID);
 
         EsTestUtil.startEsNode();

http://git-wip-us.apache.org/repos/asf/storm/blob/2002dbd7/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
index 1bd338f..4520389 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
@@ -36,8 +36,7 @@ public class EsPercolateBoltTest extends AbstractEsBoltTest {
             throws Exception {
         EsConfig esConfig = new EsConfig();
         esConfig.setClusterName("test-cluster");
-        esConfig.setHost(new String[]{"127.0.0.1"});
-        esConfig.setPort(9300);
+        esConfig.setNodes(new String[]{"localhost:9300"});
         bolt = new EsPercolateBolt(esConfig);
         bolt.prepare(config, null, collector);
         String index = "index1";

http://git-wip-us.apache.org/repos/asf/storm/blob/2002dbd7/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
index b1e62ff..aed06f6 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
@@ -50,8 +50,7 @@ public class TridentEsTopology {
         Stream stream = topology.newStream("spout", spout);
         EsConfig esConfig = new EsConfig();
         esConfig.setClusterName(EsConstants.clusterName);
-        esConfig.setHost(new String[]{"localhost"});
-        esConfig.setPort(9300);
+        esConfig.setNodes(new String[]{"localhost:9300"});
         Fields esFields = new Fields("index", "type", "source");
         StateFactory factory = new EsStateFactory(esConfig);
         TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields());