You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/03/16 05:46:39 UTC
[incubator-seatunnel] branch dev updated: [Feature][Connector-V2][ES] Support dsl filter (#4130)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 79ca87833 [Feature][Connector-V2][ES] Support dsl filter (#4130)
79ca87833 is described below
commit 79ca878338499ba10e6f72fad0fadded53d17127
Author: 王一川 <wj...@outlook.com>
AuthorDate: Thu Mar 16 13:46:33 2023 +0800
[Feature][Connector-V2][ES] Support dsl filter (#4130)
* [Feature][Connector-V2][ES]Support DSL
---
docs/en/connector-v2/source/Elasticsearch.md | 43 +++++++++++++---------
release-note.md | 1 +
.../elasticsearch/client/EsRestClient.java | 8 ++--
.../elasticsearch/config/SourceConfig.java | 11 ++++++
.../elasticsearch/dto/source/SourceIndexInfo.java | 2 +
.../source/ElasticsearchSourceFactory.java | 2 +
.../source/ElasticsearchSourceReader.java | 1 +
.../source/ElasticsearchSourceSplitEnumerator.java | 10 ++++-
.../connector/elasticsearch/ElasticsearchIT.java | 31 ++++++++++++++--
.../elasticsearch_source_and_sink.conf | 1 +
10 files changed, 86 insertions(+), 24 deletions(-)
diff --git a/docs/en/connector-v2/source/Elasticsearch.md b/docs/en/connector-v2/source/Elasticsearch.md
index 28a211e92..dfb876a85 100644
--- a/docs/en/connector-v2/source/Elasticsearch.md
+++ b/docs/en/connector-v2/source/Elasticsearch.md
@@ -19,23 +19,24 @@ support version >= 2.x and < 8.x.
## Options
-| name | type | required | default value |
-|-------------------------|---------|----------|---------------|
-| hosts | array | yes | - |
-| username | string | no | - |
-| password | string | no | - |
-| index | string | yes | - |
-| source | array | no | - |
-| scroll_time | string | no | 1m |
-| scroll_size | int | no | 100 |
-| schema | | no | - |
-| tls_verify_certificate | boolean | no | true |
-| tls_verify_hostnames | boolean | no | true |
-| tls_keystore_path | string | no | - |
-| tls_keystore_password | string | no | - |
-| tls_truststore_path | string | no | - |
-| tls_truststore_password | string | no | - |
-| common-options | | no | - |
+| name | type | required | default value |
+|-------------------------|---------|----------|-------------------|
+| hosts | array | yes | - |
+| username | string | no | - |
+| password | string | no | - |
+| index | string | yes | - |
+| source | array | no | - |
+| query | json | no | {"match_all": {}} |
+| scroll_time | string | no | 1m |
+| scroll_size | int | no | 100 |
+| schema | | no | - |
+| tls_verify_certificate | boolean | no | true |
+| tls_verify_hostnames | boolean | no | true |
+| tls_keystore_path | string | no | - |
+| tls_keystore_password | string | no | - |
+| tls_truststore_path | string | no | - |
+| tls_truststore_password | string | no | - |
+| common-options | | no | - |
### hosts [array]
@@ -59,6 +60,11 @@ The fields of index.
You can get the document id by specifying the field `_id`.If sink _id to other index,you need specify an alias for _id due to the Elasticsearch limit.
If you don't config source, you must config `schema`.
+### query [json]
+
+Elasticsearch DSL.
+You can control the range of data read.
+
### scroll_time [String]
Amount of time Elasticsearch will keep the search context alive for scroll requests.
@@ -109,6 +115,7 @@ Elasticsearch {
hosts = ["localhost:9200"]
index = "seatunnel-*"
source = ["_id","name","age"]
+ query = {"range":{"firstPacket":{"gte":1669225429990,"lte":1669225429990}}}
}
```
@@ -136,6 +143,7 @@ Elasticsearch {
c_timestamp = timestamp
}
}
+ query = {"range":{"firstPacket":{"gte":1669225429990,"lte":1669225429990}}}
}
```
@@ -188,4 +196,5 @@ source {
- Add Elasticsearch Source Connector
- [Feature] Support https protocol & compatible with opensearch ([3997](https://github.com/apache/incubator-seatunnel/pull/3997))
+- [Feature] Support DSL
diff --git a/release-note.md b/release-note.md
index c4b9c2fb4..c08662b32 100644
--- a/release-note.md
+++ b/release-note.md
@@ -14,6 +14,7 @@
- [ALL]Add SQL Transform #4148
### Connectors
- [Elasticsearch] Support https protocol & compatible with opensearch
+- [Elasticsearch] Support DSL
- [Hbase] Add hbase sink connector #4049
- [Github] Add Github source connector #4155
- [CDC] Support export debezium-json format to kafka #4339
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
index ae78d6d15..ed5680323 100644
--- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
@@ -301,10 +301,12 @@ public class EsRestClient {
* @param scrollSize fetch documents count in one request
*/
public ScrollResult searchByScroll(
- String index, List<String> source, String scrollTime, int scrollSize) {
+ String index,
+ List<String> source,
+ Map<String, Object> query,
+ String scrollTime,
+ int scrollSize) {
Map<String, Object> param = new HashMap<>();
- Map<String, Object> query = new HashMap<>();
- query.put("match_all", new HashMap<String, String>());
param.put("query", query);
param.put("_source", source);
param.put("sort", new String[] {"_doc"});
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SourceConfig.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SourceConfig.java
index 075da68fe..f5c885ed3 100644
--- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SourceConfig.java
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SourceConfig.java
@@ -20,7 +20,10 @@ package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
public class SourceConfig {
@@ -51,4 +54,12 @@ public class SourceConfig {
.defaultValue(100)
.withDescription(
"Maximum number of hits to be returned with each Elasticsearch scroll request");
+
+ public static final Option<Map> QUERY =
+ Options.key("query")
+ .objectType(Map.class)
+ .defaultValue(
+ Collections.singletonMap("match_all", new HashMap<String, String>()))
+ .withDescription(
+ "Elasticsearch query language. You can control the range of data read");
}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/SourceIndexInfo.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/SourceIndexInfo.java
index 257d94ea8..6c0a5667d 100644
--- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/SourceIndexInfo.java
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/SourceIndexInfo.java
@@ -22,12 +22,14 @@ import lombok.Data;
import java.io.Serializable;
import java.util.List;
+import java.util.Map;
@Data
@AllArgsConstructor
public class SourceIndexInfo implements Serializable {
private String index;
private List<String> source;
+ private Map<String, Object> query;
private String scrollTime;
private int scrollSize;
}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
index 8d21aa011..5c4d71288 100644
--- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
@@ -35,6 +35,7 @@ import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsC
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.TLS_VERIFY_HOSTNAME;
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.USERNAME;
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig.INDEX;
+import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig.QUERY;
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig.SCROLL_SIZE;
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig.SCROLL_TIME;
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig.SOURCE;
@@ -55,6 +56,7 @@ public class ElasticsearchSourceFactory implements TableSourceFactory {
PASSWORD,
SCROLL_TIME,
SCROLL_SIZE,
+ QUERY,
TLS_VERIFY_CERTIFICATE,
TLS_VERIFY_HOSTNAME,
TLS_KEY_STORE_PATH,
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java
index 9e0eb0853..5c3268894 100644
--- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java
@@ -83,6 +83,7 @@ public class ElasticsearchSourceReader
esRestClient.searchByScroll(
sourceIndexInfo.getIndex(),
sourceIndexInfo.getSource(),
+ sourceIndexInfo.getQuery(),
sourceIndexInfo.getScrollTime(),
sourceIndexInfo.getScrollSize());
outputFromScrollResult(scrollResult, sourceIndexInfo.getSource(), output);
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
index ced936d87..2c6657765 100644
--- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
@@ -149,6 +149,10 @@ public class ElasticsearchSourceSplitEnumerator
if (pluginConfig.hasPath(SourceConfig.SCROLL_SIZE.key())) {
scrollSize = pluginConfig.getInt(SourceConfig.SCROLL_SIZE.key());
}
+ Map query = SourceConfig.QUERY.defaultValue();
+ if (pluginConfig.hasPath(SourceConfig.QUERY.key())) {
+ query = (Map) pluginConfig.getAnyRef(SourceConfig.QUERY.key());
+ }
List<IndexDocsCount> indexDocsCounts =
esRestClient.getIndexDocsCount(pluginConfig.getString(SourceConfig.INDEX.key()));
@@ -162,7 +166,11 @@ public class ElasticsearchSourceSplitEnumerator
new ElasticsearchSourceSplit(
String.valueOf(indexDocsCount.getIndex().hashCode()),
new SourceIndexInfo(
- indexDocsCount.getIndex(), source, scrollTime, scrollSize)));
+ indexDocsCount.getIndex(),
+ source,
+ query,
+ scrollTime,
+ scrollSize)));
}
return splits;
}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
index 2f010f0d5..ec9e77847 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.e2e.connector.elasticsearch;
import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seatunnel.common.utils.JsonUtils;
@@ -125,8 +126,9 @@ public class ElasticsearchIT extends TestSuiteBase implements TestResource {
Container.ExecResult execResult =
container.executeJob("/elasticsearch/elasticsearch_source_and_sink.conf");
Assertions.assertEquals(0, execResult.getExitCode());
- List<String> sinData = readSinkData();
- Assertions.assertIterableEquals(testDataset, sinData);
+ List<String> sinkData = readSinkData();
+ // for DSL is: {"range":{"c_int":{"gte":10,"lte":20}}}
+ Assertions.assertIterableEquals(mapTestDatasetForDSL(), sinkData);
}
private List<String> generateTestDataSet()
@@ -197,7 +199,15 @@ public class ElasticsearchIT extends TestSuiteBase implements TestResource {
"c_bytes",
"c_date",
"c_timestamp");
- ScrollResult scrollResult = esRestClient.searchByScroll("st_index2", source, "1m", 1000);
+ HashMap<String, Object> rangeParam = new HashMap<>();
+ rangeParam.put("gte", 10);
+ rangeParam.put("lte", 20);
+ HashMap<String, Object> range = new HashMap<>();
+ range.put("c_int", rangeParam);
+ Map<String, Object> query = new HashMap<>();
+ query.put("range", range);
+ ScrollResult scrollResult =
+ esRestClient.searchByScroll("st_index2", source, query, "1m", 1000);
scrollResult
.getDocs()
.forEach(
@@ -216,6 +226,21 @@ public class ElasticsearchIT extends TestSuiteBase implements TestResource {
return docs;
}
+ private List<String> mapTestDatasetForDSL() {
+ return testDataset.stream()
+ .map(JsonUtils::parseObject)
+ .filter(
+ node -> {
+ if (node.hasNonNull("c_int")) {
+ int cInt = node.get("c_int").asInt();
+ return cInt >= 10 && cInt <= 20;
+ }
+ return false;
+ })
+ .map(JsonNode::toString)
+ .collect(Collectors.toList());
+ }
+
@AfterEach
@Override
public void tearDown() {
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
index e7643dc07..cfca96073 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
@@ -36,6 +36,7 @@ source {
tls_verify_hostname = false
index = "st_index"
+ query = {"range":{"c_int":{"gte":10,"lte":20}}}
schema = {
fields {
c_map = "map<string, tinyint>"