You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/08/06 00:58:49 UTC
[04/11] storm git commit: STORM-845 Storm ElasticSearch connector
STORM-845 Storm ElasticSearch connector
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2002dbd7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2002dbd7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2002dbd7
Branch: refs/heads/master
Commit: 2002dbd7cf54c9615c7c826496275d35b7471c5b
Parents: c147a4e
Author: SEUNGJIN LEE <sw...@navercorp.com>
Authored: Mon Jun 22 19:33:40 2015 +0900
Committer: SEUNGJIN LEE <sw...@navercorp.com>
Committed: Thu Jul 23 14:32:17 2015 +0900
----------------------------------------------------------------------
external/storm-elasticsearch/README.md | 17 ++++------
.../elasticsearch/bolt/AbstractEsBolt.java | 9 ++++--
.../storm/elasticsearch/common/EsConfig.java | 24 +++++---------
.../storm/elasticsearch/trident/EsState.java | 33 ++++++++++++--------
.../elasticsearch/bolt/AbstractEsBoltTest.java | 25 ++++++++-------
.../elasticsearch/bolt/EsIndexBoltTest.java | 3 +-
.../elasticsearch/bolt/EsIndexTopology.java | 3 +-
.../elasticsearch/bolt/EsPercolateBoltTest.java | 3 +-
.../trident/TridentEsTopology.java | 3 +-
9 files changed, 57 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/2002dbd7/external/storm-elasticsearch/README.md
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/README.md b/external/storm-elasticsearch/README.md
index 562fd6d..20caece 100644
--- a/external/storm-elasticsearch/README.md
+++ b/external/storm-elasticsearch/README.md
@@ -13,8 +13,7 @@ User should make sure that there are "index","type", and "source" fields declare
```java
EsConfig esConfig = new EsConfig();
esConfig.setClusterName(clusterName);
-esConfig.setHost(new String[]{"localhost"});
-esConfig.setPort(9300);
+esConfig.setNodes(new String[]{"localhost:9300"});
EsIndexBolt indexBolt = new IndexBolt(esConfig);
```
@@ -28,8 +27,7 @@ User should make sure that there are "index","type", and "source" fields declare
```java
EsConfig esConfig = new EsConfig();
esConfig.setClusterName(clusterName);
-esConfig.setHost(new String[]{"localhost"});
-esConfig.setPort(9300);
+esConfig.setNodes(new String[]{"localhost:9300"});
EsPercolateBolt percolateBolt = new EsPercolateBolt(esConfig);
```
@@ -40,8 +38,7 @@ Two bolts above takes in EsConfig as a constructor arg.
```java
EsConfig esConfig = new EsConfig();
esConfig.setClusterName(clusterName);
- esConfig.setHost(new String[]{"localhost"});
- esConfig.setPort(9300);
+ esConfig.setNodes(new String[]{"localhost:9300"});
```
EsConfig params
@@ -49,8 +46,7 @@ EsConfig params
|Arg |Description | Type
|--- |--- |---
|clusterName | ElasticSearch cluster name | String (required) |
-|host | ElasticSearch host | String array (required) |
-|port | ElasticSearch port | int (required) |
+|nodes | ElasticSearch nodes in a String array, each element should follow {host}:{port} pattern | String array (required) |
@@ -61,9 +57,8 @@ ElasticSearch Trident state also follows similar pattern to EsBolts. It takes in
```code
EsConfig esConfig = new EsConfig();
esConfig.setClusterName(clusterName);
- esConfig.setHost(new String[]{"localhost"});
- esConfig.setPort(9300);
-
+ esConfig.setNodes(new String[]{"localhost:9300"});
+
StateFactory factory = new EsStateFactory(esConfig);
TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields());
```
http://git-wip-us.apache.org/repos/asf/storm/blob/2002dbd7/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
index cd7fc81..1e2d1ed 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
@@ -47,7 +47,6 @@ public abstract class AbstractEsBolt extends BaseRichBolt {
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
- System.out.println(this.getClass().getName());
try {
this.collector = outputCollector;
synchronized (AbstractEsBolt.class) {
@@ -56,8 +55,12 @@ public abstract class AbstractEsBolt extends BaseRichBolt {
ImmutableSettings.settingsBuilder().put("cluster.name", esConfig.getClusterName())
.put("client.transport.sniff", "false").build();
List<InetSocketTransportAddress> transportAddressList = new ArrayList<InetSocketTransportAddress>();
- for (String host : esConfig.getHost()) {
- transportAddressList.add(new InetSocketTransportAddress(host, esConfig.getPort()));
+ for (String node : esConfig.getNodes()) {
+ String[] hostAndPort = node.split(":");
+ if(hostAndPort.length != 2){
+ throw new Exception("incorrect ElasticSearch node format, should follow {host}:{port} pattern");
+ }
+ transportAddressList.add(new InetSocketTransportAddress(hostAndPort[0], Integer.parseInt(hostAndPort[1])));
}
client = new TransportClient(settings)
.addTransportAddresses(transportAddressList.toArray(new InetSocketTransportAddress[transportAddressList.size()]));
http://git-wip-us.apache.org/repos/asf/storm/blob/2002dbd7/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
index f2aa48f..c97d77f 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
@@ -21,16 +21,14 @@ import java.io.Serializable;
public class EsConfig implements Serializable{
private String clusterName;
- private String[] host;
- private int port;
+ private String[] nodes;
public EsConfig() {
}
- public EsConfig(String clusterName, String[] host, int port) {
+ public EsConfig(String clusterName, String[] nodes, int port) {
this.clusterName = clusterName;
- this.host = host;
- this.port = port;
+ this.nodes = nodes;
}
public String getClusterName() {
@@ -41,19 +39,11 @@ public class EsConfig implements Serializable{
this.clusterName = clusterName;
}
- public String[] getHost() {
- return host;
+ public String[] getNodes() {
+ return nodes;
}
- public void setHost(String[] host) {
- this.host = host;
- }
-
- public int getPort() {
- return port;
- }
-
- public void setPort(int port) {
- this.port = port;
+ public void setNodes(String[] nodes) {
+ this.nodes = nodes;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2002dbd7/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
index e753119..ee95355 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p/>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -57,20 +57,27 @@ public class EsState implements State {
}
public void prepare(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
- synchronized (EsState.class) {
- if (client == null) {
- Settings settings =
- ImmutableSettings.settingsBuilder().put("cluster.name", esConfig.getClusterName())
- .put("client.transport.sniff", "true").build();
- List<InetSocketTransportAddress> transportAddressList = new ArrayList<InetSocketTransportAddress>();
- for (String host : esConfig.getHost()) {
- transportAddressList.add(new InetSocketTransportAddress(host, esConfig.getPort()));
+ try {
+ synchronized (EsState.class) {
+ if (client == null) {
+ Settings settings =
+ ImmutableSettings.settingsBuilder().put("cluster.name", esConfig.getClusterName())
+ .put("client.transport.sniff", "true").build();
+ List<InetSocketTransportAddress> transportAddressList = new ArrayList<InetSocketTransportAddress>();
+ for (String node : esConfig.getNodes()) {
+ String[] hostAndPort = node.split(":");
+ if (hostAndPort.length != 2) {
+ throw new Exception("incorrect ElasticSearch node format, should follow {host}:{port} pattern");
+ }
+ transportAddressList.add(new InetSocketTransportAddress(hostAndPort[0], Integer.parseInt(hostAndPort[1])));
+ }
+ client = new TransportClient(settings)
+ .addTransportAddresses(transportAddressList.toArray(new InetSocketTransportAddress[transportAddressList.size()]));
}
- client = new TransportClient(settings)
- .addTransportAddresses(transportAddressList.toArray(new InetSocketTransportAddress[transportAddressList.size()]));
}
+ } catch (Exception e) {
+ LOG.warn("unable to initialize EsState ", e);
}
-
}
public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
http://git-wip-us.apache.org/repos/asf/storm/blob/2002dbd7/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
index fdf7cd4..ae6b321 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
@@ -19,6 +19,7 @@ package org.apache.storm.elasticsearch.bolt;
import backtype.storm.Config;
import backtype.storm.task.OutputCollector;
+import org.apache.commons.io.FileUtils;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.client.Requests;
@@ -31,20 +32,23 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
+
+import java.io.File;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;
public class AbstractEsBoltTest {
- protected Config config = new Config();
- protected OutputCollector collector = mock(OutputCollector.class);
- protected Node node;
+ protected static Config config = new Config();
+ protected static OutputCollector collector = mock(OutputCollector.class);
+ protected static Node node;
- @Before
- public void setup() throws Exception {
- System.out.println("setup");
+ @BeforeClass
+ public static void setup() throws Exception {
node = NodeBuilder.nodeBuilder().data(true).settings(
ImmutableSettings.builder()
.put(ClusterName.SETTING, "test-cluster")
@@ -59,18 +63,17 @@ public class AbstractEsBoltTest {
ensureEsGreen(node);
ClusterHealthResponse chr = node.client().admin().cluster()
.health(Requests.clusterHealthRequest().timeout(TimeValue.timeValueSeconds(30)).waitForGreenStatus().waitForRelocatingShards(0)).actionGet();
- System.out.println(chr.getStatus());
Thread.sleep(1000);
}
- @After
- public void cleanup() throws Exception {
- System.out.println("cleanup");
+ @AfterClass
+ public static void cleanup() throws Exception {
node.stop();
node.close();
+ FileUtils.deleteDirectory(new File("./data"));
}
- private void ensureEsGreen(Node node) {
+ private static void ensureEsGreen(Node node) {
ClusterHealthResponse chr = node.client().admin().cluster()
.health(Requests.clusterHealthRequest().timeout(TimeValue.timeValueSeconds(30)).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet();
assertThat("cluster status is green", chr.getStatus(), equalTo(ClusterHealthStatus.GREEN));
http://git-wip-us.apache.org/repos/asf/storm/blob/2002dbd7/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
index e66da19..28b8bf7 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
@@ -35,8 +35,7 @@ public class EsIndexBoltTest extends AbstractEsBoltTest{
throws Exception {
EsConfig esConfig = new EsConfig();
esConfig.setClusterName("test-cluster");
- esConfig.setHost(new String[]{"127.0.0.1"});
- esConfig.setPort(9300);
+ esConfig.setNodes(new String[]{"127.0.0.1:9300"});
bolt = new EsIndexBolt(esConfig);
bolt.prepare(config, null, collector);
String index = "index1";
http://git-wip-us.apache.org/repos/asf/storm/blob/2002dbd7/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
index 4a82c63..f5e868a 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
@@ -48,8 +48,7 @@ public class EsIndexTopology {
builder.setSpout(SPOUT_ID, spout, 1);
EsConfig esConfig = new EsConfig();
esConfig.setClusterName(EsConstants.clusterName);
- esConfig.setHost(new String[]{"localhost"});
- esConfig.setPort(9300);
+ esConfig.setNodes(new String[]{"localhost:9300"});
builder.setBolt(BOLT_ID, new EsIndexBolt(esConfig), 1).shuffleGrouping(SPOUT_ID);
EsTestUtil.startEsNode();
http://git-wip-us.apache.org/repos/asf/storm/blob/2002dbd7/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
index 1bd338f..4520389 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
@@ -36,8 +36,7 @@ public class EsPercolateBoltTest extends AbstractEsBoltTest {
throws Exception {
EsConfig esConfig = new EsConfig();
esConfig.setClusterName("test-cluster");
- esConfig.setHost(new String[]{"127.0.0.1"});
- esConfig.setPort(9300);
+ esConfig.setNodes(new String[]{"localhost:9300"});
bolt = new EsPercolateBolt(esConfig);
bolt.prepare(config, null, collector);
String index = "index1";
http://git-wip-us.apache.org/repos/asf/storm/blob/2002dbd7/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
index b1e62ff..aed06f6 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
@@ -50,8 +50,7 @@ public class TridentEsTopology {
Stream stream = topology.newStream("spout", spout);
EsConfig esConfig = new EsConfig();
esConfig.setClusterName(EsConstants.clusterName);
- esConfig.setHost(new String[]{"localhost"});
- esConfig.setPort(9300);
+ esConfig.setNodes(new String[]{"localhost:9300"});
Fields esFields = new Fields("index", "type", "source");
StateFactory factory = new EsStateFactory(esConfig);
TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields());