You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2017/04/18 19:26:17 UTC
[2/4] storm git commit: STORM-2379: update for Elasticsearch 2.
STORM-2379: update for Elasticsearch 2.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2b4565c6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2b4565c6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2b4565c6
Branch: refs/heads/master
Commit: 2b4565c668c16c366e086214d11f3ec2e1d4769c
Parents: 1a17f74
Author: Heather McCartney <he...@problemchimp.org>
Authored: Mon Mar 13 13:16:27 2017 +0000
Committer: Heather McCartney <he...@problemchimp.org>
Committed: Thu Apr 13 21:30:18 2017 +0100
----------------------------------------------------------------------
examples/storm-elasticsearch-examples/pom.xml | 10 ++
.../elasticsearch/bolt/EsIndexTopology.java | 3 +-
.../storm/elasticsearch/common/EsTestUtil.java | 14 +-
.../trident/TridentEsTopology.java | 3 +-
external/storm-elasticsearch/pom.xml | 44 ++++++-
.../DefaultEsLookupResultOutput.java | 62 +++++++++
.../elasticsearch/ElasticsearchGetRequest.java | 36 -----
.../elasticsearch/EsLookupResultOutput.java | 5 +-
.../elasticsearch/bolt/AbstractEsBolt.java | 53 +++++---
.../storm/elasticsearch/bolt/EsIndexBolt.java | 31 +++--
.../storm/elasticsearch/bolt/EsLookupBolt.java | 49 ++++---
.../elasticsearch/bolt/EsPercolateBolt.java | 41 ++++--
.../common/DefaultEsTupleMapper.java | 20 +++
.../storm/elasticsearch/common/EsConfig.java | 130 +++++++++++++------
.../elasticsearch/common/EsTupleMapper.java | 9 ++
.../common/StormElasticSearchClient.java | 37 +++---
.../common/TransportAddresses.java | 72 ----------
.../apache/storm/elasticsearch/doc/Index.java | 69 ++++++++++
.../storm/elasticsearch/doc/IndexDoc.java | 43 ++++++
.../storm/elasticsearch/doc/IndexItem.java | 91 +++++++++++++
.../storm/elasticsearch/doc/IndexItemDoc.java | 42 ++++++
.../apache/storm/elasticsearch/doc/Shards.java | 63 +++++++++
.../storm/elasticsearch/doc/SourceDoc.java | 43 ++++++
.../response/BulkIndexResponse.java | 80 ++++++++++++
.../elasticsearch/response/LookupResponse.java | 63 +++++++++
.../response/PercolateResponse.java | 85 ++++++++++++
.../storm/elasticsearch/trident/EsState.java | 72 ++++++----
.../elasticsearch/trident/EsStateFactory.java | 15 +--
.../bolt/AbstractEsBoltIntegrationTest.java | 68 +++-------
.../elasticsearch/bolt/AbstractEsBoltTest.java | 15 ++-
.../elasticsearch/bolt/EsIndexBoltTest.java | 44 ++++---
.../bolt/EsLookupBoltIntegrationTest.java | 75 +++--------
.../elasticsearch/bolt/EsLookupBoltTest.java | 67 +++++-----
.../elasticsearch/bolt/EsPercolateBoltTest.java | 62 ++++++---
.../elasticsearch/common/EsConfigTest.java | 60 ++++-----
.../storm/elasticsearch/common/EsTestUtil.java | 99 ++++++++++++--
.../common/TransportAddressesTest.java | 81 ------------
.../trident/EsStateFactoryTest.java | 2 +-
.../elasticsearch/trident/EsStateTest.java | 98 ++++++++++++++
.../src/test/resources/log4j2.xml | 33 +++++
40 files changed, 1424 insertions(+), 565 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/examples/storm-elasticsearch-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-elasticsearch-examples/pom.xml b/examples/storm-elasticsearch-examples/pom.xml
index 64c8229..23a717d 100644
--- a/examples/storm-elasticsearch-examples/pom.xml
+++ b/examples/storm-elasticsearch-examples/pom.xml
@@ -26,6 +26,10 @@
</parent>
<artifactId>storm-elasticsearch-examples</artifactId>
+ <properties>
+ <elasticsearch.test.version>2.4.4</elasticsearch.test.version>
+ </properties>
+
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
@@ -38,6 +42,12 @@
<artifactId>storm-elasticsearch</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ <version>${elasticsearch.test.version}</version>
+ <scope>${provided.scope}</scope>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
index 3cd2bc8..c7ec7d0 100644
--- a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
+++ b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
@@ -24,7 +24,6 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.elasticsearch.common.EsConfig;
-import org.apache.storm.elasticsearch.common.EsConstants;
import org.apache.storm.elasticsearch.common.EsTestUtil;
import org.apache.storm.elasticsearch.common.EsTupleMapper;
import org.apache.storm.spout.SpoutOutputCollector;
@@ -48,7 +47,7 @@ public class EsIndexTopology {
UserDataSpout spout = new UserDataSpout();
builder.setSpout(SPOUT_ID, spout, 1);
EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
- EsConfig esConfig = new EsConfig(EsConstants.clusterName, new String[]{"localhost:9300"});
+ EsConfig esConfig = new EsConfig("http://localhost:9300");
builder.setBolt(BOLT_ID, new EsIndexBolt(esConfig, tupleMapper), 1).shuffleGrouping(SPOUT_ID);
EsTestUtil.startEsNode();
http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
----------------------------------------------------------------------
diff --git a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsTestUtil.java b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
index cb1c745..189813a 100644
--- a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
+++ b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
@@ -17,23 +17,22 @@
*/
package org.apache.storm.elasticsearch.common;
+import java.util.HashMap;
+
import org.apache.storm.Config;
import org.apache.storm.task.GeneralTopologyContext;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.ITuple;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.tuple.Values;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.metadata.IndexMetaData;
-import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
-import java.util.HashMap;
-
public class EsTestUtil {
public static Tuple generateTestTuple(String source, String index, String type, String id) {
TopologyBuilder builder = new TopologyBuilder();
@@ -53,14 +52,15 @@ public class EsTestUtil {
public static Node startEsNode(){
Node node = NodeBuilder.nodeBuilder().data(true).settings(
- ImmutableSettings.builder()
+ Settings.settingsBuilder()
.put(ClusterName.SETTING, EsConstants.clusterName)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(EsExecutors.PROCESSORS, 1)
- .put("http.enabled", false)
+ .put("http.enabled", true)
.put("index.percolator.map_unmapped_fields_as_string", true)
- .put("index.store.type", "memory")
+ .put("index.store.type", "mmapfs")
+ .put("path.home", "./data")
).build();
node.start();
return node;
http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
index 307a991..e7fb2ef 100644
--- a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
+++ b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
@@ -26,7 +26,6 @@ import java.util.UUID;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.elasticsearch.common.EsConfig;
-import org.apache.storm.elasticsearch.common.EsConstants;
import org.apache.storm.elasticsearch.common.EsTestUtil;
import org.apache.storm.elasticsearch.common.EsTupleMapper;
import org.apache.storm.task.TopologyContext;
@@ -50,7 +49,7 @@ public class TridentEsTopology {
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout", spout);
- EsConfig esConfig = new EsConfig(EsConstants.clusterName, new String[]{"localhost:9300"});
+ EsConfig esConfig = new EsConfig("http://localhost:9300");
Fields esFields = new Fields("index", "type", "source");
EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
StateFactory factory = new EsStateFactory(esConfig, tupleMapper);
http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/pom.xml b/external/storm-elasticsearch/pom.xml
index bab0426..17d65d1 100644
--- a/external/storm-elasticsearch/pom.xml
+++ b/external/storm-elasticsearch/pom.xml
@@ -34,10 +34,16 @@
<name>Adrian Seungjin Lee</name>
<email>sweetest.sj@navercorp.com</email>
</developer>
+ <developer>
+ <id>hmcc</id>
+ <name>Heather McCartney</name>
+ <email>heather@problemchimp.org</email>
+ </developer>
</developers>
<properties>
- <elasticsearch.version>1.6.0</elasticsearch.version>
+ <elasticsearch.version>5.2.2</elasticsearch.version>
+ <elasticsearch.test.version>2.4.4</elasticsearch.test.version>
</properties>
<dependencies>
@@ -53,10 +59,27 @@
<scope>${provided.scope}</scope>
</dependency>
<dependency>
- <groupId>org.elasticsearch</groupId>
- <artifactId>elasticsearch</artifactId>
+ <groupId>org.elasticsearch.client</groupId>
+ <artifactId>rest</artifactId>
<version>${elasticsearch.version}</version>
- <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpasyncclient</artifactId>
+ <version>4.1.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.5.2</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
@@ -80,13 +103,24 @@
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ <version>${elasticsearch.test.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>18.0</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-clean-plugin</artifactId>
- <version>2.5</version>
<executions>
<execution>
<id>cleanup</id>
http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/DefaultEsLookupResultOutput.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/DefaultEsLookupResultOutput.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/DefaultEsLookupResultOutput.java
new file mode 100644
index 0000000..533dc65
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/DefaultEsLookupResultOutput.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.storm.elasticsearch.response.LookupResponse;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.elasticsearch.client.Response;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * Default implementation of {@link EsLookupResultOutput}.
+ * Outputs the index, type, id and source as strings.
+ */
+public class DefaultEsLookupResultOutput implements EsLookupResultOutput {
+
+ private static final long serialVersionUID = 2932278450655703239L;
+
+ private ObjectMapper objectMapper;
+
+ public DefaultEsLookupResultOutput(ObjectMapper objectMapper) {
+ super();
+ this.objectMapper = objectMapper;
+ }
+
+ @Override
+ public Collection<Values> toValues(Response response) {
+ LookupResponse lookupResponse;
+ try {
+ lookupResponse = objectMapper.readValue(response.getEntity().getContent(), LookupResponse.class);
+ } catch (UnsupportedOperationException | IOException e) {
+ throw new IllegalArgumentException("Response " + response + " is invalid", e);
+ }
+ return Collections.singleton(new Values(lookupResponse.getIndex(), lookupResponse.getType(), lookupResponse.getId(), lookupResponse.getSource()));
+ }
+
+ @Override
+ public Fields fields() {
+ return new Fields("index", "type", "id", "source");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/ElasticsearchGetRequest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/ElasticsearchGetRequest.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/ElasticsearchGetRequest.java
deleted file mode 100644
index b9a7885..0000000
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/ElasticsearchGetRequest.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.elasticsearch;
-
-import java.io.Serializable;
-
-import org.elasticsearch.action.get.GetRequest;
-
-import org.apache.storm.tuple.ITuple;
-
-/**
- * @since 0.11
- * The adapter to convert the incoming tuple to Elasticsearch GetRequest.
- */
-public interface ElasticsearchGetRequest extends Serializable {
-
- /**
- * @return GetRequest to perform against Elasticsearch.
- */
- GetRequest extractFrom(ITuple tuple);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/EsLookupResultOutput.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/EsLookupResultOutput.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/EsLookupResultOutput.java
index d00fd47..b057729 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/EsLookupResultOutput.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/EsLookupResultOutput.java
@@ -20,10 +20,9 @@ package org.apache.storm.elasticsearch;
import java.io.Serializable;
import java.util.Collection;
-import org.elasticsearch.action.get.GetResponse;
-
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
+import org.elasticsearch.client.Response;
/**
* @since 0.11
@@ -34,7 +33,7 @@ public interface EsLookupResultOutput extends Serializable {
/**
* @return collection of values to emit.
*/
- Collection<Values> toValues(GetResponse response);
+ Collection<Values> toValues(Response response);
/**
* @return output fields to declare.
http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
index b53e183..42f20e6 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
@@ -17,37 +17,35 @@
*/
package org.apache.storm.elasticsearch.bolt;
+import static java.util.Objects.requireNonNull;
+import static org.apache.http.util.Args.notBlank;
+
import java.util.Map;
import org.apache.storm.elasticsearch.common.EsConfig;
import org.apache.storm.elasticsearch.common.StormElasticSearchClient;
-import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
-import org.apache.storm.utils.TupleUtils;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.common.annotations.VisibleForTesting;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.apache.storm.tuple.Tuple;
+import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
+import org.elasticsearch.client.RestClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import static org.elasticsearch.common.base.Preconditions.checkNotNull;
+import com.fasterxml.jackson.databind.ObjectMapper;
public abstract class AbstractEsBolt extends BaseTickTupleAwareRichBolt {
private static final Logger LOG = LoggerFactory.getLogger(AbstractEsBolt.class);
- protected static Client client;
+ protected static RestClient client;
+ protected final static ObjectMapper objectMapper = new ObjectMapper();
protected OutputCollector collector;
private EsConfig esConfig;
public AbstractEsBolt(EsConfig esConfig) {
- checkNotNull(esConfig);
- this.esConfig = esConfig;
+ this.esConfig = requireNonNull(esConfig);
}
@Override
@@ -68,13 +66,34 @@ public abstract class AbstractEsBolt extends BaseTickTupleAwareRichBolt {
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
- @VisibleForTesting
- static Client getClient() {
+ /**
+ * Construct an Elasticsearch endpoint from the provided index, type and
+ * id.
+ * @param index - required; name of Elasticsearch index
+ * @param type - optional; name of Elasticsearch type
+ * @param id - optional; Elasticsearch document ID
+ * @return the index, type and id concatenated with '/'.
+ */
+ static String getEndpoint(String index, String type, String id) {
+ requireNonNull(index);
+ notBlank(index, "index");
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("/").append(index);
+ if (!(type == null || type.isEmpty())) {
+ sb.append("/").append(type);
+ }
+ if (!(id == null || id.isEmpty())) {
+ sb.append("/").append(id);
+ }
+ return sb.toString();
+ }
+
+ static RestClient getClient() {
return AbstractEsBolt.client;
}
- @VisibleForTesting
- static void replaceClient(Client client) {
+ static void replaceClient(RestClient client) {
AbstractEsBolt.client = client;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
index 5d66eb9..1f3122b 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
@@ -17,17 +17,19 @@
*/
package org.apache.storm.elasticsearch.bolt;
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.http.entity.StringEntity;
+import org.apache.storm.elasticsearch.common.DefaultEsTupleMapper;
+import org.apache.storm.elasticsearch.common.EsConfig;
+import org.apache.storm.elasticsearch.common.EsTupleMapper;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
-import org.apache.storm.elasticsearch.common.EsConfig;
-import org.apache.storm.elasticsearch.common.EsTupleMapper;
-import org.apache.storm.utils.TupleUtils;
-
-import java.util.Map;
-
-import static org.elasticsearch.common.base.Preconditions.checkNotNull;
/**
* Basic bolt for storing tuple to ES document.
@@ -37,12 +39,20 @@ public class EsIndexBolt extends AbstractEsBolt {
/**
* EsIndexBolt constructor
- * @param esConfig Elasticsearch configuration containing node addresses and cluster name {@link EsConfig}
+ * @param esConfig Elasticsearch configuration containing node addresses {@link EsConfig}
+ */
+ public EsIndexBolt(EsConfig esConfig) {
+ this(esConfig, new DefaultEsTupleMapper());
+ }
+
+ /**
+ * EsIndexBolt constructor
+ * @param esConfig Elasticsearch configuration containing node addresses {@link EsConfig}
* @param tupleMapper Tuple to ES document mapper {@link EsTupleMapper}
*/
public EsIndexBolt(EsConfig esConfig, EsTupleMapper tupleMapper) {
super(esConfig);
- this.tupleMapper = checkNotNull(tupleMapper);
+ this.tupleMapper = requireNonNull(tupleMapper);
}
@Override
@@ -61,8 +71,9 @@ public class EsIndexBolt extends AbstractEsBolt {
String index = tupleMapper.getIndex(tuple);
String type = tupleMapper.getType(tuple);
String id = tupleMapper.getId(tuple);
+ Map<String, String> params = tupleMapper.getParams(tuple, new HashMap<>());
- client.prepareIndex(index, type, id).setSource(source).execute().actionGet();
+ client.performRequest("put", getEndpoint(index, type, id), params, new StringEntity(source));
collector.ack(tuple);
} catch (Exception e) {
collector.reportError(e);
http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java
index 895e30b..1ff4686 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java
@@ -17,38 +17,51 @@
*/
package org.apache.storm.elasticsearch.bolt;
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
-import org.apache.storm.elasticsearch.ElasticsearchGetRequest;
+import org.apache.storm.elasticsearch.DefaultEsLookupResultOutput;
import org.apache.storm.elasticsearch.EsLookupResultOutput;
+import org.apache.storm.elasticsearch.common.DefaultEsTupleMapper;
import org.apache.storm.elasticsearch.common.EsConfig;
-import org.apache.storm.utils.TupleUtils;
-import org.elasticsearch.action.get.GetRequest;
-import org.elasticsearch.action.get.GetResponse;
-
+import org.apache.storm.elasticsearch.common.EsTupleMapper;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
-
-import static org.elasticsearch.common.base.Preconditions.checkNotNull;
+import org.elasticsearch.client.Response;
/**
* @since 0.11
*/
public class EsLookupBolt extends AbstractEsBolt {
- private final ElasticsearchGetRequest getRequest;
+ private final EsTupleMapper tupleMapper;
private final EsLookupResultOutput output;
/**
+ * EsLookupBolt constructor.
+ * @param esConfig Elasticsearch configuration containing node addresses {@link EsConfig}
* @throws NullPointerException if any of the parameters is null
*/
- public EsLookupBolt(EsConfig esConfig, ElasticsearchGetRequest getRequest, EsLookupResultOutput output) {
+ public EsLookupBolt(EsConfig esConfig) {
+ this(esConfig, new DefaultEsTupleMapper(), new DefaultEsLookupResultOutput(objectMapper));
+ }
+
+ /**
+ * EsLookupBolt constructor.
+ * @param esConfig Elasticsearch configuration containing node addresses {@link EsConfig}
+ * @param tupleMapper Tuple to ES document mapper {@link EsTupleMapper}
+ * @param output ES response to Values mapper {@link EsLookupResultOutput}
+ * @throws NullPointerException if any of the parameters is null
+ */
+ public EsLookupBolt(EsConfig esConfig, EsTupleMapper tupleMapper, EsLookupResultOutput output) {
super(esConfig);
- checkNotNull(getRequest);
- checkNotNull(output);
- this.getRequest = getRequest;
- this.output = output;
+ this.tupleMapper = requireNonNull(tupleMapper);
+ this.output = requireNonNull(output);
}
@Override
@@ -62,9 +75,13 @@ public class EsLookupBolt extends AbstractEsBolt {
}
}
- private Collection<Values> lookupValuesInEs(Tuple tuple) {
- GetRequest request = getRequest.extractFrom(tuple);
- GetResponse response = client.get(request).actionGet();
+ private Collection<Values> lookupValuesInEs(Tuple tuple) throws IOException {
+ String index = tupleMapper.getIndex(tuple);
+ String type = tupleMapper.getType(tuple);
+ String id = tupleMapper.getId(tuple);
+ Map<String, String> params = tupleMapper.getParams(tuple, new HashMap<>());
+
+ Response response = client.performRequest("get", getEndpoint(index, type, id), params);
return output.toValues(response);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
index 0b96dfc..4d969d5 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
@@ -17,21 +17,23 @@
*/
package org.apache.storm.elasticsearch.bolt;
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.http.entity.StringEntity;
+import org.apache.storm.elasticsearch.common.DefaultEsTupleMapper;
+import org.apache.storm.elasticsearch.common.EsConfig;
+import org.apache.storm.elasticsearch.common.EsTupleMapper;
+import org.apache.storm.elasticsearch.response.PercolateResponse;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
-import org.apache.storm.elasticsearch.common.EsConfig;
-import org.apache.storm.elasticsearch.common.EsTupleMapper;
-import org.apache.storm.utils.TupleUtils;
-import org.elasticsearch.action.percolate.PercolateResponse;
-import org.elasticsearch.action.percolate.PercolateSourceBuilder;
-
-import java.util.Map;
-
-import static org.elasticsearch.common.base.Preconditions.checkNotNull;
+import org.elasticsearch.client.Response;
/**
* Basic bolt for retrieve matched percolate queries.
@@ -42,12 +44,20 @@ public class EsPercolateBolt extends AbstractEsBolt {
/**
* EsPercolateBolt constructor
+ * @param esConfig Elasticsearch configuration containing node addresses {@link EsConfig}
+ */
+ public EsPercolateBolt(EsConfig esConfig) {
+ this(esConfig, new DefaultEsTupleMapper());
+ }
+
+ /**
+ * EsPercolateBolt constructor
* @param esConfig Elasticsearch configuration containing node addresses and cluster name {@link EsConfig}
* @param tupleMapper Tuple to ES document mapper {@link EsTupleMapper}
*/
public EsPercolateBolt(EsConfig esConfig, EsTupleMapper tupleMapper) {
super(esConfig);
- this.tupleMapper = checkNotNull(tupleMapper);
+ this.tupleMapper = requireNonNull(tupleMapper);
}
@Override
@@ -68,10 +78,13 @@ public class EsPercolateBolt extends AbstractEsBolt {
String index = tupleMapper.getIndex(tuple);
String type = tupleMapper.getType(tuple);
- PercolateResponse response = client.preparePercolate().setIndices(index).setDocumentType(type)
- .setPercolateDoc(PercolateSourceBuilder.docBuilder().setDoc(source)).execute().actionGet();
- if (response.getCount() > 0) {
- for (PercolateResponse.Match match : response) {
+ Map<String, String> indexParams = new HashMap<>();
+ indexParams.put(type, null);
+ String percolateDoc = "{\"doc\": " + source + "}";
+ Response response = client.performRequest("get", getEndpoint(index, type, "_percolate"), new HashMap<>(), new StringEntity(percolateDoc));
+ PercolateResponse percolateResponse = objectMapper.readValue(response.getEntity().getContent(), PercolateResponse.class);
+ if (!percolateResponse.getMatches().isEmpty()) {
+ for (PercolateResponse.Match match : percolateResponse.getMatches()) {
collector.emit(new Values(source, match));
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/DefaultEsTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/DefaultEsTupleMapper.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/DefaultEsTupleMapper.java
index 0a15922..c8d750d 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/DefaultEsTupleMapper.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/DefaultEsTupleMapper.java
@@ -17,6 +17,10 @@
*/
package org.apache.storm.elasticsearch.common;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
import org.apache.storm.tuple.ITuple;
public class DefaultEsTupleMapper implements EsTupleMapper {
@@ -39,4 +43,20 @@ public class DefaultEsTupleMapper implements EsTupleMapper {
public String getId(ITuple tuple) {
return tuple.getStringByField("id");
}
+
+ @Override
+ public Map<String, String> getParams(ITuple tuple, Map<String, String> defaultValue) {
+ if (!tuple.contains("params")) {
+ return defaultValue;
+ }
+ Object o = tuple.getValueByField("params");
+ if (o instanceof Map) {
+ Map<String, String> params = new HashMap<>();
+ for (Map.Entry<?, ?> entry : ((Map<?, ?>) o).entrySet()) {
+ params.put(entry.getKey().toString(), entry.getValue() == null ? null : entry.getValue().toString());
+ }
+ return params;
+ }
+ return defaultValue;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
index 6bbd81f..02f045d 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
@@ -18,65 +18,117 @@
package org.apache.storm.elasticsearch.common;
import java.io.Serializable;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+import java.net.URI;
+import java.net.URISyntaxException;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.settings.Settings;
-
-import static org.elasticsearch.common.base.Preconditions.checkArgument;
-import static org.elasticsearch.common.base.Preconditions.checkNotNull;
+import org.apache.http.Header;
+import org.apache.http.HttpHost;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
+import org.elasticsearch.client.RestClientBuilder.RequestConfigCallback;
/**
* @since 0.11
*/
public class EsConfig implements Serializable {
- private final String clusterName;
- private final String[] nodes;
- private final Map<String, String> additionalConfiguration;
+ private final HttpHost[] httpHosts;
+ private Integer maxRetryTimeoutMillis;
+ private Header[] defaultHeaders;
+ private RestClient.FailureListener failureListener;
+ private HttpClientConfigCallback httpClientConfigCallback;
+ private RequestConfigCallback requestConfigCallback;
+ private String pathPrefix;
/**
- * EsConfig Constructor to be used in EsIndexBolt, EsPercolateBolt and EsStateFactory
- *
- * @param clusterName Elasticsearch cluster name
- * @param nodes Elasticsearch addresses in host:port pattern string array
- * @throws IllegalArgumentException if nodes are empty
- * @throws NullPointerException on any of the fields being null
- */
- public EsConfig(String clusterName, String[] nodes) {
- this(clusterName, nodes, Collections.<String, String>emptyMap());
+ * EsConfig Constructor to be used in EsIndexBolt, EsPercolateBolt and EsStateFactory.
+ * Connects to Elasticsearch at http://localhost:9200.
+ */
+ public EsConfig() {
+ this("http://localhost:9200");
}
/**
* EsConfig Constructor to be used in EsIndexBolt, EsPercolateBolt and EsStateFactory
*
- * @param clusterName Elasticsearch cluster name
- * @param nodes Elasticsearch addresses in host:port pattern string array
- * @param additionalConfiguration Additional Elasticsearch configuration
- * @throws IllegalArgumentException if nodes are empty
+ * @param urls Elasticsearch addresses in scheme://host:port pattern string array
+ * @throws IllegalArgumentException if urls are empty
* @throws NullPointerException on any of the fields being null
*/
- public EsConfig(String clusterName, String[] nodes, Map<String, String> additionalConfiguration) {
- checkNotNull(clusterName);
- checkNotNull(nodes);
- checkNotNull(additionalConfiguration);
+ public EsConfig(String... urls) {
+ if (urls.length == 0) {
+ throw new IllegalArgumentException("urls is required");
+ }
+ this.httpHosts = new HttpHost[urls.length];
+ for (int i = 0; i < urls.length; i++) {
+ URI uri = toURI(urls[i]);
+ this.httpHosts[i] = new HttpHost(uri.getHost(), uri.getPort(), uri.getScheme());
+ }
+ }
+
+ static URI toURI(String url) throws IllegalArgumentException {
+ try {
+ return new URI(url);
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException("Invalid url " + url);
+ }
+ }
+
+ public EsConfig withMaxRetryTimeoutMillis(Integer maxRetryTimeoutMillis) {
+ this.maxRetryTimeoutMillis = maxRetryTimeoutMillis;
+ return this;
+ }
+
+ public EsConfig withDefaultHeaders(Header[] defaultHeaders) {
+ this.defaultHeaders = defaultHeaders;
+ return this;
+ }
+
+ public EsConfig withFailureListener(RestClient.FailureListener failureListener) {
+ this.failureListener = failureListener;
+ return this;
+ }
+
+ public EsConfig withHttpClientConfigCallback(HttpClientConfigCallback httpClientConfigCallback) {
+ this.httpClientConfigCallback = httpClientConfigCallback;
+ return this;
+ }
+
+ public EsConfig withRequestConfigCallback(RequestConfigCallback requestConfigCallback) {
+ this.requestConfigCallback = requestConfigCallback;
+ return this;
+ }
+
+ public EsConfig withPathPrefix(String pathPrefix) {
+ this.pathPrefix = pathPrefix;
+ return this;
+ }
+
+ public HttpHost[] getHttpHosts() {
+ return httpHosts;
+ }
+
+ public Integer getMaxRetryTimeoutMillis() {
+ return maxRetryTimeoutMillis;
+ }
+
+ public Header[] getDefaultHeaders() {
+ return defaultHeaders;
+ }
+
+ public RestClient.FailureListener getFailureListener() {
+ return failureListener;
+ }
- checkArgument(nodes.length != 0, "Nodes cannot be empty");
- this.clusterName = clusterName;
- this.nodes = nodes;
- this.additionalConfiguration = new HashMap<>(additionalConfiguration);
+ public HttpClientConfigCallback getHttpClientConfigCallback() {
+ return httpClientConfigCallback;
}
- TransportAddresses getTransportAddresses() {
- return new TransportAddresses(nodes);
+ public RequestConfigCallback getRequestConfigCallback() {
+ return requestConfigCallback;
}
- Settings toBasicSettings() {
- return ImmutableSettings.settingsBuilder()
- .put("cluster.name", clusterName)
- .put(additionalConfiguration)
- .build();
+ public String getPathPrefix() {
+ return pathPrefix;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsTupleMapper.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsTupleMapper.java
index 5b6c425..285e112 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsTupleMapper.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsTupleMapper.java
@@ -20,6 +20,7 @@ package org.apache.storm.elasticsearch.common;
import org.apache.storm.tuple.ITuple;
import java.io.Serializable;
+import java.util.Map;
/**
* TupleMapper defines how to extract source, index, type, and id from tuple for ElasticSearch.
@@ -52,4 +53,12 @@ public interface EsTupleMapper extends Serializable {
* @return id
*/
String getId(ITuple tuple);
+
+ /**
+ * Extracts params from tuple if available.
+ * @param tuple source tuple
+ * @param defaultValue value to return if params are missing
+ * @return params
+ */
+ Map<String, String> getParams(ITuple tuple, Map<String, String> defaultValue);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/StormElasticSearchClient.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/StormElasticSearchClient.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/StormElasticSearchClient.java
index 3ebfe72..a4aca59 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/StormElasticSearchClient.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/StormElasticSearchClient.java
@@ -19,10 +19,8 @@ package org.apache.storm.elasticsearch.common;
import java.io.Serializable;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
public final class StormElasticSearchClient implements Serializable {
@@ -32,17 +30,26 @@ public final class StormElasticSearchClient implements Serializable {
this.esConfig = esConfig;
}
- public Client construct() {
- Settings settings = esConfig.toBasicSettings();
- TransportClient transportClient = new TransportClient(settings);
- addTransportAddresses(transportClient);
- return transportClient;
- }
-
- private void addTransportAddresses(TransportClient transportClient) {
- Iterable<InetSocketTransportAddress> transportAddresses = esConfig.getTransportAddresses();
- for (InetSocketTransportAddress transportAddress : transportAddresses) {
- transportClient.addTransportAddress(transportAddress);
+ public RestClient construct() {
+ RestClientBuilder builder = RestClient.builder(esConfig.getHttpHosts());
+ if (esConfig.getMaxRetryTimeoutMillis() != null) {
+ builder.setMaxRetryTimeoutMillis(esConfig.getMaxRetryTimeoutMillis());
+ }
+ if (esConfig.getDefaultHeaders() != null) {
+ builder.setDefaultHeaders(esConfig.getDefaultHeaders());
+ }
+ if (esConfig.getFailureListener() != null) {
+ builder.setFailureListener(esConfig.getFailureListener());
+ }
+ if (esConfig.getHttpClientConfigCallback() != null) {
+ builder.setHttpClientConfigCallback(esConfig.getHttpClientConfigCallback());
+ }
+ if (esConfig.getRequestConfigCallback() != null) {
+ builder.setRequestConfigCallback(esConfig.getRequestConfigCallback());
+ }
+ if (esConfig.getPathPrefix() != null) {
+ builder.setPathPrefix(esConfig.getPathPrefix());
}
+ return builder.build();
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/TransportAddresses.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/TransportAddresses.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/TransportAddresses.java
deleted file mode 100644
index cd082a7..0000000
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/TransportAddresses.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.elasticsearch.common;
-
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-
-final class TransportAddresses implements Iterable<InetSocketTransportAddress> {
-
- static final String DELIMETER = ":";
-
- private final String[] nodes;
-
- TransportAddresses(String[] nodes) {
- if (nodes == null) {
- throw new IllegalArgumentException("Elasticsearch hosts cannot be null");
- }
- if (nodes.length == 0) {
- throw new IllegalArgumentException("At least one Elasticsearch host must be specified");
- }
-
- this.nodes = nodes;
- }
-
- @Override
- public Iterator<InetSocketTransportAddress> iterator() {
- List<InetSocketTransportAddress> result = new LinkedList<>();
-
- for (String node : nodes) {
- InetSocketTransportAddress transportAddress = transformToInetAddress(node);
- result.add(transportAddress);
- }
-
- return result.iterator();
- }
-
- private InetSocketTransportAddress transformToInetAddress(String node) {
- String[] hostAndPort = node.split(DELIMETER);
- if (hostAndPort.length != 2) {
- throw new IllegalArgumentException(
- "Incorrect Elasticsearch node format, should follow {host}" + DELIMETER + "{port} pattern");
- }
- String hostname = hostname(hostAndPort[0]);
- return new InetSocketTransportAddress(hostname, port(hostAndPort[1]));
- }
-
- private String hostname(String input) {
- return input.trim();
- }
-
- private int port(String input) {
- return Integer.parseInt(input.trim());
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/Index.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/Index.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/Index.java
new file mode 100644
index 0000000..d010abe
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/Index.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.doc;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Elasticsearch document fragment with "_index", "_type" and "_id" fields.
+ */
+public class Index {
+
+ public Index() {
+
+ }
+
+ public Index(String index, String type, String id) {
+ this.index = index;
+ this.type = type;
+ this.id = id;
+ }
+
+ @JsonProperty("_index")
+ private String index;
+
+ @JsonProperty("_type")
+ private String type;
+
+ @JsonProperty("_id")
+ private String id;
+
+ public String getIndex() {
+ return index;
+ }
+
+ public void setIndex(String index) {
+ this.index = index;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/IndexDoc.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/IndexDoc.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/IndexDoc.java
new file mode 100644
index 0000000..c695638
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/IndexDoc.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.doc;
+
+/**
+ * Elasticsearch document fragment with a single "index" field, used in bulk
+ * requests.
+ */
+public class IndexDoc {
+
+ public IndexDoc() {
+
+ }
+
+ public IndexDoc(Index index) {
+ this.index = index;
+ }
+
+ private Index index;
+
+ public Index getIndex() {
+ return index;
+ }
+
+ public void setIndex(Index index) {
+ this.index = index;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/IndexItem.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/IndexItem.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/IndexItem.java
new file mode 100644
index 0000000..e04c48e
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/IndexItem.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.doc;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonRawValue;
+import com.fasterxml.jackson.databind.JsonNode;
+
+/**
+ * Elasticsearch document fragment containing extended index information, used
+ * in bulk index responses.
+ */
+public class IndexItem extends Index {
+
+ @JsonProperty("_version")
+ private long version;
+
+ private String result;
+ private boolean created;
+ private int status;
+
+ @JsonRawValue
+ private Object error;
+
+
+ @JsonProperty("_shards")
+ private Shards shards;
+
+ public long getVersion() {
+ return version;
+ }
+
+ public void setVersion(long version) {
+ this.version = version;
+ }
+
+ public String getResult() {
+ return result;
+ }
+
+ public void setResult(String result) {
+ this.result = result;
+ }
+
+ public boolean isCreated() {
+ return created;
+ }
+
+ public void setCreated(boolean created) {
+ this.created = created;
+ }
+
+ public int getStatus() {
+ return status;
+ }
+
+ public void setStatus(int status) {
+ this.status = status;
+ }
+
+ public String getError() {
+ return error == null ? null : error.toString();
+ }
+
+ public void setError(JsonNode error) {
+ this.error = error;
+ }
+
+ public Shards getShards() {
+ return shards;
+ }
+
+ public void setShards(Shards shards) {
+ this.shards = shards;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/IndexItemDoc.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/IndexItemDoc.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/IndexItemDoc.java
new file mode 100644
index 0000000..60a330e
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/IndexItemDoc.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.doc;
+
+/**
+ * Elasticsearch document with a single "index" field, used in bulk responses.
+ */
+public class IndexItemDoc {
+
+ public IndexItemDoc() {
+
+ }
+
+ public IndexItemDoc(IndexItem index) {
+ this.index = index;
+ }
+
+ private IndexItem index;
+
+ public IndexItem getIndex() {
+ return index;
+ }
+
+ public void setIndex(IndexItem index) {
+ this.index = index;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/Shards.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/Shards.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/Shards.java
new file mode 100644
index 0000000..87c7ad5
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/Shards.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.doc;
+
+import java.util.List;
+
+/**
+ * Elasticsearch document fragment containing shard success information,
+ * used in percolate and bulk index responses.
+ */
+public class Shards {
+ private int total;
+ private int successful;
+ private int failed;
+ private List<Object> failures;
+
+ public int getTotal() {
+ return total;
+ }
+
+ public void setTotal(int total) {
+ this.total = total;
+ }
+
+ public int getSuccessful() {
+ return successful;
+ }
+
+ public void setSuccessful(int successful) {
+ this.successful = successful;
+ }
+
+ public int getFailed() {
+ return failed;
+ }
+
+ public void setFailed(int failed) {
+ this.failed = failed;
+ }
+
+ public List<Object> getFailures() {
+ return failures;
+ }
+
+ public void setFailures(List<Object> failures) {
+ this.failures = failures;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/SourceDoc.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/SourceDoc.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/SourceDoc.java
new file mode 100644
index 0000000..ea30e65
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/SourceDoc.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.doc;
+
+/**
+ * Elasticsearch document fragment with a single field, "source", used in bulk
+ * requests.
+ */
+public class SourceDoc {
+
+ public SourceDoc() {
+
+ }
+
+ public SourceDoc(String source) {
+ this.source = source;
+ }
+
+ private String source;
+
+ public String getSource() {
+ return source;
+ }
+
+ public void setSource(String source) {
+ this.source = source;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/response/BulkIndexResponse.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/response/BulkIndexResponse.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/response/BulkIndexResponse.java
new file mode 100644
index 0000000..7131e63
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/response/BulkIndexResponse.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.response;
+
+import java.util.List;
+
+import org.apache.storm.elasticsearch.doc.IndexItemDoc;
+
+/**
+ * Mapped response for bulk index.
+ */
+public class BulkIndexResponse {
+
+ private boolean errors;
+ private long took;
+ private List<IndexItemDoc> items;
+
+ public BulkIndexResponse() {
+
+ }
+
+ public boolean hasErrors() {
+ return errors;
+ }
+
+ public void setErrors(boolean errors) {
+ this.errors = errors;
+ }
+
+ public long getTook() {
+ return took;
+ }
+
+ public void setTook(long took) {
+ this.took = took;
+ }
+
+ public List<IndexItemDoc> getItems() {
+ return items;
+ }
+
+ public void setItems(List<IndexItemDoc> items) {
+ this.items = items;
+ }
+
+ public Integer getFirstError() {
+ if (items == null || items.isEmpty()) {
+ return null;
+ }
+ for (IndexItemDoc item : items) {
+ int status = item.getIndex().getStatus();
+ if (400 <= status && status <= 599) {
+ return status;
+ }
+ }
+ return null;
+ }
+
+ public String getFirstResult() {
+ if (items == null || items.isEmpty()) {
+ return null;
+ }
+ return items.get(0).getIndex().getResult();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/response/LookupResponse.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/response/LookupResponse.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/response/LookupResponse.java
new file mode 100644
index 0000000..8b6b24d
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/response/LookupResponse.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.response;
+
+import org.apache.storm.elasticsearch.doc.Index;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonRawValue;
+import com.fasterxml.jackson.databind.JsonNode;
+
+/**
+ * Mapped response for document lookup.
+ */
+public class LookupResponse extends Index {
+
+ @JsonProperty("_version")
+ private long version;
+
+ private boolean found;
+
+ @JsonProperty("_source")
+ @JsonRawValue
+ private Object source;
+
+ public long getVersion() {
+ return version;
+ }
+
+ public void setVersion(long version) {
+ this.version = version;
+ }
+
+ public boolean isFound() {
+ return found;
+ }
+
+ public void setFound(boolean found) {
+ this.found = found;
+ }
+
+ public String getSource() {
+ return source == null ? null : source.toString();
+ }
+
+ public void setSource(JsonNode source) {
+ this.source = source;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/response/PercolateResponse.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/response/PercolateResponse.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/response/PercolateResponse.java
new file mode 100644
index 0000000..87bc004
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/response/PercolateResponse.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.response;
+
+import java.util.List;
+
+import org.apache.storm.elasticsearch.doc.Shards;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Mapped response for percolate.
+ */
+public class PercolateResponse {
+
+ private long took;
+ private long total;
+ private List<Match> matches;
+
+ @JsonProperty("_shards")
+ private Shards shards;
+
+ public static class Match {
+ @JsonProperty("_index")
+ private String index;
+
+ @JsonProperty("_id")
+ private String id;
+
+ public String getIndex() {
+ return index;
+ }
+
+ public void setIndex(String index) {
+ this.index = index;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+ }
+
+ public long getTook() {
+ return took;
+ }
+
+ public void setTook(long took) {
+ this.took = took;
+ }
+
+ public long getTotal() {
+ return total;
+ }
+
+ public void setTotal(long total) {
+ this.total = total;
+ }
+
+ public List<Match> getMatches() {
+ return matches;
+ }
+
+ public void setMatches(List<Match> matches) {
+ this.matches = matches;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
index 2241f4b..fb34407 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
@@ -17,20 +17,29 @@
*/
package org.apache.storm.elasticsearch.trident;
-import org.apache.storm.topology.FailedException;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.HashMap;
+import java.util.List;
-import org.apache.storm.elasticsearch.common.StormElasticSearchClient;
+import org.apache.http.entity.StringEntity;
import org.apache.storm.elasticsearch.common.EsConfig;
import org.apache.storm.elasticsearch.common.EsTupleMapper;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.client.Client;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.storm.elasticsearch.common.StormElasticSearchClient;
+import org.apache.storm.elasticsearch.doc.Index;
+import org.apache.storm.elasticsearch.doc.IndexDoc;
+import org.apache.storm.elasticsearch.doc.SourceDoc;
+import org.apache.storm.elasticsearch.response.BulkIndexResponse;
+import org.apache.storm.topology.FailedException;
import org.apache.storm.trident.state.State;
import org.apache.storm.trident.tuple.TridentTuple;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.util.List;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
/**
* Trident State for storing tuple to ES document.
@@ -38,8 +47,9 @@ import java.util.List;
*/
class EsState implements State {
private static final Logger LOG = LoggerFactory.getLogger(EsState.class);
- private static Client client;
+ private static RestClient client;
private EsConfig esConfig;
+ private final ObjectMapper objectMapper;
private EsTupleMapper tupleMapper;
/**
@@ -49,6 +59,7 @@ class EsState implements State {
*/
public EsState(EsConfig esConfig, EsTupleMapper tupleMapper) {
this.esConfig = esConfig;
+ this.objectMapper = new ObjectMapper();
this.tupleMapper = tupleMapper;
}
@@ -88,26 +99,43 @@ class EsState implements State {
}
}
- /**
- * Store current state to ElasticSearch.
- *
- * @param tuples list of tuples for storing to ES.
- * Each tuple should have relevant fields (source, index, type, id) for EsState's tupleMapper to extract ES document.
- */
- public void updateState(List<TridentTuple> tuples) {
- BulkRequestBuilder bulkRequest = client.prepareBulk();
+ private String buildRequest(List<TridentTuple> tuples) throws JsonProcessingException {
+ StringBuilder bulkRequest = new StringBuilder();
for (TridentTuple tuple : tuples) {
String source = tupleMapper.getSource(tuple);
String index = tupleMapper.getIndex(tuple);
String type = tupleMapper.getType(tuple);
String id = tupleMapper.getId(tuple);
- bulkRequest.add(client.prepareIndex(index, type, id).setSource(source));
+ IndexDoc indexDoc = new IndexDoc(new Index(index, type, id));
+ SourceDoc sourceDoc = new SourceDoc(source);
+ bulkRequest.append(objectMapper.writeValueAsString(indexDoc)).append('\n');
+ bulkRequest.append(objectMapper.writeValueAsString(sourceDoc)).append('\n');
+
}
- BulkResponse bulkResponse = bulkRequest.execute().actionGet();
- if (bulkResponse.hasFailures()) {
- LOG.warn("failed processing bulk index requests " + bulkResponse.buildFailureMessage());
- throw new FailedException();
+ return bulkRequest.toString();
+ }
+
+ /**
+ * Store current state to ElasticSearch.
+ *
+ * @param tuples list of tuples for storing to ES.
+ * Each tuple should have relevant fields (source, index, type, id) for EsState's tupleMapper to extract ES document.
+ * @throws IOException
+ * @throws UnsupportedEncodingException
+ */
+ public void updateState(List<TridentTuple> tuples) {
+ try {
+ String bulkRequest = buildRequest(tuples);
+ Response response = client.performRequest("post", "_bulk", new HashMap<>(), new StringEntity(bulkRequest.toString()));
+ BulkIndexResponse bulkResponse = objectMapper.readValue(response.getEntity().getContent(), BulkIndexResponse.class);
+ if (bulkResponse.hasErrors()) {
+ LOG.warn("failed processing bulk index requests: " + bulkResponse.getFirstError() + ": " + bulkResponse.getFirstResult());
+ throw new FailedException();
+ }
+ } catch (IOException e) {
+ LOG.warn("failed processing bulk index requests: " + e.toString());
+ throw new FailedException(e);
}
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
index 5ae174f..84e3cea 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
@@ -17,17 +17,16 @@
*/
package org.apache.storm.elasticsearch.trident;
-import org.apache.storm.task.IMetricsContext;
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
import org.apache.storm.elasticsearch.common.EsConfig;
import org.apache.storm.elasticsearch.common.EsTupleMapper;
-
+import org.apache.storm.task.IMetricsContext;
import org.apache.storm.trident.state.State;
import org.apache.storm.trident.state.StateFactory;
-import java.util.Map;
-
-import static org.elasticsearch.common.base.Preconditions.checkNotNull;
-
/**
* StateFactory for providing EsState.
* @since 0.11
@@ -42,8 +41,8 @@ public class EsStateFactory implements StateFactory {
* @param tupleMapper Tuple to ES document mapper {@link EsTupleMapper}
*/
public EsStateFactory(EsConfig esConfig, EsTupleMapper tupleMapper) {
- this.esConfig = checkNotNull(esConfig);
- this.tupleMapper = checkNotNull(tupleMapper);
+ this.esConfig = requireNonNull(esConfig);
+ this.tupleMapper = requireNonNull(tupleMapper);
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java
index 87ffefa..559dedf 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java
@@ -17,28 +17,16 @@
*/
package org.apache.storm.elasticsearch.bolt;
+import org.apache.storm.elasticsearch.common.EsTestUtil;
import org.apache.storm.testing.IntegrationTest;
-import org.apache.commons.io.FileUtils;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.cluster.ClusterName;
-import org.elasticsearch.cluster.metadata.IndexMetaData;
-import org.elasticsearch.common.Priority;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.node.Node;
-import org.elasticsearch.node.NodeBuilder;
+import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
-import java.io.File;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
-
@Category(IntegrationTest.class)
public abstract class AbstractEsBoltIntegrationTest<Bolt extends AbstractEsBolt> extends AbstractEsBoltTest<Bolt> {
@@ -46,49 +34,23 @@ public abstract class AbstractEsBoltIntegrationTest<Bolt extends AbstractEsBolt>
@BeforeClass
public static void startElasticSearchNode() throws Exception {
- node = NodeBuilder.nodeBuilder().data(true).settings(createSettings()).build();
- node.start();
- ensureEsGreen(node);
- ClusterHealthResponse clusterHealth = node.client()
- .admin()
- .cluster()
- .health(Requests.clusterHealthRequest()
- .timeout(TimeValue.timeValueSeconds(30))
- .waitForGreenStatus()
- .waitForRelocatingShards(0))
- .actionGet();
- Thread.sleep(1000);
- }
-
- private static ImmutableSettings.Builder createSettings() {
- return ImmutableSettings.builder()
- .put(ClusterName.SETTING, "test-cluster")
- .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
- .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
- .put(EsExecutors.PROCESSORS, 1)
- .put("http.enabled", false)
- .put("index.percolator.map_unmapped_fields_as_string", true)
- .put("index.store.type", "memory");
+ node = EsTestUtil.startEsNode();
+ EsTestUtil.ensureEsGreen(node);
}
@AfterClass
public static void closeElasticSearchNode() throws Exception {
- node.stop();
- node.close();
- FileUtils.deleteDirectory(new File("./data"));
+ EsTestUtil.stopEsNode(node);
}
- private static void ensureEsGreen(Node node) {
- ClusterHealthResponse chr = node.client()
- .admin()
- .cluster()
- .health(Requests.clusterHealthRequest()
- .timeout(TimeValue.timeValueSeconds(30))
- .waitForGreenStatus()
- .waitForEvents(Priority.LANGUID)
- .waitForRelocatingShards(0))
- .actionGet();
- assertThat("cluster status is green", chr.getStatus(), equalTo(ClusterHealthStatus.GREEN));
+ @Before
+ public void createIndex() {
+ node.client().admin().indices().create(new CreateIndexRequest(index)).actionGet();
}
+ @After
+ public void clearIndex() throws Exception {
+ EsTestUtil.clearIndex(node, index);
+ EsTestUtil.clearIndex(node, "missing");
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2b4565c6/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
index fb9739c..d6ef979 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
@@ -19,6 +19,9 @@ package org.apache.storm.elasticsearch.bolt;
import com.google.common.testing.NullPointerTester;
+import java.lang.reflect.Method;
+import java.util.UUID;
+
import org.apache.storm.Config;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.elasticsearch.common.EsConfig;
@@ -33,6 +36,10 @@ import org.mockito.runners.MockitoJUnitRunner;
public abstract class AbstractEsBoltTest<Bolt extends AbstractEsBolt> {
protected static Config config = new Config();
+ protected static final String documentId = UUID.randomUUID().toString();
+ protected static final String index = "index";
+ protected static final String source = "{\"user\":\"user1\"}";
+ protected static final String type = "type";
@Mock
protected OutputCollector outputCollector;
@@ -48,7 +55,7 @@ public abstract class AbstractEsBoltTest<Bolt extends AbstractEsBolt> {
protected abstract Bolt createBolt(EsConfig esConfig);
protected EsConfig esConfig() {
- return new EsConfig("test-cluster", new String[] {"127.0.0.1:9300"});
+ return new EsConfig();
}
@After
@@ -61,5 +68,11 @@ public abstract class AbstractEsBoltTest<Bolt extends AbstractEsBolt> {
new NullPointerTester().setDefault(EsConfig.class, esConfig()).testAllPublicConstructors(getBoltClass());
}
+ @Test
+ public void getEndpointThrowsOnNull() throws Exception {
+ Method getEndpointMethod = AbstractEsBolt.class.getDeclaredMethod("getEndpoint", String.class, String.class, String.class);
+ new NullPointerTester().setDefault(String.class, "test").testMethodParameter(null, getEndpointMethod, 0);
+ }
+
protected abstract Class<Bolt> getBoltClass();
}