You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by rz...@apache.org on 2023/12/04 07:42:01 UTC

(storm) 01/02: STORM-4005 - ElasticSearch 7.17.13

This is an automated email from the ASF dual-hosted git repository.

rzo1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git

commit c15a1563f0eaa1a3762ec5eb9b0922766cd643a2
Author: Richard Zowalla <ri...@hs-heilbronn.de>
AuthorDate: Wed Nov 29 14:50:14 2023 +0100

    STORM-4005 - ElasticSearch 7.17.13
---
 DEPENDENCY-LICENSES                                |  70 +++++++-----
 examples/storm-elasticsearch-examples/pom.xml      |   8 --
 .../storm/elasticsearch/bolt/EsIndexTopology.java  |   9 +-
 .../storm/elasticsearch/common/EsTestUtil.java     | 122 ---------------------
 .../elasticsearch/trident/TridentEsTopology.java   |  10 +-
 external/storm-elasticsearch/pom.xml               |  32 ++++--
 .../storm/elasticsearch/bolt/EsIndexBolt.java      |   8 +-
 .../storm/elasticsearch/bolt/EsLookupBolt.java     |   6 +-
 .../storm/elasticsearch/bolt/EsPercolateBolt.java  |  10 +-
 .../storm/elasticsearch/common/EsConfig.java       |  22 ++--
 .../common/StormElasticSearchClient.java           |  10 +-
 .../storm/elasticsearch/trident/EsState.java       |   7 +-
 .../bolt/AbstractEsBoltIntegrationTest.java        |  10 +-
 .../storm/elasticsearch/bolt/EsIndexBoltTest.java  |  41 +++++--
 .../bolt/EsLookupBoltIntegrationTest.java          |  13 ++-
 .../storm/elasticsearch/bolt/EsLookupBoltTest.java |   6 +-
 .../elasticsearch/bolt/EsPercolateBoltTest.java    |  20 +++-
 .../storm/elasticsearch/common/EsTestUtil.java     |  66 ++++-------
 .../storm/elasticsearch/trident/EsStateTest.java   |   3 +-
 pom.xml                                            |   2 +-
 20 files changed, 207 insertions(+), 268 deletions(-)

