You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@linkis.apache.org by "peacewong (via GitHub)" <gi...@apache.org> on 2023/05/16 12:15:50 UTC

[GitHub] [linkis] peacewong commented on a diff in pull request #4531: Translate engineconn-plugins-elasticsearch service classes from Scala to Java

peacewong commented on code in PR #4531:
URL: https://github.com/apache/linkis/pull/4531#discussion_r1195058023


##########
linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/impl/ElasticSearchExecutorImpl.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.elasticsearch.executor.client.impl;
+
+import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchConfiguration;
+import org.apache.linkis.engineplugin.elasticsearch.executor.client.*;
+import org.apache.linkis.protocol.constants.TaskConstant;
+import org.apache.linkis.storage.utils.StorageUtils;
+
+import org.apache.commons.lang.StringUtils;
+
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import org.elasticsearch.client.Cancellable;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.ResponseListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticSearchExecutorImpl implements ElasticSearchExecutor {
+
+  private static final Logger logger = LoggerFactory.getLogger(ElasticSearchExecutorImpl.class);
+
+  private EsClient client;
+  private Cancellable cancelable;
+  private String user;
+  private String runType;
+  private Map<String, String> properties;
+
+  public ElasticSearchExecutorImpl(String runType, Map<String, String> properties) {
+    this.runType = runType;
+    this.properties = properties;
+  }
+
+  @Override
+  public void open() throws Exception {
+    this.client = EsClientFactory.getRestClient(properties);
+    this.user =
+        StringUtils.defaultString(
+            properties.getOrDefault(TaskConstant.UMUSER, StorageUtils.getJvmUser()));
+    switch (runType.trim().toLowerCase(Locale.getDefault())) {
+      case "essql":
+      case "sql":
+        properties.putIfAbsent(
+            ElasticSearchConfiguration.ES_HTTP_ENDPOINT.key(),
+            ElasticSearchConfiguration.ES_HTTP_SQL_ENDPOINT.getValue(properties));
+        break;
+      default:
+        break;
+    }
+  }
+
+  @Override
+  public ElasticSearchResponse executeLine(String code) {
+    String realCode = code.trim();
+    logger.info("es client begins to run {} code:\n {}", runType, realCode.trim());
+    CountDownLatch countDown = new CountDownLatch(1);
+    ElasticSearchResponse[] executeResponse = {
+      new ElasticSearchErrorResponse("INCOMPLETE", null, null)
+    };
+    cancelable =
+        client.execute(
+            realCode,
+            properties,
+            new ResponseListener() {
+              @Override
+              public void onSuccess(Response response) {
+                executeResponse[0] = convertResponse(response);
+                countDown.countDown();
+              }
+
+              @Override
+              public void onFailure(Exception exception) {
+                executeResponse[0] =
+                    new ElasticSearchErrorResponse(
+                        "EsEngineExecutor execute fail. ", null, exception);
+                countDown.countDown();
+              }
+            });
+    try {
+      countDown.await();
+    } catch (InterruptedException e) {
+      executeResponse[0] =
+          new ElasticSearchErrorResponse("EsEngineExecutor execute interrupted. ", null, e);
+    }
+    return executeResponse[0];
+  }
+
+  // convert response to executeResponse
+  private ElasticSearchResponse convertResponse(Response response) {
+    int statusCode = response.getStatusLine().getStatusCode();
+    if (statusCode >= 200 && statusCode < 300) {
+      return ResponseHandler$.MODULE$.handle(response);

Review Comment:
   Need to catch exception, if there is an exception, ElasticSearchErrorResponse should be return



##########
linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/impl/ElasticSearchExecutorImpl.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.elasticsearch.executor.client.impl;
+
+import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchConfiguration;
+import org.apache.linkis.engineplugin.elasticsearch.executor.client.*;
+import org.apache.linkis.protocol.constants.TaskConstant;
+import org.apache.linkis.storage.utils.StorageUtils;
+
+import org.apache.commons.lang.StringUtils;
+
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import org.elasticsearch.client.Cancellable;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.ResponseListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticSearchExecutorImpl implements ElasticSearchExecutor {
+
+  private static final Logger logger = LoggerFactory.getLogger(ElasticSearchExecutorImpl.class);
+
+  private EsClient client;
+  private Cancellable cancelable;
+  private String user;
+  private String runType;
+  private Map<String, String> properties;
+
+  public ElasticSearchExecutorImpl(String runType, Map<String, String> properties) {
+    this.runType = runType;
+    this.properties = properties;
+  }
+
+  @Override
+  public void open() throws Exception {
+    this.client = EsClientFactory.getRestClient(properties);
+    this.user =
+        StringUtils.defaultString(
+            properties.getOrDefault(TaskConstant.UMUSER, StorageUtils.getJvmUser()));
+    switch (runType.trim().toLowerCase(Locale.getDefault())) {
+      case "essql":
+      case "sql":
+        properties.putIfAbsent(
+            ElasticSearchConfiguration.ES_HTTP_ENDPOINT.key(),
+            ElasticSearchConfiguration.ES_HTTP_SQL_ENDPOINT.getValue(properties));
+        break;
+      default:
+        break;
+    }
+  }
+
+  @Override
+  public ElasticSearchResponse executeLine(String code) {
+    String realCode = code.trim();
+    logger.info("es client begins to run {} code:\n {}", runType, realCode.trim());
+    CountDownLatch countDown = new CountDownLatch(1);
+    ElasticSearchResponse[] executeResponse = {

Review Comment:
   not array?



##########
linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/ElasticSearchEngineConnExecutor.java:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.elasticsearch.executor;
+
+import org.apache.linkis.common.io.MetaData;
+import org.apache.linkis.common.io.Record;
+import org.apache.linkis.common.io.resultset.ResultSetWriter;
+import org.apache.linkis.common.utils.OverloadUtils;
+import org.apache.linkis.engineconn.common.conf.EngineConnConf;
+import org.apache.linkis.engineconn.common.conf.EngineConnConstant;
+import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask;
+import org.apache.linkis.engineconn.computation.executor.execute.ConcurrentComputationExecutor;
+import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext;
+import org.apache.linkis.engineconn.core.EngineConnObject;
+import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchConfiguration;
+import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchEngineConsoleConf;
+import org.apache.linkis.engineplugin.elasticsearch.executor.client.*;
+import org.apache.linkis.engineplugin.elasticsearch.executor.client.impl.ElasticSearchExecutorImpl;
+import org.apache.linkis.governance.common.entity.ExecutionNodeStatus;
+import org.apache.linkis.manager.common.entity.resource.CommonNodeResource;
+import org.apache.linkis.manager.common.entity.resource.LoadResource;
+import org.apache.linkis.manager.common.entity.resource.NodeResource;
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils;
+import org.apache.linkis.manager.label.entity.Label;
+import org.apache.linkis.protocol.engine.JobProgressInfo;
+import org.apache.linkis.rpc.Sender;
+import org.apache.linkis.scheduler.executer.AliasOutputExecuteResponse;
+import org.apache.linkis.scheduler.executer.ErrorExecuteResponse;
+import org.apache.linkis.scheduler.executer.ExecuteResponse;
+import org.apache.linkis.storage.LineRecord;
+import org.apache.linkis.storage.resultset.ResultSetFactory;
+import org.apache.linkis.storage.resultset.table.TableMetaData;
+
+import org.apache.commons.io.IOUtils;
+
+import org.springframework.util.CollectionUtils;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticSearchEngineConnExecutor extends ConcurrentComputationExecutor {
+  private static final Logger logger =
+      LoggerFactory.getLogger(ElasticSearchEngineConnExecutor.class);
+
+  private int id;
+  private String runType;
+  private List<Label<?>> executorLabels = new ArrayList<>(2);
+
+  private Cache<String, ElasticSearchExecutor> elasticSearchExecutorCache =
+      CacheBuilder.newBuilder()
+          .expireAfterAccess(
+              (Long) EngineConnConf.ENGINE_TASK_EXPIRE_TIME().getValue(), TimeUnit.MILLISECONDS)
+          .removalListener(
+              new RemovalListener<String, ElasticSearchExecutor>() {
+                @Override
+                public void onRemoval(
+                    RemovalNotification<String, ElasticSearchExecutor> notification) {
+                  notification.getValue().close();
+                  EngineConnTask task = getTaskById(notification.getKey());
+                  if (!ExecutionNodeStatus.isCompleted(task.getStatus())) {
+                    killTask(notification.getKey());
+                  }
+                }
+              })
+          .maximumSize(EngineConnConstant.MAX_TASK_NUM())
+          .build();
+
+  public ElasticSearchEngineConnExecutor(int outputPrintLimit, int id, String runType) {
+    super(outputPrintLimit);
+    this.id = id;
+    this.runType = runType;
+  }
+
+  @Override
+  public void init() {
+    super.init();
+  }
+
+  @Override
+  public ExecuteResponse execute(EngineConnTask engineConnTask) {
+    Map<String, String> properties = buildRuntimeParams(engineConnTask);
+    logger.info("The elasticsearch properties is: {}", properties);
+
+    //    ElasticSearchExecutor elasticSearchExecutor = ElasticSearchExecutor.apply(runType,
+    // properties);
+    ElasticSearchExecutor elasticSearchExecutor =
+        new ElasticSearchExecutorImpl(runType, properties);
+    try {
+      elasticSearchExecutor.open();
+    } catch (Exception e) {
+      logger.error("Execute es code failed, reason:", e);
+      return new ErrorExecuteResponse("run es failed", e);
+    }
+    elasticSearchExecutorCache.put(engineConnTask.getTaskId(), elasticSearchExecutor);
+    return super.execute(engineConnTask);
+  }
+
+  @Override
+  public ExecuteResponse executeLine(EngineExecutionContext engineExecutorContext, String code) {
+    String taskId = engineExecutorContext.getJobId().get();
+    ElasticSearchExecutor elasticSearchExecutor = elasticSearchExecutorCache.getIfPresent(taskId);
+    ElasticSearchResponse elasticSearchResponse = elasticSearchExecutor.executeLine(code);
+
+    try {
+
+      if (elasticSearchResponse instanceof ElasticSearchTableResponse) {
+        ElasticSearchTableResponse tableResponse =
+            (ElasticSearchTableResponse) elasticSearchResponse;
+        TableMetaData metaData = new TableMetaData(tableResponse.columns());
+        ResultSetWriter<? extends MetaData, ? extends Record> resultSetWriter =
+            engineExecutorContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE);
+        resultSetWriter.addMetaData(metaData);
+        Arrays.asList(tableResponse.records())
+            .forEach(
+                record -> {
+                  try {
+                    resultSetWriter.addRecord(record);
+                  } catch (IOException e) {
+                    logger.warn("es addRecord failed", e);
+                  }
+                });
+        String output = resultSetWriter.toString();
+        IOUtils.closeQuietly(resultSetWriter);
+        return new AliasOutputExecuteResponse(null, output);
+      } else if (elasticSearchResponse instanceof ElasticSearchJsonResponse) {
+        ElasticSearchJsonResponse jsonResponse = (ElasticSearchJsonResponse) elasticSearchResponse;
+        ResultSetWriter<? extends MetaData, ? extends Record> resultSetWriter =
+            engineExecutorContext.createResultSetWriter(ResultSetFactory.TEXT_TYPE);
+        resultSetWriter.addMetaData(null);
+        Arrays.stream(jsonResponse.value().split("\\n"))
+            .forEach(
+                item -> {
+                  try {
+                    resultSetWriter.addRecord(new LineRecord(item));
+                  } catch (IOException e) {
+                    logger.warn("es addRecord failed", e);
+                  }
+                });
+        String output = resultSetWriter.toString();
+        IOUtils.closeQuietly(resultSetWriter);
+        return new AliasOutputExecuteResponse(null, output);
+      } else if (elasticSearchResponse instanceof ElasticSearchErrorResponse) {
+        ElasticSearchErrorResponse errorResponse =
+            (ElasticSearchErrorResponse) elasticSearchResponse;
+        return new ErrorExecuteResponse(errorResponse.message(), errorResponse.cause());
+      }
+    } catch (IOException e) {
+      logger.warn("es addMetaData failed", e);

Review Comment:
   If there is an exception, an ErrorExecuteResponse needs to be returned



##########
linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/ElasticSearchEngineConnExecutor.java:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.elasticsearch.executor;
+
+import org.apache.linkis.common.io.MetaData;
+import org.apache.linkis.common.io.Record;
+import org.apache.linkis.common.io.resultset.ResultSetWriter;
+import org.apache.linkis.common.utils.OverloadUtils;
+import org.apache.linkis.engineconn.common.conf.EngineConnConf;
+import org.apache.linkis.engineconn.common.conf.EngineConnConstant;
+import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask;
+import org.apache.linkis.engineconn.computation.executor.execute.ConcurrentComputationExecutor;
+import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext;
+import org.apache.linkis.engineconn.core.EngineConnObject;
+import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchConfiguration;
+import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchEngineConsoleConf;
+import org.apache.linkis.engineplugin.elasticsearch.executor.client.*;
+import org.apache.linkis.engineplugin.elasticsearch.executor.client.impl.ElasticSearchExecutorImpl;
+import org.apache.linkis.governance.common.entity.ExecutionNodeStatus;
+import org.apache.linkis.manager.common.entity.resource.CommonNodeResource;
+import org.apache.linkis.manager.common.entity.resource.LoadResource;
+import org.apache.linkis.manager.common.entity.resource.NodeResource;
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils;
+import org.apache.linkis.manager.label.entity.Label;
+import org.apache.linkis.protocol.engine.JobProgressInfo;
+import org.apache.linkis.rpc.Sender;
+import org.apache.linkis.scheduler.executer.AliasOutputExecuteResponse;
+import org.apache.linkis.scheduler.executer.ErrorExecuteResponse;
+import org.apache.linkis.scheduler.executer.ExecuteResponse;
+import org.apache.linkis.storage.LineRecord;
+import org.apache.linkis.storage.resultset.ResultSetFactory;
+import org.apache.linkis.storage.resultset.table.TableMetaData;
+
+import org.apache.commons.io.IOUtils;
+
+import org.springframework.util.CollectionUtils;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticSearchEngineConnExecutor extends ConcurrentComputationExecutor {
+  private static final Logger logger =
+      LoggerFactory.getLogger(ElasticSearchEngineConnExecutor.class);
+
+  private int id;
+  private String runType;
+  private List<Label<?>> executorLabels = new ArrayList<>(2);
+
+  private Cache<String, ElasticSearchExecutor> elasticSearchExecutorCache =
+      CacheBuilder.newBuilder()
+          .expireAfterAccess(
+              (Long) EngineConnConf.ENGINE_TASK_EXPIRE_TIME().getValue(), TimeUnit.MILLISECONDS)
+          .removalListener(
+              new RemovalListener<String, ElasticSearchExecutor>() {
+                @Override
+                public void onRemoval(
+                    RemovalNotification<String, ElasticSearchExecutor> notification) {
+                  notification.getValue().close();
+                  EngineConnTask task = getTaskById(notification.getKey());
+                  if (!ExecutionNodeStatus.isCompleted(task.getStatus())) {
+                    killTask(notification.getKey());
+                  }
+                }
+              })
+          .maximumSize(EngineConnConstant.MAX_TASK_NUM())
+          .build();
+
+  public ElasticSearchEngineConnExecutor(int outputPrintLimit, int id, String runType) {
+    super(outputPrintLimit);
+    this.id = id;
+    this.runType = runType;
+  }
+
+  @Override
+  public void init() {
+    super.init();
+  }
+
+  @Override
+  public ExecuteResponse execute(EngineConnTask engineConnTask) {
+    Map<String, String> properties = buildRuntimeParams(engineConnTask);
+    logger.info("The elasticsearch properties is: {}", properties);
+
+    //    ElasticSearchExecutor elasticSearchExecutor = ElasticSearchExecutor.apply(runType,
+    // properties);
+    ElasticSearchExecutor elasticSearchExecutor =
+        new ElasticSearchExecutorImpl(runType, properties);
+    try {
+      elasticSearchExecutor.open();
+    } catch (Exception e) {
+      logger.error("Execute es code failed, reason:", e);
+      return new ErrorExecuteResponse("run es failed", e);
+    }
+    elasticSearchExecutorCache.put(engineConnTask.getTaskId(), elasticSearchExecutor);
+    return super.execute(engineConnTask);
+  }
+
+  @Override
+  public ExecuteResponse executeLine(EngineExecutionContext engineExecutorContext, String code) {
+    String taskId = engineExecutorContext.getJobId().get();
+    ElasticSearchExecutor elasticSearchExecutor = elasticSearchExecutorCache.getIfPresent(taskId);
+    ElasticSearchResponse elasticSearchResponse = elasticSearchExecutor.executeLine(code);
+
+    try {
+
+      if (elasticSearchResponse instanceof ElasticSearchTableResponse) {
+        ElasticSearchTableResponse tableResponse =
+            (ElasticSearchTableResponse) elasticSearchResponse;
+        TableMetaData metaData = new TableMetaData(tableResponse.columns());
+        ResultSetWriter<? extends MetaData, ? extends Record> resultSetWriter =
+            engineExecutorContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE);
+        resultSetWriter.addMetaData(metaData);
+        Arrays.asList(tableResponse.records())
+            .forEach(
+                record -> {
+                  try {
+                    resultSetWriter.addRecord(record);
+                  } catch (IOException e) {
+                    logger.warn("es addRecord failed", e);
+                  }
+                });
+        String output = resultSetWriter.toString();
+        IOUtils.closeQuietly(resultSetWriter);
+        return new AliasOutputExecuteResponse(null, output);
+      } else if (elasticSearchResponse instanceof ElasticSearchJsonResponse) {
+        ElasticSearchJsonResponse jsonResponse = (ElasticSearchJsonResponse) elasticSearchResponse;
+        ResultSetWriter<? extends MetaData, ? extends Record> resultSetWriter =
+            engineExecutorContext.createResultSetWriter(ResultSetFactory.TEXT_TYPE);
+        resultSetWriter.addMetaData(null);
+        Arrays.stream(jsonResponse.value().split("\\n"))
+            .forEach(
+                item -> {
+                  try {
+                    resultSetWriter.addRecord(new LineRecord(item));
+                  } catch (IOException e) {
+                    logger.warn("es addRecord failed", e);

Review Comment:
   If there is an exception, an ErrorExecuteResponse needs to be returned



##########
linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/ElasticSearchEngineConnExecutor.java:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.elasticsearch.executor;
+
+import org.apache.linkis.common.io.MetaData;
+import org.apache.linkis.common.io.Record;
+import org.apache.linkis.common.io.resultset.ResultSetWriter;
+import org.apache.linkis.common.utils.OverloadUtils;
+import org.apache.linkis.engineconn.common.conf.EngineConnConf;
+import org.apache.linkis.engineconn.common.conf.EngineConnConstant;
+import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask;
+import org.apache.linkis.engineconn.computation.executor.execute.ConcurrentComputationExecutor;
+import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext;
+import org.apache.linkis.engineconn.core.EngineConnObject;
+import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchConfiguration;
+import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchEngineConsoleConf;
+import org.apache.linkis.engineplugin.elasticsearch.executor.client.*;
+import org.apache.linkis.engineplugin.elasticsearch.executor.client.impl.ElasticSearchExecutorImpl;
+import org.apache.linkis.governance.common.entity.ExecutionNodeStatus;
+import org.apache.linkis.manager.common.entity.resource.CommonNodeResource;
+import org.apache.linkis.manager.common.entity.resource.LoadResource;
+import org.apache.linkis.manager.common.entity.resource.NodeResource;
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils;
+import org.apache.linkis.manager.label.entity.Label;
+import org.apache.linkis.protocol.engine.JobProgressInfo;
+import org.apache.linkis.rpc.Sender;
+import org.apache.linkis.scheduler.executer.AliasOutputExecuteResponse;
+import org.apache.linkis.scheduler.executer.ErrorExecuteResponse;
+import org.apache.linkis.scheduler.executer.ExecuteResponse;
+import org.apache.linkis.storage.LineRecord;
+import org.apache.linkis.storage.resultset.ResultSetFactory;
+import org.apache.linkis.storage.resultset.table.TableMetaData;
+
+import org.apache.commons.io.IOUtils;
+
+import org.springframework.util.CollectionUtils;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticSearchEngineConnExecutor extends ConcurrentComputationExecutor {
+  private static final Logger logger =
+      LoggerFactory.getLogger(ElasticSearchEngineConnExecutor.class);
+
+  private int id;
+  private String runType;
+  private List<Label<?>> executorLabels = new ArrayList<>(2);
+
+  private Cache<String, ElasticSearchExecutor> elasticSearchExecutorCache =
+      CacheBuilder.newBuilder()
+          .expireAfterAccess(
+              (Long) EngineConnConf.ENGINE_TASK_EXPIRE_TIME().getValue(), TimeUnit.MILLISECONDS)
+          .removalListener(
+              new RemovalListener<String, ElasticSearchExecutor>() {
+                @Override
+                public void onRemoval(
+                    RemovalNotification<String, ElasticSearchExecutor> notification) {
+                  notification.getValue().close();
+                  EngineConnTask task = getTaskById(notification.getKey());
+                  if (!ExecutionNodeStatus.isCompleted(task.getStatus())) {
+                    killTask(notification.getKey());
+                  }
+                }
+              })
+          .maximumSize(EngineConnConstant.MAX_TASK_NUM())
+          .build();
+
+  public ElasticSearchEngineConnExecutor(int outputPrintLimit, int id, String runType) {
+    super(outputPrintLimit);
+    this.id = id;
+    this.runType = runType;
+  }
+
+  @Override
+  public void init() {
+    super.init();
+  }
+
+  @Override
+  public ExecuteResponse execute(EngineConnTask engineConnTask) {
+    Map<String, String> properties = buildRuntimeParams(engineConnTask);
+    logger.info("The elasticsearch properties is: {}", properties);
+
+    //    ElasticSearchExecutor elasticSearchExecutor = ElasticSearchExecutor.apply(runType,
+    // properties);
+    ElasticSearchExecutor elasticSearchExecutor =
+        new ElasticSearchExecutorImpl(runType, properties);
+    try {
+      elasticSearchExecutor.open();
+    } catch (Exception e) {
+      logger.error("Execute es code failed, reason:", e);
+      return new ErrorExecuteResponse("run es failed", e);
+    }
+    elasticSearchExecutorCache.put(engineConnTask.getTaskId(), elasticSearchExecutor);
+    return super.execute(engineConnTask);
+  }
+
+  @Override
+  public ExecuteResponse executeLine(EngineExecutionContext engineExecutorContext, String code) {
+    String taskId = engineExecutorContext.getJobId().get();
+    ElasticSearchExecutor elasticSearchExecutor = elasticSearchExecutorCache.getIfPresent(taskId);
+    ElasticSearchResponse elasticSearchResponse = elasticSearchExecutor.executeLine(code);
+
+    try {
+
+      if (elasticSearchResponse instanceof ElasticSearchTableResponse) {
+        ElasticSearchTableResponse tableResponse =
+            (ElasticSearchTableResponse) elasticSearchResponse;
+        TableMetaData metaData = new TableMetaData(tableResponse.columns());
+        ResultSetWriter<? extends MetaData, ? extends Record> resultSetWriter =
+            engineExecutorContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE);
+        resultSetWriter.addMetaData(metaData);
+        Arrays.asList(tableResponse.records())
+            .forEach(
+                record -> {
+                  try {
+                    resultSetWriter.addRecord(record);
+                  } catch (IOException e) {
+                    logger.warn("es addRecord failed", e);
+                  }
+                });
+        String output = resultSetWriter.toString();
+        IOUtils.closeQuietly(resultSetWriter);
+        return new AliasOutputExecuteResponse(null, output);
+      } else if (elasticSearchResponse instanceof ElasticSearchJsonResponse) {
+        ElasticSearchJsonResponse jsonResponse = (ElasticSearchJsonResponse) elasticSearchResponse;
+        ResultSetWriter<? extends MetaData, ? extends Record> resultSetWriter =
+            engineExecutorContext.createResultSetWriter(ResultSetFactory.TEXT_TYPE);
+        resultSetWriter.addMetaData(null);
+        Arrays.stream(jsonResponse.value().split("\\n"))
+            .forEach(
+                item -> {
+                  try {
+                    resultSetWriter.addRecord(new LineRecord(item));
+                  } catch (IOException e) {
+                    logger.warn("es addRecord failed", e);
+                  }
+                });
+        String output = resultSetWriter.toString();
+        IOUtils.closeQuietly(resultSetWriter);
+        return new AliasOutputExecuteResponse(null, output);
+      } else if (elasticSearchResponse instanceof ElasticSearchErrorResponse) {
+        ElasticSearchErrorResponse errorResponse =
+            (ElasticSearchErrorResponse) elasticSearchResponse;
+        return new ErrorExecuteResponse(errorResponse.message(), errorResponse.cause());
+      }
+    } catch (IOException e) {
+      logger.warn("es addMetaData failed", e);
+    }
+
+    return null;

Review Comment:
   An ErrorExecuteResponse needs to be returned



##########
linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/ElasticSearchEngineConnExecutor.java:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.elasticsearch.executor;
+
+import org.apache.linkis.common.io.MetaData;
+import org.apache.linkis.common.io.Record;
+import org.apache.linkis.common.io.resultset.ResultSetWriter;
+import org.apache.linkis.common.utils.OverloadUtils;
+import org.apache.linkis.engineconn.common.conf.EngineConnConf;
+import org.apache.linkis.engineconn.common.conf.EngineConnConstant;
+import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask;
+import org.apache.linkis.engineconn.computation.executor.execute.ConcurrentComputationExecutor;
+import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext;
+import org.apache.linkis.engineconn.core.EngineConnObject;
+import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchConfiguration;
+import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchEngineConsoleConf;
+import org.apache.linkis.engineplugin.elasticsearch.executor.client.*;
+import org.apache.linkis.engineplugin.elasticsearch.executor.client.impl.ElasticSearchExecutorImpl;
+import org.apache.linkis.governance.common.entity.ExecutionNodeStatus;
+import org.apache.linkis.manager.common.entity.resource.CommonNodeResource;
+import org.apache.linkis.manager.common.entity.resource.LoadResource;
+import org.apache.linkis.manager.common.entity.resource.NodeResource;
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils;
+import org.apache.linkis.manager.label.entity.Label;
+import org.apache.linkis.protocol.engine.JobProgressInfo;
+import org.apache.linkis.rpc.Sender;
+import org.apache.linkis.scheduler.executer.AliasOutputExecuteResponse;
+import org.apache.linkis.scheduler.executer.ErrorExecuteResponse;
+import org.apache.linkis.scheduler.executer.ExecuteResponse;
+import org.apache.linkis.storage.LineRecord;
+import org.apache.linkis.storage.resultset.ResultSetFactory;
+import org.apache.linkis.storage.resultset.table.TableMetaData;
+
+import org.apache.commons.io.IOUtils;
+
+import org.springframework.util.CollectionUtils;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticSearchEngineConnExecutor extends ConcurrentComputationExecutor {
+  private static final Logger logger =
+      LoggerFactory.getLogger(ElasticSearchEngineConnExecutor.class);
+
+  private int id;
+  private String runType;
+  private List<Label<?>> executorLabels = new ArrayList<>(2);
+
+  private Cache<String, ElasticSearchExecutor> elasticSearchExecutorCache =
+      CacheBuilder.newBuilder()
+          .expireAfterAccess(
+              (Long) EngineConnConf.ENGINE_TASK_EXPIRE_TIME().getValue(), TimeUnit.MILLISECONDS)
+          .removalListener(
+              new RemovalListener<String, ElasticSearchExecutor>() {
+                @Override
+                public void onRemoval(
+                    RemovalNotification<String, ElasticSearchExecutor> notification) {
+                  notification.getValue().close();
+                  EngineConnTask task = getTaskById(notification.getKey());
+                  if (!ExecutionNodeStatus.isCompleted(task.getStatus())) {
+                    killTask(notification.getKey());
+                  }
+                }
+              })
+          .maximumSize(EngineConnConstant.MAX_TASK_NUM())
+          .build();
+
+  public ElasticSearchEngineConnExecutor(int outputPrintLimit, int id, String runType) {
+    super(outputPrintLimit);
+    this.id = id;
+    this.runType = runType;
+  }
+
+  @Override
+  public void init() {
+    super.init();
+  }
+
+  @Override
+  public ExecuteResponse execute(EngineConnTask engineConnTask) {
+    Map<String, String> properties = buildRuntimeParams(engineConnTask);
+    logger.info("The elasticsearch properties is: {}", properties);
+
+    //    ElasticSearchExecutor elasticSearchExecutor = ElasticSearchExecutor.apply(runType,
+    // properties);
+    ElasticSearchExecutor elasticSearchExecutor =
+        new ElasticSearchExecutorImpl(runType, properties);
+    try {
+      elasticSearchExecutor.open();
+    } catch (Exception e) {
+      logger.error("Execute es code failed, reason:", e);
+      return new ErrorExecuteResponse("run es failed", e);
+    }
+    elasticSearchExecutorCache.put(engineConnTask.getTaskId(), elasticSearchExecutor);
+    return super.execute(engineConnTask);
+  }
+
+  @Override
+  public ExecuteResponse executeLine(EngineExecutionContext engineExecutorContext, String code) {
+    String taskId = engineExecutorContext.getJobId().get();
+    ElasticSearchExecutor elasticSearchExecutor = elasticSearchExecutorCache.getIfPresent(taskId);
+    ElasticSearchResponse elasticSearchResponse = elasticSearchExecutor.executeLine(code);
+
+    try {
+
+      if (elasticSearchResponse instanceof ElasticSearchTableResponse) {
+        ElasticSearchTableResponse tableResponse =
+            (ElasticSearchTableResponse) elasticSearchResponse;
+        TableMetaData metaData = new TableMetaData(tableResponse.columns());
+        ResultSetWriter<? extends MetaData, ? extends Record> resultSetWriter =
+            engineExecutorContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE);
+        resultSetWriter.addMetaData(metaData);
+        Arrays.asList(tableResponse.records())
+            .forEach(
+                record -> {
+                  try {
+                    resultSetWriter.addRecord(record);
+                  } catch (IOException e) {
+                    logger.warn("es addRecord failed", e);

Review Comment:
   If there is an exception, an ErrorExecuteResponse needs to be returned



##########
linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/ElasticSearchEngineConnExecutor.java:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.elasticsearch.executor;
+
+import org.apache.linkis.common.io.MetaData;
+import org.apache.linkis.common.io.Record;
+import org.apache.linkis.common.io.resultset.ResultSetWriter;
+import org.apache.linkis.common.utils.OverloadUtils;
+import org.apache.linkis.engineconn.common.conf.EngineConnConf;
+import org.apache.linkis.engineconn.common.conf.EngineConnConstant;
+import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask;
+import org.apache.linkis.engineconn.computation.executor.execute.ConcurrentComputationExecutor;
+import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext;
+import org.apache.linkis.engineconn.core.EngineConnObject;
+import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchConfiguration;
+import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchEngineConsoleConf;
+import org.apache.linkis.engineplugin.elasticsearch.executor.client.*;
+import org.apache.linkis.engineplugin.elasticsearch.executor.client.impl.ElasticSearchExecutorImpl;
+import org.apache.linkis.governance.common.entity.ExecutionNodeStatus;
+import org.apache.linkis.manager.common.entity.resource.CommonNodeResource;
+import org.apache.linkis.manager.common.entity.resource.LoadResource;
+import org.apache.linkis.manager.common.entity.resource.NodeResource;
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils;
+import org.apache.linkis.manager.label.entity.Label;
+import org.apache.linkis.protocol.engine.JobProgressInfo;
+import org.apache.linkis.rpc.Sender;
+import org.apache.linkis.scheduler.executer.AliasOutputExecuteResponse;
+import org.apache.linkis.scheduler.executer.ErrorExecuteResponse;
+import org.apache.linkis.scheduler.executer.ExecuteResponse;
+import org.apache.linkis.storage.LineRecord;
+import org.apache.linkis.storage.resultset.ResultSetFactory;
+import org.apache.linkis.storage.resultset.table.TableMetaData;
+
+import org.apache.commons.io.IOUtils;
+
+import org.springframework.util.CollectionUtils;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticSearchEngineConnExecutor extends ConcurrentComputationExecutor {
+  private static final Logger logger =
+      LoggerFactory.getLogger(ElasticSearchEngineConnExecutor.class);
+
+  private int id;
+  private String runType;
+  private List<Label<?>> executorLabels = new ArrayList<>(2);
+
+  private Cache<String, ElasticSearchExecutor> elasticSearchExecutorCache =
+      CacheBuilder.newBuilder()
+          .expireAfterAccess(
+              (Long) EngineConnConf.ENGINE_TASK_EXPIRE_TIME().getValue(), TimeUnit.MILLISECONDS)
+          .removalListener(
+              new RemovalListener<String, ElasticSearchExecutor>() {
+                @Override
+                public void onRemoval(
+                    RemovalNotification<String, ElasticSearchExecutor> notification) {
+                  notification.getValue().close();
+                  EngineConnTask task = getTaskById(notification.getKey());
+                  if (!ExecutionNodeStatus.isCompleted(task.getStatus())) {
+                    killTask(notification.getKey());
+                  }
+                }
+              })
+          .maximumSize(EngineConnConstant.MAX_TASK_NUM())
+          .build();
+
+  public ElasticSearchEngineConnExecutor(int outputPrintLimit, int id, String runType) {
+    super(outputPrintLimit);
+    this.id = id;
+    this.runType = runType;
+  }
+
+  @Override
+  public void init() {
+    super.init();
+  }
+
+  @Override
+  public ExecuteResponse execute(EngineConnTask engineConnTask) {
+    Map<String, String> properties = buildRuntimeParams(engineConnTask);
+    logger.info("The elasticsearch properties is: {}", properties);
+
+    //    ElasticSearchExecutor elasticSearchExecutor = ElasticSearchExecutor.apply(runType,
+    // properties);
+    ElasticSearchExecutor elasticSearchExecutor =
+        new ElasticSearchExecutorImpl(runType, properties);
+    try {
+      elasticSearchExecutor.open();
+    } catch (Exception e) {
+      logger.error("Execute es code failed, reason:", e);
+      return new ErrorExecuteResponse("run es failed", e);
+    }
+    elasticSearchExecutorCache.put(engineConnTask.getTaskId(), elasticSearchExecutor);
+    return super.execute(engineConnTask);
+  }
+
+  @Override
+  public ExecuteResponse executeLine(EngineExecutionContext engineExecutorContext, String code) {
+    String taskId = engineExecutorContext.getJobId().get();
+    ElasticSearchExecutor elasticSearchExecutor = elasticSearchExecutorCache.getIfPresent(taskId);
+    ElasticSearchResponse elasticSearchResponse = elasticSearchExecutor.executeLine(code);
+
+    try {
+
+      if (elasticSearchResponse instanceof ElasticSearchTableResponse) {
+        ElasticSearchTableResponse tableResponse =
+            (ElasticSearchTableResponse) elasticSearchResponse;
+        TableMetaData metaData = new TableMetaData(tableResponse.columns());
+        ResultSetWriter<? extends MetaData, ? extends Record> resultSetWriter =
+            engineExecutorContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE);
+        resultSetWriter.addMetaData(metaData);
+        Arrays.asList(tableResponse.records())
+            .forEach(
+                record -> {
+                  try {
+                    resultSetWriter.addRecord(record);
+                  } catch (IOException e) {
+                    logger.warn("es addRecord failed", e);
+                  }
+                });
+        String output = resultSetWriter.toString();
+        IOUtils.closeQuietly(resultSetWriter);
+        return new AliasOutputExecuteResponse(null, output);
+      } else if (elasticSearchResponse instanceof ElasticSearchJsonResponse) {
+        ElasticSearchJsonResponse jsonResponse = (ElasticSearchJsonResponse) elasticSearchResponse;
+        ResultSetWriter<? extends MetaData, ? extends Record> resultSetWriter =
+            engineExecutorContext.createResultSetWriter(ResultSetFactory.TEXT_TYPE);
+        resultSetWriter.addMetaData(null);
+        Arrays.stream(jsonResponse.value().split("\\n"))
+            .forEach(
+                item -> {
+                  try {
+                    resultSetWriter.addRecord(new LineRecord(item));
+                  } catch (IOException e) {
+                    logger.warn("es addRecord failed", e);
+                  }
+                });
+        String output = resultSetWriter.toString();
+        IOUtils.closeQuietly(resultSetWriter);
+        return new AliasOutputExecuteResponse(null, output);
+      } else if (elasticSearchResponse instanceof ElasticSearchErrorResponse) {
+        ElasticSearchErrorResponse errorResponse =
+            (ElasticSearchErrorResponse) elasticSearchResponse;
+        return new ErrorExecuteResponse(errorResponse.message(), errorResponse.cause());
+      }
+    } catch (IOException e) {
+      logger.warn("es addMetaData failed", e);
+    }
+
+    return null;
+  }
+
+  private Map<String, String> buildRuntimeParams(EngineConnTask engineConnTask) {
+    // Parameters specified at runtime
+    Map<String, String> executorProperties =
+        engineConnTask.getProperties().entrySet().stream()
+            .collect(
+                Collectors.toMap(
+                    Map.Entry::getKey, entry -> Objects.toString(entry.getValue(), null)));
+    if (executorProperties == null) {

Review Comment:
   should first judge the null



##########
linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/EsClientImpl.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.elasticsearch.executor.client;
+
+import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchConfiguration;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.Header;
+import org.apache.http.auth.AUTH;
+import org.apache.http.auth.Credentials;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.message.BufferedHeader;
+import org.apache.http.util.CharArrayBuffer;
+import org.apache.http.util.EncodingUtils;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+import org.elasticsearch.client.Cancellable;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.ResponseListener;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.sniff.Sniffer;
+
+public class EsClientImpl extends EsClient {
+  private RestClient client;

Review Comment:
   ExClient already has a Client var



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@linkis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@linkis.apache.org
For additional commands, e-mail: notifications-help@linkis.apache.org