You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by wy...@apache.org on 2021/04/12 03:24:06 UTC

[incubator-doris] branch master updated: [Doris On ES][WIP] Support external ES table with `SSL` secured and configurable node sniffing (#5325)

This is an automated email from the ASF dual-hosted git repository.

wyf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 75db273  [Doris On ES][WIP] Support external ES  table with `SSL` secured and configurable node sniffing (#5325)
75db273 is described below

commit 75db273b9358d95ea5bc402a04a77424abf39754
Author: Stalary <45...@qq.com>
AuthorDate: Mon Apr 12 11:23:49 2021 +0800

    [Doris On ES][WIP] Support external ES  table with `SSL` secured and configurable node sniffing (#5325)
    
    Support external ES  table with `SSL` secured and configurable node sniffing
---
 be/src/exec/es/es_scan_reader.cpp                  | 12 ++++
 be/src/exec/es/es_scan_reader.h                    |  3 +
 be/src/http/http_client.h                          |  6 ++
 docs/en/extending-doris/doris-on-es.md             | 57 ++++++++++++++++++
 docs/zh-CN/extending-doris/doris-on-es.md          | 58 ++++++++++++++++++
 .../java/org/apache/doris/catalog/Catalog.java     |  2 +
 .../java/org/apache/doris/catalog/EsTable.java     | 65 +++++++++++++++------
 .../doris/external/elasticsearch/EsNodeInfo.java   | 27 ++++++++-
 .../doris/external/elasticsearch/EsRepository.java |  2 +-
 .../doris/external/elasticsearch/EsRestClient.java | 68 +++++++++++++++++++++-
 .../external/elasticsearch/EsTablePartitions.java  | 18 ------
 .../doris/external/elasticsearch/EsUtil.java       | 13 +++++
 .../external/elasticsearch/PartitionPhase.java     | 11 +++-
 .../external/elasticsearch/SearchContext.java      |  8 +++
 .../java/org/apache/doris/planner/EsScanNode.java  |  1 +
 .../external/elasticsearch/PartitionPhaseTest.java |  2 +-
 16 files changed, 310 insertions(+), 43 deletions(-)

diff --git a/be/src/exec/es/es_scan_reader.cpp b/be/src/exec/es/es_scan_reader.cpp
index 7dc3809..87131a7 100644
--- a/be/src/exec/es/es_scan_reader.cpp
+++ b/be/src/exec/es/es_scan_reader.cpp
@@ -60,6 +60,9 @@ ESScanReader::ESScanReader(const std::string& target,
     if (props.find(KEY_QUERY) != props.end()) {
         _query = props.at(KEY_QUERY);
     }
+    if (props.find(KEY_HTTP_SSL_ENABLED) != props.end()) {
+        std::istringstream(props.at(KEY_HTTP_SSL_ENABLED)) >> std::boolalpha >> _use_ssl_client;
+    }
 
     std::string batch_size_str = props.at(KEY_BATCH_SIZE);
     _batch_size = atoi(batch_size_str.c_str());
@@ -103,6 +106,9 @@ Status ESScanReader::open() {
     }
     _network_client.set_basic_auth(_user_name, _passwd);
     _network_client.set_content_type("application/json");
+    if (_use_ssl_client) {
+        _network_client.use_untrusted_ssl();
+    }
     // phase open, we cached the first response for `get_next` phase
     Status status = _network_client.execute_post_request(_query, &_cached_response);
     if (!status.ok() || _network_client.get_http_status() != 200) {
@@ -134,6 +140,9 @@ Status ESScanReader::get_next(bool* scan_eos, std::unique_ptr<ScrollParser>& scr
         _network_client.set_basic_auth(_user_name, _passwd);
         _network_client.set_content_type("application/json");
         _network_client.set_timeout_ms(_http_timeout_ms);
+        if (_use_ssl_client) {
+            _network_client.use_untrusted_ssl();
+        }
         RETURN_IF_ERROR(_network_client.execute_post_request(
                 ESScrollQueryBuilder::build_next_scroll_body(_scroll_id, _scroll_keep_alive),
                 &response));
@@ -188,6 +197,9 @@ Status ESScanReader::close() {
     _network_client.set_method(DELETE);
     _network_client.set_content_type("application/json");
     _network_client.set_timeout_ms(5 * 1000);
+    if (_use_ssl_client) {
+        _network_client.use_untrusted_ssl();
+    }
     std::string response;
     RETURN_IF_ERROR(_network_client.execute_delete_request(
             ESScrollQueryBuilder::build_clear_scroll_body(_scroll_id), &response));
diff --git a/be/src/exec/es/es_scan_reader.h b/be/src/exec/es/es_scan_reader.h
index 6a1f9d4..f162dc3 100644
--- a/be/src/exec/es/es_scan_reader.h
+++ b/be/src/exec/es/es_scan_reader.h
@@ -40,6 +40,7 @@ public:
     static constexpr const char* KEY_BATCH_SIZE = "batch_size";
     static constexpr const char* KEY_TERMINATE_AFTER = "limit";
     static constexpr const char* KEY_DOC_VALUES_MODE = "doc_values_mode";
+    static constexpr const char* KEY_HTTP_SSL_ENABLED = "http_ssl_enabled";
     ESScanReader(const std::string& target, const std::map<std::string, std::string>& props,
                  bool doc_value_mode);
     ~ESScanReader();
@@ -63,6 +64,8 @@ private:
     std::string _query;
     // Elasticsearch shards to fetch document
     std::string _shards;
+    // whether use ssl client
+    bool _use_ssl_client = false;
     // distinguish the first scroll phase and the following scroll
     bool _is_first;
 
diff --git a/be/src/http/http_client.h b/be/src/http/http_client.h
index cdbe143..5fec678 100644
--- a/be/src/http/http_client.h
+++ b/be/src/http/http_client.h
@@ -66,6 +66,12 @@ public:
         curl_easy_setopt(_curl, CURLOPT_COPYPOSTFIELDS, post_body.c_str());
     }
 
+    // Currently, only fake SSL configurations are supported
+    void use_untrusted_ssl() {
+        curl_easy_setopt(_curl, CURLOPT_SSL_VERIFYPEER, 0L);
+        curl_easy_setopt(_curl, CURLOPT_SSL_VERIFYHOST, 0L);
+    }
+
     // TODO(zc): support set header
     // void set_header(const std::string& key, const std::string& value) {
     // _cntl.http_request().SetHeader(key, value);
diff --git a/docs/en/extending-doris/doris-on-es.md b/docs/en/extending-doris/doris-on-es.md
index 3a7209a..9be1bed 100644
--- a/docs/en/extending-doris/doris-on-es.md
+++ b/docs/en/extending-doris/doris-on-es.md
@@ -328,6 +328,63 @@ This term does not match any term in the dictionary, and will not return any res
 
 The type of `k4.keyword` is `keyword`, and writing data into ES is a complete term, so it can be matched
 
+### Enable node discovery mechanism, default is true(es\_nodes\_discovery=true)
+
+```
+CREATE EXTERNAL TABLE `test` (
+  `k1` bigint(20) COMMENT "",
+  `k2` datetime COMMENT "",
+  `k3` varchar(20) COMMENT "",
+  `k4` varchar(100) COMMENT "",
+  `k5` float COMMENT ""
+) ENGINE=ELASTICSEARCH
+PROPERTIES (
+"hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200",
+"index" = "test”,
+"type" = "doc",
+"user" = "root",
+"password" = "root",
+
+"nodes_discovery" = "true"
+);
+```
+
+Parameter Description:
+
+Parameter | Description
+---|---
+**es\_nodes\_discovery** | Whether or not to enable ES node discovery. the default is true
+
+Doris would find all available related data nodes (shards allocated on)from ES when this is true.  Just set false if address of  ES data nodes are not accessed by Doris BE, eg. the ES cluster is deployed in the intranet which isolated from your public Internet, and users access through a proxy
+
+### Whether ES cluster enables https access mode, if enabled should set value with`true`, default is false(http\_ssl\_enable=true)
+
+```
+CREATE EXTERNAL TABLE `test` (
+  `k1` bigint(20) COMMENT "",
+  `k2` datetime COMMENT "",
+  `k3` varchar(20) COMMENT "",
+  `k4` varchar(100) COMMENT "",
+  `k5` float COMMENT ""
+) ENGINE=ELASTICSEARCH
+PROPERTIES (
+"hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200",
+"index" = "test”,
+"type" = "doc",
+"user" = "root",
+"password" = "root",
+
+"http_ssl_enabled" = "true"
+);
+```
+
+Parameter Description:
+
+Parameter | Description
+---|---
+**http\_ssl\_enabled** | Whether ES cluster enables https access mode
+
+The current FE/BE implementation is to trust all, this is a temporary solution, and the real user configuration certificate will be used later
 
 ### Query usage
 
diff --git a/docs/zh-CN/extending-doris/doris-on-es.md b/docs/zh-CN/extending-doris/doris-on-es.md
index a1c3148..4a046f0 100644
--- a/docs/zh-CN/extending-doris/doris-on-es.md
+++ b/docs/zh-CN/extending-doris/doris-on-es.md
@@ -325,6 +325,64 @@ POST /_analyze
 
 `k4.keyword` 的类型是`keyword`,数据写入ES中是一个完整的term,所以可以匹配
 
+### 开启节点自动发现, 默认为true(es\_nodes\_discovery=true)
+
+```
+CREATE EXTERNAL TABLE `test` (
+  `k1` bigint(20) COMMENT "",
+  `k2` datetime COMMENT "",
+  `k3` varchar(20) COMMENT "",
+  `k4` varchar(100) COMMENT "",
+  `k5` float COMMENT ""
+) ENGINE=ELASTICSEARCH
+PROPERTIES (
+"hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200",
+"index" = "test”,
+"type" = "doc",
+"user" = "root",
+"password" = "root",
+
+"nodes_discovery" = "true"
+);
+```
+
+参数说明:
+
+参数 | 说明
+---|---
+**es\_nodes\_discovery** | 是否开启es节点发现,默认为true
+
+当配置为true时,Doris将从ES找到所有可用的相关数据节点(在上面分配的分片)。如果ES数据节点的地址没有被Doris BE访问,则设置为false。ES集群部署在与公共Internet隔离的内网,用户通过代理访问
+
+### ES集群是否开启https访问模式,如果开启应设置为`true`,默认为false(http\_ssl\_enabled=true)
+
+```
+CREATE EXTERNAL TABLE `test` (
+  `k1` bigint(20) COMMENT "",
+  `k2` datetime COMMENT "",
+  `k3` varchar(20) COMMENT "",
+  `k4` varchar(100) COMMENT "",
+  `k5` float COMMENT ""
+) ENGINE=ELASTICSEARCH
+PROPERTIES (
+"hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200",
+"index" = "test”,
+"type" = "doc",
+"user" = "root",
+"password" = "root",
+
+"http_ssl_enabled" = "true"
+);
+```
+
+参数说明:
+
+参数 | 说明
+---|---
+**http\_ssl\_enabled** | ES集群是否开启https访问模式
+
+目前会fe/be实现方式为信任所有,这是临时解决方案,后续会使用真实的用户配置证书
+
 ### 查询用法
 
 完成在Doris中建立ES外表后,除了无法使用Doris中的数据模型(rollup、预聚合、物化视图等)外并无区别
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index 7d76648..c366762 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -4146,6 +4146,8 @@ public class Catalog {
             sb.append("\"enable_docvalue_scan\" = \"").append(esTable.isDocValueScanEnable()).append("\",\n");
             sb.append("\"max_docvalue_fields\" = \"").append(esTable.maxDocValueFields()).append("\",\n");
             sb.append("\"enable_keyword_sniff\" = \"").append(esTable.isKeywordSniffEnable()).append("\"\n");
+            sb.append("\"nodes_discovery\" = \"").append(esTable.isNodesDiscovery()).append("\"\n");
+            sb.append("\"http_ssl_enabled\" = \"").append(esTable.isHttpSslEnabled()).append("\"\n");
             sb.append(")");
         } else if (table.getType() == TableType.HIVE) {
             HiveTable hiveTable = (HiveTable) table;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java
index 8cae113..cbd2b77 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java
@@ -24,6 +24,7 @@ import org.apache.doris.external.elasticsearch.EsMajorVersion;
 import org.apache.doris.external.elasticsearch.EsMetaStateTracker;
 import org.apache.doris.external.elasticsearch.EsRestClient;
 import org.apache.doris.external.elasticsearch.EsTablePartitions;
+import org.apache.doris.external.elasticsearch.EsUtil;
 import org.apache.doris.thrift.TEsTable;
 import org.apache.doris.thrift.TTableDescriptor;
 import org.apache.doris.thrift.TTableType;
@@ -63,6 +64,8 @@ public class EsTable extends Table {
     public static final String DOC_VALUE_SCAN = "enable_docvalue_scan";
     public static final String KEYWORD_SNIFF = "enable_keyword_sniff";
     public static final String MAX_DOCVALUE_FIELDS = "max_docvalue_fields";
+    public static final String NODES_DISCOVERY = "nodes_discovery";
+    public static final String HTTP_SSL_ENABLED = "http_ssl_enabled";
 
     private String hosts;
     private String[] seeds;
@@ -87,6 +90,10 @@ public class EsTable extends Table {
     // would downgrade to extract value from `stored_fields`
     private int maxDocValueFields = DEFAULT_MAX_DOCVALUE_FIELDS;
 
+    private boolean nodesDiscovery = true;
+
+    private boolean httpSslEnabled = false;
+
     // Solr doc_values vs stored_fields performance-smackdown indicate:
     // It is possible to notice that retrieving an high number of fields leads
     // to a sensible worsening of performance if DocValues are used.
@@ -138,6 +145,13 @@ public class EsTable extends Table {
         return enableKeywordSniff;
     }
 
+    public boolean isNodesDiscovery() {
+        return nodesDiscovery;
+    }
+
+    public boolean isHttpSslEnabled() {
+        return httpSslEnabled;
+    }
 
     private void validate(Map<String, String> properties) throws DdlException {
         if (properties == null) {
@@ -185,36 +199,40 @@ public class EsTable extends Table {
 
         // enable doc value scan for Elasticsearch
         if (properties.containsKey(DOC_VALUE_SCAN)) {
-            try {
-                enableDocValueScan = Boolean.parseBoolean(properties.get(DOC_VALUE_SCAN).trim());
-            } catch (Exception e) {
-                throw new DdlException("fail to parse enable_docvalue_scan, enable_docvalue_scan= "
-                        + properties.get(VERSION).trim() + " ,`enable_docvalue_scan`"
-                        + " should be like 'true' or 'false', value should be double quotation marks");
-            }
+            enableDocValueScan = EsUtil.getBoolean(properties, DOC_VALUE_SCAN);
         }
 
         if (properties.containsKey(KEYWORD_SNIFF)) {
-            try {
-                enableKeywordSniff = Boolean.parseBoolean(properties.get(KEYWORD_SNIFF).trim());
-            } catch (Exception e) {
-                throw new DdlException("fail to parse enable_keyword_sniff, enable_keyword_sniff= "
-                        + properties.get(VERSION).trim() + " ,`enable_keyword_sniff`"
-                        + " should be like 'true' or 'false', value should be double quotation marks");
+            enableKeywordSniff = EsUtil.getBoolean(properties, KEYWORD_SNIFF);
+        }
+
+        if (properties.containsKey(NODES_DISCOVERY)) {
+            nodesDiscovery = EsUtil.getBoolean(properties, NODES_DISCOVERY);
+        }
+
+        if (properties.containsKey(HTTP_SSL_ENABLED)) {
+            httpSslEnabled = EsUtil.getBoolean(properties, HTTP_SSL_ENABLED);
+            // check protocol
+            for (String seed : seeds) {
+                if (httpSslEnabled && seed.startsWith("http://")) {
+                    throw new DdlException("if http_ssl_enabled is true, the https protocol must be used");
+                }
+                if (!httpSslEnabled && seed.startsWith("https://")) {
+                    throw new DdlException("if http_ssl_enabled is false, the http protocol must be used");
+                }
             }
-        } else {
-            enableKeywordSniff = true;
         }
 
         if (!Strings.isNullOrEmpty(properties.get(TYPE))
                 && !Strings.isNullOrEmpty(properties.get(TYPE).trim())) {
             mappingType = properties.get(TYPE).trim();
         }
+
         if (!Strings.isNullOrEmpty(properties.get(TRANSPORT))
                 && !Strings.isNullOrEmpty(properties.get(TRANSPORT).trim())) {
             transport = properties.get(TRANSPORT).trim();
             if (!(TRANSPORT_HTTP.equals(transport) || TRANSPORT_THRIFT.equals(transport))) {
-                throw new DdlException("transport of ES table must be http(recommend) or thrift(reserved inner usage),"
+                throw new DdlException("transport of ES table must be http/https(recommend) or thrift(reserved inner usage),"
                         + " but value is " + transport);
             }
         }
@@ -241,6 +259,8 @@ public class EsTable extends Table {
         tableContext.put("enableDocValueScan", String.valueOf(enableDocValueScan));
         tableContext.put("enableKeywordSniff", String.valueOf(enableKeywordSniff));
         tableContext.put("maxDocValueFields", String.valueOf(maxDocValueFields));
+        tableContext.put(NODES_DISCOVERY, String.valueOf(nodesDiscovery));
+        tableContext.put(HTTP_SSL_ENABLED, String.valueOf(httpSslEnabled));
     }
 
     public TTableDescriptor toThrift() {
@@ -323,7 +343,16 @@ public class EsTable extends Table {
                     maxDocValueFields = DEFAULT_MAX_DOCVALUE_FIELDS;
                 }
             }
-
+            if (tableContext.containsKey(NODES_DISCOVERY)) {
+                nodesDiscovery = Boolean.parseBoolean(tableContext.get(NODES_DISCOVERY));
+            } else {
+                nodesDiscovery = true;
+            }
+            if (tableContext.containsKey(HTTP_SSL_ENABLED)) {
+                httpSslEnabled = Boolean.parseBoolean(tableContext.get(HTTP_SSL_ENABLED));
+            } else {
+                httpSslEnabled = false;
+            }
             PartitionType partType = PartitionType.valueOf(Text.readString(in));
             if (partType == PartitionType.UNPARTITIONED) {
                 partitionInfo = SinglePartitionInfo.read(in);
@@ -357,6 +386,8 @@ public class EsTable extends Table {
             tableContext.put("transport", transport);
             tableContext.put("enableDocValueScan", "false");
             tableContext.put(KEYWORD_SNIFF, "true");
+            tableContext.put(NODES_DISCOVERY, "true");
+            tableContext.put(HTTP_SSL_ENABLED, "false");
         }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java
index 73d8daf..31bfc50 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java
@@ -19,6 +19,9 @@ package org.apache.doris.external.elasticsearch;
 
 import org.apache.doris.thrift.TNetworkAddress;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import java.util.List;
 import java.util.Map;
 
@@ -38,7 +41,9 @@ public class EsNodeInfo {
     private boolean hasThrift;
     private TNetworkAddress thriftAddress;
 
-    public EsNodeInfo(String id, Map<String, Object> map) throws DorisEsException {
+    private static final Logger LOG = LogManager.getLogger(EsNodeInfo.class);
+
+    public EsNodeInfo(String id, Map<String, Object> map, boolean httpSslEnabled) {
         this.id = id;
         EsMajorVersion version = EsMajorVersion.parse((String) map.get("version"));
         this.name = (String) map.get("name");
@@ -66,7 +71,7 @@ public class EsNodeInfo {
             String address = (String) httpMap.get("publish_address");
             if (address != null) {
                 String[] scratch = address.split(":");
-                this.publishAddress = new TNetworkAddress(scratch[0], Integer.valueOf(scratch[1]));
+                this.publishAddress = new TNetworkAddress((httpSslEnabled ? "https://" : "") + scratch[0], Integer.parseInt(scratch[1]));
                 this.hasHttp = true;
             } else {
                 this.publishAddress = null;
@@ -96,6 +101,24 @@ public class EsNodeInfo {
         }
     }
 
+    public EsNodeInfo(String id, String seed) {
+        this.id = id;
+        String[] scratch = seed.split(":");
+        int port = 80;
+        if (scratch.length == 3) {
+            port = Integer.parseInt(scratch[2]);
+        }
+        String remoteHost = scratch[0] + ":" + scratch[1];
+        this.name = remoteHost;
+        this.host = remoteHost;
+        this.ip = remoteHost;
+        this.isClient = true;
+        this.isData = true;
+        this.isIngest = true;
+        this.publishAddress = new TNetworkAddress(remoteHost, port);
+        this.hasHttp = true;
+    }
+
     public boolean hasHttp() {
         return hasHttp;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRepository.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRepository.java
index 98e895e..2d94ee8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRepository.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRepository.java
@@ -59,7 +59,7 @@ public class EsRepository extends MasterDaemon {
         }
         esTables.put(esTable.getId(), esTable);
         esClients.put(esTable.getId(),
-                new EsRestClient(esTable.getSeeds(), esTable.getUserName(), esTable.getPasswd()));
+                new EsRestClient(esTable.getSeeds(), esTable.getUserName(), esTable.getPasswd(), esTable.isHttpSslEnabled()));
         LOG.info("register a new table [{}] to sync list", esTable);
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java
index f2868aa..7e1983a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java
@@ -27,11 +27,20 @@ import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.SerializationConfig;
 
 import java.io.IOException;
+import java.security.SecureRandom;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
 import okhttp3.Credentials;
 import okhttp3.OkHttpClient;
 import okhttp3.Request;
@@ -51,13 +60,16 @@ public class EsRestClient {
     private static OkHttpClient networkClient = new OkHttpClient.Builder()
             .readTimeout(10, TimeUnit.SECONDS)
             .build();
+    
+    private static OkHttpClient sslNetworkClient;
 
     private Request.Builder builder;
     private String[] nodes;
     private String currentNode;
     private int currentNodeIndex = 0;
+    private boolean httpSslEnable;
 
-    public EsRestClient(String[] nodes, String authUser, String authPassword) {
+    public EsRestClient(String[] nodes, String authUser, String authPassword, boolean httpSslEnable) {
         this.nodes = nodes;
         this.builder = new Request.Builder();
         if (!Strings.isEmpty(authUser) && !Strings.isEmpty(authPassword)) {
@@ -65,6 +77,7 @@ public class EsRestClient {
                     Credentials.basic(authUser, authPassword));
         }
         this.currentNode = nodes[currentNodeIndex];
+        this.httpSslEnable = httpSslEnable;
     }
 
     private void selectNextNode() {
@@ -83,7 +96,7 @@ public class EsRestClient {
         }
         Map<String, EsNodeInfo> nodesMap = new HashMap<>();
         for (Map.Entry<String, Map<String, Object>> entry : nodesData.entrySet()) {
-            EsNodeInfo node = new EsNodeInfo(entry.getKey(), entry.getValue());
+            EsNodeInfo node = new EsNodeInfo(entry.getKey(), entry.getValue(), httpSslEnable);
             if (node.hasHttp()) {
                 nodesMap.put(node.getId(), node);
             }
@@ -141,6 +154,20 @@ public class EsRestClient {
         }
         return EsShardPartitions.findShardPartitions(indexName, searchShards);
     }
+    
+    /**
+     * init ssl networkClient use lazy way
+     **/
+    private synchronized OkHttpClient getOrCreateSslNetworkClient() {
+        if (sslNetworkClient == null) {
+            sslNetworkClient = new OkHttpClient.Builder()
+                    .readTimeout(10, TimeUnit.SECONDS)
+                    .sslSocketFactory(createSSLSocketFactory(), new TrustAllCerts())
+                    .hostnameVerifier(new TrustAllHostnameVerifier())
+                    .build();
+        }
+        return sslNetworkClient;
+    }
 
     /**
      * execute request for specific path,it will try again nodes.length times if it fails
@@ -151,6 +178,12 @@ public class EsRestClient {
     private String execute(String path) throws DorisEsException {
         int retrySize = nodes.length;
         DorisEsException scratchExceptionForThrow = null;
+        OkHttpClient httpClient;
+        if (httpSslEnable) {
+            httpClient = getOrCreateSslNetworkClient();
+        } else {
+            httpClient = networkClient;
+        }
         for (int i = 0; i < retrySize; i++) {
             // maybe should add HTTP schema to the address
             // actually, at this time we can only process http protocol
@@ -170,7 +203,7 @@ public class EsRestClient {
                 LOG.trace("es rest client request URL: {}", currentNode + "/" + path);
             }
             try {
-                response = networkClient.newCall(request).execute();
+                response = httpClient.newCall(request).execute();
                 if (response.isSuccessful()) {
                     return response.body().string();
                 }
@@ -207,4 +240,33 @@ public class EsRestClient {
         }
         return (T) (key != null ? map.get(key) : map);
     }
+
+    /**
+     * support https
+     **/
+    private static class TrustAllCerts implements X509TrustManager {
+        public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {}
+
+        public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {}
+
+        public X509Certificate[] getAcceptedIssuers() {return new X509Certificate[0];}
+    }
+
+    private static class TrustAllHostnameVerifier implements HostnameVerifier {
+        public boolean verify(String hostname, SSLSession session) {
+            return true;
+        }
+    }
+
+    private static SSLSocketFactory createSSLSocketFactory() {
+        SSLSocketFactory ssfFactory;
+        try {
+            SSLContext sc = SSLContext.getInstance("TLS");
+            sc.init(null, new TrustManager[]{new TrustAllCerts()}, new SecureRandom());
+            ssfFactory = sc.getSocketFactory();
+        } catch (Exception e) {
+            throw new DorisEsException("Errors happens when create ssl socket");
+        }
+        return ssfFactory;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsTablePartitions.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsTablePartitions.java
index 05c4fe0..d6bc5e3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsTablePartitions.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsTablePartitions.java
@@ -24,7 +24,6 @@ import org.apache.doris.catalog.PartitionKey;
 import org.apache.doris.catalog.RangePartitionInfo;
 import org.apache.doris.catalog.SinglePartitionInfo;
 import org.apache.doris.common.DdlException;
-import org.apache.doris.thrift.TNetworkAddress;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -36,7 +35,6 @@ import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 
 /**
  * save the dynamic info parsed from es cluster state such as shard routing, partition info
@@ -109,22 +107,6 @@ public class EsTablePartitions {
         }
         return esTablePartitions;
     }
-
-    public void addHttpAddress(Map<String, EsNodeInfo> nodesInfo) {
-        for (EsShardPartitions indexState : partitionedIndexStates.values()) {
-            indexState.addHttpAddress(nodesInfo);
-        }
-        for (EsShardPartitions indexState : unPartitionedIndexStates.values()) {
-            indexState.addHttpAddress(nodesInfo);
-        }
-
-    }
-
-    public TNetworkAddress randomAddress(Map<String, EsNodeInfo> nodesInfo) {
-        int seed = new Random().nextInt() % nodesInfo.size();
-        EsNodeInfo[] nodeInfos = (EsNodeInfo[]) nodesInfo.values().toArray();
-        return nodeInfos[seed].getPublishAddress();
-    }
     
     public PartitionInfo getPartitionInfo() {
         return partitionInfo;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java
index d25a01f..454fb5f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java
@@ -21,8 +21,12 @@ import org.apache.doris.analysis.DistributionDesc;
 import org.apache.doris.analysis.PartitionDesc;
 import org.apache.doris.analysis.RangePartitionDesc;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+
 import org.json.JSONObject;
 
+import java.util.Map;
+
 public class EsUtil {
     
     public static void analyzePartitionAndDistributionDesc(PartitionDesc partitionDesc,
@@ -82,4 +86,13 @@ public class EsUtil {
             return null;
         }
     }
+    
+    public static boolean getBoolean(Map<String, String> properties, String name) throws DdlException {
+        String property = properties.get(name).trim();
+        try {
+            return Boolean.parseBoolean(property);
+        } catch (Exception e) {
+            throw new DdlException(String.format("fail to parse %s, %s = %s, `%s` should be like 'true' or 'false', value should be double quotation marks", name, name, property, name));
+        }
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java
index de1bb76..bb5d416 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java
@@ -19,6 +19,7 @@ package org.apache.doris.external.elasticsearch;
 
 import org.apache.doris.catalog.EsTable;
 
+import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -37,7 +38,15 @@ public class PartitionPhase implements SearchPhase {
     @Override
     public void execute(SearchContext context) throws DorisEsException {
         shardPartitions = client.searchShards(context.sourceIndex());
-        nodesInfo = client.getHttpNodes();
+        if (context.nodesDiscovery()) {
+            nodesInfo = client.getHttpNodes();
+        } else {
+            nodesInfo = new HashMap<>();
+            String[] seeds = context.esTable().getSeeds();
+            for (int i = 0; i < seeds.length; i++) {
+                nodesInfo.put(String.valueOf(i), new EsNodeInfo(String.valueOf(i), seeds[i]));
+            }
+        }
     }
 
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/SearchContext.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/SearchContext.java
index 06b4c7d..3e9e03d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/SearchContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/SearchContext.java
@@ -84,12 +84,16 @@ public class SearchContext {
     // the ES cluster version
     private EsMajorVersion version;
 
+    // whether the nodes needs to be discovered
+    private boolean nodesDiscovery;
+
 
     public SearchContext(EsTable table) {
         this.table = table;
         fullSchema = table.getFullSchema();
         sourceIndex = table.getIndexName();
         type = table.getMappingType();
+        nodesDiscovery = table.isNodesDiscovery();
     }
 
 
@@ -142,4 +146,8 @@ public class SearchContext {
     public EsTablePartitions tablePartitions() throws Exception {
         return EsTablePartitions.fromShardPartitions(table, shardPartitions);
     }
+
+    public boolean nodesDiscovery() {
+        return nodesDiscovery;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java
index d075946..e662de0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java
@@ -148,6 +148,7 @@ public class EsScanNode extends ScanNode {
         Map<String, String> properties = Maps.newHashMap();
         properties.put(EsTable.USER, table.getUserName());
         properties.put(EsTable.PASSWORD, table.getPasswd());
+        properties.put(EsTable.HTTP_SSL_ENABLED, String.valueOf(table.isHttpSslEnabled()));
         TEsScanNode esScanNode = new TEsScanNode(desc.getId().asInt());
         esScanNode.setProperties(properties);
         if (table.isDocValueScanEnable()) {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/PartitionPhaseTest.java b/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/PartitionPhaseTest.java
index aa82dff..ae85801 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/PartitionPhaseTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/PartitionPhaseTest.java
@@ -50,7 +50,7 @@ public class PartitionPhaseTest extends EsTestCase {
         Map<String, Map<String, Object>> nodesData = (Map<String, Map<String, Object>>) mapper.readValue(jsonParser, Map.class).get("nodes");
         Map<String, EsNodeInfo> nodesMap = new HashMap<>();
         for (Map.Entry<String, Map<String, Object>> entry : nodesData.entrySet()) {
-            EsNodeInfo node = new EsNodeInfo(entry.getKey(), entry.getValue());
+            EsNodeInfo node = new EsNodeInfo(entry.getKey(), entry.getValue(), false);
             if (node.hasHttp()) {
                 nodesMap.put(node.getId(), node);
             }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org