You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2023/05/17 13:02:29 UTC

[linkis] branch dev-1.4.0 updated: Translate engineconn-plugins-elasticsearch service classes from Scala to Java (#4531)

This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.4.0
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/dev-1.4.0 by this push:
     new 3989b4dcf Translate engineconn-plugins-elasticsearch  service classes from Scala to Java  (#4531)
3989b4dcf is described below

commit 3989b4dcf2f410f6ba2b081ad0510f252741b6e1
Author: ChengJie1053 <18...@163.com>
AuthorDate: Wed May 17 21:02:23 2023 +0800

    Translate engineconn-plugins-elasticsearch  service classes from Scala to Java  (#4531)
---
 .../ElasticSearchEngineConnPlugin.java             |  89 +++++++
 ...asticSearchProcessEngineConnLaunchBuilder.java} |  17 +-
 .../conf/ElasticSearchConfiguration.java           |  63 +++++
 .../conf/ElasticSearchEngineConsoleConf.java       |  64 +++++
 .../exception/EsConvertResponseException.java}     |  13 +-
 .../exception/EsParamsIllegalException.java}       |  13 +-
 .../executor/ElasticSearchEngineConnExecutor.java  | 284 +++++++++++++++++++++
 .../executor/client/ElasticSearchExecutor.java}    |  12 +-
 .../elasticsearch/executor/client/EsClient.java    |  66 +++++
 .../executor/client/EsClientFactory.java           | 220 ++++++++++++++++
 .../executor/client/EsClientImpl.java              | 106 ++++++++
 .../executor/client/EsClientOperate.java}          |  14 +-
 .../client/impl/ElasticSearchExecutorImpl.java     | 130 ++++++++++
 .../ElasticSearchEngineConnPlugin.scala            |  74 ------
 .../conf/ElasticSearchConfiguration.scala          |  49 ----
 .../conf/ElasticSearchEngineConsoleConf.scala      |  49 ----
 .../executor/ElasticSearchEngineConnExecutor.scala | 232 -----------------
 .../executor/client/ElasticSearchExecutor.scala    |  46 ----
 .../executor/client/ElasticSearchResponse.scala    |   0
 .../elasticsearch/executor/client/EsClient.scala   | 131 ----------
 .../executor/client/EsClientFactory.scala          | 185 --------------
 .../client/impl/ElasticSearchExecutorImpl.scala    | 112 --------
 .../executor/client/impl/ResponseHandlerImpl.scala |   3 +-
 23 files changed, 1066 insertions(+), 906 deletions(-)

diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/ElasticSearchEngineConnPlugin.java b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/ElasticSearchEngineConnPlugin.java
new file mode 100644
index 000000000..17129bd3d
--- /dev/null
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/ElasticSearchEngineConnPlugin.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.elasticsearch;
+
+import org.apache.linkis.engineplugin.elasticsearch.builder.ElasticSearchProcessEngineConnLaunchBuilder;
+import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchConfiguration;
+import org.apache.linkis.engineplugin.elasticsearch.factory.ElasticSearchEngineConnFactory;
+import org.apache.linkis.manager.engineplugin.common.EngineConnPlugin;
+import org.apache.linkis.manager.engineplugin.common.creation.EngineConnFactory;
+import org.apache.linkis.manager.engineplugin.common.launch.EngineConnLaunchBuilder;
+import org.apache.linkis.manager.engineplugin.common.resource.EngineResourceFactory;
+import org.apache.linkis.manager.engineplugin.common.resource.GenericEngineResourceFactory;
+import org.apache.linkis.manager.label.entity.Label;
+import org.apache.linkis.manager.label.entity.engine.EngineType;
+import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class ElasticSearchEngineConnPlugin implements EngineConnPlugin {
+
+  private EngineResourceFactory engineResourceFactory;
+
+  private EngineConnFactory engineFactory;
+
+  private List<Label<?>> defaultLabels = new ArrayList<>();
+
+  private Object resourceLocker = new Object();
+
+  private Object engineFactoryLocker = new Object();
+
+  @Override
+  public void init(Map<String, Object> params) {
+    EngineTypeLabel typeLabel = new EngineTypeLabel();
+    typeLabel.setEngineType(EngineType.ELASTICSEARCH().toString());
+    typeLabel.setVersion(ElasticSearchConfiguration.DEFAULT_VERSION.getValue());
+    this.defaultLabels.add(typeLabel);
+  }
+
+  @Override
+  public EngineResourceFactory getEngineResourceFactory() {
+    if (engineResourceFactory == null) {
+      synchronized (resourceLocker) {
+        if (engineResourceFactory == null) {
+          engineResourceFactory = new GenericEngineResourceFactory();
+        }
+      }
+    }
+    return engineResourceFactory;
+  }
+
+  @Override
+  public EngineConnLaunchBuilder getEngineConnLaunchBuilder() {
+    return new ElasticSearchProcessEngineConnLaunchBuilder();
+  }
+
+  @Override
+  public EngineConnFactory getEngineConnFactory() {
+    if (engineFactory == null) {
+      synchronized (engineFactoryLocker) {
+        if (engineFactory == null) {
+          engineFactory = new ElasticSearchEngineConnFactory();
+        }
+      }
+    }
+    return engineFactory;
+  }
+
+  @Override
+  public List<Label<?>> getDefaultLabels() {
+    return defaultLabels;
+  }
+}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/builder/ElasticSearchProcessEngineConnLaunchBuilder.scala b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/builder/ElasticSearchProcessEngineConnLaunchBuilder.java
similarity index 69%
rename from linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/builder/ElasticSearchProcessEngineConnLaunchBuilder.scala
rename to linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/builder/ElasticSearchProcessEngineConnLaunchBuilder.java
index a35b9107b..7094184ec 100644
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/builder/ElasticSearchProcessEngineConnLaunchBuilder.scala
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/builder/ElasticSearchProcessEngineConnLaunchBuilder.java
@@ -15,16 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.engineplugin.elasticsearch.builder
+package org.apache.linkis.engineplugin.elasticsearch.builder;
 
-import org.apache.linkis.manager.engineplugin.common.launch.process.JavaProcessEngineConnLaunchBuilder
-import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel
-import org.apache.linkis.storage.utils.StorageConfiguration
+import org.apache.linkis.manager.engineplugin.common.launch.process.JavaProcessEngineConnLaunchBuilder;
+import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel;
+import org.apache.linkis.storage.utils.StorageConfiguration;
 