diff --git a/DEPENDENCY-LICENSES b/DEPENDENCY-LICENSES
index 87892a602..57fb93e51 100644
--- a/DEPENDENCY-LICENSES
+++ b/DEPENDENCY-LICENSES
@@ -31,6 +31,10 @@ List of third-party dependencies grouped by their license type.
         * Maven Plugin Tools Java Annotations (org.apache.maven.plugin-tools:maven-plugin-annotations:3.8.1 - https://maven.apache.org/plugin-tools/maven-plugin-annotations)
         * snappy-java (org.xerial.snappy:snappy-java:1.1.10.4 - https://github.com/xerial/snappy-java)
 
+    Apache-2.0, LGPL-2.1-or-later
+
+        * Java Native Access (net.java.dev.jna:jna:5.10.0 - https://github.com/java-native-access/jna)
+
     Apache 2.0 License
 
         * Apache MINA Core (org.apache.mina:mina-core:2.2.2 - https://mina.apache.org/mina-core/)
@@ -125,7 +129,7 @@ List of third-party dependencies grouped by their license type.
         * Apache HttpAsyncClient (org.apache.httpcomponents:httpasyncclient:4.1.5 - http://hc.apache.org/httpcomponents-asyncclient)
         * Apache HttpClient (org.apache.httpcomponents:httpclient:4.5.14 - http://hc.apache.org/httpcomponents-client-ga)
         * Apache HttpCore (org.apache.httpcomponents:httpcore:4.4.16 - http://hc.apache.org/httpcomponents-core-ga)
-        * Apache HttpCore NIO (org.apache.httpcomponents:httpcore-nio:4.4.5 - http://hc.apache.org/httpcomponents-core-ga)
+        * Apache HttpCore NIO (org.apache.httpcomponents:httpcore-nio:4.4.15 - http://hc.apache.org/httpcomponents-core-ga)
         * Apache Ivy (org.apache.ivy:ivy:2.4.0 - http://ant.apache.org/ivy/)
         * Apache Kafka (org.apache.kafka:kafka-clients:0.11.0.3 - http://kafka.apache.org)
         * Apache Maven Artifact Transfer (org.apache.maven.shared:maven-artifact-transfer:0.9.1 - https://maven.apache.org/shared/maven-artifact-transfer/)
@@ -176,7 +180,7 @@ List of third-party dependencies grouped by their license type.
         * Commons Logging (commons-logging:commons-logging:1.1.3 - http://commons.apache.org/proper/commons-logging/)
         * Commons Math (org.apache.commons:commons-math3:3.1.1 - http://commons.apache.org/math/)
         * Commons Pool (commons-pool:commons-pool:1.5.4 - http://commons.apache.org/pool/)
-        * Compress-LZF (com.ning:compress-lzf:1.0.2 - http://github.com/ning/compress)
+        * compiler (com.github.spullara.mustache.java:compiler:0.9.6 - http://github.com/spullara/mustache.java)
         * Curator Client (org.apache.curator:curator-client:5.5.0 - https://curator.apache.org/curator-client)
         * Curator Framework (org.apache.curator:curator-framework:5.5.0 - https://curator.apache.org/curator-framework)
         * Curator Recipes (org.apache.curator:curator-recipes:5.5.0 - https://curator.apache.org/curator-recipes)
@@ -198,8 +202,6 @@ List of third-party dependencies grouped by their license type.
         * Dropwizard Utility Classes (io.dropwizard:dropwizard-util:1.3.29 - http://www.dropwizard.io/1.3.29/dropwizard-util)
         * Dropwizard Validation Support (io.dropwizard:dropwizard-validation:1.3.29 - http://www.dropwizard.io/1.3.29/dropwizard-validation)
         * Ehcache (org.ehcache:ehcache:3.3.1 - http://ehcache.org)
-        * Elasticsearch: Core (org.elasticsearch:elasticsearch:2.4.4 - http://nexus.sonatype.org/oss-repository-hosting.html/parent/elasticsearch)
-        * Elasticsearch SecureSM (org.elasticsearch:securesm:1.0 - http://nexus.sonatype.org/oss-repository-hosting.html/securesm)
         * error-prone annotations (com.google.errorprone:error_prone_annotations:2.21.1 - https://errorprone.info/error_prone_annotations)
         * Esri Geometry API for Java (com.esri.geometry:esri-geometry-api:2.0.0 - https://github.com/Esri/geometry-api-java)
         * fastutil (it.unimi.dsi:fastutil:6.5.6 - http://fasutil.dsi.unimi.it/)
@@ -250,8 +252,8 @@ List of third-party dependencies grouped by their license type.
         * Hive Storage API (org.apache.hive:hive-storage-api:2.7.0 - https://www.apache.org/hive-storage-api/)
         * Hive Upgrade Acid (org.apache.hive:hive-upgrade-acid:3.1.3 - https://www.apache.org/hive-upgrade-acid/)
         * Hive Vector-Code-Gen Utilities (org.apache.hive:hive-vector-code-gen:3.1.3 - https://hive.apache.org/hive-vector-code-gen)
-        * HPPC Collections (com.carrotsearch:hppc:0.7.1 - http://labs.carrotsearch.com/hppc.html/hppc)
         * HPPC Collections (com.carrotsearch:hppc:0.7.2 - http://labs.carrotsearch.com/hppc.html/hppc)
+        * HPPC Collections (com.carrotsearch:hppc:0.8.1 - http://labs.carrotsearch.com/hppc.html/hppc)
         * htrace-core (org.apache.htrace:htrace-core:3.2.0-incubating - http://incubator.apache.org/projects/htrace.html)
         * j2html (com.j2html:j2html:1.6.0 - http://j2html.com)
         * J2ObjC Annotations (com.google.j2objc:j2objc-annotations:2.8 - https://github.com/google/j2objc/)
@@ -306,21 +308,21 @@ List of third-party dependencies grouped by their license type.
         * Kerby Util (org.apache.kerby:kerby-util:1.0.1 - http://directory.apache.org/kerby/kerby-common/kerby-util)
         * Kerby XDR Project (org.apache.kerby:kerby-xdr:1.0.1 - http://directory.apache.org/kerby/kerby-common/kerby-xdr)
         * Logging (commons-logging:commons-logging:1.0.3 - http://jakarta.apache.org/commons/logging/)
-        * Lucene Common Analyzers (org.apache.lucene:lucene-analyzers-common:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-analyzers-common)
-        * Lucene Core (org.apache.lucene:lucene-core:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-core)
-        * Lucene Grouping (org.apache.lucene:lucene-grouping:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-grouping)
-        * Lucene Highlighter (org.apache.lucene:lucene-highlighter:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-highlighter)
-        * Lucene Join (org.apache.lucene:lucene-join:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-join)
-        * Lucene Memory (org.apache.lucene:lucene-backward-codecs:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-backward-codecs)
-        * Lucene Memory (org.apache.lucene:lucene-memory:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-memory)
-        * Lucene Miscellaneous (org.apache.lucene:lucene-misc:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-misc)
-        * Lucene Queries (org.apache.lucene:lucene-queries:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-queries)
-        * Lucene QueryParsers (org.apache.lucene:lucene-queryparser:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-queryparser)
-        * Lucene Sandbox (org.apache.lucene:lucene-sandbox:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-sandbox)
-        * Lucene Spatial (org.apache.lucene:lucene-spatial:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-spatial)
-        * Lucene Spatial 3D (org.apache.lucene:lucene-spatial3d:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-spatial3d)
-        * Lucene Suggest (org.apache.lucene:lucene-suggest:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-suggest)
+        * Lucene Common Analyzers (org.apache.lucene:lucene-analyzers-common:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-analyzers-common)
+        * Lucene Core (org.apache.lucene:lucene-core:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-core)
+        * Lucene Grouping (org.apache.lucene:lucene-grouping:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-grouping)
+        * Lucene Highlighter (org.apache.lucene:lucene-highlighter:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-highlighter)
+        * Lucene Join (org.apache.lucene:lucene-join:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-join)
+        * Lucene Memory (org.apache.lucene:lucene-backward-codecs:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-backward-codecs)
+        * Lucene Memory (org.apache.lucene:lucene-memory:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-memory)
+        * Lucene Miscellaneous (org.apache.lucene:lucene-misc:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-misc)
+        * Lucene Queries (org.apache.lucene:lucene-queries:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-queries)
+        * Lucene QueryParsers (org.apache.lucene:lucene-queryparser:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-queryparser)
+        * Lucene Sandbox (org.apache.lucene:lucene-sandbox:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-sandbox)
+        * Lucene Spatial 3D (org.apache.lucene:lucene-spatial3d:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-spatial3d)
+        * Lucene Suggest (org.apache.lucene:lucene-suggest:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-suggest)
         * LZ4 and xxHash (net.jpountz.lz4:lz4:1.3.0 - https://github.com/jpountz/lz4-java)
+        * LZ4 and xxHash (org.lz4:lz4-java:1.8.0 - https://github.com/lz4/lz4-java)
         * Maven Aether Provider (org.apache.maven:maven-aether-provider:3.0 - http://maven.apache.org/maven-aether-provider/)
         * Maven Artifact (org.apache.maven:maven-artifact:3.0 - http://maven.apache.org/maven-artifact/)
         * Maven Artifact (org.apache.maven:maven-artifact:3.6.0 - https://maven.apache.org/ref/3.6.0/maven-artifact/)
@@ -419,13 +421,12 @@ List of third-party dependencies grouped by their license type.
         * Plexus Interpolation API (org.codehaus.plexus:plexus-interpolation:1.25 - http://codehaus-plexus.github.io/plexus-interpolation/)
         * Plexus Security Dispatcher Component (org.sonatype.plexus:plexus-sec-dispatcher:1.3 - http://spice.sonatype.org/plexus-sec-dispatcher)
         * Plexus Security Dispatcher Component (org.sonatype.plexus:plexus-sec-dispatcher:1.4 - http://spice.sonatype.org/plexus-sec-dispatcher)
-        * rest (org.elasticsearch.client:rest:5.2.2 - https://github.com/elastic/elasticsearch)
+        * rest (org.elasticsearch.client:elasticsearch-rest-client:7.17.13 - https://github.com/elastic/elasticsearch.git)
         * sigar (org.fusesource:sigar:1.6.4 - http://fusesource.com/sigar/)
         * Sisu - Guice (org.sonatype.sisu:sisu-guice:2.1.7 - http://forge.sonatype.com/sisu-guice/)
         * Sisu - Inject (JSR330 bean support) (org.sonatype.sisu:sisu-inject-bean:1.4.2 - http://sisu.sonatype.org/sisu-inject/guice-bean/sisu-inject-bean/)
         * Sisu - Inject (Plexus bean support) (org.sonatype.sisu:sisu-inject-plexus:1.4.2 - http://sisu.sonatype.org/sisu-inject/guice-bean/guice-plexus/sisu-inject-plexus/)
         * SnakeYAML (org.yaml:snakeyaml:2.0 - https://bitbucket.org/snakeyaml/snakeyaml)
-        * Spatial4J (com.spatial4j:spatial4j:0.5 - http://nexus.sonatype.org/oss-repository-hosting.html/spatial4j)
         * Spring AOP (org.springframework:spring-aop:5.3.27 - https://github.com/spring-projects/spring-framework)
         * Spring Beans (org.springframework:spring-beans:5.3.27 - https://github.com/spring-projects/spring-framework)
         * Spring Commons Logging Bridge (org.springframework:spring-jcl:5.3.27 - https://github.com/spring-projects/spring-framework)
@@ -436,7 +437,7 @@ List of third-party dependencies grouped by their license type.
         * Spring Messaging (org.springframework:spring-messaging:5.3.27 - https://github.com/spring-projects/spring-framework)
         * Spring Transaction (org.springframework:spring-tx:5.3.27 - https://github.com/spring-projects/spring-framework)
         * StAX API (stax:stax-api:1.0.1 - http://stax.codehaus.org/)
-        * T-Digest (com.tdunning:t-digest:3.0 - https://github.com/tdunning/t-digest)
+        * T-Digest (com.tdunning:t-digest:3.2 - https://github.com/tdunning/t-digest)
         * Tephra API (co.cask.tephra:tephra-api:0.6.0 - https://github.com/caskdata/tephra/tephra-api)
         * Tephra Core (co.cask.tephra:tephra-core:0.6.0 - https://github.com/caskdata/tephra/tephra-core)
         * Tephra HBase 1.0 Compatibility (co.cask.tephra:tephra-hbase-compat-1.0:0.6.0 - https://github.com/caskdata/tephra/tephra-hbase-compat-1.0)
@@ -547,10 +548,6 @@ List of third-party dependencies grouped by their license type.
         * JLine Bundle (org.jline:jline:3.9.0 - http://nexus.sonatype.org/oss-repository-hosting.html/jline-parent/jline)
         * Stax2 API (org.codehaus.woodstox:stax2-api:4.2.1 - http://github.com/FasterXML/stax2-api)
 
-    CC0 1.0 Universal
-
-        * JSR166e (com.twitter:jsr166e:1.1.0 - http://github.com/twitter/jsr166e)
-
     CDDL/GPLv2+CE
 
         * JavaBeans Activation Framework API jar (javax.activation:javax.activation-api:1.2.0 - http://java.net/all/javax.activation-api/)
@@ -637,6 +634,26 @@ List of third-party dependencies grouped by their license type.
 
         * jms (jakarta.jms:jakarta.jms-api:2.0.2 - https://projects.eclipse.org/projects/ee4j.jms)
 
+    Elastic License 2.0
+
+        * rest-high-level (org.elasticsearch.client:elasticsearch-rest-high-level-client:7.17.13 - https://github.com/elastic/elasticsearch.git)
+
+    Elastic License 2.0, Server Side Public License, v 1
+
+        * aggs-matrix-stats (org.elasticsearch.plugin:aggs-matrix-stats-client:7.17.13 - https://github.com/elastic/elasticsearch.git)
+        * elasticsearch-cli (org.elasticsearch:elasticsearch-cli:7.17.13 - https://github.com/elastic/elasticsearch.git)
+        * elasticsearch-core (org.elasticsearch:elasticsearch-core:7.17.13 - https://github.com/elastic/elasticsearch.git)
+        * elasticsearch-geo (org.elasticsearch:elasticsearch-geo:7.17.13 - https://github.com/elastic/elasticsearch.git)
+        * elasticsearch-lz4 (org.elasticsearch:elasticsearch-lz4:7.17.13 - https://github.com/elastic/elasticsearch.git)
+        * elasticsearch-plugin-classloader (org.elasticsearch:elasticsearch-plugin-classloader:7.17.13 - https://github.com/elastic/elasticsearch.git)
+        * elasticsearch-secure-sm (org.elasticsearch:elasticsearch-secure-sm:7.17.13 - https://github.com/elastic/elasticsearch.git)
+        * elasticsearch-x-content (org.elasticsearch:elasticsearch-x-content:7.17.13 - https://github.com/elastic/elasticsearch.git)
+        * lang-mustache (org.elasticsearch.plugin:lang-mustache-client:7.17.13 - https://github.com/elastic/elasticsearch.git)
+        * mapper-extras (org.elasticsearch.plugin:mapper-extras-client:7.17.13 - https://github.com/elastic/elasticsearch.git)
+        * parent-join (org.elasticsearch.plugin:parent-join-client:7.17.13 - https://github.com/elastic/elasticsearch.git)
+        * rank-eval (org.elasticsearch.plugin:rank-eval-client:7.17.13 - https://github.com/elastic/elasticsearch.git)
+        * server (org.elasticsearch:elasticsearch:7.17.13 - https://github.com/elastic/elasticsearch.git)
+
     MIT License
 
         * argparse4j (net.sourceforge.argparse4j:argparse4j:0.8.1 - http://argparse4j.github.io)
@@ -644,6 +661,7 @@ List of third-party dependencies grouped by their license type.
         * JCodings (org.jruby.jcodings:jcodings:1.0.55 - http://nexus.sonatype.org/oss-repository-hosting.html/jcodings)
         * Jedis (redis.clients:jedis:2.9.0 - https://github.com/xetorthio/jedis)
         * Joni (org.jruby.joni:joni:2.1.31 - http://nexus.sonatype.org/oss-repository-hosting.html/joni)
+        * JOpt Simple (net.sf.jopt-simple:jopt-simple:5.0.2 - http://pholser.github.io/jopt-simple)
         * JUL to SLF4J bridge (org.slf4j:jul-to-slf4j:1.7.36 - http://www.slf4j.org)
         * Microsoft JDBC Driver for SQL Server (com.microsoft.sqlserver:mssql-jdbc:6.2.1.jre7 - https://github.com/Microsoft/mssql-jdbc)
         * SLF4J API Module (org.slf4j:slf4j-api:1.7.36 - http://www.slf4j.org)
diff --git a/examples/storm-elasticsearch-examples/pom.xml b/examples/storm-elasticsearch-examples/pom.xml
index ee864aa15..f16429bb3 100644
--- a/examples/storm-elasticsearch-examples/pom.xml
+++ b/examples/storm-elasticsearch-examples/pom.xml
@@ -26,9 +26,6 @@
     </parent>
 
     <artifactId>storm-elasticsearch-examples</artifactId>
-    <properties>
-        <elasticsearch.test.version>2.4.4</elasticsearch.test.version>
-    </properties>
 
     <dependencies>
         <dependency>
@@ -42,11 +39,6 @@
             <artifactId>storm-elasticsearch</artifactId>
             <version>${project.version}</version>
         </dependency>
-        <dependency>
-            <groupId>org.elasticsearch</groupId>
-            <artifactId>elasticsearch</artifactId>
-            <version>${elasticsearch.test.version}</version>
-        </dependency>
     </dependencies>
 
     <build>
diff --git a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
index 1c9e09f1c..3c3e8565a 100644
--- a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
+++ b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
@@ -24,10 +24,7 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
-import org.apache.storm.elasticsearch.common.EsConfig;
-import org.apache.storm.elasticsearch.common.EsConstants;
-import org.apache.storm.elasticsearch.common.EsTestUtil;
-import org.apache.storm.elasticsearch.common.EsTupleMapper;
+import org.apache.storm.elasticsearch.common.*;
 import org.apache.storm.generated.AlreadyAliveException;
 import org.apache.storm.generated.AuthorizationException;
 import org.apache.storm.generated.InvalidTopologyException;
@@ -76,13 +73,11 @@ public final class EsIndexTopology {
         TopologyBuilder builder = new TopologyBuilder();
         UserDataSpout spout = new UserDataSpout();
         builder.setSpout(SPOUT_ID, spout, 1);
-        EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
+        EsTupleMapper tupleMapper =new DefaultEsTupleMapper();
         EsConfig esConfig = new EsConfig("http://localhost:9300");
         builder.setBolt(BOLT_ID, new EsIndexBolt(esConfig, tupleMapper), 1)
                 .shuffleGrouping(SPOUT_ID);
 
-        EsTestUtil.startEsNode();
-        EsTestUtil.waitForSeconds(EsConstants.WAIT_DEFAULT_SECS);
         StormSubmitter.submitTopology(TOPOLOGY_NAME,
                 config,
                 builder.createTopology());
diff --git a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsTestUtil.java b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
deleted file mode 100644
index e96f6e897..000000000
--- a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.elasticsearch.common;
-
-import java.util.HashMap;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.storm.Config;
-import org.apache.storm.task.GeneralTopologyContext;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.TupleImpl;
-import org.apache.storm.tuple.Values;
-import org.elasticsearch.cluster.ClusterName;
-import org.elasticsearch.cluster.metadata.IndexMetaData;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.util.concurrent.EsExecutors;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.node.NodeBuilder;
-
-/**
- * ElasticSearch example utilities.
- */
-public final class EsTestUtil {
-
-    /**
-     * Generates a test tuple.
-     * @param source the source of the tuple
-     * @param index the index of the tuple
-     * @param type the type of the tuple
-     * @param id the id of the tuple
-     * @return the generated tuple
-     */
-    public static Tuple generateTestTuple(final String source,
-            final String index,
-            final String type,
-            final String id) {
-        TopologyBuilder builder = new TopologyBuilder();
-        GeneralTopologyContext topologyContext = new GeneralTopologyContext(
-                builder.createTopology(),
-                new Config(),
-                new HashMap<>(),
-                new HashMap<>(),
-                new HashMap<>(),
-                "") {
-            @Override
-            public Fields getComponentOutputFields(final String componentId,
-                    final String streamId) {
-                return new Fields("source", "index", "type", "id");
-            }
-        };
-        return new TupleImpl(topologyContext,
-                new Values(source, index, type, id),
-                source,
-                1,
-                "");
-    }
-
-    /**
-     * Generates a new tuple mapper.
-     * @return the generated mapper
-     */
-    public static EsTupleMapper generateDefaultTupleMapper() {
-        return new DefaultEsTupleMapper();
-    }
-
-    /**
-     * Starts an ElasticSearch node.
-     * @return the started node.
-     */
-    public static Node startEsNode() {
-        Node node = NodeBuilder.nodeBuilder().data(true).settings(
-                Settings.settingsBuilder()
-                        .put(ClusterName.SETTING, EsConstants.CLUSTER_NAME)
-                        .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
-                        .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
-                        .put(EsExecutors.PROCESSORS, 1)
-                        .put("http.enabled", true)
-                        .put("index.percolator.map_unmapped_fields_as_string",
-                                true)
-                        .put("index.store.type", "mmapfs")
-                        .put("path.home", "./data")
-        ).build();
-        node.start();
-        return node;
-    }
-
-    /**
-     * Waits for specified seconds and ignores {@link InterruptedException}.
-     * @param seconds the seconds to wait
-     */
-    public static void waitForSeconds(final int seconds) {
-        try {
-            Thread.sleep(TimeUnit.SECONDS.toMillis(5));
-        } catch (InterruptedException ex) {
-            //expected
-        }
-    }
-
-    /**
-     * Utility constructor.
-     */
-    private EsTestUtil() {
-    }
-}
diff --git a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
index c9126656e..311c816b7 100644
--- a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
+++ b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
@@ -26,10 +26,7 @@ import java.util.UUID;
 
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
-import org.apache.storm.elasticsearch.common.EsConfig;
-import org.apache.storm.elasticsearch.common.EsConstants;
-import org.apache.storm.elasticsearch.common.EsTestUtil;
-import org.apache.storm.elasticsearch.common.EsTupleMapper;
+import org.apache.storm.elasticsearch.common.*;
 import org.apache.storm.generated.AlreadyAliveException;
 import org.apache.storm.generated.AuthorizationException;
 import org.apache.storm.generated.InvalidTopologyException;
@@ -68,16 +65,13 @@ public final class TridentEsTopology {
         Stream stream = topology.newStream("spout", spout);
         EsConfig esConfig = new EsConfig("http://localhost:9300");
         Fields esFields = new Fields("index", "type", "source");
-        EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
+        EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
         StateFactory factory = new EsStateFactory(esConfig, tupleMapper);
         TridentState state = stream.partitionPersist(factory,
                 esFields,
                 new EsUpdater(),
                 new Fields());
 
-        EsTestUtil.startEsNode();
-        EsTestUtil.waitForSeconds(EsConstants.WAIT_DEFAULT_SECS);
-
         StormSubmitter.submitTopology(TOPOLOGY_NAME,
                 new Config(),
                 topology.build());
diff --git a/external/storm-elasticsearch/pom.xml b/external/storm-elasticsearch/pom.xml
index 0a72b30fc..ba68ee7b8 100644
--- a/external/storm-elasticsearch/pom.xml
+++ b/external/storm-elasticsearch/pom.xml
@@ -58,7 +58,7 @@
         </dependency>
         <dependency>
             <groupId>org.elasticsearch.client</groupId>
-            <artifactId>rest</artifactId>
+            <artifactId>elasticsearch-rest-high-level-client</artifactId>
             <version>${elasticsearch.version}</version>
         </dependency>
         <dependency>
@@ -91,7 +91,6 @@
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-databind</artifactId>
-            <version>${jackson.databind.version}</version>
         </dependency>
         <dependency>
             <groupId>com.google.guava</groupId>
@@ -111,12 +110,6 @@
             <groupId>org.hamcrest</groupId>
             <artifactId>hamcrest</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.elasticsearch</groupId>
-            <artifactId>elasticsearch</artifactId>
-            <version>${elasticsearch.test.version}</version>
-            <scope>test</scope>
-        </dependency>
         <dependency>
             <groupId>org.apache.storm</groupId>
             <artifactId>storm-client</artifactId>
@@ -129,6 +122,12 @@
             <artifactId>guava</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>elasticsearch</artifactId>
+            <version>1.19.1</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
@@ -153,6 +152,23 @@
                     </execution>
                 </executions>
             </plugin>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>default-test</id>
+                        <phase>test</phase>
+                        <goals>
+                            <goal>test</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <systemPropertyVariables>
+                        <elasticsearch-version>${elasticsearch.version}</elasticsearch-version>
+                    </systemPropertyVariables>
+                </configuration>
+            </plugin>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-failsafe-plugin</artifactId>
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
index 6d4686331..9d60b9786 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
@@ -31,6 +31,8 @@ import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.tuple.Tuple;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
 
 /**
  * Basic bolt for storing tuple to ES document.
@@ -74,7 +76,11 @@ public class EsIndexBolt extends AbstractEsBolt {
             String id = tupleMapper.getId(tuple);
             Map<String, String> params = tupleMapper.getParams(tuple, new HashMap<>());
 
-            client.performRequest("put", getEndpoint(index, type, id), params, new StringEntity(source));
+            final Request request = new Request("post", getEndpoint(index, type, id));
+            request.setEntity(new StringEntity(source));
+            request.addParameters(params);
+
+            client.performRequest(request);
             collector.ack(tuple);
         } catch (Exception e) {
             collector.reportError(e);
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java
index 5faa69fbd..f84b5b601 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java
@@ -25,6 +25,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.http.entity.StringEntity;
 import org.apache.storm.elasticsearch.DefaultEsLookupResultOutput;
 import org.apache.storm.elasticsearch.EsLookupResultOutput;
 import org.apache.storm.elasticsearch.common.DefaultEsTupleMapper;
@@ -33,6 +34,7 @@ import org.apache.storm.elasticsearch.common.EsTupleMapper;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
+import org.elasticsearch.client.Request;
 import org.elasticsearch.client.Response;
 
 /**
@@ -83,7 +85,9 @@ public class EsLookupBolt extends AbstractEsBolt {
         String id = tupleMapper.getId(tuple);
         Map<String, String> params = tupleMapper.getParams(tuple, new HashMap<>());
 
-        Response response = client.performRequest("get", getEndpoint(index, type, id), params);
+        final Request request = new Request("get", getEndpoint(index, type, id));
+        request.addParameters(params);
+        Response response = client.performRequest(request);
         return output.toValues(response);
     }
 
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
index 3f831f74a..2fac3e8f6 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
@@ -34,6 +34,7 @@ import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
+import org.elasticsearch.client.Request;
 import org.elasticsearch.client.Response;
 
 /**
@@ -82,8 +83,13 @@ public class EsPercolateBolt extends AbstractEsBolt {
             Map<String, String> indexParams = new HashMap<>();
             indexParams.put(type, null);
             String percolateDoc = "{\"doc\": " + source + "}";
-            Response response = client.performRequest("get", getEndpoint(index, type, "_percolate"),
-                    new HashMap<>(), new StringEntity(percolateDoc));
+
+            final Request request = new Request("get",  getEndpoint(index, type, "_percolate"));
+            request.setEntity(new StringEntity(percolateDoc));
+            request.addParameters(new HashMap<>());
+
+            Response response = client.performRequest(request);
+
             PercolateResponse percolateResponse = objectMapper.readValue(response.getEntity().getContent(), PercolateResponse.class);
             if (!percolateResponse.getMatches().isEmpty()) {
                 for (PercolateResponse.Match match : percolateResponse.getMatches()) {
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
index c665ae4f8..04ac73d4c 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
@@ -35,13 +35,14 @@ import org.elasticsearch.client.RestClientBuilder.RequestConfigCallback;
 public class EsConfig implements Serializable {
 
     private final HttpHost[] httpHosts;
-    private Integer maxRetryTimeoutMillis;
     private Header[] defaultHeaders;
     private RestClient.FailureListener failureListener;
     private HttpClientConfigCallback httpClientConfigCallback;
     private RequestConfigCallback requestConfigCallback;
     private String pathPrefix;
 
+    private boolean compression;
+
     /**
     * EsConfig Constructor to be used in EsIndexBolt, EsPercolateBolt and EsStateFactory.
     * Connects to Elasticsearch at http://localhost:9200.
@@ -75,11 +76,6 @@ public class EsConfig implements Serializable {
             throw new IllegalArgumentException("Invalid url " + url);
         }
     }
-    
-    public EsConfig withMaxRetryTimeoutMillis(Integer maxRetryTimeoutMillis) {
-        this.maxRetryTimeoutMillis = maxRetryTimeoutMillis;
-        return this;
-    }
 
     public EsConfig withDefaultHeaders(Header[] defaultHeaders) {
         this.defaultHeaders = defaultHeaders;
@@ -106,12 +102,13 @@ public class EsConfig implements Serializable {
         return this;
     }
 
-    public HttpHost[] getHttpHosts() {
-        return httpHosts;
+    public EsConfig withCompression(boolean compression) {
+        this.compression = compression;
+        return this;
     }
 
-    public Integer getMaxRetryTimeoutMillis() {
-        return maxRetryTimeoutMillis;
+    public HttpHost[] getHttpHosts() {
+        return httpHosts;
     }
 
     public Header[] getDefaultHeaders() {
@@ -133,4 +130,9 @@ public class EsConfig implements Serializable {
     public String getPathPrefix() {
         return pathPrefix;
     }
+
+    public boolean isCompression() {
+        return compression;
+    }
+
 }
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/StormElasticSearchClient.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/StormElasticSearchClient.java
index 732aa084a..7fa57d7b9 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/StormElasticSearchClient.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/StormElasticSearchClient.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -41,9 +41,6 @@ public final class StormElasticSearchClient implements Serializable {
      */
     public RestClient construct() {
         RestClientBuilder builder = RestClient.builder(esConfig.getHttpHosts());
-        if (esConfig.getMaxRetryTimeoutMillis() != null) {
-            builder.setMaxRetryTimeoutMillis(esConfig.getMaxRetryTimeoutMillis());
-        }
         if (esConfig.getDefaultHeaders() != null) {
             builder.setDefaultHeaders(esConfig.getDefaultHeaders());
         }
@@ -59,6 +56,9 @@ public final class StormElasticSearchClient implements Serializable {
         if (esConfig.getPathPrefix() != null) {
             builder.setPathPrefix(esConfig.getPathPrefix());
         }
+
+        builder.setCompressionEnabled(esConfig.isCompression());
+
         return builder.build();
     }
 }
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
index 3a8e202c2..ef82335f8 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
@@ -22,7 +22,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.List;
 
 import org.apache.http.entity.StringEntity;
@@ -36,6 +35,7 @@ import org.apache.storm.elasticsearch.response.BulkIndexResponse;
 import org.apache.storm.topology.FailedException;
 import org.apache.storm.trident.state.State;
 import org.apache.storm.trident.tuple.TridentTuple;
+import org.elasticsearch.client.Request;
 import org.elasticsearch.client.Response;
 import org.elasticsearch.client.RestClient;
 import org.slf4j.Logger;
@@ -122,7 +122,10 @@ class EsState implements State {
     public void updateState(List<TridentTuple> tuples) {
         try {
             String bulkRequest = buildRequest(tuples);
-            Response response = client.performRequest("post", "_bulk", new HashMap<>(), new StringEntity(bulkRequest.toString()));
+
+            final Request request = new Request("post", "_bulk");
+            request.setEntity(new StringEntity(bulkRequest));
+            Response response = client.performRequest(request);
             BulkIndexResponse bulkResponse = objectMapper.readValue(response.getEntity().getContent(), BulkIndexResponse.class);
             if (bulkResponse.hasErrors()) {
                 LOG.warn("failed processing bulk index requests: " + bulkResponse.getFirstError() + ": " + bulkResponse.getFirstResult());
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java
index afb75db1a..0819f8a8e 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java
@@ -20,16 +20,20 @@ package org.apache.storm.elasticsearch.bolt;
 import org.apache.storm.elasticsearch.common.EsTestUtil;
 import org.apache.storm.testing.IntegrationTest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
+import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.node.Node;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+
+import java.io.IOException;
 
 @IntegrationTest
 public abstract class AbstractEsBoltIntegrationTest<Bolt extends AbstractEsBolt> extends AbstractEsBoltTest<Bolt> {
 
-    protected static Node node;
+    protected static ElasticsearchContainer node;
 
     @BeforeAll
     public static void startElasticSearchNode() throws Exception {
@@ -43,8 +47,8 @@ public abstract class AbstractEsBoltIntegrationTest<Bolt extends AbstractEsBolt>
     }
 
     @BeforeEach
-    public void createIndex() {
-        node.client().admin().indices().create(new CreateIndexRequest(index)).actionGet();
+    public void createIndex() throws IOException {
+        EsTestUtil.getRestHighLevelClient(node).indices().create(new CreateIndexRequest(index), RequestOptions.DEFAULT);
     }
 
     @AfterEach
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
index 9e86865b3..a7ccee4bf 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
@@ -23,31 +23,45 @@ import static org.mockito.Mockito.verify;
 import org.apache.storm.elasticsearch.common.EsConfig;
 import org.apache.storm.elasticsearch.common.EsTestUtil;
 import org.apache.storm.tuple.Tuple;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.query.TermQueryBuilder;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.junit.jupiter.api.Test;
 
+import java.io.IOException;
+
 public class EsIndexBoltTest extends AbstractEsBoltIntegrationTest<EsIndexBolt> {
 
     @Test
-    public void testEsIndexBolt() {
+    public void testEsIndexBolt() throws IOException {
         Tuple tuple = createTestTuple(index, type);
 
         bolt.execute(tuple);
 
         verify(outputCollector).ack(tuple);
 
-        node.client().admin().indices().prepareRefresh(index).execute().actionGet();
-        SearchResponse resp = node.client().prepareSearch(index)
-                .setQuery(new TermQueryBuilder("_type", type))
-                .setSize(0)
-                .execute().actionGet();
+        RestHighLevelClient client =  EsTestUtil.getRestHighLevelClient(node);
+        RefreshRequest request = new RefreshRequest(index);
+        client.indices().refresh(request, RequestOptions.DEFAULT);
+
+        SearchRequest searchRequest = new SearchRequest(index);
+        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+        searchSourceBuilder.query(new TermQueryBuilder("_type", type));
+        searchSourceBuilder.size(0);
+        searchRequest.source(searchSourceBuilder);
+
+        SearchResponse resp = client.search(searchRequest, RequestOptions.DEFAULT);
 
         assertEquals(1, resp.getHits().getTotalHits());
     }
 
     @Test
-    public void indexMissing() {
+    public void indexMissing() throws IOException {
         String index = "missing";
 
         Tuple tuple = createTestTuple(index, type);
@@ -56,11 +70,14 @@ public class EsIndexBoltTest extends AbstractEsBoltIntegrationTest<EsIndexBolt>
 
         verify(outputCollector).ack(tuple);
 
-        node.client().admin().indices().prepareRefresh(index).execute().actionGet();
-        SearchResponse resp = node.client().prepareSearch(index)
-                .setQuery(new TermQueryBuilder("_type", type))
-                .setSize(0)
-                .execute().actionGet();
+        RestHighLevelClient client =  EsTestUtil.getRestHighLevelClient(node);
+        SearchRequest searchRequest = new SearchRequest(index);
+        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+        searchSourceBuilder.query(new TermQueryBuilder("_type", type));
+        searchSourceBuilder.size(0);
+        searchRequest.source(searchSourceBuilder);
+
+        SearchResponse resp = client.search(searchRequest, RequestOptions.DEFAULT);
 
         assertEquals(1, resp.getHits().getTotalHits());
     }
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltIntegrationTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltIntegrationTest.java
index de595c3f2..c31e6f310 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltIntegrationTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltIntegrationTest.java
@@ -27,7 +27,11 @@ import org.apache.storm.elasticsearch.common.EsConfig;
 import org.apache.storm.elasticsearch.common.EsTestUtil;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.ResponseException;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.xcontent.XContentType;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -35,6 +39,8 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.junit.jupiter.MockitoExtension;
 
+import java.io.IOException;
+
 @ExtendWith(MockitoExtension.class)
 public class EsLookupBoltIntegrationTest extends AbstractEsBoltIntegrationTest<EsLookupBolt> {
 
@@ -52,8 +58,11 @@ public class EsLookupBoltIntegrationTest extends AbstractEsBoltIntegrationTest<E
     }
 
     @BeforeEach
-    public void populateIndexWithTestData() {
-        node.client().prepareIndex(index, type, documentId).setSource(source).execute().actionGet();
+    public void populateIndexWithTestData() throws IOException {
+        IndexRequest indexRequest = new IndexRequest(index, type, documentId)
+                .source(source, XContentType.JSON);
+        RestHighLevelClient client =  EsTestUtil.getRestHighLevelClient(node);
+        client.index(indexRequest, RequestOptions.DEFAULT);
     }
 
     @Test
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltTest.java
index 197ca6ec6..0067e465d 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltTest.java
@@ -35,6 +35,8 @@ import org.apache.storm.elasticsearch.common.EsTupleMapper;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
 import org.elasticsearch.client.ResponseException;
 import org.elasticsearch.client.RestClient;
 import org.junit.jupiter.api.AfterEach;
@@ -87,7 +89,9 @@ public class EsLookupBoltTest extends AbstractEsBoltTest<EsLookupBolt> {
     }
 
     private void makeRequestAndThrow(Exception exception) throws IOException {
-        when(client.performRequest("get", AbstractEsBolt.getEndpoint(index, type, documentId), params)).thenThrow(exception);
+        final Request request = new Request("get", AbstractEsBolt.getEndpoint(index, type, documentId));
+        request.addParameters(params);
+        when(client.performRequest(request)).thenThrow(exception);
         bolt.execute(tuple);
     }
 
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
index 4ac39e1ca..d50127db0 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
@@ -29,11 +29,18 @@ import org.apache.storm.elasticsearch.common.EsTestUtil;
 import org.apache.storm.elasticsearch.response.PercolateResponse;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.ResponseException;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.xcontent.XContentType;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
 
+import java.io.IOException;
+
 public class EsPercolateBoltTest extends AbstractEsBoltIntegrationTest<EsPercolateBolt> {
 
     private final String source = "{\"user\":\"user1\"}";
@@ -44,11 +51,14 @@ public class EsPercolateBoltTest extends AbstractEsBoltIntegrationTest<EsPercola
     }
 
     @BeforeEach
-    public void populateIndexWithTestData() {
-        node.client().prepareIndex(index, ".percolator", documentId)
-            .setSource("{\"query\":{\"match\":" + source + "}}")
-            .setRefresh(true)
-            .execute().actionGet();
+    public void populateIndexWithTestData() throws IOException {
+        IndexRequest indexRequest = new IndexRequest(index, ".percolator", documentId)
+                .source(source, XContentType.JSON)
+                .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); // setRefresh(true) in Elasticsearch 6.x
+
+        RestHighLevelClient client =  EsTestUtil.getRestHighLevelClient(node);
+
+        client.index(indexRequest, RequestOptions.DEFAULT);
     }
 
     @Test
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
index 6712768ba..303fda635 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
@@ -39,23 +39,11 @@ import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.TupleImpl;
 import org.apache.storm.tuple.Values;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.client.Response;
-import org.elasticsearch.client.ResponseException;
-import org.elasticsearch.cluster.ClusterName;
-import org.elasticsearch.cluster.health.ClusterHealthStatus;
-import org.elasticsearch.cluster.metadata.IndexMetaData;
-import org.elasticsearch.common.Priority;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.util.concurrent.EsExecutors;
-import org.elasticsearch.index.IndexNotFoundException;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.node.NodeBuilder;
+import org.elasticsearch.client.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
 
 public class EsTestUtil {
     private static final Logger LOG = LoggerFactory.getLogger(EsTestUtil.class);
@@ -91,40 +79,32 @@ public class EsTestUtil {
         return new ResponseException(response);
     }
 
-    public static Node startEsNode(){
-        Node node = NodeBuilder.nodeBuilder().data(true).settings(
-                Settings.settingsBuilder()
-                        .put(ClusterName.SETTING, EsConstants.clusterName)
-                        .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
-                        .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
-                        .put(EsExecutors.PROCESSORS, 1)
-                        .put("http.enabled", true)
-                        .put("index.percolator.map_unmapped_fields_as_string", true)
-                        .put("index.store.type", "mmapfs")
-                        .put("path.home", "./data")
-        ).build();
-        node.start();
-        return node;
+    public static RestHighLevelClient getRestHighLevelClient(ElasticsearchContainer node) {
+        final EsConfig cfg = new EsConfig(node.getHttpHostAddress());
+        return new RestHighLevelClientBuilder(new StormElasticSearchClient(cfg).construct()).build();
     }
 
-    public static void ensureEsGreen(Node node) {
-        ClusterHealthResponse chr = node.client()
-                                        .admin()
-                                        .cluster()
-                                        .health(Requests.clusterHealthRequest()
-                                                        .timeout(TimeValue.timeValueSeconds(30))
-                                                        .waitForGreenStatus()
-                                                        .waitForEvents(Priority.LANGUID)
-                                                        .waitForRelocatingShards(0))
-                                        .actionGet();
-        assertThat("cluster status is green", chr.getStatus(), equalTo(ClusterHealthStatus.GREEN));
+    public static ElasticsearchContainer startEsNode() {
+        String version = System.getProperty("elasticsearch-version");
+        if (version == null) version = "7.17.13";
+        LOG.info("Starting docker instance of Elasticsearch {}...", version);
+
+        final ElasticsearchContainer container =
+                new ElasticsearchContainer(
+                        "docker.elastic.co/elasticsearch/elasticsearch:" + version);
+        container.start();
+        return container;
+    }
+
+    public static void ensureEsGreen(ElasticsearchContainer node) {
+        assertThat("cluster status is green", node.isHealthy());
     }
 
     /**
      * Stop the given Elasticsearch node and clear the data directory.
      * @param node
      */
-    public static void stopEsNode(Node node) {
+    public static void stopEsNode(ElasticsearchContainer node) {
         node.close();
         try {
             FileUtils.deleteDirectory(new File("./data"));
@@ -138,10 +118,10 @@ public class EsTestUtil {
      * @param node - the node to connect to
      * @param index - the index to clear
      */
-    public static void clearIndex(Node node, String index) {
+    public static void clearIndex(ElasticsearchContainer node, String index) {
         try {
-            node.client().admin().indices().delete(new DeleteIndexRequest(index)).actionGet();
-        } catch (IndexNotFoundException ignore) {
+            getRestHighLevelClient(node).indices().delete(new DeleteIndexRequest(index), RequestOptions.DEFAULT);
+        } catch (IOException ignore) {
 
         }
     }
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateTest.java
index bcabe9e60..8e86e7dad 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateTest.java
@@ -33,12 +33,13 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.junit.jupiter.MockitoExtension;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
 
 @IntegrationTest
 @ExtendWith(MockitoExtension.class)
 public class EsStateTest {
     
-    private static Node node;
+    private static ElasticsearchContainer node;
 
     private final String[] documentId = {UUID.randomUUID().toString(), UUID.randomUUID().toString()};
     private final String index = "index";
diff --git a/pom.xml b/pom.xml
index 58d6eeb71..c1716ee15 100644
--- a/pom.xml
+++ b/pom.xml
@@ -126,7 +126,7 @@
         <awaitility.version>3.1.0</awaitility.version>
         <hdrhistogram.version>2.1.10</hdrhistogram.version>
         <hamcrest.version>2.2</hamcrest.version>
-        <elasticsearch.version>5.2.2</elasticsearch.version>
+        <elasticsearch.version>7.17.13</elasticsearch.version>
         <calcite.version>1.16.0</calcite.version>
         <jedis.version>2.9.0</jedis.version>
         <activemq.version>5.18.3</activemq.version>