You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/07/27 15:16:23 UTC

[doris] branch master updated: [feature](multi-catalog) Support es datasource (#10565)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 87b1f4c071 [feature](multi-catalog) Support es datasource (#10565)
87b1f4c071 is described below

commit 87b1f4c0719468e56f8f06641cba4b1a7a43578c
Author: Stalary <st...@163.com>
AuthorDate: Wed Jul 27 23:16:17 2022 +0800

    [feature](multi-catalog) Support es datasource (#10565)
---
 fe/fe-core/pom.xml                                 |   1 +
 .../main/java/org/apache/doris/catalog/Env.java    |  15 ++-
 .../java/org/apache/doris/catalog/EsTable.java     | 144 ++------------------
 .../java/org/apache/doris/catalog/TableIf.java     |   5 +-
 .../doris/catalog/external/EsExternalDatabase.java |  91 +++++++++++++
 .../doris/catalog/external/EsExternalTable.java    | 141 +++++++++++++++++++
 .../org/apache/doris/common/util/JsonUtil.java     |  61 +++++++++
 .../apache/doris/datasource/CatalogFactory.java    |   6 +-
 .../org/apache/doris/datasource/DataSourceMgr.java |   5 +-
 .../doris/datasource/EsExternalDataSource.java     | 150 +++++++++++++++++++--
 .../external/elasticsearch/EsMetaStateTracker.java |   6 +-
 .../doris/external/elasticsearch/EsRestClient.java | 148 +++++++++++---------
 .../doris/external/elasticsearch/EsUtil.java       |  48 ++++++-
 .../doris/external/elasticsearch/VersionPhase.java |  55 --------
 .../java/org/apache/doris/planner/EsScanNode.java  |  23 +++-
 .../apache/doris/planner/SingleNodePlanner.java    |   3 +
 .../external/elasticsearch/VersionPhaseTest.java   |  55 --------
 17 files changed, 623 insertions(+), 334 deletions(-)

diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index 44eabcce13..8e7d4552b6 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -624,6 +624,7 @@ under the License.
         <dependency>
             <groupId>org.apache.iceberg</groupId>
             <artifactId>iceberg-core</artifactId>
+            <scope>provided</scope>
         </dependency>
 
         <dependency>
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index d2033b70ff..835cfbce2c 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -90,6 +90,7 @@ import org.apache.doris.clone.TabletScheduler;
 import org.apache.doris.clone.TabletSchedulerStat;
 import org.apache.doris.cluster.BaseParam;
 import org.apache.doris.cluster.Cluster;
+import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.ClientPool;
 import org.apache.doris.common.Config;
@@ -118,6 +119,7 @@ import org.apache.doris.common.util.Util;
 import org.apache.doris.consistency.ConsistencyChecker;
 import org.apache.doris.datasource.DataSourceIf;
 import org.apache.doris.datasource.DataSourceMgr;
+import org.apache.doris.datasource.EsExternalDataSource;
 import org.apache.doris.datasource.InternalDataSource;
 import org.apache.doris.deploy.DeployManager;
 import org.apache.doris.deploy.impl.AmbariDeployManager;
@@ -2983,9 +2985,9 @@ public class Env {
             if (esTable.getMappingType() != null) {
                 sb.append("\"type\" = \"").append(esTable.getMappingType()).append("\",\n");
             }
-            sb.append("\"enable_docvalue_scan\" = \"").append(esTable.isDocValueScanEnable()).append("\",\n");
-            sb.append("\"max_docvalue_fields\" = \"").append(esTable.maxDocValueFields()).append("\",\n");
-            sb.append("\"enable_keyword_sniff\" = \"").append(esTable.isKeywordSniffEnable()).append("\",\n");
+            sb.append("\"enable_docvalue_scan\" = \"").append(esTable.isEnableDocValueScan()).append("\",\n");
+            sb.append("\"max_docvalue_fields\" = \"").append(esTable.getMaxDocValueFields()).append("\",\n");
+            sb.append("\"enable_keyword_sniff\" = \"").append(esTable.isEnableKeywordSniff()).append("\",\n");
             sb.append("\"nodes_discovery\" = \"").append(esTable.isNodesDiscovery()).append("\",\n");
             sb.append("\"http_ssl_enabled\" = \"").append(esTable.isHttpSslEnabled()).append("\"\n");
             sb.append(")");
@@ -4184,11 +4186,16 @@ public class Env {
 
     // Switch catalog of this sesseion.
     public void changeCatalog(ConnectContext ctx, String catalogName) throws DdlException {
-        if (dataSourceMgr.getCatalogNullable(catalogName) == null) {
+        DataSourceIf dataSourceIf = dataSourceMgr.getCatalogNullable(catalogName);
+        if (dataSourceIf == null) {
             throw new DdlException(ErrorCode.ERR_UNKNOWN_CATALOG.formatErrorMsg(catalogName),
                     ErrorCode.ERR_UNKNOWN_CATALOG);
         }
         ctx.changeDefaultCatalog(catalogName);
+        if (dataSourceIf instanceof EsExternalDataSource) {
+            ctx.setDatabase(SystemInfoService.DEFAULT_CLUSTER + ClusterNamespace.CLUSTER_DELIMITER
+                    + EsExternalDataSource.DEFAULT_DB);
+        }
     }
 
     // Change current database of this session.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java
index ba39f2d4d2..93dd2bc2bf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java
@@ -19,7 +19,6 @@ package org.apache.doris.catalog;
 
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.io.Text;
-import org.apache.doris.external.elasticsearch.EsMajorVersion;
 import org.apache.doris.external.elasticsearch.EsMetaStateTracker;
 import org.apache.doris.external.elasticsearch.EsRestClient;
 import org.apache.doris.external.elasticsearch.EsTablePartitions;
@@ -28,16 +27,16 @@ import org.apache.doris.thrift.TEsTable;
 import org.apache.doris.thrift.TTableDescriptor;
 import org.apache.doris.thrift.TTableType;
 
+import lombok.Getter;
+import lombok.Setter;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
-import org.json.simple.JSONObject;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -48,6 +47,8 @@ import java.util.Set;
 /**
  * Elasticsearch table.
  **/
+@Getter
+@Setter
 public class EsTable extends Table {
 
     public static final Set<String> DEFAULT_DOCVALUE_DISABLED_FIELDS = new HashSet<>(Arrays.asList("text"));
@@ -84,10 +85,6 @@ public class EsTable extends Table {
     // we also provide configurable parameters for expert-using
     // @see `MAX_DOCVALUE_FIELDS`
     private static final int DEFAULT_MAX_DOCVALUE_FIELDS = 20;
-
-    // version would be used to be compatible with different ES Cluster
-    public EsMajorVersion majorVersion = null;
-
     private String hosts;
     private String[] seeds;
     private String userName = "";
@@ -143,14 +140,18 @@ public class EsTable extends Table {
     /**
      * Create table for test.
      **/
-    public EsTable(long id, String name, List<Column> schema,
-            Map<String, String> properties, PartitionInfo partitionInfo) throws DdlException {
+    public EsTable(long id, String name, List<Column> schema, Map<String, String> properties,
+            PartitionInfo partitionInfo) throws DdlException {
         super(id, name, TableType.ELASTICSEARCH, schema);
         this.partitionInfo = partitionInfo;
         validate(properties);
         this.client = new EsRestClient(seeds, userName, passwd, httpSslEnabled);
     }
 
+    public EsTable(long id, String name, List<Column> schema, TableType tableType) {
+        super(id, name, tableType, schema);
+    }
+
     public Map<String, String> fieldsContext() {
         return esMetaStateTracker.searchContext().fetchFieldsContext();
     }
@@ -159,26 +160,6 @@ public class EsTable extends Table {
         return esMetaStateTracker.searchContext().docValueFieldsContext();
     }
 
-    public int maxDocValueFields() {
-        return maxDocValueFields;
-    }
-
-    public boolean isDocValueScanEnable() {
-        return enableDocValueScan;
-    }
-
-    public boolean isKeywordSniffEnable() {
-        return enableKeywordSniff;
-    }
-
-    public boolean isNodesDiscovery() {
-        return nodesDiscovery;
-    }
-
-    public boolean isHttpSslEnabled() {
-        return httpSslEnabled;
-    }
-
     private void validate(Map<String, String> properties) throws DdlException {
         if (properties == null) {
             throw new DdlException(
@@ -199,24 +180,11 @@ public class EsTable extends Table {
         }
 
         if (StringUtils.isBlank(properties.get(INDEX))) {
-            throw new DdlException("Index of ES table is null. "
-                    + "Please add properties('index'='xxxx') when create table");
+            throw new DdlException(
+                    "Index of ES table is null. " + "Please add properties('index'='xxxx') when create table");
         }
         indexName = properties.get(INDEX).trim();
 
-        // Explicit setting for cluster version to avoid detecting version failure
-        if (properties.containsKey(VERSION)) {
-            try {
-                majorVersion = EsMajorVersion.parse(properties.get(VERSION).trim());
-                if (majorVersion.before(EsMajorVersion.V_5_X)) {
-                    throw new DdlException("Unsupported/Unknown ES Cluster version [" + properties.get(VERSION) + "] ");
-                }
-            } catch (Exception e) {
-                throw new DdlException("fail to parse ES major version, version= " + properties.get(VERSION).trim()
-                        + ", should be like '6.5.3' ");
-            }
-        }
-
         // enable doc value scan for Elasticsearch
         if (properties.containsKey(DOC_VALUE_SCAN)) {
             enableDocValueScan = EsUtil.getBoolean(properties, DOC_VALUE_SCAN);
@@ -264,9 +232,6 @@ public class EsTable extends Table {
         if (mappingType != null) {
             tableContext.put("mappingType", mappingType);
         }
-        if (majorVersion != null) {
-            tableContext.put("majorVersion", majorVersion.toString());
-        }
         tableContext.put("enableDocValueScan", String.valueOf(enableDocValueScan));
         tableContext.put("enableKeywordSniff", String.valueOf(enableKeywordSniff));
         tableContext.put("maxDocValueFields", String.valueOf(maxDocValueFields));
@@ -334,13 +299,6 @@ public class EsTable extends Table {
         passwd = tableContext.get("passwd");
         indexName = tableContext.get("indexName");
         mappingType = tableContext.get("mappingType");
-        if (tableContext.containsKey("majorVersion")) {
-            try {
-                majorVersion = EsMajorVersion.parse(tableContext.get("majorVersion"));
-            } catch (Exception e) {
-                majorVersion = EsMajorVersion.V_5_X;
-            }
-        }
 
         enableDocValueScan = Boolean.parseBoolean(tableContext.get("enableDocValueScan"));
         if (tableContext.containsKey("enableKeywordSniff")) {
@@ -376,58 +334,6 @@ public class EsTable extends Table {
         client = new EsRestClient(seeds, userName, passwd, httpSslEnabled);
     }
 
-    public String getHosts() {
-        return hosts;
-    }
-
-    public String[] getSeeds() {
-        return seeds;
-    }
-
-    public String getUserName() {
-        return userName;
-    }
-
-    public String getPasswd() {
-        return passwd;
-    }
-
-    public String getIndexName() {
-        return indexName;
-    }
-
-    public String getMappingType() {
-        return mappingType;
-    }
-
-    public PartitionInfo getPartitionInfo() {
-        return partitionInfo;
-    }
-
-    public EsTablePartitions getEsTablePartitions() {
-        return esTablePartitions;
-    }
-
-    public void setEsTablePartitions(EsTablePartitions esTablePartitions) {
-        this.esTablePartitions = esTablePartitions;
-    }
-
-    public EsMajorVersion esVersion() {
-        return majorVersion;
-    }
-
-    public Throwable getLastMetaDataSyncException() {
-        return lastMetaDataSyncException;
-    }
-
-    public void setLastMetaDataSyncException(Throwable lastMetaDataSyncException) {
-        this.lastMetaDataSyncException = lastMetaDataSyncException;
-    }
-
-    public void setPartitionInfo(PartitionInfo partitionInfo) {
-        this.partitionInfo = partitionInfo;
-    }
-
     /**
      * Sync es index meta from remote ES Cluster.
      */
@@ -440,35 +346,13 @@ public class EsTable extends Table {
             this.esTablePartitions = esMetaStateTracker.searchContext().tablePartitions();
         } catch (Throwable e) {
             LOG.warn("Exception happens when fetch index [{}] meta data from remote es cluster."
-                    + "table id: {}, err: {}", this.name, this.id, e.getMessage());
+                    + "table id: {}, err: ", this.name, this.id, e);
             this.esTablePartitions = null;
             this.lastMetaDataSyncException = e;
         }
     }
 
-    /**
-     * Generate columns from ES Cluster.
-     **/
     public List<Column> genColumnsFromEs() {
-        String mapping = client.getMapping(indexName);
-        JSONObject mappingProps = EsUtil.getMappingProps(indexName, mapping, mappingType);
-        Set<String> keys = (Set<String>) mappingProps.keySet();
-        List<Column> columns = new ArrayList<>();
-        for (String key : keys) {
-            JSONObject field = (JSONObject) mappingProps.get(key);
-            // Complex types are not currently supported.
-            if (field.containsKey("type")) {
-                Type type = EsUtil.toDorisType(field.get("type").toString());
-                if (!type.isInvalid()) {
-                    Column column = new Column();
-                    column.setName(key);
-                    column.setType(type);
-                    column.setIsKey(true);
-                    column.setIsAllowNull(true);
-                    columns.add(column);
-                }
-            }
-        }
-        return columns;
+        return EsUtil.genColumnsFromEs(client, indexName, mappingType);
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
index b0066b28aa..9b7b1bb32b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
@@ -100,7 +100,7 @@ public interface TableIf {
      */
     public enum TableType {
         MYSQL, ODBC, OLAP, SCHEMA, INLINE_VIEW, VIEW, BROKER, ELASTICSEARCH, HIVE, ICEBERG, HUDI,
-        TABLE_VALUED_FUNCTION, HMS_EXTERNAL_TABLE;
+        TABLE_VALUED_FUNCTION, HMS_EXTERNAL_TABLE, ES_EXTERNAL_TABLE;
 
         public String toEngineName() {
             switch (this) {
@@ -128,6 +128,8 @@ public interface TableIf {
                     return "Table_Valued_Function";
                 case HMS_EXTERNAL_TABLE:
                     return "hms";
+                case ES_EXTERNAL_TABLE:
+                    return "es";
                 default:
                     return null;
             }
@@ -150,6 +152,7 @@ public interface TableIf {
                 case HUDI:
                 case TABLE_VALUED_FUNCTION:
                 case HMS_EXTERNAL_TABLE:
+                case ES_EXTERNAL_TABLE:
                     return "EXTERNAL TABLE";
                 default:
                     return null;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalDatabase.java
new file mode 100644
index 0000000000..dd90844b6e
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalDatabase.java
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog.external;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.datasource.EsExternalDataSource;
+import org.apache.doris.datasource.ExternalDataSource;
+
+import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Elasticsearch metastore external database.
+ */
+public class EsExternalDatabase extends ExternalDatabase<EsExternalTable> {
+
+    private static final Logger LOG = LogManager.getLogger(EsExternalDatabase.class);
+
+    // Cache of table name to table id.
+    private Map<String, Long> tableNameToId = Maps.newConcurrentMap();
+    private Map<Long, EsExternalTable> idToTbl = Maps.newHashMap();
+
+    /**
+     * Create Elasticsearch external database.
+     *
+     * @param extDataSource External data source this database belongs to.
+     * @param id database id.
+     * @param name database name.
+     */
+    public EsExternalDatabase(ExternalDataSource extDataSource, long id, String name) {
+        super(extDataSource, id, name);
+        init();
+    }
+
+    private void init() {
+        List<String> tableNames = extDataSource.listTableNames(null, name);
+        if (tableNames != null) {
+            for (String tableName : tableNames) {
+                long tblId = Env.getCurrentEnv().getNextId();
+                tableNameToId.put(tableName, tblId);
+                idToTbl.put(tblId, new EsExternalTable(tblId, tableName, name, (EsExternalDataSource) extDataSource));
+            }
+        }
+    }
+
+    @Override
+    public Set<String> getTableNamesWithLock() {
+        // Doesn't need to lock because everytime we call the hive metastore api to get table names.
+        return new HashSet<>(extDataSource.listTableNames(null, name));
+    }
+
+    @Override
+    public List<EsExternalTable> getTables() {
+        return new ArrayList<>(idToTbl.values());
+    }
+
+    @Override
+    public EsExternalTable getTableNullable(String tableName) {
+        if (!tableNameToId.containsKey(tableName)) {
+            return null;
+        }
+        return idToTbl.get(tableNameToId.get(tableName));
+    }
+
+    @Override
+    public EsExternalTable getTableNullable(long tableId) {
+        return idToTbl.get(tableId);
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java
new file mode 100644
index 0000000000..8852dac22a
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog.external;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.EsTable;
+import org.apache.doris.datasource.EsExternalDataSource;
+import org.apache.doris.external.elasticsearch.EsUtil;
+import org.apache.doris.thrift.TEsTable;
+import org.apache.doris.thrift.TTableDescriptor;
+import org.apache.doris.thrift.TTableType;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+
+/**
+ * Elasticsearch external table.
+ */
+public class EsExternalTable extends ExternalTable {
+
+    private static final Logger LOG = LogManager.getLogger(EsExternalTable.class);
+
+    private final EsExternalDataSource ds;
+    private final String dbName;
+    private boolean initialized = false;
+    private EsTable esTable;
+
+    /**
+     * Create elasticsearch external table.
+     *
+     * @param id Table id.
+     * @param name Table name.
+     * @param dbName Database name.
+     * @param ds HMSExternalDataSource.
+     */
+    public EsExternalTable(long id, String name, String dbName, EsExternalDataSource ds) {
+        super(id, name);
+        this.dbName = dbName;
+        this.ds = ds;
+        this.type = TableType.ES_EXTERNAL_TABLE;
+    }
+
+
+    private synchronized void makeSureInitialized() {
+        if (!initialized) {
+            init();
+            initialized = true;
+        }
+    }
+
+    private void init() {
+        fullSchema = EsUtil.genColumnsFromEs(ds.getEsRestClient(), name, null);
+        esTable = toEsTable();
+    }
+
+    @Override
+    public List<Column> getFullSchema() {
+        makeSureInitialized();
+        return fullSchema;
+    }
+
+    @Override
+    public List<Column> getBaseSchema() {
+        return getFullSchema();
+    }
+
+    @Override
+    public List<Column> getBaseSchema(boolean full) {
+        return getFullSchema();
+    }
+
+    @Override
+    public Column getColumn(String name) {
+        makeSureInitialized();
+        for (Column column : fullSchema) {
+            if (name.equals(column.getName())) {
+                return column;
+            }
+        }
+        return null;
+    }
+
+    public EsTable getEsTable() {
+        makeSureInitialized();
+        return esTable;
+    }
+
+    @Override
+    public String getMysqlType() {
+        return type.name();
+    }
+
+    /**
+     * get database name of hms table.
+     */
+    public String getDbName() {
+        return dbName;
+    }
+
+    @Override
+    public TTableDescriptor toThrift() {
+        TEsTable tEsTable = new TEsTable();
+        TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.ES_TABLE, fullSchema.size(), 0,
+                getName(), "");
+        tTableDescriptor.setEsTable(tEsTable);
+        return tTableDescriptor;
+    }
+
+    private EsTable toEsTable() {
+        EsTable esTable = new EsTable(this.id, this.name, this.fullSchema, TableType.ES_EXTERNAL_TABLE);
+        esTable.setIndexName(name);
+        esTable.setClient(ds.getEsRestClient());
+        esTable.setUserName(ds.getUsername());
+        esTable.setPasswd(ds.getPassword());
+        esTable.setEnableDocValueScan(ds.isEnableDocValueScan());
+        esTable.setEnableKeywordSniff(ds.isEnableKeywordSniff());
+        esTable.setNodesDiscovery(ds.isEnableNodesDiscovery());
+        esTable.setHttpSslEnabled(ds.isEnableSsl());
+        esTable.setSeeds(ds.getNodes());
+        esTable.setHosts(String.join(",", ds.getNodes()));
+        esTable.syncTableMetaData();
+        return esTable;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/JsonUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/JsonUtil.java
new file mode 100644
index 0000000000..76d913260f
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/JsonUtil.java
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.util;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.text.SimpleDateFormat;
+import java.util.TimeZone;
+
+/**
+ * Util for json use jackson.
+ **/
+public class JsonUtil {
+
+    private static final Logger LOG = LogManager.getLogger(JsonUtil.class);
+
+    private static final ObjectMapper objectMapper = new ObjectMapper().configure(
+                    DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+            .configure(DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
+            .configure(DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
+            .configure(MapperFeature.REQUIRE_SETTERS_FOR_GETTERS, true).setTimeZone(TimeZone.getDefault())
+            .setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
+
+    public static JsonNode toJsonNode(Object obj) {
+        return objectMapper.valueToTree(obj);
+    }
+
+    public static ArrayNode parseArray(String text) {
+        try {
+            return (ArrayNode) objectMapper.readTree(text);
+        } catch (Exception e) {
+            throw new RuntimeException("Json deserialization exception.", e);
+        }
+    }
+
+    public static <T> T readValue(String text, Class<T> clazz) throws JsonProcessingException {
+        return objectMapper.readValue(text, clazz);
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
index 5cae998bb3..f3669d325b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
@@ -22,6 +22,7 @@ import org.apache.doris.analysis.AlterCatalogPropertyStmt;
 import org.apache.doris.analysis.CreateCatalogStmt;
 import org.apache.doris.analysis.DropCatalogStmt;
 import org.apache.doris.analysis.StatementBase;
+import org.apache.doris.common.DdlException;
 
 import java.util.Map;
 
@@ -56,11 +57,12 @@ public class CatalogFactory {
     /**
      * create the datasource instance from data source log.
      */
-    public static DataSourceIf constructorFromLog(CatalogLog log) {
+    public static DataSourceIf constructorFromLog(CatalogLog log) throws DdlException {
         return constructorDataSource(log.getCatalogId(), log.getCatalogName(), log.getProps());
     }
 
-    private static DataSourceIf constructorDataSource(long catalogId, String name, Map<String, String> props) {
+    private static DataSourceIf constructorDataSource(long catalogId, String name, Map<String, String> props)
+            throws DdlException {
         String type = props.get("type");
         DataSourceIf dataSource;
         switch (type) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java
index 3f66358d82..c9e58be38f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java
@@ -124,6 +124,9 @@ public class DataSourceMgr implements Writable, GsonPostProcessable {
         return Lists.newArrayList(idToCatalog.keySet());
     }
 
+    /**
+     * Get db allow return null
+     **/
     public DatabaseIf getDbNullable(long dbId) {
         DatabaseIf db = internalDataSource.getDbNullable(dbId);
         if (db != null) {
@@ -308,7 +311,7 @@ public class DataSourceMgr implements Writable, GsonPostProcessable {
     /**
      * Reply for create catalog event.
      */
-    public void replayCreateCatalog(CatalogLog log) {
+    public void replayCreateCatalog(CatalogLog log) throws DdlException {
         writeLock();
         try {
             DataSourceIf ds = CatalogFactory.constructorFromLog(log);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalDataSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalDataSource.java
index a07450a7cc..d531b4f153 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalDataSource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalDataSource.java
@@ -18,43 +18,175 @@
 package org.apache.doris.datasource;
 
 
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.external.EsExternalDatabase;
+import org.apache.doris.catalog.external.ExternalDatabase;
+import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.external.elasticsearch.EsRestClient;
+import org.apache.doris.external.elasticsearch.EsUtil;
+
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import lombok.Getter;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.jetbrains.annotations.Nullable;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
 /**
  * External data source for elasticsearch
  */
+@Getter
 public class EsExternalDataSource extends ExternalDataSource {
+
+    public static final String DEFAULT_DB = "default";
+    private static final Logger LOG = LogManager.getLogger(EsExternalDataSource.class);
+    private static final String PROP_HOSTS = "elasticsearch.hosts";
+    private static final String PROP_USERNAME = "elasticsearch.username";
+    private static final String PROP_PASSWORD = "elasticsearch.password";
+    private static final String PROP_DOC_VALUE_SCAN = "elasticsearch.doc_value_scan";
+    private static final String PROP_KEYWORD_SNIFF = "elasticsearch.keyword_sniff";
+    private static final String PROP_NODES_DISCOVERY = "elasticsearch.nodes_discovery";
+    private static final String PROP_SSL = "elasticsearch.ssl";
+
+    // Cache of db name to db id.
+    private Map<String, Long> dbNameToId;
+    private Map<Long, EsExternalDatabase> idToDb;
+
+    private EsRestClient esRestClient;
+
+    private boolean initialized = false;
+
+    private String[] nodes;
+
+    private String username = "";
+
+    private String password = "";
+
+    private boolean enableDocValueScan = true;
+
+    private boolean enableKeywordSniff = true;
+
+    private boolean enableSsl = false;
+
+    private boolean enableNodesDiscovery = true;
+
     /**
      * Default constructor for EsExternalDataSource.
      */
-    public EsExternalDataSource(long catalogId, String name, Map<String, String> props) {
+    public EsExternalDataSource(long catalogId, String name, Map<String, String> props) throws DdlException {
         this.id = catalogId;
-        setName(name);
-        getDsProperty().setProperties(props);
-        setType("es");
+        this.name = name;
+        this.type = "es";
+        validate(props);
+        this.dsProperty = new DataSourceProperty();
+        this.dsProperty.setProperties(props);
+    }
+
+    private void validate(Map<String, String> properties) throws DdlException {
+        if (properties == null) {
+            throw new DdlException(
+                    "Please set properties of elasticsearch table, " + "they are: hosts, user, password, index");
+        }
+
+        if (StringUtils.isBlank(properties.get(PROP_HOSTS))) {
+            throw new DdlException("Hosts of ES table is null.");
+        }
+        nodes = properties.get(PROP_HOSTS).trim().split(",");
+
+        if (StringUtils.isNotBlank(properties.get(PROP_USERNAME))) {
+            username = properties.get(PROP_USERNAME).trim();
+        }
+
+        if (StringUtils.isNotBlank(properties.get(PROP_PASSWORD))) {
+            password = properties.get(PROP_PASSWORD).trim();
+        }
+
+        if (properties.containsKey(PROP_DOC_VALUE_SCAN)) {
+            enableDocValueScan = EsUtil.getBoolean(properties, PROP_DOC_VALUE_SCAN);
+        }
+
+        if (properties.containsKey(PROP_KEYWORD_SNIFF)) {
+            enableKeywordSniff = EsUtil.getBoolean(properties, PROP_KEYWORD_SNIFF);
+        }
+
+        if (properties.containsKey(PROP_NODES_DISCOVERY)) {
+            enableNodesDiscovery = EsUtil.getBoolean(properties, PROP_NODES_DISCOVERY);
+        }
+
+        if (properties.containsKey(PROP_SSL)) {
+            enableSsl = EsUtil.getBoolean(properties, PROP_SSL);
+            // check protocol
+            for (String seed : nodes) {
+                if (enableSsl && seed.startsWith("http://")) {
+                    throw new DdlException("if ssl_enabled is true, the https protocol must be used");
+                }
+                if (!enableSsl && seed.startsWith("https://")) {
+                    throw new DdlException("if ssl_enabled is false, the http protocol must be used");
+                }
+            }
+        }
+    }
+
+    /**
+     * Datasource can't be init when creating because the external datasource may depend on third system.
+     * So you have to make sure the client of third system is initialized before any method was called.
+     */
+    private synchronized void makeSureInitialized() {
+        if (!initialized) {
+            init();
+            initialized = true;
+        }
+    }
+
+    private void init() {
+        try {
+            validate(this.dsProperty.getProperties());
+        } catch (DdlException e) {
+            LOG.warn("validate error", e);
+        }
+        dbNameToId = Maps.newConcurrentMap();
+        idToDb = Maps.newConcurrentMap();
+        this.esRestClient = new EsRestClient(this.nodes, this.username, this.password, this.enableSsl);
+        long defaultDbId = Env.getCurrentEnv().getNextId();
+        dbNameToId.put(DEFAULT_DB, defaultDbId);
+        idToDb.put(defaultDbId, new EsExternalDatabase(this, defaultDbId, "default"));
     }
 
     @Override
     public List<String> listDatabaseNames(SessionContext ctx) {
-        return null;
+        makeSureInitialized();
+        return new ArrayList<>(dbNameToId.keySet());
     }
 
     @Override
     public List<String> listTableNames(SessionContext ctx, String dbName) {
-        return null;
+        return esRestClient.getIndexes();
+    }
+
+    @Nullable
+    @Override
+    public ExternalDatabase getDbNullable(String dbName) {
+        makeSureInitialized();
+        String realDbName = ClusterNamespace.getNameFromFullName(dbName);
+        if (!dbNameToId.containsKey(realDbName)) {
+            return null;
+        }
+        return new EsExternalDatabase(this, dbNameToId.get(realDbName), realDbName);
     }
 
     @Override
     public boolean tableExist(SessionContext ctx, String dbName, String tblName) {
-        return false;
+        return esRestClient.existIndex(this.esRestClient.getClient(), tblName);
     }
 
     @Override
     public List<Long> getDbIds() {
-        // TODO: implement it
-        return Lists.newArrayList();
+        return Lists.newArrayList(dbNameToId.values());
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsMetaStateTracker.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsMetaStateTracker.java
index f41837fcd8..0b556e46c6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsMetaStateTracker.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsMetaStateTracker.java
@@ -25,9 +25,8 @@ import java.util.List;
 /**
  * It is responsible for this class to schedule all network request sent to remote ES Cluster
  * Request sequence
- * 1. GET /
- * 2. GET {index}/_mapping
- * 3. GET {index}/_search_shards
+ * 1. GET {index}/_mapping
+ * 2. GET {index}/_search_shards
  * <p>
  * note: step 1 is not necessary
  */
@@ -37,7 +36,6 @@ public class EsMetaStateTracker {
     private SearchContext searchContext;
 
     public EsMetaStateTracker(EsRestClient client, EsTable esTable) {
-        builtinSearchPhase.add(new VersionPhase(client));
         builtinSearchPhase.add(new MappingPhase(client));
         builtinSearchPhase.add(new PartitionPhase(client));
         searchContext = new SearchContext(esTable);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java
index c156bb5bc9..77fbe3bff6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java
@@ -17,6 +17,9 @@
 
 package org.apache.doris.external.elasticsearch;
 
+import org.apache.doris.common.util.JsonUtil;
+
+import com.fasterxml.jackson.databind.node.ArrayNode;
 import okhttp3.Credentials;
 import okhttp3.OkHttpClient;
 import okhttp3.Request;
@@ -25,17 +28,15 @@ import org.apache.http.HttpHeaders;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.util.Strings;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.map.DeserializationConfig;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.SerializationConfig;
 
 import java.io.IOException;
 import java.security.SecureRandom;
 import java.security.cert.CertificateException;
 import java.security.cert.X509Certificate;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import javax.net.ssl.HostnameVerifier;
@@ -45,40 +46,41 @@ import javax.net.ssl.SSLSocketFactory;
 import javax.net.ssl.TrustManager;
 import javax.net.ssl.X509TrustManager;
 
+/**
+ * For get es metadata by http/https.
+ **/
 public class EsRestClient {
 
     private static final Logger LOG = LogManager.getLogger(EsRestClient.class);
-    private ObjectMapper mapper;
-
-    {
-        mapper = new ObjectMapper();
-        mapper.configure(DeserializationConfig.Feature.USE_ANNOTATIONS, false);
-        mapper.configure(SerializationConfig.Feature.USE_ANNOTATIONS, false);
-    }
-
-    private static OkHttpClient networkClient = new OkHttpClient.Builder()
-            .readTimeout(10, TimeUnit.SECONDS)
-            .build();
+    private static OkHttpClient networkClient = new OkHttpClient.Builder().readTimeout(10, TimeUnit.SECONDS).build();
 
     private static OkHttpClient sslNetworkClient;
-
     private Request.Builder builder;
     private String[] nodes;
     private String currentNode;
     private int currentNodeIndex = 0;
     private boolean httpSslEnable;
 
+    /**
+     * For EsTable.
+     **/
     public EsRestClient(String[] nodes, String authUser, String authPassword, boolean httpSslEnable) {
         this.nodes = nodes;
         this.builder = new Request.Builder();
         if (!Strings.isEmpty(authUser) && !Strings.isEmpty(authPassword)) {
-            this.builder.addHeader(HttpHeaders.AUTHORIZATION,
-                    Credentials.basic(authUser, authPassword));
+            this.builder.addHeader(HttpHeaders.AUTHORIZATION, Credentials.basic(authUser, authPassword));
         }
         this.currentNode = nodes[currentNodeIndex];
         this.httpSslEnable = httpSslEnable;
     }
 
+    public OkHttpClient getClient() {
+        if (httpSslEnable) {
+            return getOrCreateSslNetworkClient();
+        }
+        return networkClient;
+    }
+
     private void selectNextNode() {
         currentNodeIndex++;
         // reroute, because the previously failed node may have already been restored
@@ -88,6 +90,9 @@ public class EsRestClient {
         currentNode = nodes[currentNodeIndex];
     }
 
+    /**
+     * Get http nodes.
+     **/
     public Map<String, EsNodeInfo> getHttpNodes() throws DorisEsException {
         Map<String, Map<String, Object>> nodesData = get("_nodes/http", "nodes");
         if (nodesData == null) {
@@ -104,26 +109,7 @@ public class EsRestClient {
     }
 
     /**
-     * Get remote ES Cluster version
-     *
-     * @return
-     * @throws Exception
-     */
-    public EsMajorVersion version() throws DorisEsException {
-        Map<String, Object> result = get("/", null);
-        if (result == null) {
-            throw new DorisEsException("Unable to retrieve ES main cluster info.");
-        }
-        Map<String, String> versionBody = (Map<String, String>) result.get("version");
-        return EsMajorVersion.parse(versionBody.get("number"));
-    }
-
-    /**
-     * Get mapping for indexName
-     *
-     * @param indexName
-     * @return
-     * @throws Exception
+     * Get mapping for indexName.
      */
     public String getMapping(String indexName) throws DorisEsException {
         String path = indexName + "/_mapping";
@@ -134,14 +120,48 @@ public class EsRestClient {
         return indexMapping;
     }
 
+    /**
+     * Check whether index exist.
+     **/
+    public boolean existIndex(OkHttpClient httpClient, String indexName) {
+        String path = indexName + "/_mapping";
+        Response response;
+        try {
+            response = executeResponse(httpClient, path);
+            if (response.isSuccessful()) {
+                return true;
+            }
+        } catch (IOException e) {
+            LOG.warn("existIndex error", e);
+            return false;
+        }
+        return false;
+    }
 
     /**
-     * Get Shard location
-     *
-     * @param indexName
-     * @return
-     * @throws DorisEsException
-     */
+     * Get all index.
+     **/
+    public List<String> getIndexes() {
+        String indexes = execute("_cat/indices?h=index&format=json&s=index:asc");
+        if (indexes == null) {
+            throw new DorisEsException("get es indexes error");
+        }
+        List<String> ret = new ArrayList<>();
+        ArrayNode jsonNodes = JsonUtil.parseArray(indexes);
+        jsonNodes.forEach(json -> {
+            // es 7.17 has .geoip_databases, but _mapping response 400.
+            String index = json.get("index").asText();
+            if (!index.startsWith(".")) {
+                ret.add(index);
+            }
+        });
+        return ret;
+    }
+
+
+    /**
+     * Get Shard location.
+     **/
     public EsShardPartitions searchShards(String indexName) throws DorisEsException {
         String path = indexName + "/_search_shards";
         String searchShards = execute(path);
@@ -156,15 +176,25 @@ public class EsRestClient {
      **/
     private synchronized OkHttpClient getOrCreateSslNetworkClient() {
         if (sslNetworkClient == null) {
-            sslNetworkClient = new OkHttpClient.Builder()
-                    .readTimeout(10, TimeUnit.SECONDS)
+            sslNetworkClient = new OkHttpClient.Builder().readTimeout(10, TimeUnit.SECONDS)
                     .sslSocketFactory(createSSLSocketFactory(), new TrustAllCerts())
-                    .hostnameVerifier(new TrustAllHostnameVerifier())
-                    .build();
+                    .hostnameVerifier(new TrustAllHostnameVerifier()).build();
         }
         return sslNetworkClient;
     }
 
+    private Response executeResponse(OkHttpClient httpClient, String path) throws IOException {
+        currentNode = currentNode.trim();
+        if (!(currentNode.startsWith("http://") || currentNode.startsWith("https://"))) {
+            currentNode = "http://" + currentNode;
+        }
+        Request request = builder.get().url(currentNode + "/" + path).build();
+        if (LOG.isInfoEnabled()) {
+            LOG.info("es rest client request URL: {}", currentNode + "/" + path);
+        }
+        return httpClient.newCall(request).execute();
+    }
+
     /**
      * execute request for specific path,it will try again nodes.length times if it fails
      *
@@ -187,29 +217,16 @@ public class EsRestClient {
             // User may set a config like described below:
             // hosts: "http://192.168.0.1:8200, http://192.168.0.2:8200"
             // then currentNode will be "http://192.168.0.1:8200", " http://192.168.0.2:8200"
-            currentNode = currentNode.trim();
-            if (!(currentNode.startsWith("http://") || currentNode.startsWith("https://"))) {
-                currentNode = "http://" + currentNode;
-            }
-            Request request = builder.get()
-                    .url(currentNode + "/" + path)
-                    .build();
-            Response response = null;
             if (LOG.isTraceEnabled()) {
                 LOG.trace("es rest client request URL: {}", currentNode + "/" + path);
             }
-            try {
-                response = httpClient.newCall(request).execute();
+            try (Response response = executeResponse(httpClient, path)) {
                 if (response.isSuccessful()) {
                     return response.body().string();
                 }
             } catch (IOException e) {
                 LOG.warn("request node [{}] [{}] failures {}, try next nodes", currentNode, path, e);
                 scratchExceptionForThrow = new DorisEsException(e.getMessage());
-            } finally {
-                if (response != null) {
-                    response.close();
-                }
             }
             selectNextNode();
         }
@@ -226,10 +243,9 @@ public class EsRestClient {
 
     @SuppressWarnings("unchecked")
     private <T> T parseContent(String response, String key) {
-        Map<String, Object> map = Collections.emptyMap();
+        Map<String, Object> map;
         try {
-            JsonParser jsonParser = mapper.getJsonFactory().createJsonParser(response);
-            map = mapper.readValue(jsonParser, Map.class);
+            map = JsonUtil.readValue(response, Map.class);
         } catch (IOException ex) {
             LOG.error("parse es response failure: [{}]", response);
             throw new DorisEsException(ex.getMessage());
@@ -262,7 +278,7 @@ public class EsRestClient {
         SSLSocketFactory ssfFactory;
         try {
             SSLContext sc = SSLContext.getInstance("TLS");
-            sc.init(null, new TrustManager[]{new TrustAllCerts()}, new SecureRandom());
+            sc.init(null, new TrustManager[] {new TrustAllCerts()}, new SecureRandom());
             ssfFactory = sc.getSocketFactory();
         } catch (Exception e) {
             throw new DorisEsException("Errors happens when create ssl socket");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java
index 2b098a669a..3537c1ad3a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java
@@ -45,12 +45,16 @@ import org.apache.doris.external.elasticsearch.QueryBuilders.QueryBuilder;
 import org.apache.doris.thrift.TExprOpcode;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.json.simple.JSONObject;
 import org.json.simple.JSONValue;
 
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -58,6 +62,8 @@ import java.util.stream.Collectors;
  **/
 public class EsUtil {
 
+    private static final Logger LOG = LogManager.getLogger(EsUtil.class);
+
     /**
      * Analyze partition and distributionDesc.
      **/
@@ -143,6 +149,18 @@ public class EsUtil {
         // Elasticsearch 8.x, include_type_name parameter is removed
         if (rootSchema == null) {
             properties = (JSONObject) mappings.get("properties");
+            // Compatible es6 with no type passed in.
+            if (mappingType == null) {
+                String typeKey = (String) mappings.keySet().iterator().next();
+                JSONObject typeProps = (JSONObject) ((JSONObject) mappings.get(typeKey)).get("properties");
+                if (typeProps != null) {
+                    properties = typeProps;
+                    if (properties.containsKey("mappings")) {
+                        properties.remove("mappings");
+                        properties.remove("settings");
+                    }
+                }
+            }
         } else {
             properties = (JSONObject) rootSchema.get("properties");
         }
@@ -349,6 +367,33 @@ public class EsUtil {
         return null;
     }
 
+
+    /**
+     * Generate columns from ES Cluster.
+     **/
+    public static List<Column> genColumnsFromEs(EsRestClient client, String indexName, String mappingType) {
+        String mapping = client.getMapping(indexName);
+        JSONObject mappingProps = EsUtil.getMappingProps(indexName, mapping, mappingType);
+        Set<String> keys = (Set<String>) mappingProps.keySet();
+        List<Column> columns = new ArrayList<>();
+        for (String key : keys) {
+            JSONObject field = (JSONObject) mappingProps.get(key);
+            // Complex types are not currently supported.
+            if (field.containsKey("type")) {
+                Type type = toDorisType(field.get("type").toString());
+                if (!type.isInvalid()) {
+                    Column column = new Column();
+                    column.setName(key);
+                    column.setType(type);
+                    column.setIsKey(true);
+                    column.setIsAllowNull(true);
+                    columns.add(column);
+                }
+            }
+        }
+        return columns;
+    }
+
     /**
      * Transfer es type to doris type.
      **/
@@ -429,8 +474,7 @@ public class EsUtil {
             if (StringUtils.isNotBlank(type)) {
                 initScrollUrl.append("/").append(type);
             }
-            initScrollUrl.append("/_search?").append(filterPath).append("&terminate_after=")
-                    .append(batchSize);
+            initScrollUrl.append("/_search?").append(filterPath).append("&terminate_after=").append(batchSize);
             nextScrollUrl.append("/_search/scroll?").append(filterPath);
             return new EsUrls(null, initScrollUrl.toString(), nextScrollUrl.toString());
         } else {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/VersionPhase.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/VersionPhase.java
deleted file mode 100644
index 8d4e91186e..0000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/VersionPhase.java
+++ /dev/null
@@ -1,55 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.external.elasticsearch;
-
-/**
- * Request version from remote ES Cluster. If request fails, set the version with `LATEST`
- */
-public class VersionPhase implements SearchPhase {
-
-    private EsRestClient client;
-
-    private boolean isVersionSet = false;
-
-
-    public VersionPhase(EsRestClient client) {
-        this.client = client;
-    }
-
-    @Override
-    public void preProcess(SearchContext context) {
-        if (context.esTable().esVersion() != null) {
-            isVersionSet = true;
-            context.version(context.esTable().esVersion());
-        }
-    }
-
-    @Override
-    public void execute(SearchContext context) {
-        if (isVersionSet) {
-            return;
-        }
-        EsMajorVersion version;
-        try {
-            version = client.version();
-        } catch (Throwable e) {
-            version = EsMajorVersion.LATEST;
-        }
-        context.version(version);
-    }
-}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java
index b6dedd2eaf..d2ff066c22 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java
@@ -26,6 +26,7 @@ import org.apache.doris.catalog.EsTable;
 import org.apache.doris.catalog.PartitionInfo;
 import org.apache.doris.catalog.PartitionItem;
 import org.apache.doris.catalog.RangePartitionInfo;
+import org.apache.doris.catalog.external.EsExternalTable;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.external.elasticsearch.EsShardPartitions;
@@ -83,8 +84,20 @@ public class EsScanNode extends ScanNode {
     private boolean isFinalized = false;
 
     public EsScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) {
+        this(id, desc, planNodeName, false);
+    }
+
+    /**
+     * For multicatalog es.
+     **/
+    public EsScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, boolean esExternalTable) {
         super(id, desc, planNodeName, StatisticalType.ES_SCAN_NODE);
-        table = (EsTable) (desc.getTable());
+        if (esExternalTable) {
+            EsExternalTable externalTable = (EsExternalTable) (desc.getTable());
+            table = externalTable.getEsTable();
+        } else {
+            table = (EsTable) (desc.getTable());
+        }
         esTablePartitions = table.getEsTablePartitions();
     }
 
@@ -134,7 +147,7 @@ public class EsScanNode extends ScanNode {
         for (SlotDescriptor slotDescriptor : slotDescriptors) {
             selectedFields.add(slotDescriptor.getColumn().getName());
         }
-        if (selectedFields.size() > table.maxDocValueFields()) {
+        if (selectedFields.size() > table.getMaxDocValueFields()) {
             return 0;
         }
         Set<String> docValueFields = docValueContext.keySet();
@@ -159,14 +172,14 @@ public class EsScanNode extends ScanNode {
         properties.put(EsTable.HTTP_SSL_ENABLED, String.valueOf(table.isHttpSslEnabled()));
         TEsScanNode esScanNode = new TEsScanNode(desc.getId().asInt());
         esScanNode.setProperties(properties);
-        if (table.isDocValueScanEnable()) {
+        if (table.isEnableDocValueScan()) {
             esScanNode.setDocvalueContext(table.docValueContext());
             properties.put(EsTable.DOC_VALUES_MODE, String.valueOf(useDocValueScan(desc, table.docValueContext())));
         }
         properties.put(EsTable.ES_DSL, queryBuilder.toJson());
 
         // Be use it add es host_port and shardId to query.
-        EsUrls esUrls = EsUtil.genEsUrls(table.getIndexName(), table.getMappingType(), table.isDocValueScanEnable(),
+        EsUrls esUrls = EsUtil.genEsUrls(table.getIndexName(), table.getMappingType(), table.isEnableDocValueScan(),
                 ConnectContext.get().getSessionVariable().batchSize, msg.limit);
         if (esUrls.getSearchUrl() != null) {
             properties.put(EsTable.SEARCH_URL, esUrls.getSearchUrl());
@@ -174,7 +187,7 @@ public class EsScanNode extends ScanNode {
             properties.put(EsTable.INIT_SCROLL_URL, esUrls.getInitScrollUrl());
             properties.put(EsTable.NEXT_SCROLL_URL, esUrls.getNextScrollUrl());
         }
-        if (table.isKeywordSniffEnable() && table.fieldsContext().size() > 0) {
+        if (table.isEnableKeywordSniff() && table.fieldsContext().size() > 0) {
             esScanNode.setFieldsContext(table.fieldsContext());
         }
         msg.es_scan_node = esScanNode;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index dc9cfac50d..35b5defcec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -1727,6 +1727,9 @@ public class SingleNodePlanner {
             case HMS_EXTERNAL_TABLE:
                 scanNode = new ExternalFileScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "HMS_FILE_SCAN_NODE");
                 break;
+            case ES_EXTERNAL_TABLE:
+                scanNode = new EsScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "EsScanNode", true);
+                break;
             default:
                 break;
         }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/VersionPhaseTest.java b/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/VersionPhaseTest.java
deleted file mode 100644
index a3228820f2..0000000000
--- a/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/VersionPhaseTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.external.elasticsearch;
-
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.EsTable;
-import org.apache.doris.catalog.PrimitiveType;
-import org.apache.doris.common.ExceptionChecker;
-
-import mockit.Expectations;
-import mockit.Injectable;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class VersionPhaseTest extends EsTestCase {
-
-    @Test
-    public void testWorkFlow(@Injectable EsRestClient client) throws Exception {
-        List<Column> columns = new ArrayList<>();
-        Column k1 = new Column("k1", PrimitiveType.BIGINT);
-        columns.add(k1);
-        EsTable esTableBefore7X = fakeEsTable("fake", "test", "doc", columns);
-        SearchContext context = new SearchContext(esTableBefore7X);
-
-        new Expectations(client) {
-            {
-                client.version();
-                minTimes = 0;
-                result = EsMajorVersion.V_6_X;
-            }
-        };
-        VersionPhase versionPhase = new VersionPhase(client);
-        ExceptionChecker.expectThrowsNoException(() -> versionPhase.preProcess(context));
-        ExceptionChecker.expectThrowsNoException(() -> versionPhase.execute(context));
-        Assert.assertTrue(context.version().on(EsMajorVersion.V_6_X));
-    }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org