-class ElasticSearchProcessEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder {
+public class ElasticSearchProcessEngineConnLaunchBuilder
+    extends JavaProcessEngineConnLaunchBuilder {
 
-  override def getEngineStartUser(label: UserCreatorLabel): String = {
-    StorageConfiguration.HDFS_ROOT_USER.getValue
+  @Override
+  public String getEngineStartUser(UserCreatorLabel label) {
+    return StorageConfiguration.HDFS_ROOT_USER.getValue();
   }
-
 }
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/conf/ElasticSearchConfiguration.java b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/conf/ElasticSearchConfiguration.java
new file mode 100644
index 000000000..dfd844bc9
--- /dev/null
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/conf/ElasticSearchConfiguration.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.elasticsearch.conf;
+
+import org.apache.linkis.common.conf.ByteType;
+import org.apache.linkis.common.conf.CommonVars;
+
+public class ElasticSearchConfiguration {
+
+  // es client
+  public static final CommonVars<String> ES_CLUSTER =
+      CommonVars.apply("linkis.es.cluster", "127.0.0.1:9200");
+  public static final CommonVars<String> ES_DATASOURCE_NAME =
+      CommonVars.apply("linkis.es.datasource", "default_datasource");
+  public static final CommonVars<Boolean> ES_AUTH_CACHE =
+      CommonVars.apply("linkis.es.auth.cache", false);
+  public static final CommonVars<String> ES_USERNAME = CommonVars.apply("linkis.es.username", "");
+  public static final CommonVars<String> ES_PASSWORD = CommonVars.apply("linkis.es.password", "");
+  public static final CommonVars<Boolean> ES_SNIFFER_ENABLE =
+      CommonVars.apply("linkis.es.sniffer.enable", false);
+  public static final CommonVars<String> ES_HTTP_METHOD =
+      CommonVars.apply("linkis.es.http.method", "GET");
+  public static final CommonVars<String> ES_HTTP_ENDPOINT =
+      CommonVars.apply("linkis.es.http.endpoint", "/_search");
+  public static final CommonVars<String> ES_HTTP_SQL_ENDPOINT =
+      CommonVars.apply("linkis.es.sql.endpoint", "/_sql");
+  public static final CommonVars<String> ES_SQL_FORMAT =
+      CommonVars.apply("linkis.es.sql.format", "{\"query\": \"%s\"}");
+  public static final String ES_HTTP_HEADER_PREFIX = "linkis.es.headers.";
+
+  // entrance resource
+  public static final CommonVars<Integer> ENTRANCE_MAX_JOB_INSTANCE =
+      CommonVars.apply("linkis.es.max.job.instance", 100);
+  public static final CommonVars<Integer> ENTRANCE_PROTECTED_JOB_INSTANCE =
+      CommonVars.apply("linkis.es.protected.job.instance", 20);
+  public static final CommonVars<Integer> ENGINE_DEFAULT_LIMIT =
+      CommonVars.apply("linkis.es.default.limit", 5000);
+
+  // resultSet
+  public static final CommonVars<ByteType> ENGINE_RESULT_SET_MAX_CACHE =
+      CommonVars.apply("linkis.resultSet.cache.max", new ByteType("512k"));
+
+  public static final CommonVars<Integer> ENGINE_CONCURRENT_LIMIT =
+      CommonVars.apply("linkis.engineconn.concurrent.limit", 100);
+
+  public static final CommonVars<String> DEFAULT_VERSION =
+      CommonVars.apply("linkis.engineconn.io.version", "7.6.2");
+}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/conf/ElasticSearchEngineConsoleConf.java b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/conf/ElasticSearchEngineConsoleConf.java
new file mode 100644
index 000000000..602aac138
--- /dev/null
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/conf/ElasticSearchEngineConsoleConf.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.elasticsearch.conf;
+
+import org.apache.linkis.common.conf.Configuration;
+import org.apache.linkis.governance.common.protocol.conf.RequestQueryEngineConfig;
+import org.apache.linkis.governance.common.protocol.conf.ResponseQueryConfig;
+import org.apache.linkis.manager.label.entity.Label;
+import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel;
+import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel;
+import org.apache.linkis.protocol.CacheableProtocol;
+import org.apache.linkis.rpc.RPCMapCache;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+
+public class ElasticSearchEngineConsoleConf extends RPCMapCache<Label[], String, String> {
+
+  public ElasticSearchEngineConsoleConf() {
+    super(Configuration.CLOUD_CONSOLE_CONFIGURATION_SPRING_APPLICATION_NAME().getValue());
+  }
+
+  @Override
+  public CacheableProtocol createRequest(Label[] labels) {
+    UserCreatorLabel userCreatorLabel =
+        (UserCreatorLabel)
+            Arrays.stream(labels)
+                .filter(label -> label instanceof UserCreatorLabel)
+                .findFirst()
+                .get();
+    EngineTypeLabel engineTypeLabel =
+        (EngineTypeLabel)
+            Arrays.stream(labels)
+                .filter(label -> label instanceof EngineTypeLabel)
+                .findFirst()
+                .get();
+    return new RequestQueryEngineConfig(userCreatorLabel, engineTypeLabel, null);
+  }
+
+  @Override
+  public Map<String, String> createMap(Object obj) {
+    if (obj instanceof ResponseQueryConfig) {
+      return ((ResponseQueryConfig) obj).getKeyAndValue();
+    } else {
+      return Collections.emptyMap();
+    }
+  }
+}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/exception/EsParamsIllegalException.scala b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/exception/EsConvertResponseException.java
similarity index 70%
rename from linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/exception/EsParamsIllegalException.scala
rename to linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/exception/EsConvertResponseException.java
index f6c7d4131..cadb8326e 100644
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/exception/EsParamsIllegalException.scala
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/exception/EsConvertResponseException.java
@@ -15,10 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.engineplugin.elasticsearch.exception
+package org.apache.linkis.engineplugin.elasticsearch.exception;
 
-import org.apache.linkis.common.exception.ErrorException
-import org.apache.linkis.engineplugin.elasticsearch.errorcode.EasticsearchErrorCodeSummary.CLUSTER_IS_BLANK
+import org.apache.linkis.common.exception.ErrorException;
+import org.apache.linkis.engineplugin.elasticsearch.errorcode.EasticsearchErrorCodeSummary;
 
-case class EsParamsIllegalException(errorMsg: String)
-    extends ErrorException(CLUSTER_IS_BLANK.getErrorCode, errorMsg)
+public class EsConvertResponseException extends ErrorException {
+  public EsConvertResponseException(String errorMsg) {
+    super(EasticsearchErrorCodeSummary.RESPONSE_FAIL_IS_EMPTY.getErrorCode(), errorMsg);
+  }
+}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/exception/EsConvertResponseException.scala b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/exception/EsParamsIllegalException.java
similarity index 70%
rename from linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/exception/EsConvertResponseException.scala
rename to linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/exception/EsParamsIllegalException.java
index 998a2d81d..8aada5ae1 100644
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/exception/EsConvertResponseException.scala
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/exception/EsParamsIllegalException.java
@@ -15,10 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.engineplugin.elasticsearch.exception
+package org.apache.linkis.engineplugin.elasticsearch.exception;
 
-import org.apache.linkis.common.exception.ErrorException
-import org.apache.linkis.engineplugin.elasticsearch.errorcode.EasticsearchErrorCodeSummary.RESPONSE_FAIL_IS_EMPTY
+import org.apache.linkis.common.exception.ErrorException;
+import org.apache.linkis.engineplugin.elasticsearch.errorcode.EasticsearchErrorCodeSummary;
 
-case class EsConvertResponseException(errorMsg: String)
-    extends ErrorException(RESPONSE_FAIL_IS_EMPTY.getErrorCode, errorMsg)
+public class EsParamsIllegalException extends ErrorException {
+  public EsParamsIllegalException(String errorMsg) {
+    super(EasticsearchErrorCodeSummary.CLUSTER_IS_BLANK.getErrorCode(), errorMsg);
+  }
+}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/ElasticSearchEngineConnExecutor.java b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/ElasticSearchEngineConnExecutor.java
new file mode 100644
index 000000000..8d6b7e6ed
--- /dev/null
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/ElasticSearchEngineConnExecutor.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.elasticsearch.executor;
+
+import org.apache.linkis.common.io.MetaData;
+import org.apache.linkis.common.io.Record;
+import org.apache.linkis.common.io.resultset.ResultSetWriter;
+import org.apache.linkis.common.utils.OverloadUtils;
+import org.apache.linkis.engineconn.common.conf.EngineConnConf;
+import org.apache.linkis.engineconn.common.conf.EngineConnConstant;
+import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask;
+import org.apache.linkis.engineconn.computation.executor.execute.ConcurrentComputationExecutor;
+import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext;
+import org.apache.linkis.engineconn.core.EngineConnObject;
+import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchConfiguration;
+import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchEngineConsoleConf;
+import org.apache.linkis.engineplugin.elasticsearch.executor.client.*;
+import org.apache.linkis.engineplugin.elasticsearch.executor.client.impl.ElasticSearchExecutorImpl;
+import org.apache.linkis.governance.common.entity.ExecutionNodeStatus;
+import org.apache.linkis.manager.common.entity.resource.CommonNodeResource;
+import org.apache.linkis.manager.common.entity.resource.LoadResource;
+import org.apache.linkis.manager.common.entity.resource.NodeResource;
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils;
+import org.apache.linkis.manager.label.entity.Label;
+import org.apache.linkis.protocol.engine.JobProgressInfo;
+import org.apache.linkis.rpc.Sender;
+import org.apache.linkis.scheduler.executer.AliasOutputExecuteResponse;
+import org.apache.linkis.scheduler.executer.ErrorExecuteResponse;
+import org.apache.linkis.scheduler.executer.ExecuteResponse;
+import org.apache.linkis.storage.LineRecord;
+import org.apache.linkis.storage.resultset.ResultSetFactory;
+import org.apache.linkis.storage.resultset.table.TableMetaData;
+
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.io.IOUtils;
+
+import org.springframework.util.CollectionUtils;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticSearchEngineConnExecutor extends ConcurrentComputationExecutor {
+  private static final Logger logger =
+      LoggerFactory.getLogger(ElasticSearchEngineConnExecutor.class);
+
+  private int id;
+  private String runType;
+  private List<Label<?>> executorLabels = new ArrayList<>(2);
+
+  private Cache<String, ElasticSearchExecutor> elasticSearchExecutorCache =
+      CacheBuilder.newBuilder()
+          .expireAfterAccess(
+              (Long) EngineConnConf.ENGINE_TASK_EXPIRE_TIME().getValue(), TimeUnit.MILLISECONDS)
+          .removalListener(
+              new RemovalListener<String, ElasticSearchExecutor>() {
+                @Override
+                public void onRemoval(
+                    RemovalNotification<String, ElasticSearchExecutor> notification) {
+                  notification.getValue().close();
+                  EngineConnTask task = getTaskById(notification.getKey());
+                  if (!ExecutionNodeStatus.isCompleted(task.getStatus())) {
+                    killTask(notification.getKey());
+                  }
+                }
+              })
+          .maximumSize(EngineConnConstant.MAX_TASK_NUM())
+          .build();
+
+  public ElasticSearchEngineConnExecutor(int outputPrintLimit, int id, String runType) {
+    super(outputPrintLimit);
+    this.id = id;
+    this.runType = runType;
+  }
+
+  @Override
+  public void init() {
+    super.init();
+  }
+
+  @Override
+  public ExecuteResponse execute(EngineConnTask engineConnTask) {
+    Map<String, String> properties = buildRuntimeParams(engineConnTask);
+    logger.info("The elasticsearch properties is: {}", properties);
+
+    //    ElasticSearchExecutor elasticSearchExecutor = ElasticSearchExecutor.apply(runType,
+    // properties);
+    ElasticSearchExecutor elasticSearchExecutor =
+        new ElasticSearchExecutorImpl(runType, properties);
+    try {
+      elasticSearchExecutor.open();
+    } catch (Exception e) {
+      logger.error("Execute es code failed, reason:", e);
+      return new ErrorExecuteResponse("run es failed", e);
+    }
+    elasticSearchExecutorCache.put(engineConnTask.getTaskId(), elasticSearchExecutor);
+    return super.execute(engineConnTask);
+  }
+
+  @Override
+  public ExecuteResponse executeLine(EngineExecutionContext engineExecutorContext, String code) {
+    String taskId = engineExecutorContext.getJobId().get();
+    ElasticSearchExecutor elasticSearchExecutor = elasticSearchExecutorCache.getIfPresent(taskId);
+    ElasticSearchResponse elasticSearchResponse = elasticSearchExecutor.executeLine(code);
+
+    try {
+
+      if (elasticSearchResponse instanceof ElasticSearchTableResponse) {
+        ElasticSearchTableResponse tableResponse =
+            (ElasticSearchTableResponse) elasticSearchResponse;
+        TableMetaData metaData = new TableMetaData(tableResponse.columns());
+        ResultSetWriter<? extends MetaData, ? extends Record> resultSetWriter =
+            engineExecutorContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE);
+        resultSetWriter.addMetaData(metaData);
+        Arrays.asList(tableResponse.records())
+            .forEach(
+                record -> {
+                  try {
+                    resultSetWriter.addRecord(record);
+                  } catch (IOException e) {
+                    logger.warn("es addRecord failed", e);
+                    throw new RuntimeException("es addRecord failed", e);
+                  }
+                });
+        String output = resultSetWriter.toString();
+        IOUtils.closeQuietly(resultSetWriter);
+        return new AliasOutputExecuteResponse(null, output);
+      } else if (elasticSearchResponse instanceof ElasticSearchJsonResponse) {
+        ElasticSearchJsonResponse jsonResponse = (ElasticSearchJsonResponse) elasticSearchResponse;
+        ResultSetWriter<? extends MetaData, ? extends Record> resultSetWriter =
+            engineExecutorContext.createResultSetWriter(ResultSetFactory.TEXT_TYPE);
+        resultSetWriter.addMetaData(null);
+        Arrays.stream(jsonResponse.value().split("\\n"))
+            .forEach(
+                item -> {
+                  try {
+                    resultSetWriter.addRecord(new LineRecord(item));
+                  } catch (IOException e) {
+                    logger.warn("es addRecord failed", e);
+                    throw new RuntimeException("es addRecord failed", e);
+                  }
+                });
+        String output = resultSetWriter.toString();
+        IOUtils.closeQuietly(resultSetWriter);
+        return new AliasOutputExecuteResponse(null, output);
+      } else if (elasticSearchResponse instanceof ElasticSearchErrorResponse) {
+        ElasticSearchErrorResponse errorResponse =
+            (ElasticSearchErrorResponse) elasticSearchResponse;
+        return new ErrorExecuteResponse(errorResponse.message(), errorResponse.cause());
+      }
+    } catch (IOException e) {
+      logger.warn("es addMetaData failed", e);
+      return new ErrorExecuteResponse("es addMetaData failed", e);
+    }
+
+    return new ErrorExecuteResponse("es executeLine failed", null);
+  }
+
+  private Map<String, String> buildRuntimeParams(EngineConnTask engineConnTask) {
+    // Parameters specified at runtime
+    Map<String, String> executorProperties =
+        engineConnTask.getProperties().entrySet().stream()
+            .collect(
+                Collectors.toMap(
+                    Map.Entry::getKey, entry -> Objects.toString(entry.getValue(), null)));
+
+    // Global engine params by console
+    Map<String, String> globalConfig =
+        new ElasticSearchEngineConsoleConf().getCacheMap(engineConnTask.getLables());
+
+    if (MapUtils.isNotEmpty(executorProperties)) {
+      globalConfig.putAll(executorProperties);
+    }
+
+    return globalConfig;
+  }
+
+  @Override
+  public ExecuteResponse executeCompletely(
+      EngineExecutionContext engineExecutorContext, String code, String completedLine) {
+    return null;
+  }
+
+  @Override
+  public float progress(String taskId) {
+    return 0.0f;
+  }
+
+  @Override
+  public JobProgressInfo[] getProgressInfo(String taskId) {
+    return new JobProgressInfo[0];
+  }
+
+  @Override
+  public List<Label<?>> getExecutorLabels() {
+    return executorLabels;
+  }
+
+  @Override
+  public void setExecutorLabels(List<Label<?>> labels) {
+    if (!CollectionUtils.isEmpty(labels)) {
+      executorLabels.clear();
+      executorLabels.addAll(labels);
+    }
+  }
+
+  @Override
+  public NodeResource requestExpectedResource(NodeResource expectedResource) {
+    return null;
+  }
+
+  @Override
+  public NodeResource getCurrentNodeResource() {
+    NodeResourceUtils.appendMemoryUnitIfMissing(
+        EngineConnObject.getEngineCreationContext().getOptions());
+
+    CommonNodeResource resource = new CommonNodeResource();
+    LoadResource usedResource = new LoadResource(OverloadUtils.getProcessMaxMemory(), 1);
+    resource.setUsedResource(usedResource);
+    return resource;
+  }
+
+  @Override
+  public String getId() {
+    return Sender.getThisServiceInstance().getInstance() + "_" + id;
+  }
+
+  @Override
+  public int getConcurrentLimit() {
+    return ElasticSearchConfiguration.ENGINE_CONCURRENT_LIMIT.getValue();
+  }
+
+  @Override
+  public void killTask(String taskId) {
+
+    ElasticSearchExecutor elasticSearchExecutor = elasticSearchExecutorCache.getIfPresent(taskId);
+    if (elasticSearchExecutor != null) {
+      elasticSearchExecutor.close();
+    }
+
+    super.killTask(taskId);
+  }
+
+  @Override
+  public void killAll() {
+    elasticSearchExecutorCache.asMap().values().forEach(e -> e.close());
+  }
+
+  @Override
+  public void transformTaskStatus(EngineConnTask task, ExecutionNodeStatus newStatus) {
+    super.transformTaskStatus(task, newStatus);
+    if (ExecutionNodeStatus.isCompleted(newStatus)) {
+      elasticSearchExecutorCache.invalidate(task.getTaskId());
+    }
+  }
+
+  @Override
+  public boolean supportCallBackLogs() {
+    return false;
+  }
+}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/ElasticSearchExecutorOrder.scala b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/ElasticSearchExecutor.java
similarity index 79%
rename from linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/ElasticSearchExecutorOrder.scala
rename to linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/ElasticSearchExecutor.java
index ea49f9fe2..d15e06fd0 100644
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/ElasticSearchExecutorOrder.scala
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/ElasticSearchExecutor.java
@@ -15,11 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.engineplugin.elasticsearch.executor
+package org.apache.linkis.engineplugin.elasticsearch.executor.client;
 
-object ElasticSearchExecutorOrder extends Enumeration {
+public interface ElasticSearchExecutor {
 
-  type ElasticSearchExecutorOrder = Value
-  val SQL = Value(1)
-  val JSON = Value(3)
+  void open() throws Exception;
+
+  ElasticSearchResponse executeLine(String code);
+
+  void close();
 }
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/EsClient.java b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/EsClient.java
new file mode 100644
index 000000000..a8f30ac87
--- /dev/null
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/EsClient.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.elasticsearch.executor.client;
+
+import java.io.IOException;
+
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.sniff.Sniffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class EsClient implements EsClientOperate {
+
+  private static final Logger logger = LoggerFactory.getLogger(EsClient.class);
+
+  private String datasourceName;
+  public RestClient client;
+  private Sniffer sniffer;
+
+  public EsClient(String datasourceName, RestClient client, Sniffer sniffer) {
+    this.datasourceName = datasourceName;
+    this.client = client;
+    this.sniffer = sniffer;
+  }
+
+  public String getDatasourceName() {
+    return datasourceName;
+  }
+
+  public RestClient getRestClient() {
+    return client;
+  }
+
+  public Sniffer getSniffer() {
+    return sniffer;
+  }
+
+  @Override
+  public void close() {
+    if (sniffer != null) {
+      sniffer.close();
+    }
+    if (client != null) {
+      try {
+        client.close();
+      } catch (IOException e) {
+        logger.warn("RestClient close warn");
+      }
+    }
+  }
+}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/EsClientFactory.java b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/EsClientFactory.java
new file mode 100644
index 000000000..7fb8a0023
--- /dev/null
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/EsClientFactory.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.elasticsearch.executor.client;
+
+import org.apache.linkis.common.conf.CommonVars;
+import org.apache.linkis.common.exception.ErrorException;
+import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchConfiguration;
+import org.apache.linkis.engineplugin.elasticsearch.exception.EsParamsIllegalException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.Header;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.message.BasicHeader;
+
+import java.util.*;
+
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.sniff.Sniffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.linkis.engineplugin.elasticsearch.errorcode.EasticsearchErrorCodeSummary.CLUSTER_IS_BLANK;
+
+public class EsClientFactory {
+
+  private static final Logger logger = LoggerFactory.getLogger(EsClientFactory.class);
+
+  public static EsClient getRestClient(Map<String, String> options) {
+    String key = getDatasourceName(options);
+    if (StringUtils.isBlank(key)) {
+      return defaultClient;
+    }
+
+    if (!ES_CLIENT_MAP.containsKey(key)) {
+      synchronized (ES_CLIENT_MAP) {
+        if (!ES_CLIENT_MAP.containsKey(key)) {
+          try {
+            cacheClient(createRestClient(options));
+          } catch (ErrorException e) {
+            logger.error("es createRestClient failed, reason:", e);
+          }
+        }
+      }
+    }
+
+    return ES_CLIENT_MAP.get(key);
+  }
+
+  private static int MAX_CACHE_CLIENT_SIZE = 20;
+
+  private static Map<String, EsClient> ES_CLIENT_MAP =
+      new LinkedHashMap<String, EsClient>() {
+
+        @Override
+        protected boolean removeEldestEntry(Map.Entry<String, EsClient> eldest) {
+          if (size() > MAX_CACHE_CLIENT_SIZE) {
+            eldest.getValue().close();
+            return true;
+          } else {
+            return false;
+          }
+        }
+      };
+
+  private static String getDatasourceName(Map<String, String> options) {
+    return options.getOrDefault(ElasticSearchConfiguration.ES_DATASOURCE_NAME.key(), "");
+  }
+
+  private static void cacheClient(EsClient client) {
+    ES_CLIENT_MAP.put(client.getDatasourceName(), client);
+  }
+
+  private static EsClient createRestClient(Map<String, String> options) throws ErrorException {
+    String clusterStr = options.get(ElasticSearchConfiguration.ES_CLUSTER.key());
+
+    if (StringUtils.isBlank(clusterStr)) {
+      throw new EsParamsIllegalException(CLUSTER_IS_BLANK.getErrorDesc());
+    }
+
+    HttpHost[] cluster = getCluster(clusterStr);
+
+    if (cluster.length == 0) {
+      throw new EsParamsIllegalException(CLUSTER_IS_BLANK.getErrorDesc());
+    }
+
+    String username = options.get(ElasticSearchConfiguration.ES_USERNAME.key());
+    String password = options.get(ElasticSearchConfiguration.ES_PASSWORD.key());
+
+    if (ElasticSearchConfiguration.ES_AUTH_CACHE.getValue()) {
+      setAuthScope(cluster, username, password);
+    }
+
+    HttpHost[] httpHosts =
+        Arrays.stream(cluster)
+            .map(item -> new HttpHost(item.getHostName(), item.getPort()))
+            .toArray(HttpHost[]::new);
+    RestClientBuilder builder = RestClient.builder(httpHosts);
+
+    builder.setHttpClientConfigCallback(
+        new RestClientBuilder.HttpClientConfigCallback() {
+          @Override
+          public HttpAsyncClientBuilder customizeHttpClient(
+              HttpAsyncClientBuilder httpAsyncClientBuilder) {
+            if (!ElasticSearchConfiguration.ES_AUTH_CACHE.getValue()) {
+              httpAsyncClientBuilder.disableAuthCaching();
+            }
+            return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+          }
+        });
+
+    if (defaultHeaders != null) {
+      builder.setDefaultHeaders(defaultHeaders);
+    }
+
+    RestClient restClient = builder.build();
+    Sniffer sniffer =
+        ElasticSearchConfiguration.ES_SNIFFER_ENABLE.getValue(options)
+            ? Sniffer.builder(restClient).build()
+            : null;
+
+    return new EsClientImpl(getDatasourceName(options), restClient, sniffer);
+  }
+
+  private static CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+
+  private static EsClient defaultClient;
+
+  static {
+    String cluster = ElasticSearchConfiguration.ES_CLUSTER.key();
+    if (StringUtils.isBlank(cluster)) {
+      defaultClient = null;
+    } else {
+      Map<String, String> defaultOpts = new HashMap<>();
+      defaultOpts.put(ElasticSearchConfiguration.ES_CLUSTER.key(), cluster);
+      defaultOpts.put(
+          ElasticSearchConfiguration.ES_DATASOURCE_NAME.key(),
+          ElasticSearchConfiguration.ES_DATASOURCE_NAME.getValue());
+      defaultOpts.put(
+          ElasticSearchConfiguration.ES_USERNAME.key(),
+          ElasticSearchConfiguration.ES_USERNAME.getValue());
+      defaultOpts.put(
+          ElasticSearchConfiguration.ES_PASSWORD.key(),
+          ElasticSearchConfiguration.ES_PASSWORD.getValue());
+      EsClient client = null;
+      try {
+        client = createRestClient(defaultOpts);
+      } catch (ErrorException e) {
+        logger.error("es createRestClient failed, reason:", e);
+      }
+      cacheClient(client);
+      defaultClient = client;
+    }
+  }
+
+  private static Header[] defaultHeaders =
+      CommonVars.properties().entrySet().stream()
+          .filter(
+              entry ->
+                  entry.getKey() != null
+                      && entry.getValue() != null
+                      && entry
+                          .getKey()
+                          .toString()
+                          .startsWith(ElasticSearchConfiguration.ES_HTTP_HEADER_PREFIX))
+          .map(entry -> new BasicHeader(entry.getKey().toString(), entry.getValue().toString()))
+          .toArray(Header[]::new);
+
+  private static HttpHost[] getCluster(String clusterStr) {
+    if (StringUtils.isNotBlank(clusterStr)) {
+      return Arrays.stream(clusterStr.split(","))
+          .map(
+              value -> {
+                String[] arr = value.replace("http://", "").split(":");
+                return new HttpHost(arr[0].trim(), Integer.parseInt(arr[1].trim()));
+              })
+          .toArray(HttpHost[]::new);
+    } else {
+      return new HttpHost[0];
+    }
+  }
+
+  private static void setAuthScope(HttpHost[] cluster, String username, String password) {
+    if (cluster != null
+        && cluster.length > 0
+        && StringUtils.isNotBlank(username)
+        && StringUtils.isNotBlank(password)) {
+      Arrays.stream(cluster)
+          .forEach(
+              host ->
+                  credentialsProvider.setCredentials(
+                      new AuthScope(
+                          host.getHostName(),
+                          host.getPort(),
+                          AuthScope.ANY_REALM,
+                          AuthScope.ANY_SCHEME),
+                      new UsernamePasswordCredentials(username, password)));
+    }
+  }
+}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/EsClientImpl.java b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/EsClientImpl.java
new file mode 100644
index 000000000..568f5ca29
--- /dev/null
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/EsClientImpl.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.elasticsearch.executor.client;
+
+import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchConfiguration;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.Header;
+import org.apache.http.auth.AUTH;
+import org.apache.http.auth.Credentials;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.message.BufferedHeader;
+import org.apache.http.util.CharArrayBuffer;
+import org.apache.http.util.EncodingUtils;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+import org.elasticsearch.client.Cancellable;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.ResponseListener;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.sniff.Sniffer;
+
+public class EsClientImpl extends EsClient {
+
+  public EsClientImpl(String datasourceName, RestClient client, Sniffer sniffer) {
+    super(datasourceName, client, sniffer);
+  }
+
+  @Override
+  public Cancellable execute(
+      String code, Map<String, String> options, ResponseListener responseListener) {
+    Request request = createRequest(code, options);
+    return client.performRequestAsync(request, responseListener);
+  }
+
+  private Request createRequest(String code, Map<String, String> options) {
+    String endpoint = ElasticSearchConfiguration.ES_HTTP_ENDPOINT.getValue(options);
+    String method = ElasticSearchConfiguration.ES_HTTP_METHOD.getValue(options);
+
+    Request request = new Request(method, endpoint);
+    request.setOptions(getRequestOptions(options));
+    request.setJsonEntity(code);
+    return request;
+  }
+
+  private RequestOptions getRequestOptions(Map<String, String> options) {
+    RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
+
+    String username = ElasticSearchConfiguration.ES_USERNAME.getValue(options);
+    String password = ElasticSearchConfiguration.ES_PASSWORD.getValue(options);
+
+    // username / password convert to base auth
+    if (StringUtils.isNotBlank(username) && StringUtils.isNotBlank(password)) {
+      Header authHeader =
+          authenticate(
+              new UsernamePasswordCredentials(username, password), StandardCharsets.UTF_8.name());
+      builder.addHeader(authHeader.getName(), authHeader.getValue());
+    }
+
+    options.entrySet().stream()
+        .filter(
+            entry ->
+                entry.getKey() != null
+                    && entry.getValue() != null
+                    && entry.getKey().startsWith(ElasticSearchConfiguration.ES_HTTP_HEADER_PREFIX))
+        .forEach(entry -> builder.addHeader(entry.getKey(), entry.getValue()));
+
+    return builder.build();
+  }
+
+  private Header authenticate(Credentials credentials, String charset) {
+    StringBuilder tmp = new StringBuilder();
+    tmp.append(credentials.getUserPrincipal().getName());
+    tmp.append(":");
+    tmp.append(credentials.getPassword() == null ? "null" : credentials.getPassword());
+
+    byte[] base64password =
+        Base64.encodeBase64(EncodingUtils.getBytes(tmp.toString(), charset), false);
+
+    CharArrayBuffer buffer = new CharArrayBuffer(32);
+    buffer.append(AUTH.WWW_AUTH_RESP);
+    buffer.append(": Basic ");
+    buffer.append(base64password, 0, base64password.length);
+
+    return new BufferedHeader(buffer);
+  }
+}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/exception/EsEngineException.scala b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/EsClientOperate.java
similarity index 70%
rename from linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/exception/EsEngineException.scala
rename to linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/EsClientOperate.java
index 1632ae412..619974c17 100644
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/exception/EsEngineException.scala
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/EsClientOperate.java
@@ -15,8 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.engineplugin.elasticsearch.exception
+package org.apache.linkis.engineplugin.elasticsearch.executor.client;
 
-import org.apache.linkis.common.exception.ErrorException
+import java.util.Map;
 
-case class EsEngineException(errorMsg: String) extends ErrorException(70114, errorMsg)
+import org.elasticsearch.client.Cancellable;
+import org.elasticsearch.client.ResponseListener;
+
+public interface EsClientOperate {
+
+  Cancellable execute(String code, Map<String, String> options, ResponseListener responseListener);
+
+  void close();
+}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/impl/ElasticSearchExecutorImpl.java b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/impl/ElasticSearchExecutorImpl.java
new file mode 100644
index 000000000..ba21ff56c
--- /dev/null
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/impl/ElasticSearchExecutorImpl.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.elasticsearch.executor.client.impl;
+
+import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchConfiguration;
+import org.apache.linkis.engineplugin.elasticsearch.executor.client.*;
+import org.apache.linkis.protocol.constants.TaskConstant;
+import org.apache.linkis.storage.utils.StorageUtils;
+
+import org.apache.commons.lang.StringUtils;
+
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import org.elasticsearch.client.Cancellable;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.ResponseListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticSearchExecutorImpl implements ElasticSearchExecutor {
+
+  private static final Logger logger = LoggerFactory.getLogger(ElasticSearchExecutorImpl.class);
+
+  private EsClient client;
+  private Cancellable cancelable;
+  private String user;
+  private String runType;
+  private Map<String, String> properties;
+
+  public ElasticSearchExecutorImpl(String runType, Map<String, String> properties) {
+    this.runType = runType;
+    this.properties = properties;
+  }
+
+  @Override
+  public void open() throws Exception {
+    this.client = EsClientFactory.getRestClient(properties);
+    this.user =
+        StringUtils.defaultString(
+            properties.getOrDefault(TaskConstant.UMUSER, StorageUtils.getJvmUser()));
+    switch (runType.trim().toLowerCase(Locale.getDefault())) {
+      case "essql":
+      case "sql":
+        properties.putIfAbsent(
+            ElasticSearchConfiguration.ES_HTTP_ENDPOINT.key(),
+            ElasticSearchConfiguration.ES_HTTP_SQL_ENDPOINT.getValue(properties));
+        break;
+      default:
+        break;
+    }
+  }
+
+  @Override
+  public ElasticSearchResponse executeLine(String code) {
+    String realCode = code.trim();
+    logger.info("es client begins to run {} code:\n {}", runType, realCode.trim());
+    CountDownLatch countDown = new CountDownLatch(1);
+    ElasticSearchResponse[] executeResponse = {
+      new ElasticSearchErrorResponse("INCOMPLETE", null, null)
+    };
+    cancelable =
+        client.execute(
+            realCode,
+            properties,
+            new ResponseListener() {
+              @Override
+              public void onSuccess(Response response) {
+                executeResponse[0] = convertResponse(response);
+                countDown.countDown();
+              }
+
+              @Override
+              public void onFailure(Exception exception) {
+                executeResponse[0] =
+                    new ElasticSearchErrorResponse(
+                        "EsEngineExecutor execute fail. ", null, exception);
+                countDown.countDown();
+              }
+            });
+    try {
+      countDown.await();
+    } catch (InterruptedException e) {
+      executeResponse[0] =
+          new ElasticSearchErrorResponse("EsEngineExecutor execute interrupted. ", null, e);
+    }
+    return executeResponse[0];
+  }
+
+  // convert response to executeResponse
+  private ElasticSearchResponse convertResponse(Response response) {
+    try {
+      int statusCode = response.getStatusLine().getStatusCode();
+      if (statusCode >= 200 && statusCode < 300) {
+        return ResponseHandler$.MODULE$.handle(response);
+      } else {
+        return new ElasticSearchErrorResponse(
+            "EsEngineExecutor convert response fail. response code: " + statusCode, null, null);
+      }
+    } catch (Exception e) {
+      return new ElasticSearchErrorResponse("EsEngineExecutor convert response error.", null, e);
+    }
+  }
+
+  @Override
+  public void close() {
+    if (cancelable != null) {
+      cancelable.cancel();
+    }
+    if (client != null) {
+      client.close();
+    }
+  }
+}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/ElasticSearchEngineConnPlugin.scala b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/ElasticSearchEngineConnPlugin.scala
deleted file mode 100644
index 67e2d39ce..000000000
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/ElasticSearchEngineConnPlugin.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.engineplugin.elasticsearch
-
-import org.apache.linkis.engineplugin.elasticsearch.builder.ElasticSearchProcessEngineConnLaunchBuilder
-import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchConfiguration
-import org.apache.linkis.engineplugin.elasticsearch.factory.ElasticSearchEngineConnFactory
-import org.apache.linkis.manager.engineplugin.common.EngineConnPlugin
-import org.apache.linkis.manager.engineplugin.common.creation.EngineConnFactory
-import org.apache.linkis.manager.engineplugin.common.launch.EngineConnLaunchBuilder
-import org.apache.linkis.manager.engineplugin.common.resource.{
-  EngineResourceFactory,
-  GenericEngineResourceFactory
-}
-import org.apache.linkis.manager.label.entity.Label
-import org.apache.linkis.manager.label.entity.engine.{EngineType, EngineTypeLabel}
-
-import java.util
-
-class ElasticSearchEngineConnPlugin extends EngineConnPlugin {
-
-  private var engineResourceFactory: EngineResourceFactory = _
-
-  private var engineFactory: EngineConnFactory = _
-
-  private val defaultLabels: util.List[Label[_]] = new util.ArrayList[Label[_]]()
-
-  private val resourceLocker = new Array[Byte](0)
-
-  private val engineFactoryLocker = new Array[Byte](0)
-
-  override def init(params: util.Map[String, AnyRef]): Unit = {
-    val typeLabel = new EngineTypeLabel()
-    typeLabel.setEngineType(EngineType.ELASTICSEARCH.toString)
-    typeLabel.setVersion(ElasticSearchConfiguration.DEFAULT_VERSION.getValue)
-    this.defaultLabels.add(typeLabel)
-  }
-
-  override def getEngineResourceFactory: EngineResourceFactory = {
-    if (null == engineResourceFactory) resourceLocker.synchronized {
-      if (null == engineResourceFactory) engineResourceFactory = new GenericEngineResourceFactory
-    }
-    engineResourceFactory
-  }
-
-  override def getEngineConnLaunchBuilder: EngineConnLaunchBuilder = {
-    new ElasticSearchProcessEngineConnLaunchBuilder()
-  }
-
-  override def getEngineConnFactory: EngineConnFactory = {
-    if (null == engineFactory) engineFactoryLocker.synchronized {
-      if (null == engineFactory) engineFactory = new ElasticSearchEngineConnFactory
-    }
-    engineFactory
-  }
-
-  override def getDefaultLabels: util.List[Label[_]] = defaultLabels
-
-}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/conf/ElasticSearchConfiguration.scala b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/conf/ElasticSearchConfiguration.scala
deleted file mode 100644
index d2909a063..000000000
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/conf/ElasticSearchConfiguration.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.engineplugin.elasticsearch.conf
-
-import org.apache.linkis.common.conf.{ByteType, CommonVars}
-
-object ElasticSearchConfiguration {
-
-  // es client
-  val ES_CLUSTER = CommonVars("linkis.es.cluster", "127.0.0.1:9200")
-  val ES_DATASOURCE_NAME = CommonVars("linkis.es.datasource", "default_datasource")
-  val ES_AUTH_CACHE = CommonVars("linkis.es.auth.cache", false)
-  val ES_USERNAME = CommonVars("linkis.es.username", "")
-  val ES_PASSWORD = CommonVars("linkis.es.password", "")
-  val ES_SNIFFER_ENABLE = CommonVars("linkis.es.sniffer.enable", false)
-  val ES_HTTP_METHOD = CommonVars("linkis.es.http.method", "GET")
-  val ES_HTTP_ENDPOINT = CommonVars("linkis.es.http.endpoint", "/_search")
-  val ES_HTTP_SQL_ENDPOINT = CommonVars("linkis.es.sql.endpoint", "/_sql")
-  val ES_SQL_FORMAT = CommonVars("linkis.es.sql.format", "{\"query\": \"%s\"}")
-  val ES_HTTP_HEADER_PREFIX = "linkis.es.headers."
-
-  // entrance resource
-  val ENTRANCE_MAX_JOB_INSTANCE = CommonVars("linkis.es.max.job.instance", 100)
-  val ENTRANCE_PROTECTED_JOB_INSTANCE = CommonVars("linkis.es.protected.job.instance", 20)
-  val ENGINE_DEFAULT_LIMIT = CommonVars("linkis.es.default.limit", 5000)
-
-  // resultSet
-  val ENGINE_RESULT_SET_MAX_CACHE = CommonVars("linkis.resultSet.cache.max", new ByteType("512k"))
-
-  val ENGINE_CONCURRENT_LIMIT = CommonVars[Int]("linkis.engineconn.concurrent.limit", 100)
-
-  val DEFAULT_VERSION = CommonVars[String]("linkis.engineconn.io.version", "7.6.2")
-
-}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/conf/ElasticSearchEngineConsoleConf.scala b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/conf/ElasticSearchEngineConsoleConf.scala
deleted file mode 100644
index b197f2047..000000000
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/conf/ElasticSearchEngineConsoleConf.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.engineplugin.elasticsearch.conf
-
-import org.apache.linkis.common.conf.Configuration
-import org.apache.linkis.governance.common.protocol.conf.{
-  RequestQueryEngineConfig,
-  ResponseQueryConfig
-}
-import org.apache.linkis.manager.label.entity.Label
-import org.apache.linkis.manager.label.entity.engine.{EngineTypeLabel, UserCreatorLabel}
-import org.apache.linkis.protocol.CacheableProtocol
-import org.apache.linkis.rpc.RPCMapCache
-
-import java.util
-
-object ElasticSearchEngineConsoleConf
-    extends RPCMapCache[Array[Label[_]], String, String](
-      Configuration.CLOUD_CONSOLE_CONFIGURATION_SPRING_APPLICATION_NAME.getValue
-    ) {
-
-  override protected def createRequest(labels: Array[Label[_]]): CacheableProtocol = {
-    val userCreatorLabel =
-      labels.find(_.isInstanceOf[UserCreatorLabel]).get.asInstanceOf[UserCreatorLabel]
-    val engineTypeLabel =
-      labels.find(_.isInstanceOf[EngineTypeLabel]).get.asInstanceOf[EngineTypeLabel]
-    RequestQueryEngineConfig(userCreatorLabel, engineTypeLabel)
-  }
-
-  override protected def createMap(any: Any): util.Map[String, String] = any match {
-    case response: ResponseQueryConfig => response.getKeyAndValue
-  }
-
-}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/ElasticSearchEngineConnExecutor.scala b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/ElasticSearchEngineConnExecutor.scala
deleted file mode 100644
index 7797ac1b1..000000000
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/ElasticSearchEngineConnExecutor.scala
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.engineplugin.elasticsearch.executor
-
-import org.apache.linkis.common.utils.{Logging, OverloadUtils, Utils}
-import org.apache.linkis.engineconn.common.conf.{EngineConnConf, EngineConnConstant}
-import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask
-import org.apache.linkis.engineconn.computation.executor.execute.{
-  ConcurrentComputationExecutor,
-  EngineExecutionContext
-}
-import org.apache.linkis.engineconn.core.EngineConnObject
-import org.apache.linkis.engineplugin.elasticsearch.conf.{
-  ElasticSearchConfiguration,
-  ElasticSearchEngineConsoleConf
-}
-import org.apache.linkis.engineplugin.elasticsearch.executor.client.{
-  ElasticSearchErrorResponse,
-  ElasticSearchExecutor,
-  ElasticSearchJsonResponse,
-  ElasticSearchTableResponse
-}
-import org.apache.linkis.engineplugin.elasticsearch.executor.client.ElasticSearchErrorResponse
-import org.apache.linkis.governance.common.entity.ExecutionNodeStatus
-import org.apache.linkis.manager.common.entity.resource.{
-  CommonNodeResource,
-  LoadResource,
-  NodeResource
-}
-import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
-import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
-import org.apache.linkis.manager.label.entity.Label
-import org.apache.linkis.protocol.engine.JobProgressInfo
-import org.apache.linkis.rpc.Sender
-import org.apache.linkis.scheduler.executer.{
-  AliasOutputExecuteResponse,
-  ErrorExecuteResponse,
-  ExecuteResponse
-}
-import org.apache.linkis.storage.LineRecord
-import org.apache.linkis.storage.resultset.ResultSetFactory
-import org.apache.linkis.storage.resultset.table.TableMetaData
-
-import org.apache.commons.io.IOUtils
-
-import org.springframework.util.CollectionUtils
-
-import java.util
-import java.util.concurrent.TimeUnit
-
-import scala.collection.JavaConverters._
-
-import com.google.common.cache.{Cache, CacheBuilder, RemovalListener, RemovalNotification}
-
-class ElasticSearchEngineConnExecutor(
-    override val outputPrintLimit: Int,
-    val id: Int,
-    runType: String
-) extends ConcurrentComputationExecutor(outputPrintLimit)
-    with Logging {
-
-  private val executorLabels: util.List[Label[_]] = new util.ArrayList[Label[_]](2)
-
-  private val elasticSearchExecutorCache: Cache[String, ElasticSearchExecutor] = CacheBuilder
-    .newBuilder()
-    .expireAfterAccess(EngineConnConf.ENGINE_TASK_EXPIRE_TIME.getValue, TimeUnit.MILLISECONDS)
-    .removalListener(new RemovalListener[String, ElasticSearchExecutor] {
-
-      override def onRemoval(
-          notification: RemovalNotification[String, ElasticSearchExecutor]
-      ): Unit = {
-        notification.getValue.close
-        val task = getTaskById(notification.getKey)
-        if (!ExecutionNodeStatus.isCompleted(task.getStatus)) {
-          killTask(notification.getKey)
-        }
-      }
-
-    })
-    .maximumSize(EngineConnConstant.MAX_TASK_NUM)
-    .build()
-
-  override def init(): Unit = {
-    super.init()
-  }
-
-  override def execute(engineConnTask: EngineConnTask): ExecuteResponse = {
-
-    val properties: util.Map[String, String] = buildRuntimeParams(engineConnTask)
-    logger.info(s"The elasticsearch properties is: $properties")
-
-    val elasticSearchExecutor = ElasticSearchExecutor(runType, properties)
-    elasticSearchExecutor.open
-    elasticSearchExecutorCache.put(engineConnTask.getTaskId, elasticSearchExecutor)
-    super.execute(engineConnTask)
-  }
-
-  override def executeLine(
-      engineExecutorContext: EngineExecutionContext,
-      code: String
-  ): ExecuteResponse = {
-    val taskId = engineExecutorContext.getJobId.get
-    val elasticSearchExecutor = elasticSearchExecutorCache.getIfPresent(taskId)
-    val elasticSearchResponse = elasticSearchExecutor.executeLine(code)
-
-    elasticSearchResponse match {
-      case ElasticSearchTableResponse(columns, records) =>
-        val metaData = new TableMetaData(columns)
-        val resultSetWriter =
-          engineExecutorContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE)
-        resultSetWriter.addMetaData(metaData)
-        records.foreach(record => resultSetWriter.addRecord(record))
-        val output = resultSetWriter.toString
-        Utils.tryQuietly {
-          IOUtils.closeQuietly(resultSetWriter)
-        }
-        AliasOutputExecuteResponse(null, output)
-      case ElasticSearchJsonResponse(content) =>
-        val resultSetWriter =
-          engineExecutorContext.createResultSetWriter(ResultSetFactory.TEXT_TYPE)
-        resultSetWriter.addMetaData(null)
-        content.split("\\n").foreach(item => resultSetWriter.addRecord(new LineRecord(item)))
-        val output = resultSetWriter.toString
-        Utils.tryQuietly {
-          IOUtils.closeQuietly(resultSetWriter)
-        }
-        AliasOutputExecuteResponse(null, output)
-      case ElasticSearchErrorResponse(message, body, cause) =>
-        ErrorExecuteResponse(message, cause)
-    }
-  }
-
-  private def buildRuntimeParams(engineConnTask: EngineConnTask): util.Map[String, String] = {
-
-    // parameters specified at runtime
-    var executorProperties = engineConnTask.getProperties.asInstanceOf[util.Map[String, String]]
-    if (executorProperties == null) {
-      executorProperties = new util.HashMap[String, String]()
-    }
-
-    // global  engine params by console
-    val globalConfig: util.Map[String, String] =
-      Utils.tryAndWarn(ElasticSearchEngineConsoleConf.getCacheMap(engineConnTask.getLables))
-
-    if (!executorProperties.isEmpty) {
-      globalConfig.putAll(executorProperties)
-    }
-
-    globalConfig
-  }
-
-  override def executeCompletely(
-      engineExecutorContext: EngineExecutionContext,
-      code: String,
-      completedLine: String
-  ): ExecuteResponse = null
-
-  override def progress(taskID: String): Float = 0.0f
-
-  override def getProgressInfo(taskID: String): Array[JobProgressInfo] =
-    Array.empty[JobProgressInfo]
-
-  override def getExecutorLabels(): util.List[Label[_]] = executorLabels
-
-  override def setExecutorLabels(labels: util.List[Label[_]]): Unit = {
-    if (!CollectionUtils.isEmpty(labels)) {
-      executorLabels.clear()
-      executorLabels.addAll(labels)
-    }
-  }
-
-  override def supportCallBackLogs(): Boolean = false
-
-  override def requestExpectedResource(expectedResource: NodeResource): NodeResource = null
-
-  override def getCurrentNodeResource(): NodeResource = {
-    NodeResourceUtils.appendMemoryUnitIfMissing(
-      EngineConnObject.getEngineCreationContext.getOptions
-    )
-
-    val resource = new CommonNodeResource
-    val usedResource = new LoadResource(OverloadUtils.getProcessMaxMemory, 1)
-    resource.setUsedResource(usedResource)
-    resource
-  }
-
-  override def getId(): String = Sender.getThisServiceInstance.getInstance + s"_$id"
-
-  override def getConcurrentLimit: Int =
-    ElasticSearchConfiguration.ENGINE_CONCURRENT_LIMIT.getValue
-
-  override def killTask(taskId: String): Unit = {
-    Utils.tryAndWarn {
-      val elasticSearchExecutor = elasticSearchExecutorCache.getIfPresent(taskId)
-      if (null != elasticSearchExecutor) {
-        elasticSearchExecutor.close
-      }
-    }
-    super.killTask(taskId)
-  }
-
-  override def killAll(): Unit = {
-    elasticSearchExecutorCache
-      .asMap()
-      .values()
-      .asScala
-      .foreach(e => e.close)
-  }
-
-  override def transformTaskStatus(task: EngineConnTask, newStatus: ExecutionNodeStatus): Unit = {
-    super.transformTaskStatus(task, newStatus)
-    if (ExecutionNodeStatus.isCompleted(newStatus)) {
-      elasticSearchExecutorCache.invalidate(task.getTaskId)
-    }
-  }
-
-}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/ElasticSearchExecutor.scala b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/ElasticSearchExecutor.scala
deleted file mode 100644
index 72e1953e3..000000000
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/ElasticSearchExecutor.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.engineplugin.elasticsearch.executor.client
-
-import org.apache.linkis.common.utils.Logging
-import org.apache.linkis.engineplugin.elasticsearch.executor.client.impl.ElasticSearchExecutorImpl
-import org.apache.linkis.scheduler.executer.ExecuteResponse
-
-import java.io.IOException
-import java.util
-
-import scala.collection.JavaConverters._
-
-trait ElasticSearchExecutor extends Logging {
-
-  @throws(classOf[IOException])
-  def open: Unit
-
-  def executeLine(code: String): ElasticSearchResponse
-
-  def close: Unit
-
-}
-
-object ElasticSearchExecutor {
-
-  def apply(runType: String, properties: util.Map[String, String]): ElasticSearchExecutor = {
-    new ElasticSearchExecutorImpl(runType, properties)
-  }
-
-}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/ElasticSearchResponse.scala b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/ElasticSearchResponse.scala
old mode 100644
new mode 100755
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/EsClient.scala b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/EsClient.scala
deleted file mode 100644
index 8b894b466..000000000
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/EsClient.scala
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.engineplugin.elasticsearch.executor.client
-
-import org.apache.linkis.common.utils.Utils
-import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchConfiguration._
-import org.apache.linkis.server.JMap
-
-import org.apache.commons.codec.binary.Base64
-import org.apache.commons.lang3.StringUtils
-import org.apache.http.Header
-import org.apache.http.auth.{AUTH, Credentials, UsernamePasswordCredentials}
-import org.apache.http.message.BufferedHeader
-import org.apache.http.util.{CharArrayBuffer, EncodingUtils}
-
-import java.nio.charset.StandardCharsets.UTF_8
-import java.util
-
-import scala.collection.JavaConverters._
-
-import org.elasticsearch.client.{Cancellable, Request, RequestOptions, ResponseListener, RestClient}
-import org.elasticsearch.client.sniff.Sniffer
-
-trait EsClientOperate {
-
-  def execute(
-      code: String,
-      options: util.Map[String, String],
-      responseListener: ResponseListener
-  ): Cancellable
-
-  def close(): Unit
-
-}
-
-abstract class EsClient(datasourceName: String, client: RestClient, sniffer: Sniffer)
-    extends EsClientOperate {
-
-  def getDatasourceName: String = datasourceName
-
-  def getRestClient: RestClient = client
-
-  def getSniffer: Sniffer = sniffer
-
-  override def close(): Unit = Utils.tryQuietly {
-    sniffer match {
-      case s: Sniffer => s.close()
-      case _ =>
-    }
-    client match {
-      case c: RestClient => c.close()
-      case _ =>
-    }
-  }
-
-}
-
-class EsClientImpl(datasourceName: String, client: RestClient, sniffer: Sniffer)
-    extends EsClient(datasourceName, client, sniffer) {
-
-  override def execute(
-      code: String,
-      options: util.Map[String, String],
-      responseListener: ResponseListener
-  ): Cancellable = {
-    val request = createRequest(code, options)
-    client.performRequestAsync(request, responseListener)
-  }
-
-  private def createRequest(code: String, options: util.Map[String, String]): Request = {
-    val endpoint = ES_HTTP_ENDPOINT.getValue(options)
-    val method = ES_HTTP_METHOD.getValue(options)
-    val request = new Request(method, endpoint)
-    request.setOptions(getRequestOptions(options))
-    request.setJsonEntity(code)
-    request
-  }
-
-  private def getRequestOptions(options: util.Map[String, String]): RequestOptions = {
-    val builder = RequestOptions.DEFAULT.toBuilder()
-
-    val username = ES_USERNAME.getValue(options)
-    val password = ES_PASSWORD.getValue(options)
-    // username / password convert to base auth
-    if (StringUtils.isNotBlank(username) && StringUtils.isNotBlank(password)) {
-      val authHeader =
-        authenticate(new UsernamePasswordCredentials(username, password), UTF_8.name())
-      builder.addHeader(authHeader.getName, authHeader.getValue)
-    }
-
-    options.asScala
-      .filter(entry =>
-        entry._1 != null && entry._2 != null && entry._1.startsWith(ES_HTTP_HEADER_PREFIX)
-      )
-      .foreach(entry => builder.addHeader(entry._1, entry._2))
-
-    builder.build()
-  }
-
-  private def authenticate(credentials: Credentials, charset: String): Header = {
-    val tmp = new StringBuilder
-    tmp.append(credentials.getUserPrincipal.getName)
-    tmp.append(":")
-    tmp.append(
-      if (credentials.getPassword == null) "null"
-      else credentials.getPassword
-    )
-    val base64password = Base64.encodeBase64(EncodingUtils.getBytes(tmp.toString, charset), false)
-    val buffer = new CharArrayBuffer(32)
-    buffer.append(AUTH.WWW_AUTH_RESP)
-    buffer.append(": Basic ")
-    buffer.append(base64password, 0, base64password.length)
-    new BufferedHeader(buffer)
-  }
-
-}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/EsClientFactory.scala b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/EsClientFactory.scala
deleted file mode 100644
index 30cb6690b..000000000
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/EsClientFactory.scala
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.engineplugin.elasticsearch.executor.client
-
-import org.apache.linkis.common.conf.CommonVars
-import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchConfiguration._
-import org.apache.linkis.engineplugin.elasticsearch.errorcode.EasticsearchErrorCodeSummary.CLUSTER_IS_BLANK
-import org.apache.linkis.engineplugin.elasticsearch.exception.EsParamsIllegalException
-
-import org.apache.commons.lang3.StringUtils
-import org.apache.http.{Header, HttpHost}
-import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
-import org.apache.http.client.CredentialsProvider
-import org.apache.http.impl.client.BasicCredentialsProvider
-import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
-import org.apache.http.message.BasicHeader
-
-import java.util
-import java.util.Map
-
-import scala.collection.JavaConverters._
-
-import org.elasticsearch.client.{RestClient, RestClientBuilder}
-import org.elasticsearch.client.sniff.Sniffer
-
-object EsClientFactory {
-
-  def getRestClient(options: util.Map[String, String]): EsClient = {
-    val key = getDatasourceName(options)
-    if (StringUtils.isBlank(key)) {
-      return defaultClient
-    }
-
-    if (!ES_CLIENT_MAP.containsKey(key)) {
-      ES_CLIENT_MAP synchronized {
-        if (!ES_CLIENT_MAP.containsKey(key)) {
-          cacheClient(createRestClient(options))
-        }
-      }
-    }
-
-    ES_CLIENT_MAP.get(key)
-  }
-
-  private val MAX_CACHE_CLIENT_SIZE = 20
-
-  private val ES_CLIENT_MAP: Map[String, EsClient] = new util.LinkedHashMap[String, EsClient]() {
-
-    override def removeEldestEntry(eldest: Map.Entry[String, EsClient]): Boolean =
-      if (size > MAX_CACHE_CLIENT_SIZE) {
-        eldest.getValue.close()
-        true
-      } else {
-        false
-      }
-
-  }
-
-  private def getDatasourceName(options: util.Map[String, String]): String = {
-    options.getOrDefault(ES_DATASOURCE_NAME.key, "")
-  }
-
-  private def cacheClient(client: EsClient) = {
-    ES_CLIENT_MAP.put(client.getDatasourceName, client)
-  }
-
-  private def createRestClient(options: util.Map[String, String]): EsClient = {
-    val clusterStr = options.get(ES_CLUSTER.key)
-    if (StringUtils.isBlank(clusterStr)) {
-      throw EsParamsIllegalException(CLUSTER_IS_BLANK.getErrorDesc)
-    }
-    val cluster = getCluster(clusterStr)
-    if (cluster.isEmpty) {
-      throw EsParamsIllegalException(CLUSTER_IS_BLANK.getErrorDesc)
-    }
-    val username = options.get(ES_USERNAME.key)
-    val password = options.get(ES_PASSWORD.key)
-
-    if (ES_AUTH_CACHE.getValue) {
-      setAuthScope(cluster, username, password)
-    }
-
-    val httpHosts = cluster.map(item => new HttpHost(item._1, item._2))
-    val builder = RestClient
-      .builder(httpHosts: _*)
-      .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
-        override def customizeHttpClient(
-            httpAsyncClientBuilder: HttpAsyncClientBuilder
-        ): HttpAsyncClientBuilder = {
-          if (!ES_AUTH_CACHE.getValue) {
-            httpAsyncClientBuilder.disableAuthCaching
-          }
-          //        httpClientBuilder.setDefaultRequestConfig(RequestConfig.DEFAULT)
-          //        httpClientBuilder.setDefaultConnectionConfig(ConnectionConfig.DEFAULT)
-          httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
-        }
-      })
-    if (defaultHeaders != null) {
-      builder.setDefaultHeaders(defaultHeaders)
-    }
-    val client = builder.build
-
-    val sniffer = if (ES_SNIFFER_ENABLE.getValue(options)) {
-      Sniffer.builder(client).build
-    } else null
-
-    val datasourceName = getDatasourceName(options)
-    new EsClientImpl(datasourceName, client, sniffer)
-  }
-
-  private val credentialsProvider: CredentialsProvider = new BasicCredentialsProvider()
-
-  private val defaultClient = {
-    val cluster = ES_CLUSTER.getValue
-    if (StringUtils.isBlank(cluster)) {
-      null
-    } else {
-      val defaultOpts = new util.HashMap[String, String]()
-      defaultOpts.put(ES_CLUSTER.key, cluster)
-      defaultOpts.put(ES_DATASOURCE_NAME.key, ES_DATASOURCE_NAME.getValue)
-      defaultOpts.put(ES_USERNAME.key, ES_USERNAME.getValue)
-      defaultOpts.put(ES_PASSWORD.key, ES_PASSWORD.getValue)
-      val client = createRestClient(defaultOpts)
-      cacheClient(client)
-      client
-    }
-  }
-
-  private val defaultHeaders: Array[Header] = CommonVars.properties
-    .entrySet()
-    .asScala
-    .filter(entry =>
-      entry.getKey != null && entry.getValue != null && entry.getKey.toString
-        .startsWith(ES_HTTP_HEADER_PREFIX)
-    )
-    .map(entry => new BasicHeader(entry.getKey.toString, entry.getValue.toString))
-    .toArray[Header]
-
-  // host1:port1,host2:port2 -> [(host1,port1),(host2,port2)]
-  private def getCluster(clusterStr: String): Array[(String, Int)] =
-    if (StringUtils.isNotBlank(clusterStr)) {
-      clusterStr
-        .split(",")
-        .map(value => {
-          val arr = value.replace("http://", "").split(":")
-          (arr(0).trim, arr(1).trim.toInt)
-        })
-    } else Array()
-
-  // set cluster auth
-  private def setAuthScope(
-      cluster: Array[(String, Int)],
-      username: String,
-      password: String
-  ): Unit = if (
-      cluster != null && !cluster.isEmpty
-      && StringUtils.isNotBlank(username)
-      && StringUtils.isNotBlank(password)
-  ) {
-    cluster.foreach {
-      case (host, port) =>
-        credentialsProvider.setCredentials(
-          new AuthScope(host, port, AuthScope.ANY_REALM, AuthScope.ANY_SCHEME),
-          new UsernamePasswordCredentials(username, password)
-        )
-      case _ =>
-    }
-  }
-
-}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/impl/ElasticSearchExecutorImpl.scala b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/impl/ElasticSearchExecutorImpl.scala
deleted file mode 100644
index 86d10660b..000000000
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/impl/ElasticSearchExecutorImpl.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.engineplugin.elasticsearch.executor.client.impl
-
-import org.apache.linkis.common.utils.Utils
-import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchConfiguration
-import org.apache.linkis.engineplugin.elasticsearch.exception.EsConvertResponseException
-import org.apache.linkis.engineplugin.elasticsearch.executor.client.{
-  ElasticSearchErrorResponse,
-  ElasticSearchExecutor,
-  ElasticSearchResponse,
-  EsClient,
-  EsClientFactory,
-  ResponseHandler
-}
-import org.apache.linkis.engineplugin.elasticsearch.executor.client.ResponseHandler
-import org.apache.linkis.protocol.constants.TaskConstant
-import org.apache.linkis.scheduler.executer.{
-  AliasOutputExecuteResponse,
-  ErrorExecuteResponse,
-  ExecuteResponse,
-  SuccessExecuteResponse
-}
-import org.apache.linkis.server.JMap
-import org.apache.linkis.storage.utils.StorageUtils
-
-import java.util
-import java.util.Locale
-import java.util.concurrent.CountDownLatch
-
-import org.elasticsearch.client.{Cancellable, Response, ResponseListener}
-
-class ElasticSearchExecutorImpl(runType: String, properties: util.Map[String, String])
-    extends ElasticSearchExecutor {
-
-  private var client: EsClient = _
-  private var cancelable: Cancellable = _
-  private var user: String = _
-
-  override def open: Unit = {
-    this.client = EsClientFactory.getRestClient(properties)
-    this.user = properties.getOrDefault(TaskConstant.UMUSER, StorageUtils.getJvmUser)
-    runType.trim.toLowerCase(Locale.getDefault) match {
-      case "essql" | "sql" =>
-        properties.putIfAbsent(
-          ElasticSearchConfiguration.ES_HTTP_ENDPOINT.key,
-          ElasticSearchConfiguration.ES_HTTP_SQL_ENDPOINT.getValue(properties)
-        )
-      case _ =>
-    }
-  }
-
-  override def executeLine(code: String): ElasticSearchResponse = {
-    val realCode = code.trim()
-    logger.info(s"es client begins to run $runType code:\n ${realCode.trim}")
-    val countDown = new CountDownLatch(1)
-    var executeResponse: ElasticSearchResponse = ElasticSearchErrorResponse("INCOMPLETE")
-    cancelable = client.execute(
-      realCode,
-      properties,
-      new ResponseListener {
-        override def onSuccess(response: Response): Unit = {
-          executeResponse = convertResponse(response)
-          countDown.countDown()
-        }
-        override def onFailure(exception: Exception): Unit = {
-          executeResponse =
-            ElasticSearchErrorResponse("EsEngineExecutor execute fail. ", null, exception)
-          countDown.countDown()
-        }
-      }
-    )
-    countDown.await()
-    executeResponse
-  }
-
-  // convert response to executeResponse
-  private def convertResponse(response: Response): ElasticSearchResponse =
-    Utils.tryCatch[ElasticSearchResponse] {
-      val statusCode = response.getStatusLine.getStatusCode
-      if (statusCode >= 200 && statusCode < 300) {
-        ResponseHandler.handle(response)
-      } else {
-        ElasticSearchErrorResponse(
-          "EsEngineExecutor convert response fail. response code: " + response.getStatusLine.getStatusCode
-        )
-      }
-    } { case t: Throwable =>
-      ElasticSearchErrorResponse("EsEngineExecutor convert response error.", null, t)
-    }
-
-  override def close: Unit = cancelable match {
-    case c: Cancellable => c.cancel()
-    case _ =>
-  }
-
-}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/impl/ResponseHandlerImpl.scala b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/impl/ResponseHandlerImpl.scala
index 7720b7076..888f02df1 100644
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/impl/ResponseHandlerImpl.scala
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/impl/ResponseHandlerImpl.scala
@@ -26,7 +26,6 @@ import org.apache.linkis.engineplugin.elasticsearch.executor.client.{
   ElasticSearchTableResponse,
   ResponseHandler
 }
-import org.apache.linkis.engineplugin.elasticsearch.executor.client.ResponseHandler
 import org.apache.linkis.engineplugin.elasticsearch.executor.client.ResponseHandler._
 import org.apache.linkis.storage.domain._
 import org.apache.linkis.storage.domain.DataType.{DoubleType, StringType}
@@ -55,7 +54,7 @@ class ResponseHandlerImpl extends ResponseHandler {
     val contentBytes = EntityUtils.toByteArray(response.getEntity)
 
     if (contentBytes == null || contentBytes.isEmpty) {
-      throw EsConvertResponseException(RESPONSE_FAIL_IS_EMPTY.getErrorDesc)
+      throw new EsConvertResponseException(RESPONSE_FAIL_IS_EMPTY.getErrorDesc)
     }
 
     val jsonNode = Utils.tryCatch {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org