You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/07/07 01:04:18 UTC

[incubator-doris] branch master updated: [Doris On ES][Refactor] refactor and enchanment ES sync meta logic (#4012)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ba38e3  [Doris On ES][Refactor] refactor and enchanment ES sync meta logic (#4012)
3ba38e3 is described below

commit 3ba38e3381d2af245f1ef6f806bec050dbe4e233
Author: Yunfeng,Wu <wu...@baidu.com>
AuthorDate: Tue Jul 7 09:04:05 2020 +0800

    [Doris On ES][Refactor] refactor and enchanment ES sync meta logic (#4012)
    
    After PR #3454 was merged, we should refactor and reorganize some logic for long-term sustainable iteration for Doris On ES.
    To facilitate code review,I would divided into this work to multiple PRs (some other WIP work I also need to think carefully)
    
    This PR include:
    
    1. introduce SearchContext for all state we needed
    2. divide meta-sync logic into three phase
    3. modify some logic processing
    4. introduce version detect logic for future using
---
 .../java/org/apache/doris/catalog/EsTable.java     |  74 +++-------
 .../external/elasticsearch/DorisEsException.java   |   5 +-
 .../external/elasticsearch/EsMajorVersion.java     |  27 +++-
 .../external/elasticsearch/EsMetaStateTracker.java |  57 ++++++++
 .../doris/external/elasticsearch/EsNodeInfo.java   |   2 +-
 .../doris/external/elasticsearch/EsRepository.java |   9 +-
 .../doris/external/elasticsearch/EsRestClient.java |  81 ++++++++---
 .../external/elasticsearch/EsShardPartitions.java  |   7 +-
 .../{EsFieldInfos.java => MappingPhase.java}       | 161 +++++++++------------
 .../external/elasticsearch/PartitionPhase.java     |  51 +++++++
 .../external/elasticsearch/SearchContext.java      | 145 +++++++++++++++++++
 .../{DorisEsException.java => SearchPhase.java}    |  23 ++-
 .../doris/external/elasticsearch/VersionPhase.java |  55 +++++++
 .../external/elasticsearch/EsRepositoryTest.java   | 128 ----------------
 .../doris/external/elasticsearch/EsUtilTest.java   |  40 ++---
 15 files changed, 526 insertions(+), 339 deletions(-)

diff --git a/fe/src/main/java/org/apache/doris/catalog/EsTable.java b/fe/src/main/java/org/apache/doris/catalog/EsTable.java
index 2a26d51..c885fb5 100644
--- a/fe/src/main/java/org/apache/doris/catalog/EsTable.java
+++ b/fe/src/main/java/org/apache/doris/catalog/EsTable.java
@@ -20,21 +20,19 @@ package org.apache.doris.catalog;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.io.Text;
-import org.apache.doris.external.elasticsearch.EsFieldInfos;
 import org.apache.doris.external.elasticsearch.EsMajorVersion;
-import org.apache.doris.external.elasticsearch.EsNodeInfo;
+import org.apache.doris.external.elasticsearch.EsMetaStateTracker;
 import org.apache.doris.external.elasticsearch.EsRestClient;
-import org.apache.doris.external.elasticsearch.EsShardPartitions;
 import org.apache.doris.external.elasticsearch.EsTablePartitions;
 import org.apache.doris.thrift.TEsTable;
 import org.apache.doris.thrift.TTableDescriptor;
 import org.apache.doris.thrift.TTableType;
 
+import com.google.common.base.Strings;
+
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import com.google.common.base.Strings;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -83,27 +81,6 @@ public class EsTable extends Table {
 
     private Map<String, String> tableContext = new HashMap<>();
 
-    // used to indicate which fields can get from ES docavalue
-    // because elasticsearch can have "fields" feature, field can have
-    // two or more types, the first type maybe have not docvalue but other
-    // can have, such as (text field not have docvalue, but keyword can have):
-    // "properties": {
-    //      "city": {
-    //        "type": "text",
-    //        "fields": {
-    //          "raw": {
-    //            "type":  "keyword"
-    //          }
-    //        }
-    //      }
-    //    }
-    // then the docvalue context provided the mapping between the select field and real request field :
-    // {"city": "city.raw"}
-    // use select city from table, if enable the docvalue, we will fetch the `city` field value from `city.raw`
-    private Map<String, String> docValueContext = new HashMap<>();
-
-    private Map<String, String> fieldsContext = new HashMap<>();
-
     // record the latest and recently exception when sync ES table metadata (mapping, shard location)
     private Throwable lastMetaDataSyncException = null;
 
@@ -118,21 +95,13 @@ public class EsTable extends Table {
         validate(properties);
     }
 
-    public void addFieldInfos(EsFieldInfos esFieldInfos) {
-        if (enableKeywordSniff && esFieldInfos.getFieldsContext() != null) {
-            fieldsContext = esFieldInfos.getFieldsContext();
-        }
-        if (enableDocValueScan && esFieldInfos.getDocValueContext() != null) {
-            docValueContext = esFieldInfos.getDocValueContext();
-        }
-    }
 
     public Map<String, String> fieldsContext() {
-        return fieldsContext;
+        return esMetaStateTracker.searchContext().fetchFieldsContext();
     }
 
     public Map<String, String> docValueContext() {
-        return docValueContext;
+        return esMetaStateTracker.searchContext().docValueFieldsContext();
     }
 
     public boolean isDocValueScanEnable() {
@@ -179,9 +148,12 @@ public class EsTable extends Table {
         if (properties.containsKey(VERSION)) {
             try {
                 majorVersion = EsMajorVersion.parse(properties.get(VERSION).trim());
+                if (majorVersion.before(EsMajorVersion.V_5_X)) {
+                    throw new DdlException("Unsupported/Unknown ES Cluster version [" + properties.get(VERSION) + "] ");
+                }
             } catch (Exception e) {
                 throw new DdlException("fail to parse ES major version, version= "
-                        + properties.get(VERSION).trim() + ", shoud be like '6.5.3' ");
+                        + properties.get(VERSION).trim() + ", should be like '6.5.3' ");
             }
         }
 
@@ -399,6 +371,10 @@ public class EsTable extends Table {
         this.esTablePartitions = esTablePartitions;
     }
 
+    public EsMajorVersion esVersion() {
+        return majorVersion;
+    }
+
     public Throwable getLastMetaDataSyncException() {
         return lastMetaDataSyncException;
     }
@@ -407,24 +383,20 @@ public class EsTable extends Table {
         this.lastMetaDataSyncException = lastMetaDataSyncException;
     }
 
+    private EsMetaStateTracker esMetaStateTracker;
+
     /**
-     * sync es index meta from remote
+     * sync es index meta from remote ES Cluster
+     *
      * @param client esRestClient
      */
-    public void syncESIndexMeta(EsRestClient client) {
+    public void syncTableMetaData(EsRestClient client) {
+        if (esMetaStateTracker == null) {
+            esMetaStateTracker = new EsMetaStateTracker(client, this);
+        }
         try {
-            EsFieldInfos fieldInfos = client.getFieldInfos(this.indexName, this.mappingType, this.fullSchema);
-            EsShardPartitions esShardPartitions = client.getShardPartitions(this.indexName);
-            Map<String, EsNodeInfo> nodesInfo = client.getHttpNodes();
-            if (this.enableKeywordSniff || this.enableDocValueScan) {
-                addFieldInfos(fieldInfos);
-            }
-
-            this.esTablePartitions = EsTablePartitions.fromShardPartitions(this, esShardPartitions);
-
-            if (EsTable.TRANSPORT_HTTP.equals(getTransport())) {
-                this.esTablePartitions.addHttpAddress(nodesInfo);
-            }
+            esMetaStateTracker.run();
+            this.esTablePartitions = esMetaStateTracker.searchContext().tablePartitions();
         } catch (Throwable e) {
             LOG.warn("Exception happens when fetch index [{}] meta data from remote es cluster", this.name, e);
             this.esTablePartitions = null;
diff --git a/fe/src/main/java/org/apache/doris/external/elasticsearch/DorisEsException.java b/fe/src/main/java/org/apache/doris/external/elasticsearch/DorisEsException.java
index c1ea1f4..dd7964d 100644
--- a/fe/src/main/java/org/apache/doris/external/elasticsearch/DorisEsException.java
+++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/DorisEsException.java
@@ -17,11 +17,8 @@
 
 package org.apache.doris.external.elasticsearch;
 
-import org.apache.doris.common.UserException;
 
-public class DorisEsException extends UserException {
-
-    private static final long serialVersionUID = 7912833584319374692L;
+public class DorisEsException extends RuntimeException {
 
     public DorisEsException(String msg) {
         super(msg);
diff --git a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsMajorVersion.java b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsMajorVersion.java
index 2319f58..d16fbd8 100644
--- a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsMajorVersion.java
+++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsMajorVersion.java
@@ -20,14 +20,18 @@ package org.apache.doris.external.elasticsearch;
 
 /**
  * Elasticsearch major version information, useful to check client's query compatibility with the Rest API.
- *
+ * <p>
  * reference es-hadoop:
- *
  */
 public class EsMajorVersion {
+
+    public static final EsMajorVersion V_0_X = new EsMajorVersion((byte) 0, "0.x");
+    public static final EsMajorVersion V_1_X = new EsMajorVersion((byte) 1, "1.x");
+    public static final EsMajorVersion V_2_X = new EsMajorVersion((byte) 2, "2.x");
     public static final EsMajorVersion V_5_X = new EsMajorVersion((byte) 5, "5.x");
     public static final EsMajorVersion V_6_X = new EsMajorVersion((byte) 6, "6.x");
     public static final EsMajorVersion V_7_X = new EsMajorVersion((byte) 7, "7.x");
+    public static final EsMajorVersion V_8_X = new EsMajorVersion((byte) 8, "8.x");
     public static final EsMajorVersion LATEST = V_7_X;
 
     public final byte major;
@@ -62,7 +66,16 @@ public class EsMajorVersion {
         return version.major >= major;
     }
 
-    public static EsMajorVersion parse(String version) throws Exception {
+    public static EsMajorVersion parse(String version) throws DorisEsException {
+        if (version.startsWith("0.")) {
+            return new EsMajorVersion((byte) 0, version);
+        }
+        if (version.startsWith("1.")) {
+            return new EsMajorVersion((byte) 1, version);
+        }
+        if (version.startsWith("2.")) {
+            return new EsMajorVersion((byte) 2, version);
+        }
         if (version.startsWith("5.")) {
             return new EsMajorVersion((byte) 5, version);
         }
@@ -72,8 +85,12 @@ public class EsMajorVersion {
         if (version.startsWith("7.")) {
             return new EsMajorVersion((byte) 7, version);
         }
-        throw new Exception("Unsupported/Unknown Elasticsearch version [" + version + "]." +
-                "Highest supported version is [" + LATEST.version + "]. You may need to upgrade ES-Hadoop.");
+        // used for the next released ES version
+        if (version.startsWith("8.")) {
+            return new EsMajorVersion((byte) 8, version);
+        }
+        throw new DorisEsException("Unsupported/Unknown ES Cluster version [" + version + "]." +
+                "Highest supported version is [" + LATEST.version + "].");
     }
 
     @Override
diff --git a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsMetaStateTracker.java b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsMetaStateTracker.java
new file mode 100644
index 0000000..f41837f
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsMetaStateTracker.java
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.external.elasticsearch;
+
+import org.apache.doris.catalog.EsTable;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * It is responsible for this class to schedule all network request sent to remote ES Cluster
+ * Request sequence
+ * 1. GET /
+ * 2. GET {index}/_mapping
+ * 3. GET {index}/_search_shards
+ * <p>
+ * note: step 1 is not necessary
+ */
+public class EsMetaStateTracker {
+
+    private List<SearchPhase> builtinSearchPhase = new LinkedList<>();
+    private SearchContext searchContext;
+
+    public EsMetaStateTracker(EsRestClient client, EsTable esTable) {
+        builtinSearchPhase.add(new VersionPhase(client));
+        builtinSearchPhase.add(new MappingPhase(client));
+        builtinSearchPhase.add(new PartitionPhase(client));
+        searchContext = new SearchContext(esTable);
+    }
+
+    public SearchContext searchContext() {
+        return searchContext;
+    }
+
+    public void run() throws DorisEsException {
+        for (SearchPhase searchPhase : builtinSearchPhase) {
+            searchPhase.preProcess(searchContext);
+            searchPhase.execute(searchContext);
+            searchPhase.postProcess(searchContext);
+        }
+    }
+}
diff --git a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java
index 4e11f9d..73d8daf 100644
--- a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java
+++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java
@@ -38,7 +38,7 @@ public class EsNodeInfo {
     private boolean hasThrift;
     private TNetworkAddress thriftAddress;
 
-    public EsNodeInfo(String id, Map<String, Object> map) throws Exception {
+    public EsNodeInfo(String id, Map<String, Object> map) throws DorisEsException {
         this.id = id;
         EsMajorVersion version = EsMajorVersion.parse((String) map.get("version"));
         this.name = (String) map.get("name");
diff --git a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsRepository.java b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsRepository.java
index 2cfcf4c..98e895e 100644
--- a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsRepository.java
+++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsRepository.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.external.elasticsearch;
 
+
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.EsTable;
@@ -24,7 +25,9 @@ import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.Table.TableType;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.util.MasterDaemon;
+
 import com.google.common.collect.Maps;
+
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -33,7 +36,8 @@ import java.util.Map;
 
 
 /**
- * It is used to call es api to get shard allocation state
+ * It is responsible for loading all ES external table's meta-data such as `fields`, `partitions` periodically,
+ * playing the `repo` role at Doris On ES
  */
 public class EsRepository extends MasterDaemon {
 
@@ -69,8 +73,7 @@ public class EsRepository extends MasterDaemon {
     protected void runAfterCatalogReady() {
         for (EsTable esTable : esTables.values()) {
             try {
-                EsRestClient client = esClients.get(esTable.getId());
-                esTable.syncESIndexMeta(client);
+                esTable.syncTableMetaData(esClients.get(esTable.getId()));
             } catch (Throwable e) {
                 LOG.warn("Exception happens when fetch index [{}] meta data from remote es cluster", esTable.getName(), e);
                 esTable.setEsTablePartitions(null);
diff --git a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java
index dd7f93f..f2868aa 100644
--- a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java
+++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java
@@ -17,7 +17,6 @@
 
 package org.apache.doris.external.elasticsearch;
 
-import org.apache.doris.catalog.Column;
 import org.apache.http.HttpHeaders;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -26,37 +25,38 @@ import org.codehaus.jackson.JsonParser;
 import org.codehaus.jackson.map.DeserializationConfig;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.SerializationConfig;
+
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+
 import okhttp3.Credentials;
 import okhttp3.OkHttpClient;
 import okhttp3.Request;
 import okhttp3.Response;
 
 public class EsRestClient {
-    
+
     private static final Logger LOG = LogManager.getLogger(EsRestClient.class);
     private ObjectMapper mapper;
-    
+
     {
         mapper = new ObjectMapper();
         mapper.configure(DeserializationConfig.Feature.USE_ANNOTATIONS, false);
         mapper.configure(SerializationConfig.Feature.USE_ANNOTATIONS, false);
     }
-    
+
     private static OkHttpClient networkClient = new OkHttpClient.Builder()
             .readTimeout(10, TimeUnit.SECONDS)
             .build();
-    
+
     private Request.Builder builder;
     private String[] nodes;
     private String currentNode;
     private int currentNodeIndex = 0;
-    
+
     public EsRestClient(String[] nodes, String authUser, String authPassword) {
         this.nodes = nodes;
         this.builder = new Request.Builder();
@@ -66,7 +66,7 @@ public class EsRestClient {
         }
         this.currentNode = nodes[currentNodeIndex];
     }
-    
+
     private void selectNextNode() {
         currentNodeIndex++;
         // reroute, because the previously failed node may have already been restored
@@ -75,8 +75,8 @@ public class EsRestClient {
         }
         currentNode = nodes[currentNodeIndex];
     }
-    
-    public Map<String, EsNodeInfo> getHttpNodes() throws Exception {
+
+    public Map<String, EsNodeInfo> getHttpNodes() throws DorisEsException {
         Map<String, Map<String, Object>> nodesData = get("_nodes/http", "nodes");
         if (nodesData == null) {
             return Collections.emptyMap();
@@ -90,35 +90,67 @@ public class EsRestClient {
         }
         return nodesMap;
     }
-    
-    public EsFieldInfos getFieldInfos(String indexName, String docType, List<Column> colList) throws Exception {
+
+    /**
+     * Get remote ES Cluster version
+     *
+     * @return
+     * @throws Exception
+     */
+    public EsMajorVersion version() throws DorisEsException {
+        Map<String, Object> result = get("/", null);
+        if (result == null) {
+            throw new DorisEsException("Unable to retrieve ES main cluster info.");
+        }
+        Map<String, String> versionBody = (Map<String, String>) result.get("version");
+        return EsMajorVersion.parse(versionBody.get("number"));
+    }
+
+    /**
+     * Get mapping for indexName
+     *
+     * @param indexName
+     * @return
+     * @throws Exception
+     */
+    public String getMapping(String indexName, boolean includeTypeName) throws DorisEsException {
         String path = indexName + "/_mapping";
+        if (includeTypeName) {
+            path += "?include_type_name=true";
+        }
         String indexMapping = execute(path);
         if (indexMapping == null) {
-            throw new DorisEsException( "index[" + indexName + "] not found for the Elasticsearch Cluster");
+            throw new DorisEsException("index[" + indexName + "] not found");
         }
-        return EsFieldInfos.fromMapping(colList, indexName, indexMapping, docType);
+        return indexMapping;
     }
 
-    
-    public EsShardPartitions getShardPartitions(String indexName) throws Exception {
+
+    /**
+     * Get Shard location
+     *
+     * @param indexName
+     * @return
+     * @throws DorisEsException
+     */
+    public EsShardPartitions searchShards(String indexName) throws DorisEsException {
         String path = indexName + "/_search_shards";
         String searchShards = execute(path);
         if (searchShards == null) {
-            throw new DorisEsException( "index[" + indexName + "] search_shards not found for the Elasticsearch Cluster");
+            throw new DorisEsException("request index [" + indexName + "] search_shards failure");
         }
         return EsShardPartitions.findShardPartitions(indexName, searchShards);
     }
-    
+
     /**
      * execute request for specific path,it will try again nodes.length times if it fails
      *
      * @param path the path must not leading with '/'
      * @return response
      */
-    private String execute(String path) throws Exception {
+    private String execute(String path) throws DorisEsException {
         int retrySize = nodes.length;
-        Exception scratchExceptionForThrow = null;
+        DorisEsException scratchExceptionForThrow = null;
         for (int i = 0; i < retrySize; i++) {
             // maybe should add HTTP schema to the address
             // actually, at this time we can only process http protocol
@@ -144,7 +176,7 @@ public class EsRestClient {
                 }
             } catch (IOException e) {
                 LOG.warn("request node [{}] [{}] failures {}, try next nodes", currentNode, path, e);
-                scratchExceptionForThrow = e;
+                scratchExceptionForThrow = new DorisEsException(e.getMessage());
             } finally {
                 if (response != null) {
                     response.close();
@@ -158,11 +190,11 @@ public class EsRestClient {
         }
         return null;
     }
-    
-    public <T> T get(String q, String key) throws Exception {
+
+    public <T> T get(String q, String key) throws DorisEsException {
         return parseContent(execute(q), key);
     }
-    
+
     @SuppressWarnings("unchecked")
     private <T> T parseContent(String response, String key) {
         Map<String, Object> map = Collections.emptyMap();
@@ -171,6 +203,7 @@ public class EsRestClient {
             map = mapper.readValue(jsonParser, Map.class);
         } catch (IOException ex) {
             LOG.error("parse es response failure: [{}]", response);
+            throw new DorisEsException(ex.getMessage());
         }
         return (T) (key != null ? map.get(key) : map);
     }
diff --git a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardPartitions.java b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardPartitions.java
index 5caa6c0..41687cf 100644
--- a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardPartitions.java
+++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardPartitions.java
@@ -59,7 +59,8 @@ public class EsShardPartitions {
      * @return shardRoutings is used for searching
      */
     public static EsShardPartitions findShardPartitions(String indexName, String searchShards) throws DorisEsException {
-        EsShardPartitions indexState = new EsShardPartitions(indexName);
+
+        EsShardPartitions partitions = new EsShardPartitions(indexName);
         JSONObject jsonObject = new JSONObject(searchShards);
         JSONArray shards = jsonObject.getJSONArray("shards");
         int length = shards.length();
@@ -87,9 +88,9 @@ public class EsShardPartitions {
             if (singleShardRouting.isEmpty()) {
                 LOG.warn("could not find a healthy allocation for [{}][{}]", indexName, i);
             }
-            indexState.addShardRouting(i, singleShardRouting);
+            partitions.addShardRouting(i, singleShardRouting);
         }
-        return indexState;
+        return partitions;
     }
 
     public void addHttpAddress(Map<String, EsNodeInfo> nodesInfo) {
diff --git a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsFieldInfos.java b/fe/src/main/java/org/apache/doris/external/elasticsearch/MappingPhase.java
similarity index 51%
rename from fe/src/main/java/org/apache/doris/external/elasticsearch/EsFieldInfos.java
rename to fe/src/main/java/org/apache/doris/external/elasticsearch/MappingPhase.java
index 5edb80b..b6d16e2 100644
--- a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsFieldInfos.java
+++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/MappingPhase.java
@@ -19,122 +19,94 @@ package org.apache.doris.external.elasticsearch;
 
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.EsTable;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+
 import org.json.JSONObject;
-import java.util.HashMap;
+
 import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
 
 /**
- * It is used to hold the field information obtained from es, currently including the fields and docValue,
- * it will eventually be added to the EsTable
- **/
-public class EsFieldInfos {
-    
-    private static final Logger LOG = LogManager.getLogger(EsFieldInfos.class);
-
-    // userId => userId.keyword
-    private Map<String, String> fieldsContext;
-
-    // city => city.raw
-    private Map<String, String> docValueContext;
-    
-    public EsFieldInfos(Map<String, String> fieldsContext, Map<String, String> docValueContext) {
-        this.fieldsContext = fieldsContext;
-        this.docValueContext = docValueContext;
+ * Get index mapping from remote ES Cluster, and resolved `keyword` and `doc_values` field
+ * Later we can use it to parse all relevant indexes
+ */
+public class MappingPhase implements SearchPhase {
+
+    private EsRestClient client;
+
+    // json response for `{index}/_mapping` API
+    private String jsonMapping;
+
+    private boolean includeTypeName = false;
+
+    public MappingPhase(EsRestClient client) {
+        this.client = client;
     }
-    
-    public Map<String, String> getFieldsContext() {
-        return fieldsContext;
+
+    @Override
+    public void preProcess(SearchContext context) {
+        if (context.version() != null && context.version().onOrAfter(EsMajorVersion.V_7_X)) {
+            includeTypeName = true;
+        }
+    }
+
+    @Override
+    public void execute(SearchContext context) throws DorisEsException {
+        jsonMapping = client.getMapping(context.sourceIndex(), includeTypeName);
     }
-    
-    public Map<String, String> getDocValueContext() {
-        return docValueContext;
+
+    @Override
+    public void postProcess(SearchContext context) {
+        resolveFields(context, jsonMapping);
     }
-    
+
+
     /**
      * Parse the required field information from the json
-     * @param colList table column
-     * @param indexName indexName(alias or really name)
-     * @param indexMapping the return value of _mapping
-     * @param docType The docType used by the index
-     * @return fieldsContext and docValueContext
+     *
+     * @param searchContext the current associated column searchContext
+     * @param indexMapping  the return value of _mapping
+     * @return fetchFieldsContext and docValueFieldsContext
      * @throws Exception
      */
-    public static EsFieldInfos fromMapping(List<Column> colList, String indexName, String indexMapping, String docType) throws DorisEsException {
+    public void resolveFields(SearchContext searchContext, String indexMapping) throws DorisEsException {
         JSONObject jsonObject = new JSONObject(indexMapping);
         // the indexName use alias takes the first mapping
         Iterator<String> keys = jsonObject.keys();
         String docKey = keys.next();
         JSONObject docData = jsonObject.optJSONObject(docKey);
-        //{
-        //  "mappings": {
-        //    "doc": {
-        //      "dynamic": "strict",
-        //      "properties": {
-        //        "time": {
-        //          "type": "long"
-        //        },
-        //        "type": {
-        //          "type": "keyword"
-        //        },
-        //        "userId": {
-        //          "type": "text",
-        //          "fields": {
-        //            "keyword": {
-        //              "type": "keyword"
-        //            }
-        //          }
-        //        }
-        //      }
-        //    }
-        //  }
-        //}
         JSONObject mappings = docData.optJSONObject("mappings");
-        JSONObject rootSchema = mappings.optJSONObject(docType);
+        JSONObject rootSchema = mappings.optJSONObject(searchContext.type());
         JSONObject properties;
-        // no type in es7
+        // After (include) 7.x, type was removed from ES mapping, default type is `_doc`
+        // https://www.elastic.co/guide/en/elasticsearch/reference/7.0/removal-of-types.html
         if (rootSchema == null) {
+            if (searchContext.type().equals("_doc") == false) {
+                throw new DorisEsException("index[" + searchContext.sourceIndex() + "]'s type must be exists, "
+                        + " and after ES7.x type must be `_doc`, but found ["
+                        + searchContext.type() + "], for table ["
+                        + searchContext.esTable().getName() + "]");
+            }
             properties = mappings.optJSONObject("properties");
         } else {
             properties = rootSchema.optJSONObject("properties");
         }
         if (properties == null) {
-            throw new DorisEsException( "index[" + indexName + "] type[" + docType + "] mapping not found for the Elasticsearch Cluster");
-        }
-        return parseProperties(colList, properties);
-    }
-
-    // get fields information in properties
-    private static EsFieldInfos parseProperties(List<Column> colList, JSONObject properties) {
-        if (properties == null) {
-            return null;
+            throw new DorisEsException("index[" + searchContext.sourceIndex() + "] type[" + searchContext.type() + "] mapping not found for the ES Cluster");
         }
-        Map<String, String> fieldsMap = new HashMap<>();
-        Map<String, String> docValueMap = new HashMap<>();
-        for (Column col : colList) {
+        for (Column col : searchContext.columns()) {
             String colName = col.getName();
+            // if column exists in Doris Table but no found in ES's mapping, we choose to ignore this situation?
             if (!properties.has(colName)) {
                 continue;
             }
             JSONObject fieldObject = properties.optJSONObject(colName);
-            String keywordField = getKeywordField(fieldObject, colName);
-            if (StringUtils.isNotEmpty(keywordField)) {
-                fieldsMap.put(colName, keywordField);
-            }
-            String docValueField = getDocValueField(fieldObject, colName);
-            if (StringUtils.isNotEmpty(docValueField)) {
-                docValueMap.put(colName, docValueField);
-            }
+
+            resolveKeywordFields(searchContext, fieldObject, colName);
+            resolveDocValuesFields(searchContext, fieldObject, colName);
         }
-        return new EsFieldInfos(fieldsMap, docValueMap);
     }
 
     // get a field of keyword type in the fields
-    private static String getKeywordField(JSONObject fieldObject, String colName) {
+    private void resolveKeywordFields(SearchContext searchContext, JSONObject fieldObject, String colName) {
         String fieldType = fieldObject.optString("type");
         // string-type field used keyword type to generate predicate
         // if text field type seen, we should use the `field` keyword type?
@@ -145,15 +117,14 @@ public class EsFieldInfos {
                     JSONObject innerTypeObject = fieldsObject.optJSONObject(key);
                     // just for text type
                     if ("keyword".equals(innerTypeObject.optString("type"))) {
-                        return colName + "." + key;
+                        searchContext.fetchFieldsContext().put(colName, colName + "." + key);
                     }
                 }
             }
         }
-        return null;
     }
-    
-    private static String getDocValueField(JSONObject fieldObject, String colName) {
+
+    private void resolveDocValuesFields(SearchContext searchContext, JSONObject fieldObject, String colName) {
         String fieldType = fieldObject.optString("type");
         String docValueField = null;
         if (EsTable.DEFAULT_DOCVALUE_DISABLED_FIELDS.contains(fieldType)) {
@@ -175,16 +146,16 @@ public class EsFieldInfos {
                     }
                 }
             }
-            return docValueField;
-        }
-        // set doc_value = false manually
-        if (fieldObject.has("doc_values")) {
-            boolean docValue = fieldObject.optBoolean("doc_values");
-            if (!docValue) {
-                return docValueField;
+        } else {
+            // set doc_value = false manually
+            if (fieldObject.has("doc_values")) {
+                boolean docValue = fieldObject.optBoolean("doc_values");
+                if (!docValue) {
+                    return;
+                }
             }
+            docValueField = colName;
         }
-        docValueField = colName;
-        return docValueField;
+        searchContext.docValueFieldsContext().put(colName, docValueField);
     }
 }
diff --git a/fe/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java b/fe/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java
new file mode 100644
index 0000000..de1bb76
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.external.elasticsearch;
+
+import org.apache.doris.catalog.EsTable;
+
+import java.util.Map;
+
+/**
+ * Fetch resolved indices's search shards from remote ES Cluster
+ */
+public class PartitionPhase implements SearchPhase {
+
+    private EsRestClient client;
+    private EsShardPartitions shardPartitions;
+    private Map<String, EsNodeInfo> nodesInfo;
+
+    public PartitionPhase(EsRestClient client) {
+        this.client = client;
+    }
+
+    @Override
+    public void execute(SearchContext context) throws DorisEsException {
+        shardPartitions = client.searchShards(context.sourceIndex());
+        nodesInfo = client.getHttpNodes();
+    }
+
+
+    @Override
+    public void postProcess(SearchContext context) throws DorisEsException {
+        context.partitions(shardPartitions);
+        if (EsTable.TRANSPORT_HTTP.equals(context.esTable().getTransport())) {
+            context.partitions().addHttpAddress(nodesInfo);
+        }
+    }
+}
diff --git a/fe/src/main/java/org/apache/doris/external/elasticsearch/SearchContext.java b/fe/src/main/java/org/apache/doris/external/elasticsearch/SearchContext.java
new file mode 100644
index 0000000..06b4c7d
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/SearchContext.java
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.external.elasticsearch;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.EsTable;
+
+import com.google.common.collect.Maps;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class encapsulates the state needed to execute a query on ES table such as fields、doc_values、resolved index、
+ * search shards etc.
+ * Since then, we would add more state or runtime information to this class such as
+ * query builder、slice scroll context、aggregation info etc.
+ **/
+public class SearchContext {
+
+    private static final Logger LOG = LogManager.getLogger(SearchContext.class);
+
+    // fetch string field value from not analyzed fields : userId => userId.keyword
+    // this is activated when `enable_keyword_sniff = true`
+    private Map<String, String> fetchFieldsContext = Maps.newHashMap();
+    // used to indicate which fields can get from ES docavalue
+    // because elasticsearch can have "fields" feature, field can have
+    // two or more types, the first type maybe have not docvalue but other
+    // can have, such as (text field not have docvalue, but keyword can have):
+    // "properties": {
+    //      "city": {
+    //        "type": "text",
+    //        "fields": {
+    //          "raw": {
+    //            "type":  "keyword"
+    //          }
+    //        }
+    //      }
+    //    }
+    // then the docvalue context provided the mapping between the select field and real request field :
+    // {"city": "city.raw"}
+    // use select city from table, if enable the docvalue, we will fetch the `city` field value from `city.raw`
+    // fetch field value from doc_values, this is activated when `enable_docvalue_scan= true`
+    private Map<String, String> docValueFieldsContext = Maps.newHashMap();
+
+    // sourceIndex is the name of index when creating ES external table
+    private final String sourceIndex;
+
+    // when the `sourceIndex` is `alias` or `wildcard` matched index, this maybe involved two or more indices
+    // `resolvedIndices` would return the matched underlying indices
+    private List<String> resolvedIndices = Collections.emptyList();
+
+    // `type` of the `sourceIndex`
+    private final String type;
+
+
+    private EsTable table;
+
+    // all columns which user created for ES external table
+    private final List<Column> fullSchema;
+
+    // represent `resolvedIndices`'s searchable shards
+    private EsShardPartitions shardPartitions;
+
+    // the ES cluster version
+    private EsMajorVersion version;
+
+
+    public SearchContext(EsTable table) {
+        this.table = table;
+        fullSchema = table.getFullSchema();
+        sourceIndex = table.getIndexName();
+        type = table.getMappingType();
+    }
+
+
+    public String sourceIndex() {
+        return sourceIndex;
+    }
+
+    public List<String> resolvedIndices() {
+        return resolvedIndices;
+    }
+
+
+    public String type() {
+        return type;
+    }
+
+    public List<Column> columns() {
+        return fullSchema;
+    }
+
+    public EsTable esTable() {
+        return table;
+    }
+
+    public Map<String, String> fetchFieldsContext() {
+        return fetchFieldsContext;
+    }
+
+    public Map<String, String> docValueFieldsContext() {
+        return docValueFieldsContext;
+    }
+
+    public void version(EsMajorVersion version) {
+        this.version = version;
+    }
+
+    public EsMajorVersion version() {
+        return version;
+    }
+
+    public void partitions(EsShardPartitions shardPartitions) {
+        this.shardPartitions = shardPartitions;
+    }
+
+    public EsShardPartitions partitions() {
+        return shardPartitions;
+    }
+
+    // this will be refactor soon
+    public EsTablePartitions tablePartitions() throws Exception {
+        return EsTablePartitions.fromShardPartitions(table, shardPartitions);
+    }
+}
diff --git a/fe/src/main/java/org/apache/doris/external/elasticsearch/DorisEsException.java b/fe/src/main/java/org/apache/doris/external/elasticsearch/SearchPhase.java
similarity index 61%
copy from fe/src/main/java/org/apache/doris/external/elasticsearch/DorisEsException.java
copy to fe/src/main/java/org/apache/doris/external/elasticsearch/SearchPhase.java
index c1ea1f4..928524d 100644
--- a/fe/src/main/java/org/apache/doris/external/elasticsearch/DorisEsException.java
+++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/SearchPhase.java
@@ -17,13 +17,26 @@
 
 package org.apache.doris.external.elasticsearch;
 
-import org.apache.doris.common.UserException;
 
-public class DorisEsException extends UserException {
+/**
+ * Represents a phase of a ES fetch index metadata request e.g. get mapping, get shard location etc through network
+ */
+public interface SearchPhase {
 
-    private static final long serialVersionUID = 7912833584319374692L;
+    /**
+     * Performs pre processing of the search context before the execute.
+     */
+    default void preProcess(SearchContext context) {
+    }
+
+    /**
+     * Executes the search phase
+     */
+    void execute(SearchContext context);
 
-    public DorisEsException(String msg) {
-        super(msg);
+    /**
+     * Performs post processing of the search context before the execute.
+     */
+    default void postProcess(SearchContext context) {
     }
 }
diff --git a/fe/src/main/java/org/apache/doris/external/elasticsearch/VersionPhase.java b/fe/src/main/java/org/apache/doris/external/elasticsearch/VersionPhase.java
new file mode 100644
index 0000000..8d4e911
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/VersionPhase.java
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.external.elasticsearch;
+
+/**
+ * Request version from remote ES Cluster. If request fails, set the version with `LATEST`
+ */
+public class VersionPhase implements SearchPhase {
+
+    private EsRestClient client;
+
+    private boolean isVersionSet = false;
+
+
+    public VersionPhase(EsRestClient client) {
+        this.client = client;
+    }
+
+    @Override
+    public void preProcess(SearchContext context) {
+        if (context.esTable().esVersion() != null) {
+            isVersionSet = true;
+            context.version(context.esTable().esVersion());
+        }
+    }
+
+    @Override
+    public void execute(SearchContext context) {
+        if (isVersionSet) {
+            return;
+        }
+        EsMajorVersion version;
+        try {
+            version = client.version();
+        } catch (Throwable e) {
+            version = EsMajorVersion.LATEST;
+        }
+        context.version(version);
+    }
+}
diff --git a/fe/src/test/java/org/apache/doris/external/elasticsearch/EsRepositoryTest.java b/fe/src/test/java/org/apache/doris/external/elasticsearch/EsRepositoryTest.java
deleted file mode 100644
index c367ec0..0000000
--- a/fe/src/test/java/org/apache/doris/external/elasticsearch/EsRepositoryTest.java
+++ /dev/null
@@ -1,128 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.external.elasticsearch;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-import org.apache.doris.catalog.Catalog;
-import org.apache.doris.catalog.CatalogTestUtil;
-import org.apache.doris.catalog.EsTable;
-import org.apache.doris.catalog.FakeCatalog;
-import org.apache.doris.catalog.FakeEditLog;
-import org.apache.doris.common.DdlException;
-import org.apache.doris.common.FeMetaVersion;
-import org.apache.doris.meta.MetaContext;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.lang.reflect.InvocationTargetException;
-import java.net.URISyntaxException;
-
-public class EsRepositoryTest {
-    
-    private static FakeEditLog fakeEditLog;
-    private static FakeCatalog fakeCatalog;
-    private static Catalog masterCatalog;
-    private static String mappingsStr = "";
-    private static String es7MappingsStr = "";
-    private static String searchShardsStr = "";
-    private EsRepository esRepository;
-    private EsRestClient fakeClient;
-    
-    @BeforeClass
-    public static void init() throws IOException, InstantiationException, IllegalAccessException,
-            IllegalArgumentException, InvocationTargetException, NoSuchMethodException, SecurityException,
-            URISyntaxException {
-        fakeEditLog = new FakeEditLog();
-        fakeCatalog = new FakeCatalog();
-        masterCatalog = CatalogTestUtil.createTestCatalog();
-        MetaContext metaContext = new MetaContext();
-        metaContext.setMetaVersion(FeMetaVersion.VERSION_40);
-        metaContext.setThreadLocalInfo();
-        // masterCatalog.setJournalVersion(FeMetaVersion.VERSION_40);
-        FakeCatalog.setCatalog(masterCatalog);
-        mappingsStr = loadJsonFromFile("data/es/mappings.json");
-        es7MappingsStr = loadJsonFromFile("data/es/es7_mappings.json");
-        searchShardsStr = loadJsonFromFile("data/es/search_shards.json");
-    }
-    
-    @Before
-    public void setUp() {
-        esRepository = new EsRepository();
-        fakeClient = new EsRestClient(new String[]{"localhost:9200"}, null, null);
-    }
-    
-    @Test
-    public void testSetEsTableContext() throws Exception {
-        EsTable esTable = (EsTable) Catalog.getCurrentCatalog()
-                .getDb(CatalogTestUtil.testDb1)
-                .getTable(CatalogTestUtil.testEsTableId1);
-        // es5
-        EsFieldInfos fieldInfos = EsFieldInfos.fromMapping(esTable.getFullSchema(), esTable.getIndexName(), mappingsStr, esTable.getMappingType());
-        esTable.addFieldInfos(fieldInfos);
-        assertEquals("userId.keyword", esTable.fieldsContext().get("userId"));
-        assertEquals("userId.keyword", esTable.docValueContext().get("userId"));
-        // es7
-        EsFieldInfos fieldInfos7 = EsFieldInfos.fromMapping(esTable.getFullSchema(), esTable.getIndexName(), es7MappingsStr, "");
-        assertEquals("userId.keyword", fieldInfos7.getFieldsContext().get("userId"));
-        assertEquals("userId.keyword", fieldInfos7.getDocValueContext().get("userId"));
-        
-    }
-    
-    @Test(expected = DorisEsException.class)
-    public void testSetErrorType() throws Exception {
-        EsTable esTable = (EsTable) Catalog.getCurrentCatalog()
-                .getDb(CatalogTestUtil.testDb1)
-                .getTable(CatalogTestUtil.testEsTableId1);
-        // error type
-        EsFieldInfos.fromMapping(esTable.getFullSchema(), esTable.getIndexName(), mappingsStr, "errorType");
-    }
-    
-    @Test
-    public void testSetTableState() throws DorisEsException, DdlException {
-        EsTable esTable = (EsTable) Catalog.getCurrentCatalog()
-                .getDb(CatalogTestUtil.testDb1)
-                .getTable(CatalogTestUtil.testEsTableId1);
-        EsShardPartitions esShardPartitions = EsShardPartitions.findShardPartitions(esTable.getIndexName(), searchShardsStr);
-        EsTablePartitions esTablePartitions = EsTablePartitions.fromShardPartitions(esTable, esShardPartitions);
-        assertNotNull(esTablePartitions);
-        assertEquals(1, esTablePartitions.getUnPartitionedIndexStates().size());
-        assertEquals(5, esTablePartitions.getEsShardPartitions("indexa").getShardRoutings().size());
-    }
-    
-    private static String loadJsonFromFile(String fileName) throws IOException, URISyntaxException {
-        File file = new File(EsRepositoryTest.class.getClassLoader().getResource(fileName).toURI());
-        InputStream is = new FileInputStream(file);
-        BufferedReader br = new BufferedReader(new InputStreamReader(is));
-        StringBuilder jsonStr = new StringBuilder();
-        String line = "";
-        while ((line = br.readLine()) != null)  {
-            jsonStr.append(line);
-        }
-        br.close();
-        is.close();
-        return jsonStr.toString();
-    }
-}
\ No newline at end of file
diff --git a/fe/src/test/java/org/apache/doris/external/elasticsearch/EsUtilTest.java b/fe/src/test/java/org/apache/doris/external/elasticsearch/EsUtilTest.java
index b611ae4..21d4909 100644
--- a/fe/src/test/java/org/apache/doris/external/elasticsearch/EsUtilTest.java
+++ b/fe/src/test/java/org/apache/doris/external/elasticsearch/EsUtilTest.java
@@ -27,38 +27,38 @@ import org.junit.Test;
 
 public class EsUtilTest {
 
-    private String jsonStr = "{\"settings\": {\n" 
-            + "               \"index\": {\n" 
-            + "                  \"bpack\": {\n" 
-            + "                     \"partition\": {\n" 
-            + "                        \"upperbound\": \"12\"\n" 
-            + "                     }\n" 
-            + "                  },\n" 
-            + "                  \"number_of_shards\": \"5\",\n" 
-            + "                  \"provided_name\": \"indexa\",\n" 
-            + "                  \"creation_date\": \"1539328532060\",\n" 
-            + "                  \"number_of_replicas\": \"1\",\n" 
-            + "                  \"uuid\": \"plNNtKiiQ9-n6NpNskFzhQ\",\n" 
-            + "                  \"version\": {\n" 
-            + "                     \"created\": \"5050099\"\n" 
-            + "                  }\n" 
-            + "               }\n" 
+    private String jsonStr = "{\"settings\": {\n"
+            + "               \"index\": {\n"
+            + "                  \"bpack\": {\n"
+            + "                     \"partition\": {\n"
+            + "                        \"upperbound\": \"12\"\n"
+            + "                     }\n"
+            + "                  },\n"
+            + "                  \"number_of_shards\": \"5\",\n"
+            + "                  \"provided_name\": \"indexa\",\n"
+            + "                  \"creation_date\": \"1539328532060\",\n"
+            + "                  \"number_of_replicas\": \"1\",\n"
+            + "                  \"uuid\": \"plNNtKiiQ9-n6NpNskFzhQ\",\n"
+            + "                  \"version\": {\n"
+            + "                     \"created\": \"5050099\"\n"
+            + "                  }\n"
+            + "               }\n"
             + "            }}";
-    
+
     @Test
     public void testGetJsonObject() {
         JSONObject json = new JSONObject(jsonStr);
         JSONObject upperBoundSetting = EsUtil.getJsonObject(json, "settings.index.bpack.partition", 0);
         assertTrue(upperBoundSetting.has("upperbound"));
         assertEquals("12", upperBoundSetting.getString("upperbound"));
-        
+
         JSONObject unExistKey = EsUtil.getJsonObject(json, "set", 0);
         assertNull(unExistKey);
-        
+
         JSONObject singleKey = EsUtil.getJsonObject(json, "settings", 0);
         assertTrue(singleKey.has("index"));
     }
-    
+
     @Test(expected = JSONException.class)
     public void testGetJsonObjectWithException() {
         JSONObject json = new JSONObject(jsonStr);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org