You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/03/16 05:46:39 UTC

[incubator-seatunnel] branch dev updated: [Feature][Connector-V2][ES] Support dsl filter (#4130)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 79ca87833 [Feature][Connector-V2][ES] Support dsl filter (#4130)
79ca87833 is described below

commit 79ca878338499ba10e6f72fad0fadded53d17127
Author: 王一川 <wj...@outlook.com>
AuthorDate: Thu Mar 16 13:46:33 2023 +0800

    [Feature][Connector-V2][ES] Support dsl filter (#4130)
    
    * [Feature][Connector-V2][ES]Support DSL
---
 docs/en/connector-v2/source/Elasticsearch.md       | 43 +++++++++++++---------
 release-note.md                                    |  1 +
 .../elasticsearch/client/EsRestClient.java         |  8 ++--
 .../elasticsearch/config/SourceConfig.java         | 11 ++++++
 .../elasticsearch/dto/source/SourceIndexInfo.java  |  2 +
 .../source/ElasticsearchSourceFactory.java         |  2 +
 .../source/ElasticsearchSourceReader.java          |  1 +
 .../source/ElasticsearchSourceSplitEnumerator.java | 10 ++++-
 .../connector/elasticsearch/ElasticsearchIT.java   | 31 ++++++++++++++--
 .../elasticsearch_source_and_sink.conf             |  1 +
 10 files changed, 86 insertions(+), 24 deletions(-)

diff --git a/docs/en/connector-v2/source/Elasticsearch.md b/docs/en/connector-v2/source/Elasticsearch.md
index 28a211e92..dfb876a85 100644
--- a/docs/en/connector-v2/source/Elasticsearch.md
+++ b/docs/en/connector-v2/source/Elasticsearch.md
@@ -19,23 +19,24 @@ support version >= 2.x and < 8.x.
 
 ## Options
 
-|          name           |  type   | required | default value |
-|-------------------------|---------|----------|---------------|
-| hosts                   | array   | yes      | -             |
-| username                | string  | no       | -             |
-| password                | string  | no       | -             |
-| index                   | string  | yes      | -             |
-| source                  | array   | no       | -             |
-| scroll_time             | string  | no       | 1m            |
-| scroll_size             | int     | no       | 100           |
-| schema                  |         | no       | -             |
-| tls_verify_certificate  | boolean | no       | true          |
-| tls_verify_hostnames    | boolean | no       | true          |
-| tls_keystore_path       | string  | no       | -             |
-| tls_keystore_password   | string  | no       | -             |
-| tls_truststore_path     | string  | no       | -             |
-| tls_truststore_password | string  | no       | -             |
-| common-options          |         | no       | -             |
+|          name           |  type   | required |   default value   |
+|-------------------------|---------|----------|-------------------|
+| hosts                   | array   | yes      | -                 |
+| username                | string  | no       | -                 |
+| password                | string  | no       | -                 |
+| index                   | string  | yes      | -                 |
+| source                  | array   | no       | -                 |
+| query                   | json    | no       | {"match_all": {}} |
+| scroll_time             | string  | no       | 1m                |
+| scroll_size             | int     | no       | 100               |
+| schema                  |         | no       | -                 |
+| tls_verify_certificate  | boolean | no       | true              |
+| tls_verify_hostnames    | boolean | no       | true              |
+| tls_keystore_path       | string  | no       | -                 |
+| tls_keystore_password   | string  | no       | -                 |
+| tls_truststore_path     | string  | no       | -                 |
+| tls_truststore_password | string  | no       | -                 |
+| common-options          |         | no       | -                 |
 
 ### hosts [array]
 
@@ -59,6 +60,11 @@ The fields of index.
 You can get the document id by specifying the field `_id`.If sink _id to other index,you need specify an alias for _id due to the Elasticsearch limit.
 If you don't config source, you must config `schema`.
 
+### query [json]
+
+Elasticsearch DSL.
+You can control the range of data read.
+
 ### scroll_time [String]
 
 Amount of time Elasticsearch will keep the search context alive for scroll requests.
@@ -109,6 +115,7 @@ Elasticsearch {
     hosts = ["localhost:9200"]
     index = "seatunnel-*"
     source = ["_id","name","age"]
+    query = {"range":{"firstPacket":{"gte":1669225429990,"lte":1669225429990}}}
 }
 ```
 
@@ -136,6 +143,7 @@ Elasticsearch {
             c_timestamp = timestamp
         }
     }
+    query = {"range":{"firstPacket":{"gte":1669225429990,"lte":1669225429990}}}
 }
 ```
 
@@ -188,4 +196,5 @@ source {
 
 - Add Elasticsearch Source Connector
 - [Feature] Support https protocol & compatible with opensearch ([3997](https://github.com/apache/incubator-seatunnel/pull/3997))
+- [Feature] Support DSL
 
diff --git a/release-note.md b/release-note.md
index c4b9c2fb4..c08662b32 100644
--- a/release-note.md
+++ b/release-note.md
@@ -14,6 +14,7 @@
 - [ALL]Add SQL Transform #4148 
 ### Connectors
 - [Elasticsearch] Support https protocol & compatible with opensearch
+- [Elasticsearch] Support DSL
 - [Hbase] Add hbase sink connector #4049
 - [Github] Add Github source connector #4155
 - [CDC] Support export debezium-json format to kafka #4339
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
index ae78d6d15..ed5680323 100644
--- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
@@ -301,10 +301,12 @@ public class EsRestClient {
      * @param scrollSize fetch documents count in one request
      */
     public ScrollResult searchByScroll(
-            String index, List<String> source, String scrollTime, int scrollSize) {
+            String index,
+            List<String> source,
+            Map<String, Object> query,
+            String scrollTime,
+            int scrollSize) {
         Map<String, Object> param = new HashMap<>();
-        Map<String, Object> query = new HashMap<>();
-        query.put("match_all", new HashMap<String, String>());
         param.put("query", query);
         param.put("_source", source);
         param.put("sort", new String[] {"_doc"});
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SourceConfig.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SourceConfig.java
index 075da68fe..f5c885ed3 100644
--- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SourceConfig.java
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SourceConfig.java
@@ -20,7 +20,10 @@ package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 public class SourceConfig {
 
@@ -51,4 +54,12 @@ public class SourceConfig {
                     .defaultValue(100)
                     .withDescription(
                             "Maximum number of hits to be returned with each Elasticsearch scroll request");
+
+    public static final Option<Map> QUERY =
+            Options.key("query")
+                    .objectType(Map.class)
+                    .defaultValue(
+                            Collections.singletonMap("match_all", new HashMap<String, String>()))
+                    .withDescription(
+                            "Elasticsearch query language. You can control the range of data read");
 }
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/SourceIndexInfo.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/SourceIndexInfo.java
index 257d94ea8..6c0a5667d 100644
--- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/SourceIndexInfo.java
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/SourceIndexInfo.java
@@ -22,12 +22,14 @@ import lombok.Data;
 
 import java.io.Serializable;
 import java.util.List;
+import java.util.Map;
 
 @Data
 @AllArgsConstructor
 public class SourceIndexInfo implements Serializable {
     private String index;
     private List<String> source;
+    private Map<String, Object> query;
     private String scrollTime;
     private int scrollSize;
 }
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
index 8d21aa011..5c4d71288 100644
--- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
@@ -35,6 +35,7 @@ import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsC
 import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.TLS_VERIFY_HOSTNAME;
 import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.USERNAME;
 import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig.INDEX;
+import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig.QUERY;
 import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig.SCROLL_SIZE;
 import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig.SCROLL_TIME;
 import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig.SOURCE;
@@ -55,6 +56,7 @@ public class ElasticsearchSourceFactory implements TableSourceFactory {
                         PASSWORD,
                         SCROLL_TIME,
                         SCROLL_SIZE,
+                        QUERY,
                         TLS_VERIFY_CERTIFICATE,
                         TLS_VERIFY_HOSTNAME,
                         TLS_KEY_STORE_PATH,
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java
index 9e0eb0853..5c3268894 100644
--- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java
@@ -83,6 +83,7 @@ public class ElasticsearchSourceReader
                         esRestClient.searchByScroll(
                                 sourceIndexInfo.getIndex(),
                                 sourceIndexInfo.getSource(),
+                                sourceIndexInfo.getQuery(),
                                 sourceIndexInfo.getScrollTime(),
                                 sourceIndexInfo.getScrollSize());
                 outputFromScrollResult(scrollResult, sourceIndexInfo.getSource(), output);
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
index ced936d87..2c6657765 100644
--- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
@@ -149,6 +149,10 @@ public class ElasticsearchSourceSplitEnumerator
         if (pluginConfig.hasPath(SourceConfig.SCROLL_SIZE.key())) {
             scrollSize = pluginConfig.getInt(SourceConfig.SCROLL_SIZE.key());
         }
+        Map query = SourceConfig.QUERY.defaultValue();
+        if (pluginConfig.hasPath(SourceConfig.QUERY.key())) {
+            query = (Map) pluginConfig.getAnyRef(SourceConfig.QUERY.key());
+        }
 
         List<IndexDocsCount> indexDocsCounts =
                 esRestClient.getIndexDocsCount(pluginConfig.getString(SourceConfig.INDEX.key()));
@@ -162,7 +166,11 @@ public class ElasticsearchSourceSplitEnumerator
                     new ElasticsearchSourceSplit(
                             String.valueOf(indexDocsCount.getIndex().hashCode()),
                             new SourceIndexInfo(
-                                    indexDocsCount.getIndex(), source, scrollTime, scrollSize)));
+                                    indexDocsCount.getIndex(),
+                                    source,
+                                    query,
+                                    scrollTime,
+                                    scrollSize)));
         }
         return splits;
     }
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
index 2f010f0d5..ec9e77847 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.e2e.connector.elasticsearch;
 
 import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
 
 import org.apache.seatunnel.common.utils.JsonUtils;
@@ -125,8 +126,9 @@ public class ElasticsearchIT extends TestSuiteBase implements TestResource {
         Container.ExecResult execResult =
                 container.executeJob("/elasticsearch/elasticsearch_source_and_sink.conf");
         Assertions.assertEquals(0, execResult.getExitCode());
-        List<String> sinData = readSinkData();
-        Assertions.assertIterableEquals(testDataset, sinData);
+        List<String> sinkData = readSinkData();
+        // for DSL is: {"range":{"c_int":{"gte":10,"lte":20}}}
+        Assertions.assertIterableEquals(mapTestDatasetForDSL(), sinkData);
     }
 
     private List<String> generateTestDataSet()
@@ -197,7 +199,15 @@ public class ElasticsearchIT extends TestSuiteBase implements TestResource {
                         "c_bytes",
                         "c_date",
                         "c_timestamp");
-        ScrollResult scrollResult = esRestClient.searchByScroll("st_index2", source, "1m", 1000);
+        HashMap<String, Object> rangeParam = new HashMap<>();
+        rangeParam.put("gte", 10);
+        rangeParam.put("lte", 20);
+        HashMap<String, Object> range = new HashMap<>();
+        range.put("c_int", rangeParam);
+        Map<String, Object> query = new HashMap<>();
+        query.put("range", range);
+        ScrollResult scrollResult =
+                esRestClient.searchByScroll("st_index2", source, query, "1m", 1000);
         scrollResult
                 .getDocs()
                 .forEach(
@@ -216,6 +226,21 @@ public class ElasticsearchIT extends TestSuiteBase implements TestResource {
         return docs;
     }
 
+    private List<String> mapTestDatasetForDSL() {
+        return testDataset.stream()
+                .map(JsonUtils::parseObject)
+                .filter(
+                        node -> {
+                            if (node.hasNonNull("c_int")) {
+                                int cInt = node.get("c_int").asInt();
+                                return cInt >= 10 && cInt <= 20;
+                            }
+                            return false;
+                        })
+                .map(JsonNode::toString)
+                .collect(Collectors.toList());
+    }
+
     @AfterEach
     @Override
     public void tearDown() {
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
index e7643dc07..cfca96073 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
@@ -36,6 +36,7 @@ source {
         tls_verify_hostname = false
 
         index = "st_index"
+        query = {"range":{"c_int":{"gte":10,"lte":20}}}
         schema = {
           fields {
             c_map = "map<string, tinyint>"