You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by rz...@apache.org on 2023/12/04 07:42:01 UTC
(storm) 01/02: STORM-4005 - ElasticSearch 7.17.13
This is an automated email from the ASF dual-hosted git repository.
rzo1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
commit c15a1563f0eaa1a3762ec5eb9b0922766cd643a2
Author: Richard Zowalla <ri...@hs-heilbronn.de>
AuthorDate: Wed Nov 29 14:50:14 2023 +0100
STORM-4005 - ElasticSearch 7.17.13
---
DEPENDENCY-LICENSES | 70 +++++++-----
examples/storm-elasticsearch-examples/pom.xml | 8 --
.../storm/elasticsearch/bolt/EsIndexTopology.java | 9 +-
.../storm/elasticsearch/common/EsTestUtil.java | 122 ---------------------
.../elasticsearch/trident/TridentEsTopology.java | 10 +-
external/storm-elasticsearch/pom.xml | 32 ++++--
.../storm/elasticsearch/bolt/EsIndexBolt.java | 8 +-
.../storm/elasticsearch/bolt/EsLookupBolt.java | 6 +-
.../storm/elasticsearch/bolt/EsPercolateBolt.java | 10 +-
.../storm/elasticsearch/common/EsConfig.java | 22 ++--
.../common/StormElasticSearchClient.java | 10 +-
.../storm/elasticsearch/trident/EsState.java | 7 +-
.../bolt/AbstractEsBoltIntegrationTest.java | 10 +-
.../storm/elasticsearch/bolt/EsIndexBoltTest.java | 41 +++++--
.../bolt/EsLookupBoltIntegrationTest.java | 13 ++-
.../storm/elasticsearch/bolt/EsLookupBoltTest.java | 6 +-
.../elasticsearch/bolt/EsPercolateBoltTest.java | 20 +++-
.../storm/elasticsearch/common/EsTestUtil.java | 66 ++++-------
.../storm/elasticsearch/trident/EsStateTest.java | 3 +-
pom.xml | 2 +-
20 files changed, 207 insertions(+), 268 deletions(-)
diff --git a/DEPENDENCY-LICENSES b/DEPENDENCY-LICENSES
index 87892a602..57fb93e51 100644
--- a/DEPENDENCY-LICENSES
+++ b/DEPENDENCY-LICENSES
@@ -31,6 +31,10 @@ List of third-party dependencies grouped by their license type.
* Maven Plugin Tools Java Annotations (org.apache.maven.plugin-tools:maven-plugin-annotations:3.8.1 - https://maven.apache.org/plugin-tools/maven-plugin-annotations)
* snappy-java (org.xerial.snappy:snappy-java:1.1.10.4 - https://github.com/xerial/snappy-java)
+ Apache-2.0, LGPL-2.1-or-later
+
+ * Java Native Access (net.java.dev.jna:jna:5.10.0 - https://github.com/java-native-access/jna)
+
Apache 2.0 License
* Apache MINA Core (org.apache.mina:mina-core:2.2.2 - https://mina.apache.org/mina-core/)
@@ -125,7 +129,7 @@ List of third-party dependencies grouped by their license type.
* Apache HttpAsyncClient (org.apache.httpcomponents:httpasyncclient:4.1.5 - http://hc.apache.org/httpcomponents-asyncclient)
* Apache HttpClient (org.apache.httpcomponents:httpclient:4.5.14 - http://hc.apache.org/httpcomponents-client-ga)
* Apache HttpCore (org.apache.httpcomponents:httpcore:4.4.16 - http://hc.apache.org/httpcomponents-core-ga)
- * Apache HttpCore NIO (org.apache.httpcomponents:httpcore-nio:4.4.5 - http://hc.apache.org/httpcomponents-core-ga)
+ * Apache HttpCore NIO (org.apache.httpcomponents:httpcore-nio:4.4.15 - http://hc.apache.org/httpcomponents-core-ga)
* Apache Ivy (org.apache.ivy:ivy:2.4.0 - http://ant.apache.org/ivy/)
* Apache Kafka (org.apache.kafka:kafka-clients:0.11.0.3 - http://kafka.apache.org)
* Apache Maven Artifact Transfer (org.apache.maven.shared:maven-artifact-transfer:0.9.1 - https://maven.apache.org/shared/maven-artifact-transfer/)
@@ -176,7 +180,7 @@ List of third-party dependencies grouped by their license type.
* Commons Logging (commons-logging:commons-logging:1.1.3 - http://commons.apache.org/proper/commons-logging/)
* Commons Math (org.apache.commons:commons-math3:3.1.1 - http://commons.apache.org/math/)
* Commons Pool (commons-pool:commons-pool:1.5.4 - http://commons.apache.org/pool/)
- * Compress-LZF (com.ning:compress-lzf:1.0.2 - http://github.com/ning/compress)
+ * compiler (com.github.spullara.mustache.java:compiler:0.9.6 - http://github.com/spullara/mustache.java)
* Curator Client (org.apache.curator:curator-client:5.5.0 - https://curator.apache.org/curator-client)
* Curator Framework (org.apache.curator:curator-framework:5.5.0 - https://curator.apache.org/curator-framework)
* Curator Recipes (org.apache.curator:curator-recipes:5.5.0 - https://curator.apache.org/curator-recipes)
@@ -198,8 +202,6 @@ List of third-party dependencies grouped by their license type.
* Dropwizard Utility Classes (io.dropwizard:dropwizard-util:1.3.29 - http://www.dropwizard.io/1.3.29/dropwizard-util)
* Dropwizard Validation Support (io.dropwizard:dropwizard-validation:1.3.29 - http://www.dropwizard.io/1.3.29/dropwizard-validation)
* Ehcache (org.ehcache:ehcache:3.3.1 - http://ehcache.org)
- * Elasticsearch: Core (org.elasticsearch:elasticsearch:2.4.4 - http://nexus.sonatype.org/oss-repository-hosting.html/parent/elasticsearch)
- * Elasticsearch SecureSM (org.elasticsearch:securesm:1.0 - http://nexus.sonatype.org/oss-repository-hosting.html/securesm)
* error-prone annotations (com.google.errorprone:error_prone_annotations:2.21.1 - https://errorprone.info/error_prone_annotations)
* Esri Geometry API for Java (com.esri.geometry:esri-geometry-api:2.0.0 - https://github.com/Esri/geometry-api-java)
* fastutil (it.unimi.dsi:fastutil:6.5.6 - http://fasutil.dsi.unimi.it/)
@@ -250,8 +252,8 @@ List of third-party dependencies grouped by their license type.
* Hive Storage API (org.apache.hive:hive-storage-api:2.7.0 - https://www.apache.org/hive-storage-api/)
* Hive Upgrade Acid (org.apache.hive:hive-upgrade-acid:3.1.3 - https://www.apache.org/hive-upgrade-acid/)
* Hive Vector-Code-Gen Utilities (org.apache.hive:hive-vector-code-gen:3.1.3 - https://hive.apache.org/hive-vector-code-gen)
- * HPPC Collections (com.carrotsearch:hppc:0.7.1 - http://labs.carrotsearch.com/hppc.html/hppc)
* HPPC Collections (com.carrotsearch:hppc:0.7.2 - http://labs.carrotsearch.com/hppc.html/hppc)
+ * HPPC Collections (com.carrotsearch:hppc:0.8.1 - http://labs.carrotsearch.com/hppc.html/hppc)
* htrace-core (org.apache.htrace:htrace-core:3.2.0-incubating - http://incubator.apache.org/projects/htrace.html)
* j2html (com.j2html:j2html:1.6.0 - http://j2html.com)
* J2ObjC Annotations (com.google.j2objc:j2objc-annotations:2.8 - https://github.com/google/j2objc/)
@@ -306,21 +308,21 @@ List of third-party dependencies grouped by their license type.
* Kerby Util (org.apache.kerby:kerby-util:1.0.1 - http://directory.apache.org/kerby/kerby-common/kerby-util)
* Kerby XDR Project (org.apache.kerby:kerby-xdr:1.0.1 - http://directory.apache.org/kerby/kerby-common/kerby-xdr)
* Logging (commons-logging:commons-logging:1.0.3 - http://jakarta.apache.org/commons/logging/)
- * Lucene Common Analyzers (org.apache.lucene:lucene-analyzers-common:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-analyzers-common)
- * Lucene Core (org.apache.lucene:lucene-core:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-core)
- * Lucene Grouping (org.apache.lucene:lucene-grouping:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-grouping)
- * Lucene Highlighter (org.apache.lucene:lucene-highlighter:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-highlighter)
- * Lucene Join (org.apache.lucene:lucene-join:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-join)
- * Lucene Memory (org.apache.lucene:lucene-backward-codecs:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-backward-codecs)
- * Lucene Memory (org.apache.lucene:lucene-memory:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-memory)
- * Lucene Miscellaneous (org.apache.lucene:lucene-misc:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-misc)
- * Lucene Queries (org.apache.lucene:lucene-queries:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-queries)
- * Lucene QueryParsers (org.apache.lucene:lucene-queryparser:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-queryparser)
- * Lucene Sandbox (org.apache.lucene:lucene-sandbox:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-sandbox)
- * Lucene Spatial (org.apache.lucene:lucene-spatial:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-spatial)
- * Lucene Spatial 3D (org.apache.lucene:lucene-spatial3d:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-spatial3d)
- * Lucene Suggest (org.apache.lucene:lucene-suggest:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-suggest)
+ * Lucene Common Analyzers (org.apache.lucene:lucene-analyzers-common:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-analyzers-common)
+ * Lucene Core (org.apache.lucene:lucene-core:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-core)
+ * Lucene Grouping (org.apache.lucene:lucene-grouping:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-grouping)
+ * Lucene Highlighter (org.apache.lucene:lucene-highlighter:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-highlighter)
+ * Lucene Join (org.apache.lucene:lucene-join:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-join)
+ * Lucene Memory (org.apache.lucene:lucene-backward-codecs:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-backward-codecs)
+ * Lucene Memory (org.apache.lucene:lucene-memory:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-memory)
+ * Lucene Miscellaneous (org.apache.lucene:lucene-misc:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-misc)
+ * Lucene Queries (org.apache.lucene:lucene-queries:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-queries)
+ * Lucene QueryParsers (org.apache.lucene:lucene-queryparser:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-queryparser)
+ * Lucene Sandbox (org.apache.lucene:lucene-sandbox:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-sandbox)
+ * Lucene Spatial 3D (org.apache.lucene:lucene-spatial3d:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-spatial3d)
+ * Lucene Suggest (org.apache.lucene:lucene-suggest:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-suggest)
* LZ4 and xxHash (net.jpountz.lz4:lz4:1.3.0 - https://github.com/jpountz/lz4-java)
+ * LZ4 and xxHash (org.lz4:lz4-java:1.8.0 - https://github.com/lz4/lz4-java)
* Maven Aether Provider (org.apache.maven:maven-aether-provider:3.0 - http://maven.apache.org/maven-aether-provider/)
* Maven Artifact (org.apache.maven:maven-artifact:3.0 - http://maven.apache.org/maven-artifact/)
* Maven Artifact (org.apache.maven:maven-artifact:3.6.0 - https://maven.apache.org/ref/3.6.0/maven-artifact/)
@@ -419,13 +421,12 @@ List of third-party dependencies grouped by their license type.
* Plexus Interpolation API (org.codehaus.plexus:plexus-interpolation:1.25 - http://codehaus-plexus.github.io/plexus-interpolation/)
* Plexus Security Dispatcher Component (org.sonatype.plexus:plexus-sec-dispatcher:1.3 - http://spice.sonatype.org/plexus-sec-dispatcher)
* Plexus Security Dispatcher Component (org.sonatype.plexus:plexus-sec-dispatcher:1.4 - http://spice.sonatype.org/plexus-sec-dispatcher)
- * rest (org.elasticsearch.client:rest:5.2.2 - https://github.com/elastic/elasticsearch)
+ * rest (org.elasticsearch.client:elasticsearch-rest-client:7.17.13 - https://github.com/elastic/elasticsearch.git)
* sigar (org.fusesource:sigar:1.6.4 - http://fusesource.com/sigar/)
* Sisu - Guice (org.sonatype.sisu:sisu-guice:2.1.7 - http://forge.sonatype.com/sisu-guice/)
* Sisu - Inject (JSR330 bean support) (org.sonatype.sisu:sisu-inject-bean:1.4.2 - http://sisu.sonatype.org/sisu-inject/guice-bean/sisu-inject-bean/)
* Sisu - Inject (Plexus bean support) (org.sonatype.sisu:sisu-inject-plexus:1.4.2 - http://sisu.sonatype.org/sisu-inject/guice-bean/guice-plexus/sisu-inject-plexus/)
* SnakeYAML (org.yaml:snakeyaml:2.0 - https://bitbucket.org/snakeyaml/snakeyaml)
- * Spatial4J (com.spatial4j:spatial4j:0.5 - http://nexus.sonatype.org/oss-repository-hosting.html/spatial4j)
* Spring AOP (org.springframework:spring-aop:5.3.27 - https://github.com/spring-projects/spring-framework)
* Spring Beans (org.springframework:spring-beans:5.3.27 - https://github.com/spring-projects/spring-framework)
* Spring Commons Logging Bridge (org.springframework:spring-jcl:5.3.27 - https://github.com/spring-projects/spring-framework)
@@ -436,7 +437,7 @@ List of third-party dependencies grouped by their license type.
* Spring Messaging (org.springframework:spring-messaging:5.3.27 - https://github.com/spring-projects/spring-framework)
* Spring Transaction (org.springframework:spring-tx:5.3.27 - https://github.com/spring-projects/spring-framework)
* StAX API (stax:stax-api:1.0.1 - http://stax.codehaus.org/)
- * T-Digest (com.tdunning:t-digest:3.0 - https://github.com/tdunning/t-digest)
+ * T-Digest (com.tdunning:t-digest:3.2 - https://github.com/tdunning/t-digest)
* Tephra API (co.cask.tephra:tephra-api:0.6.0 - https://github.com/caskdata/tephra/tephra-api)
* Tephra Core (co.cask.tephra:tephra-core:0.6.0 - https://github.com/caskdata/tephra/tephra-core)
* Tephra HBase 1.0 Compatibility (co.cask.tephra:tephra-hbase-compat-1.0:0.6.0 - https://github.com/caskdata/tephra/tephra-hbase-compat-1.0)
@@ -547,10 +548,6 @@ List of third-party dependencies grouped by their license type.
* JLine Bundle (org.jline:jline:3.9.0 - http://nexus.sonatype.org/oss-repository-hosting.html/jline-parent/jline)
* Stax2 API (org.codehaus.woodstox:stax2-api:4.2.1 - http://github.com/FasterXML/stax2-api)
- CC0 1.0 Universal
-
- * JSR166e (com.twitter:jsr166e:1.1.0 - http://github.com/twitter/jsr166e)
-
CDDL/GPLv2+CE
* JavaBeans Activation Framework API jar (javax.activation:javax.activation-api:1.2.0 - http://java.net/all/javax.activation-api/)
@@ -637,6 +634,26 @@ List of third-party dependencies grouped by their license type.
* jms (jakarta.jms:jakarta.jms-api:2.0.2 - https://projects.eclipse.org/projects/ee4j.jms)
+ Elastic License 2.0
+
+ * rest-high-level (org.elasticsearch.client:elasticsearch-rest-high-level-client:7.17.13 - https://github.com/elastic/elasticsearch.git)
+
+ Elastic License 2.0, Server Side Public License, v 1
+
+ * aggs-matrix-stats (org.elasticsearch.plugin:aggs-matrix-stats-client:7.17.13 - https://github.com/elastic/elasticsearch.git)
+ * elasticsearch-cli (org.elasticsearch:elasticsearch-cli:7.17.13 - https://github.com/elastic/elasticsearch.git)
+ * elasticsearch-core (org.elasticsearch:elasticsearch-core:7.17.13 - https://github.com/elastic/elasticsearch.git)
+ * elasticsearch-geo (org.elasticsearch:elasticsearch-geo:7.17.13 - https://github.com/elastic/elasticsearch.git)
+ * elasticsearch-lz4 (org.elasticsearch:elasticsearch-lz4:7.17.13 - https://github.com/elastic/elasticsearch.git)
+ * elasticsearch-plugin-classloader (org.elasticsearch:elasticsearch-plugin-classloader:7.17.13 - https://github.com/elastic/elasticsearch.git)
+ * elasticsearch-secure-sm (org.elasticsearch:elasticsearch-secure-sm:7.17.13 - https://github.com/elastic/elasticsearch.git)
+ * elasticsearch-x-content (org.elasticsearch:elasticsearch-x-content:7.17.13 - https://github.com/elastic/elasticsearch.git)
+ * lang-mustache (org.elasticsearch.plugin:lang-mustache-client:7.17.13 - https://github.com/elastic/elasticsearch.git)
+ * mapper-extras (org.elasticsearch.plugin:mapper-extras-client:7.17.13 - https://github.com/elastic/elasticsearch.git)
+ * parent-join (org.elasticsearch.plugin:parent-join-client:7.17.13 - https://github.com/elastic/elasticsearch.git)
+ * rank-eval (org.elasticsearch.plugin:rank-eval-client:7.17.13 - https://github.com/elastic/elasticsearch.git)
+ * server (org.elasticsearch:elasticsearch:7.17.13 - https://github.com/elastic/elasticsearch.git)
+
MIT License
* argparse4j (net.sourceforge.argparse4j:argparse4j:0.8.1 - http://argparse4j.github.io)
@@ -644,6 +661,7 @@ List of third-party dependencies grouped by their license type.
* JCodings (org.jruby.jcodings:jcodings:1.0.55 - http://nexus.sonatype.org/oss-repository-hosting.html/jcodings)
* Jedis (redis.clients:jedis:2.9.0 - https://github.com/xetorthio/jedis)
* Joni (org.jruby.joni:joni:2.1.31 - http://nexus.sonatype.org/oss-repository-hosting.html/joni)
+ * JOpt Simple (net.sf.jopt-simple:jopt-simple:5.0.2 - http://pholser.github.io/jopt-simple)
* JUL to SLF4J bridge (org.slf4j:jul-to-slf4j:1.7.36 - http://www.slf4j.org)
* Microsoft JDBC Driver for SQL Server (com.microsoft.sqlserver:mssql-jdbc:6.2.1.jre7 - https://github.com/Microsoft/mssql-jdbc)
* SLF4J API Module (org.slf4j:slf4j-api:1.7.36 - http://www.slf4j.org)
diff --git a/examples/storm-elasticsearch-examples/pom.xml b/examples/storm-elasticsearch-examples/pom.xml
index ee864aa15..f16429bb3 100644
--- a/examples/storm-elasticsearch-examples/pom.xml
+++ b/examples/storm-elasticsearch-examples/pom.xml
@@ -26,9 +26,6 @@
</parent>
<artifactId>storm-elasticsearch-examples</artifactId>
- <properties>
- <elasticsearch.test.version>2.4.4</elasticsearch.test.version>
- </properties>
<dependencies>
<dependency>
@@ -42,11 +39,6 @@
<artifactId>storm-elasticsearch</artifactId>
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.elasticsearch</groupId>
- <artifactId>elasticsearch</artifactId>
- <version>${elasticsearch.test.version}</version>
- </dependency>
</dependencies>
<build>
diff --git a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
index 1c9e09f1c..3c3e8565a 100644
--- a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
+++ b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
@@ -24,10 +24,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
-import org.apache.storm.elasticsearch.common.EsConfig;
-import org.apache.storm.elasticsearch.common.EsConstants;
-import org.apache.storm.elasticsearch.common.EsTestUtil;
-import org.apache.storm.elasticsearch.common.EsTupleMapper;
+import org.apache.storm.elasticsearch.common.*;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
@@ -76,13 +73,11 @@ public final class EsIndexTopology {
TopologyBuilder builder = new TopologyBuilder();
UserDataSpout spout = new UserDataSpout();
builder.setSpout(SPOUT_ID, spout, 1);
- EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
+ EsTupleMapper tupleMapper =new DefaultEsTupleMapper();
EsConfig esConfig = new EsConfig("http://localhost:9300");
builder.setBolt(BOLT_ID, new EsIndexBolt(esConfig, tupleMapper), 1)
.shuffleGrouping(SPOUT_ID);
- EsTestUtil.startEsNode();
- EsTestUtil.waitForSeconds(EsConstants.WAIT_DEFAULT_SECS);
StormSubmitter.submitTopology(TOPOLOGY_NAME,
config,
builder.createTopology());
diff --git a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsTestUtil.java b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
deleted file mode 100644
index e96f6e897..000000000
--- a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.elasticsearch.common;
-
-import java.util.HashMap;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.storm.Config;
-import org.apache.storm.task.GeneralTopologyContext;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.TupleImpl;
-import org.apache.storm.tuple.Values;
-import org.elasticsearch.cluster.ClusterName;
-import org.elasticsearch.cluster.metadata.IndexMetaData;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.util.concurrent.EsExecutors;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.node.NodeBuilder;
-
-/**
- * ElasticSearch example utilities.
- */
-public final class EsTestUtil {
-
- /**
- * Generates a test tuple.
- * @param source the source of the tuple
- * @param index the index of the tuple
- * @param type the type of the tuple
- * @param id the id of the tuple
- * @return the generated tuple
- */
- public static Tuple generateTestTuple(final String source,
- final String index,
- final String type,
- final String id) {
- TopologyBuilder builder = new TopologyBuilder();
- GeneralTopologyContext topologyContext = new GeneralTopologyContext(
- builder.createTopology(),
- new Config(),
- new HashMap<>(),
- new HashMap<>(),
- new HashMap<>(),
- "") {
- @Override
- public Fields getComponentOutputFields(final String componentId,
- final String streamId) {
- return new Fields("source", "index", "type", "id");
- }
- };
- return new TupleImpl(topologyContext,
- new Values(source, index, type, id),
- source,
- 1,
- "");
- }
-
- /**
- * Generates a new tuple mapper.
- * @return the generated mapper
- */
- public static EsTupleMapper generateDefaultTupleMapper() {
- return new DefaultEsTupleMapper();
- }
-
- /**
- * Starts an ElasticSearch node.
- * @return the started node.
- */
- public static Node startEsNode() {
- Node node = NodeBuilder.nodeBuilder().data(true).settings(
- Settings.settingsBuilder()
- .put(ClusterName.SETTING, EsConstants.CLUSTER_NAME)
- .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
- .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
- .put(EsExecutors.PROCESSORS, 1)
- .put("http.enabled", true)
- .put("index.percolator.map_unmapped_fields_as_string",
- true)
- .put("index.store.type", "mmapfs")
- .put("path.home", "./data")
- ).build();
- node.start();
- return node;
- }
-
- /**
- * Waits for specified seconds and ignores {@link InterruptedException}.
- * @param seconds the seconds to wait
- */
- public static void waitForSeconds(final int seconds) {
- try {
- Thread.sleep(TimeUnit.SECONDS.toMillis(5));
- } catch (InterruptedException ex) {
- //expected
- }
- }
-
- /**
- * Utility constructor.
- */
- private EsTestUtil() {
- }
-}
diff --git a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
index c9126656e..311c816b7 100644
--- a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
+++ b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
@@ -26,10 +26,7 @@ import java.util.UUID;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
-import org.apache.storm.elasticsearch.common.EsConfig;
-import org.apache.storm.elasticsearch.common.EsConstants;
-import org.apache.storm.elasticsearch.common.EsTestUtil;
-import org.apache.storm.elasticsearch.common.EsTupleMapper;
+import org.apache.storm.elasticsearch.common.*;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
@@ -68,16 +65,13 @@ public final class TridentEsTopology {
Stream stream = topology.newStream("spout", spout);
EsConfig esConfig = new EsConfig("http://localhost:9300");
Fields esFields = new Fields("index", "type", "source");
- EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
+ EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
StateFactory factory = new EsStateFactory(esConfig, tupleMapper);
TridentState state = stream.partitionPersist(factory,
esFields,
new EsUpdater(),
new Fields());
- EsTestUtil.startEsNode();
- EsTestUtil.waitForSeconds(EsConstants.WAIT_DEFAULT_SECS);
-
StormSubmitter.submitTopology(TOPOLOGY_NAME,
new Config(),
topology.build());
diff --git a/external/storm-elasticsearch/pom.xml b/external/storm-elasticsearch/pom.xml
index 0a72b30fc..ba68ee7b8 100644
--- a/external/storm-elasticsearch/pom.xml
+++ b/external/storm-elasticsearch/pom.xml
@@ -58,7 +58,7 @@
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
- <artifactId>rest</artifactId>
+ <artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
@@ -91,7 +91,6 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
- <version>${jackson.databind.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
@@ -111,12 +110,6 @@
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest</artifactId>
</dependency>
- <dependency>
- <groupId>org.elasticsearch</groupId>
- <artifactId>elasticsearch</artifactId>
- <version>${elasticsearch.test.version}</version>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-client</artifactId>
@@ -129,6 +122,12 @@
<artifactId>guava</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>elasticsearch</artifactId>
+ <version>1.19.1</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
@@ -153,6 +152,23 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>default-test</id>
+ <phase>test</phase>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <systemPropertyVariables>
+ <elasticsearch-version>${elasticsearch.version}</elasticsearch-version>
+ </systemPropertyVariables>
+ </configuration>
+ </plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
index 6d4686331..9d60b9786 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
@@ -31,6 +31,8 @@ import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
/**
* Basic bolt for storing tuple to ES document.
@@ -74,7 +76,11 @@ public class EsIndexBolt extends AbstractEsBolt {
String id = tupleMapper.getId(tuple);
Map<String, String> params = tupleMapper.getParams(tuple, new HashMap<>());
- client.performRequest("put", getEndpoint(index, type, id), params, new StringEntity(source));
+ final Request request = new Request("post", getEndpoint(index, type, id));
+ request.setEntity(new StringEntity(source));
+ request.addParameters(params);
+
+ client.performRequest(request);
collector.ack(tuple);
} catch (Exception e) {
collector.reportError(e);
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java
index 5faa69fbd..f84b5b601 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java
@@ -25,6 +25,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import org.apache.http.entity.StringEntity;
import org.apache.storm.elasticsearch.DefaultEsLookupResultOutput;
import org.apache.storm.elasticsearch.EsLookupResultOutput;
import org.apache.storm.elasticsearch.common.DefaultEsTupleMapper;
@@ -33,6 +34,7 @@ import org.apache.storm.elasticsearch.common.EsTupleMapper;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
+import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
/**
@@ -83,7 +85,9 @@ public class EsLookupBolt extends AbstractEsBolt {
String id = tupleMapper.getId(tuple);
Map<String, String> params = tupleMapper.getParams(tuple, new HashMap<>());
- Response response = client.performRequest("get", getEndpoint(index, type, id), params);
+ final Request request = new Request("get", getEndpoint(index, type, id));
+ request.addParameters(params);
+ Response response = client.performRequest(request);
return output.toValues(response);
}
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
index 3f831f74a..2fac3e8f6 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
@@ -34,6 +34,7 @@ import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
+import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
/**
@@ -82,8 +83,13 @@ public class EsPercolateBolt extends AbstractEsBolt {
Map<String, String> indexParams = new HashMap<>();
indexParams.put(type, null);
String percolateDoc = "{\"doc\": " + source + "}";
- Response response = client.performRequest("get", getEndpoint(index, type, "_percolate"),
- new HashMap<>(), new StringEntity(percolateDoc));
+
+ final Request request = new Request("get", getEndpoint(index, type, "_percolate"));
+ request.setEntity(new StringEntity(percolateDoc));
+ request.addParameters(new HashMap<>());
+
+ Response response = client.performRequest(request);
+
PercolateResponse percolateResponse = objectMapper.readValue(response.getEntity().getContent(), PercolateResponse.class);
if (!percolateResponse.getMatches().isEmpty()) {
for (PercolateResponse.Match match : percolateResponse.getMatches()) {
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
index c665ae4f8..04ac73d4c 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
@@ -35,13 +35,14 @@ import org.elasticsearch.client.RestClientBuilder.RequestConfigCallback;
public class EsConfig implements Serializable {
private final HttpHost[] httpHosts;
- private Integer maxRetryTimeoutMillis;
private Header[] defaultHeaders;
private RestClient.FailureListener failureListener;
private HttpClientConfigCallback httpClientConfigCallback;
private RequestConfigCallback requestConfigCallback;
private String pathPrefix;
+ private boolean compression;
+
/**
* EsConfig Constructor to be used in EsIndexBolt, EsPercolateBolt and EsStateFactory.
* Connects to Elasticsearch at http://localhost:9200.
@@ -75,11 +76,6 @@ public class EsConfig implements Serializable {
throw new IllegalArgumentException("Invalid url " + url);
}
}
-
- public EsConfig withMaxRetryTimeoutMillis(Integer maxRetryTimeoutMillis) {
- this.maxRetryTimeoutMillis = maxRetryTimeoutMillis;
- return this;
- }
public EsConfig withDefaultHeaders(Header[] defaultHeaders) {
this.defaultHeaders = defaultHeaders;
@@ -106,12 +102,13 @@ public class EsConfig implements Serializable {
return this;
}
- public HttpHost[] getHttpHosts() {
- return httpHosts;
+ public EsConfig withCompression(boolean compression) {
+ this.compression = compression;
+ return this;
}
- public Integer getMaxRetryTimeoutMillis() {
- return maxRetryTimeoutMillis;
+ public HttpHost[] getHttpHosts() {
+ return httpHosts;
}
public Header[] getDefaultHeaders() {
@@ -133,4 +130,9 @@ public class EsConfig implements Serializable {
public String getPathPrefix() {
return pathPrefix;
}
+
+ public boolean isCompression() {
+ return compression;
+ }
+
}
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/StormElasticSearchClient.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/StormElasticSearchClient.java
index 732aa084a..7fa57d7b9 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/StormElasticSearchClient.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/StormElasticSearchClient.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -41,9 +41,6 @@ public final class StormElasticSearchClient implements Serializable {
*/
public RestClient construct() {
RestClientBuilder builder = RestClient.builder(esConfig.getHttpHosts());
- if (esConfig.getMaxRetryTimeoutMillis() != null) {
- builder.setMaxRetryTimeoutMillis(esConfig.getMaxRetryTimeoutMillis());
- }
if (esConfig.getDefaultHeaders() != null) {
builder.setDefaultHeaders(esConfig.getDefaultHeaders());
}
@@ -59,6 +56,9 @@ public final class StormElasticSearchClient implements Serializable {
if (esConfig.getPathPrefix() != null) {
builder.setPathPrefix(esConfig.getPathPrefix());
}
+
+ builder.setCompressionEnabled(esConfig.isCompression());
+
return builder.build();
}
}
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
index 3a8e202c2..ef82335f8 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
@@ -22,7 +22,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
-import java.util.HashMap;
import java.util.List;
import org.apache.http.entity.StringEntity;
@@ -36,6 +35,7 @@ import org.apache.storm.elasticsearch.response.BulkIndexResponse;
import org.apache.storm.topology.FailedException;
import org.apache.storm.trident.state.State;
import org.apache.storm.trident.tuple.TridentTuple;
+import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.slf4j.Logger;
@@ -122,7 +122,10 @@ class EsState implements State {
public void updateState(List<TridentTuple> tuples) {
try {
String bulkRequest = buildRequest(tuples);
- Response response = client.performRequest("post", "_bulk", new HashMap<>(), new StringEntity(bulkRequest.toString()));
+
+ final Request request = new Request("post", "_bulk");
+ request.setEntity(new StringEntity(bulkRequest));
+ Response response = client.performRequest(request);
BulkIndexResponse bulkResponse = objectMapper.readValue(response.getEntity().getContent(), BulkIndexResponse.class);
if (bulkResponse.hasErrors()) {
LOG.warn("failed processing bulk index requests: " + bulkResponse.getFirstError() + ": " + bulkResponse.getFirstResult());
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java
index afb75db1a..0819f8a8e 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java
@@ -20,16 +20,20 @@ package org.apache.storm.elasticsearch.bolt;
import org.apache.storm.elasticsearch.common.EsTestUtil;
import org.apache.storm.testing.IntegrationTest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
+import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.node.Node;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+
+import java.io.IOException;
@IntegrationTest
public abstract class AbstractEsBoltIntegrationTest<Bolt extends AbstractEsBolt> extends AbstractEsBoltTest<Bolt> {
- protected static Node node;
+ protected static ElasticsearchContainer node;
@BeforeAll
public static void startElasticSearchNode() throws Exception {
@@ -43,8 +47,8 @@ public abstract class AbstractEsBoltIntegrationTest<Bolt extends AbstractEsBolt>
}
@BeforeEach
- public void createIndex() {
- node.client().admin().indices().create(new CreateIndexRequest(index)).actionGet();
+ public void createIndex() throws IOException {
+ EsTestUtil.getRestHighLevelClient(node).indices().create(new CreateIndexRequest(index), RequestOptions.DEFAULT);
}
@AfterEach
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
index 9e86865b3..a7ccee4bf 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
@@ -23,31 +23,45 @@ import static org.mockito.Mockito.verify;
import org.apache.storm.elasticsearch.common.EsConfig;
import org.apache.storm.elasticsearch.common.EsTestUtil;
import org.apache.storm.tuple.Tuple;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.jupiter.api.Test;
+import java.io.IOException;
+
public class EsIndexBoltTest extends AbstractEsBoltIntegrationTest<EsIndexBolt> {
@Test
- public void testEsIndexBolt() {
+ public void testEsIndexBolt() throws IOException {
Tuple tuple = createTestTuple(index, type);
bolt.execute(tuple);
verify(outputCollector).ack(tuple);
- node.client().admin().indices().prepareRefresh(index).execute().actionGet();
- SearchResponse resp = node.client().prepareSearch(index)
- .setQuery(new TermQueryBuilder("_type", type))
- .setSize(0)
- .execute().actionGet();
+ RestHighLevelClient client = EsTestUtil.getRestHighLevelClient(node);
+ RefreshRequest request = new RefreshRequest(index);
+ client.indices().refresh(request, RequestOptions.DEFAULT);
+
+ SearchRequest searchRequest = new SearchRequest(index);
+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+ searchSourceBuilder.query(new TermQueryBuilder("_type", type));
+ searchSourceBuilder.size(0);
+ searchRequest.source(searchSourceBuilder);
+
+ SearchResponse resp = client.search(searchRequest, RequestOptions.DEFAULT);
assertEquals(1, resp.getHits().getTotalHits());
}
@Test
- public void indexMissing() {
+ public void indexMissing() throws IOException {
String index = "missing";
Tuple tuple = createTestTuple(index, type);
@@ -56,11 +70,14 @@ public class EsIndexBoltTest extends AbstractEsBoltIntegrationTest<EsIndexBolt>
verify(outputCollector).ack(tuple);
- node.client().admin().indices().prepareRefresh(index).execute().actionGet();
- SearchResponse resp = node.client().prepareSearch(index)
- .setQuery(new TermQueryBuilder("_type", type))
- .setSize(0)
- .execute().actionGet();
+ RestHighLevelClient client = EsTestUtil.getRestHighLevelClient(node);
+ SearchRequest searchRequest = new SearchRequest(index);
+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+ searchSourceBuilder.query(new TermQueryBuilder("_type", type));
+ searchSourceBuilder.size(0);
+ searchRequest.source(searchSourceBuilder);
+
+ SearchResponse resp = client.search(searchRequest, RequestOptions.DEFAULT);
assertEquals(1, resp.getHits().getTotalHits());
}
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltIntegrationTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltIntegrationTest.java
index de595c3f2..c31e6f310 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltIntegrationTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltIntegrationTest.java
@@ -27,7 +27,11 @@ import org.apache.storm.elasticsearch.common.EsConfig;
import org.apache.storm.elasticsearch.common.EsTestUtil;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.ResponseException;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.xcontent.XContentType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -35,6 +39,8 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.junit.jupiter.MockitoExtension;
+import java.io.IOException;
+
@ExtendWith(MockitoExtension.class)
public class EsLookupBoltIntegrationTest extends AbstractEsBoltIntegrationTest<EsLookupBolt> {
@@ -52,8 +58,11 @@ public class EsLookupBoltIntegrationTest extends AbstractEsBoltIntegrationTest<E
}
@BeforeEach
- public void populateIndexWithTestData() {
- node.client().prepareIndex(index, type, documentId).setSource(source).execute().actionGet();
+ public void populateIndexWithTestData() throws IOException {
+ IndexRequest indexRequest = new IndexRequest(index, type, documentId)
+ .source(source, XContentType.JSON);
+ RestHighLevelClient client = EsTestUtil.getRestHighLevelClient(node);
+ client.index(indexRequest, RequestOptions.DEFAULT);
}
@Test
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltTest.java
index 197ca6ec6..0067e465d 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltTest.java
@@ -35,6 +35,8 @@ import org.apache.storm.elasticsearch.common.EsTupleMapper;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.junit.jupiter.api.AfterEach;
@@ -87,7 +89,9 @@ public class EsLookupBoltTest extends AbstractEsBoltTest<EsLookupBolt> {
}
private void makeRequestAndThrow(Exception exception) throws IOException {
- when(client.performRequest("get", AbstractEsBolt.getEndpoint(index, type, documentId), params)).thenThrow(exception);
+ final Request request = new Request("get", AbstractEsBolt.getEndpoint(index, type, documentId));
+ request.addParameters(params);
+ when(client.performRequest(request)).thenThrow(exception);
bolt.execute(tuple);
}
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
index 4ac39e1ca..d50127db0 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
@@ -29,11 +29,18 @@ import org.apache.storm.elasticsearch.common.EsTestUtil;
import org.apache.storm.elasticsearch.response.PercolateResponse;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.ResponseException;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.xcontent.XContentType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
+import java.io.IOException;
+
public class EsPercolateBoltTest extends AbstractEsBoltIntegrationTest<EsPercolateBolt> {
private final String source = "{\"user\":\"user1\"}";
@@ -44,11 +51,14 @@ public class EsPercolateBoltTest extends AbstractEsBoltIntegrationTest<EsPercola
}
@BeforeEach
- public void populateIndexWithTestData() {
- node.client().prepareIndex(index, ".percolator", documentId)
- .setSource("{\"query\":{\"match\":" + source + "}}")
- .setRefresh(true)
- .execute().actionGet();
+ public void populateIndexWithTestData() throws IOException {
+ IndexRequest indexRequest = new IndexRequest(index, ".percolator", documentId)
+ .source(source, XContentType.JSON)
+ .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); // setRefresh(true) in Elasticsearch 6.x
+
+ RestHighLevelClient client = EsTestUtil.getRestHighLevelClient(node);
+
+ client.index(indexRequest, RequestOptions.DEFAULT);
}
@Test
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
index 6712768ba..303fda635 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
@@ -39,23 +39,11 @@ import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.tuple.Values;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.client.Response;
-import org.elasticsearch.client.ResponseException;
-import org.elasticsearch.cluster.ClusterName;
-import org.elasticsearch.cluster.health.ClusterHealthStatus;
-import org.elasticsearch.cluster.metadata.IndexMetaData;
-import org.elasticsearch.common.Priority;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.util.concurrent.EsExecutors;
-import org.elasticsearch.index.IndexNotFoundException;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.node.NodeBuilder;
+import org.elasticsearch.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
public class EsTestUtil {
private static final Logger LOG = LoggerFactory.getLogger(EsTestUtil.class);
@@ -91,40 +79,32 @@ public class EsTestUtil {
return new ResponseException(response);
}
- public static Node startEsNode(){
- Node node = NodeBuilder.nodeBuilder().data(true).settings(
- Settings.settingsBuilder()
- .put(ClusterName.SETTING, EsConstants.clusterName)
- .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
- .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
- .put(EsExecutors.PROCESSORS, 1)
- .put("http.enabled", true)
- .put("index.percolator.map_unmapped_fields_as_string", true)
- .put("index.store.type", "mmapfs")
- .put("path.home", "./data")
- ).build();
- node.start();
- return node;
+ public static RestHighLevelClient getRestHighLevelClient(ElasticsearchContainer node) {
+ final EsConfig cfg = new EsConfig(node.getHttpHostAddress());
+ return new RestHighLevelClientBuilder(new StormElasticSearchClient(cfg).construct()).build();
}
- public static void ensureEsGreen(Node node) {
- ClusterHealthResponse chr = node.client()
- .admin()
- .cluster()
- .health(Requests.clusterHealthRequest()
- .timeout(TimeValue.timeValueSeconds(30))
- .waitForGreenStatus()
- .waitForEvents(Priority.LANGUID)
- .waitForRelocatingShards(0))
- .actionGet();
- assertThat("cluster status is green", chr.getStatus(), equalTo(ClusterHealthStatus.GREEN));
+ public static ElasticsearchContainer startEsNode() {
+ String version = System.getProperty("elasticsearch-version");
+ if (version == null) version = "7.17.13";
+ LOG.info("Starting docker instance of Elasticsearch {}...", version);
+
+ final ElasticsearchContainer container =
+ new ElasticsearchContainer(
+ "docker.elastic.co/elasticsearch/elasticsearch:" + version);
+ container.start();
+ return container;
+ }
+
+ public static void ensureEsGreen(ElasticsearchContainer node) {
+ assertThat("cluster status is green", node.isHealthy());
}
/**
* Stop the given Elasticsearch node and clear the data directory.
* @param node
*/
- public static void stopEsNode(Node node) {
+ public static void stopEsNode(ElasticsearchContainer node) {
node.close();
try {
FileUtils.deleteDirectory(new File("./data"));
@@ -138,10 +118,10 @@ public class EsTestUtil {
* @param node - the node to connect to
* @param index - the index to clear
*/
- public static void clearIndex(Node node, String index) {
+ public static void clearIndex(ElasticsearchContainer node, String index) {
try {
- node.client().admin().indices().delete(new DeleteIndexRequest(index)).actionGet();
- } catch (IndexNotFoundException ignore) {
+ getRestHighLevelClient(node).indices().delete(new DeleteIndexRequest(index), RequestOptions.DEFAULT);
+ } catch (IOException ignore) {
}
}
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateTest.java
index bcabe9e60..8e86e7dad 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateTest.java
@@ -33,12 +33,13 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
@IntegrationTest
@ExtendWith(MockitoExtension.class)
public class EsStateTest {
- private static Node node;
+ private static ElasticsearchContainer node;
private final String[] documentId = {UUID.randomUUID().toString(), UUID.randomUUID().toString()};
private final String index = "index";
diff --git a/pom.xml b/pom.xml
index 58d6eeb71..c1716ee15 100644
--- a/pom.xml
+++ b/pom.xml
@@ -126,7 +126,7 @@
<awaitility.version>3.1.0</awaitility.version>
<hdrhistogram.version>2.1.10</hdrhistogram.version>
<hamcrest.version>2.2</hamcrest.version>
- <elasticsearch.version>5.2.2</elasticsearch.version>
+ <elasticsearch.version>7.17.13</elasticsearch.version>
<calcite.version>1.16.0</calcite.version>
<jedis.version>2.9.0</jedis.version>
<activemq.version>5.18.3</activemq.version>