You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2023/05/17 13:02:29 UTC
[linkis] branch dev-1.4.0 updated: Translate engineconn-plugins-elasticsearch service classes from Scala to Java (#4531)
This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.4.0
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/dev-1.4.0 by this push:
new 3989b4dcf Translate engineconn-plugins-elasticsearch service classes from Scala to Java (#4531)
3989b4dcf is described below
commit 3989b4dcf2f410f6ba2b081ad0510f252741b6e1
Author: ChengJie1053 <18...@163.com>
AuthorDate: Wed May 17 21:02:23 2023 +0800
Translate engineconn-plugins-elasticsearch service classes from Scala to Java (#4531)
---
.../ElasticSearchEngineConnPlugin.java | 89 +++++++
...asticSearchProcessEngineConnLaunchBuilder.java} | 17 +-
.../conf/ElasticSearchConfiguration.java | 63 +++++
.../conf/ElasticSearchEngineConsoleConf.java | 64 +++++
.../exception/EsConvertResponseException.java} | 13 +-
.../exception/EsParamsIllegalException.java} | 13 +-
.../executor/ElasticSearchEngineConnExecutor.java | 284 +++++++++++++++++++++
.../executor/client/ElasticSearchExecutor.java} | 12 +-
.../elasticsearch/executor/client/EsClient.java | 66 +++++
.../executor/client/EsClientFactory.java | 220 ++++++++++++++++
.../executor/client/EsClientImpl.java | 106 ++++++++
.../executor/client/EsClientOperate.java} | 14 +-
.../client/impl/ElasticSearchExecutorImpl.java | 130 ++++++++++
.../ElasticSearchEngineConnPlugin.scala | 74 ------
.../conf/ElasticSearchConfiguration.scala | 49 ----
.../conf/ElasticSearchEngineConsoleConf.scala | 49 ----
.../executor/ElasticSearchEngineConnExecutor.scala | 232 -----------------
.../executor/client/ElasticSearchExecutor.scala | 46 ----
.../executor/client/ElasticSearchResponse.scala | 0
.../elasticsearch/executor/client/EsClient.scala | 131 ----------
.../executor/client/EsClientFactory.scala | 185 --------------
.../client/impl/ElasticSearchExecutorImpl.scala | 112 --------
.../executor/client/impl/ResponseHandlerImpl.scala | 3 +-
23 files changed, 1066 insertions(+), 906 deletions(-)
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/ElasticSearchEngineConnPlugin.java b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/ElasticSearchEngineConnPlugin.java
new file mode 100644
index 000000000..17129bd3d
--- /dev/null
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/ElasticSearchEngineConnPlugin.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.elasticsearch;
+
+import org.apache.linkis.engineplugin.elasticsearch.builder.ElasticSearchProcessEngineConnLaunchBuilder;
+import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchConfiguration;
+import org.apache.linkis.engineplugin.elasticsearch.factory.ElasticSearchEngineConnFactory;
+import org.apache.linkis.manager.engineplugin.common.EngineConnPlugin;
+import org.apache.linkis.manager.engineplugin.common.creation.EngineConnFactory;
+import org.apache.linkis.manager.engineplugin.common.launch.EngineConnLaunchBuilder;
+import org.apache.linkis.manager.engineplugin.common.resource.EngineResourceFactory;
+import org.apache.linkis.manager.engineplugin.common.resource.GenericEngineResourceFactory;
+import org.apache.linkis.manager.label.entity.Label;
+import org.apache.linkis.manager.label.entity.engine.EngineType;
+import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class ElasticSearchEngineConnPlugin implements EngineConnPlugin {
+
+ private EngineResourceFactory engineResourceFactory;
+
+ private EngineConnFactory engineFactory;
+
+ private List<Label<?>> defaultLabels = new ArrayList<>();
+
+ private Object resourceLocker = new Object();
+
+ private Object engineFactoryLocker = new Object();
+
+ @Override
+ public void init(Map<String, Object> params) {
+ EngineTypeLabel typeLabel = new EngineTypeLabel();
+ typeLabel.setEngineType(EngineType.ELASTICSEARCH().toString());
+ typeLabel.setVersion(ElasticSearchConfiguration.DEFAULT_VERSION.getValue());
+ this.defaultLabels.add(typeLabel);
+ }
+
+ @Override
+ public EngineResourceFactory getEngineResourceFactory() {
+ if (engineResourceFactory == null) {
+ synchronized (resourceLocker) {
+ if (engineResourceFactory == null) {
+ engineResourceFactory = new GenericEngineResourceFactory();
+ }
+ }
+ }
+ return engineResourceFactory;
+ }
+
+ @Override
+ public EngineConnLaunchBuilder getEngineConnLaunchBuilder() {
+ return new ElasticSearchProcessEngineConnLaunchBuilder();
+ }
+
+ @Override
+ public EngineConnFactory getEngineConnFactory() {
+ if (engineFactory == null) {
+ synchronized (engineFactoryLocker) {
+ if (engineFactory == null) {
+ engineFactory = new ElasticSearchEngineConnFactory();
+ }
+ }
+ }
+ return engineFactory;
+ }
+
+ @Override
+ public List<Label<?>> getDefaultLabels() {
+ return defaultLabels;
+ }
+}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/builder/ElasticSearchProcessEngineConnLaunchBuilder.scala b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/builder/ElasticSearchProcessEngineConnLaunchBuilder.java
similarity index 69%
rename from linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/builder/ElasticSearchProcessEngineConnLaunchBuilder.scala
rename to linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/builder/ElasticSearchProcessEngineConnLaunchBuilder.java
index a35b9107b..7094184ec 100644
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/builder/ElasticSearchProcessEngineConnLaunchBuilder.scala
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/builder/ElasticSearchProcessEngineConnLaunchBuilder.java
@@ -15,16 +15,17 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.elasticsearch.builder
+package org.apache.linkis.engineplugin.elasticsearch.builder;
-import org.apache.linkis.manager.engineplugin.common.launch.process.JavaProcessEngineConnLaunchBuilder
-import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel
-import org.apache.linkis.storage.utils.StorageConfiguration
+import org.apache.linkis.manager.engineplugin.common.launch.process.JavaProcessEngineConnLaunchBuilder;
+import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel;
+import org.apache.linkis.storage.utils.StorageConfiguration;
-class ElasticSearchProcessEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder {
+public class ElasticSearchProcessEngineConnLaunchBuilder
+ extends JavaProcessEngineConnLaunchBuilder {
- override def getEngineStartUser(label: UserCreatorLabel): String = {
- StorageConfiguration.HDFS_ROOT_USER.getValue
+ @Override
+ public String getEngineStartUser(UserCreatorLabel label) {
+ return StorageConfiguration.HDFS_ROOT_USER.getValue();
}
-
}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/conf/ElasticSearchConfiguration.java b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/conf/ElasticSearchConfiguration.java
new file mode 100644
index 000000000..dfd844bc9
--- /dev/null
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/conf/ElasticSearchConfiguration.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.elasticsearch.conf;
+
+import org.apache.linkis.common.conf.ByteType;
+import org.apache.linkis.common.conf.CommonVars;
+
+public class ElasticSearchConfiguration {
+
+ // es client
+ public static final CommonVars<String> ES_CLUSTER =
+ CommonVars.apply("linkis.es.cluster", "127.0.0.1:9200");
+ public static final CommonVars<String> ES_DATASOURCE_NAME =
+ CommonVars.apply("linkis.es.datasource", "default_datasource");
+ public static final CommonVars<Boolean> ES_AUTH_CACHE =
+ CommonVars.apply("linkis.es.auth.cache", false);
+ public static final CommonVars<String> ES_USERNAME = CommonVars.apply("linkis.es.username", "");
+ public static final CommonVars<String> ES_PASSWORD = CommonVars.apply("linkis.es.password", "");
+ public static final CommonVars<Boolean> ES_SNIFFER_ENABLE =
+ CommonVars.apply("linkis.es.sniffer.enable", false);
+ public static final CommonVars<String> ES_HTTP_METHOD =
+ CommonVars.apply("linkis.es.http.method", "GET");
+ public static final CommonVars<String> ES_HTTP_ENDPOINT =
+ CommonVars.apply("linkis.es.http.endpoint", "/_search");
+ public static final CommonVars<String> ES_HTTP_SQL_ENDPOINT =
+ CommonVars.apply("linkis.es.sql.endpoint", "/_sql");
+ public static final CommonVars<String> ES_SQL_FORMAT =
+ CommonVars.apply("linkis.es.sql.format", "{\"query\": \"%s\"}");
+ public static final String ES_HTTP_HEADER_PREFIX = "linkis.es.headers.";
+
+ // entrance resource
+ public static final CommonVars<Integer> ENTRANCE_MAX_JOB_INSTANCE =
+ CommonVars.apply("linkis.es.max.job.instance", 100);
+ public static final CommonVars<Integer> ENTRANCE_PROTECTED_JOB_INSTANCE =
+ CommonVars.apply("linkis.es.protected.job.instance", 20);
+ public static final CommonVars<Integer> ENGINE_DEFAULT_LIMIT =
+ CommonVars.apply("linkis.es.default.limit", 5000);
+
+ // resultSet
+ public static final CommonVars<ByteType> ENGINE_RESULT_SET_MAX_CACHE =
+ CommonVars.apply("linkis.resultSet.cache.max", new ByteType("512k"));
+
+ public static final CommonVars<Integer> ENGINE_CONCURRENT_LIMIT =
+ CommonVars.apply("linkis.engineconn.concurrent.limit", 100);
+
+ public static final CommonVars<String> DEFAULT_VERSION =
+ CommonVars.apply("linkis.engineconn.io.version", "7.6.2");
+}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/conf/ElasticSearchEngineConsoleConf.java b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/conf/ElasticSearchEngineConsoleConf.java
new file mode 100644
index 000000000..602aac138
--- /dev/null
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/conf/ElasticSearchEngineConsoleConf.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.elasticsearch.conf;
+
+import org.apache.linkis.common.conf.Configuration;
+import org.apache.linkis.governance.common.protocol.conf.RequestQueryEngineConfig;
+import org.apache.linkis.governance.common.protocol.conf.ResponseQueryConfig;
+import org.apache.linkis.manager.label.entity.Label;
+import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel;
+import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel;
+import org.apache.linkis.protocol.CacheableProtocol;
+import org.apache.linkis.rpc.RPCMapCache;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+
+public class ElasticSearchEngineConsoleConf extends RPCMapCache<Label[], String, String> {
+
+ public ElasticSearchEngineConsoleConf() {
+ super(Configuration.CLOUD_CONSOLE_CONFIGURATION_SPRING_APPLICATION_NAME().getValue());
+ }
+
+ @Override
+ public CacheableProtocol createRequest(Label[] labels) {
+ UserCreatorLabel userCreatorLabel =
+ (UserCreatorLabel)
+ Arrays.stream(labels)
+ .filter(label -> label instanceof UserCreatorLabel)
+ .findFirst()
+ .get();
+ EngineTypeLabel engineTypeLabel =
+ (EngineTypeLabel)
+ Arrays.stream(labels)
+ .filter(label -> label instanceof EngineTypeLabel)
+ .findFirst()
+ .get();
+ return new RequestQueryEngineConfig(userCreatorLabel, engineTypeLabel, null);
+ }
+
+ @Override
+ public Map<String, String> createMap(Object obj) {
+ if (obj instanceof ResponseQueryConfig) {
+ return ((ResponseQueryConfig) obj).getKeyAndValue();
+ } else {
+ return Collections.emptyMap();
+ }
+ }
+}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/exception/EsParamsIllegalException.scala b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/exception/EsConvertResponseException.java
similarity index 70%
rename from linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/exception/EsParamsIllegalException.scala
rename to linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/exception/EsConvertResponseException.java
index f6c7d4131..cadb8326e 100644
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/exception/EsParamsIllegalException.scala
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/exception/EsConvertResponseException.java
@@ -15,10 +15,13 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.elasticsearch.exception
+package org.apache.linkis.engineplugin.elasticsearch.exception;
-import org.apache.linkis.common.exception.ErrorException
-import org.apache.linkis.engineplugin.elasticsearch.errorcode.EasticsearchErrorCodeSummary.CLUSTER_IS_BLANK
+import org.apache.linkis.common.exception.ErrorException;
+import org.apache.linkis.engineplugin.elasticsearch.errorcode.EasticsearchErrorCodeSummary;
-case class EsParamsIllegalException(errorMsg: String)
- extends ErrorException(CLUSTER_IS_BLANK.getErrorCode, errorMsg)
+public class EsConvertResponseException extends ErrorException {
+ public EsConvertResponseException(String errorMsg) {
+ super(EasticsearchErrorCodeSummary.RESPONSE_FAIL_IS_EMPTY.getErrorCode(), errorMsg);
+ }
+}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/exception/EsConvertResponseException.scala b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/exception/EsParamsIllegalException.java
similarity index 70%
rename from linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/exception/EsConvertResponseException.scala
rename to linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/exception/EsParamsIllegalException.java
index 998a2d81d..8aada5ae1 100644
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/exception/EsConvertResponseException.scala
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/exception/EsParamsIllegalException.java
@@ -15,10 +15,13 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.elasticsearch.exception
+package org.apache.linkis.engineplugin.elasticsearch.exception;
-import org.apache.linkis.common.exception.ErrorException
-import org.apache.linkis.engineplugin.elasticsearch.errorcode.EasticsearchErrorCodeSummary.RESPONSE_FAIL_IS_EMPTY
+import org.apache.linkis.common.exception.ErrorException;
+import org.apache.linkis.engineplugin.elasticsearch.errorcode.EasticsearchErrorCodeSummary;
-case class EsConvertResponseException(errorMsg: String)
- extends ErrorException(RESPONSE_FAIL_IS_EMPTY.getErrorCode, errorMsg)
+public class EsParamsIllegalException extends ErrorException {
+ public EsParamsIllegalException(String errorMsg) {
+ super(EasticsearchErrorCodeSummary.CLUSTER_IS_BLANK.getErrorCode(), errorMsg);
+ }
+}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/ElasticSearchEngineConnExecutor.java b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/ElasticSearchEngineConnExecutor.java
new file mode 100644
index 000000000..8d6b7e6ed
--- /dev/null
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/ElasticSearchEngineConnExecutor.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.elasticsearch.executor;
+
+import org.apache.linkis.common.io.MetaData;
+import org.apache.linkis.common.io.Record;
+import org.apache.linkis.common.io.resultset.ResultSetWriter;
+import org.apache.linkis.common.utils.OverloadUtils;
+import org.apache.linkis.engineconn.common.conf.EngineConnConf;
+import org.apache.linkis.engineconn.common.conf.EngineConnConstant;
+import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask;
+import org.apache.linkis.engineconn.computation.executor.execute.ConcurrentComputationExecutor;
+import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext;
+import org.apache.linkis.engineconn.core.EngineConnObject;
+import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchConfiguration;
+import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchEngineConsoleConf;
+import org.apache.linkis.engineplugin.elasticsearch.executor.client.*;
+import org.apache.linkis.engineplugin.elasticsearch.executor.client.impl.ElasticSearchExecutorImpl;
+import org.apache.linkis.governance.common.entity.ExecutionNodeStatus;
+import org.apache.linkis.manager.common.entity.resource.CommonNodeResource;
+import org.apache.linkis.manager.common.entity.resource.LoadResource;
+import org.apache.linkis.manager.common.entity.resource.NodeResource;
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils;
+import org.apache.linkis.manager.label.entity.Label;
+import org.apache.linkis.protocol.engine.JobProgressInfo;
+import org.apache.linkis.rpc.Sender;
+import org.apache.linkis.scheduler.executer.AliasOutputExecuteResponse;
+import org.apache.linkis.scheduler.executer.ErrorExecuteResponse;
+import org.apache.linkis.scheduler.executer.ExecuteResponse;
+import org.apache.linkis.storage.LineRecord;
+import org.apache.linkis.storage.resultset.ResultSetFactory;
+import org.apache.linkis.storage.resultset.table.TableMetaData;
+
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.io.IOUtils;
+
+import org.springframework.util.CollectionUtils;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticSearchEngineConnExecutor extends ConcurrentComputationExecutor {
+ private static final Logger logger =
+ LoggerFactory.getLogger(ElasticSearchEngineConnExecutor.class);
+
+ private int id;
+ private String runType;
+ private List<Label<?>> executorLabels = new ArrayList<>(2);
+
+ private Cache<String, ElasticSearchExecutor> elasticSearchExecutorCache =
+ CacheBuilder.newBuilder()
+ .expireAfterAccess(
+ (Long) EngineConnConf.ENGINE_TASK_EXPIRE_TIME().getValue(), TimeUnit.MILLISECONDS)
+ .removalListener(
+ new RemovalListener<String, ElasticSearchExecutor>() {
+ @Override
+ public void onRemoval(
+ RemovalNotification<String, ElasticSearchExecutor> notification) {
+ notification.getValue().close();
+ EngineConnTask task = getTaskById(notification.getKey());
+ if (!ExecutionNodeStatus.isCompleted(task.getStatus())) {
+ killTask(notification.getKey());
+ }
+ }
+ })
+ .maximumSize(EngineConnConstant.MAX_TASK_NUM())
+ .build();
+
+ public ElasticSearchEngineConnExecutor(int outputPrintLimit, int id, String runType) {
+ super(outputPrintLimit);
+ this.id = id;
+ this.runType = runType;
+ }
+
+ @Override
+ public void init() {
+ super.init();
+ }
+
+ @Override
+ public ExecuteResponse execute(EngineConnTask engineConnTask) {
+ Map<String, String> properties = buildRuntimeParams(engineConnTask);
+ logger.info("The elasticsearch properties is: {}", properties);
+
+ // ElasticSearchExecutor elasticSearchExecutor = ElasticSearchExecutor.apply(runType,
+ // properties);
+ ElasticSearchExecutor elasticSearchExecutor =
+ new ElasticSearchExecutorImpl(runType, properties);
+ try {
+ elasticSearchExecutor.open();
+ } catch (Exception e) {
+ logger.error("Execute es code failed, reason:", e);
+ return new ErrorExecuteResponse("run es failed", e);
+ }
+ elasticSearchExecutorCache.put(engineConnTask.getTaskId(), elasticSearchExecutor);
+ return super.execute(engineConnTask);
+ }
+
+ @Override
+ public ExecuteResponse executeLine(EngineExecutionContext engineExecutorContext, String code) {
+ String taskId = engineExecutorContext.getJobId().get();
+ ElasticSearchExecutor elasticSearchExecutor = elasticSearchExecutorCache.getIfPresent(taskId);
+ ElasticSearchResponse elasticSearchResponse = elasticSearchExecutor.executeLine(code);
+
+ try {
+
+ if (elasticSearchResponse instanceof ElasticSearchTableResponse) {
+ ElasticSearchTableResponse tableResponse =
+ (ElasticSearchTableResponse) elasticSearchResponse;
+ TableMetaData metaData = new TableMetaData(tableResponse.columns());
+ ResultSetWriter<? extends MetaData, ? extends Record> resultSetWriter =
+ engineExecutorContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE);
+ resultSetWriter.addMetaData(metaData);
+ Arrays.asList(tableResponse.records())
+ .forEach(
+ record -> {
+ try {
+ resultSetWriter.addRecord(record);
+ } catch (IOException e) {
+ logger.warn("es addRecord failed", e);
+ throw new RuntimeException("es addRecord failed", e);
+ }
+ });
+ String output = resultSetWriter.toString();
+ IOUtils.closeQuietly(resultSetWriter);
+ return new AliasOutputExecuteResponse(null, output);
+ } else if (elasticSearchResponse instanceof ElasticSearchJsonResponse) {
+ ElasticSearchJsonResponse jsonResponse = (ElasticSearchJsonResponse) elasticSearchResponse;
+ ResultSetWriter<? extends MetaData, ? extends Record> resultSetWriter =
+ engineExecutorContext.createResultSetWriter(ResultSetFactory.TEXT_TYPE);
+ resultSetWriter.addMetaData(null);
+ Arrays.stream(jsonResponse.value().split("\\n"))
+ .forEach(
+ item -> {
+ try {
+ resultSetWriter.addRecord(new LineRecord(item));
+ } catch (IOException e) {
+ logger.warn("es addRecord failed", e);
+ throw new RuntimeException("es addRecord failed", e);
+ }
+ });
+ String output = resultSetWriter.toString();
+ IOUtils.closeQuietly(resultSetWriter);
+ return new AliasOutputExecuteResponse(null, output);
+ } else if (elasticSearchResponse instanceof ElasticSearchErrorResponse) {
+ ElasticSearchErrorResponse errorResponse =
+ (ElasticSearchErrorResponse) elasticSearchResponse;
+ return new ErrorExecuteResponse(errorResponse.message(), errorResponse.cause());
+ }
+ } catch (IOException e) {
+ logger.warn("es addMetaData failed", e);
+ return new ErrorExecuteResponse("es addMetaData failed", e);
+ }
+
+ return new ErrorExecuteResponse("es executeLine failed", null);
+ }
+
+ private Map<String, String> buildRuntimeParams(EngineConnTask engineConnTask) {
+ // Parameters specified at runtime
+ Map<String, String> executorProperties =
+ engineConnTask.getProperties().entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey, entry -> Objects.toString(entry.getValue(), null)));
+
+ // Global engine params by console
+ Map<String, String> globalConfig =
+ new ElasticSearchEngineConsoleConf().getCacheMap(engineConnTask.getLables());
+
+ if (MapUtils.isNotEmpty(executorProperties)) {
+ globalConfig.putAll(executorProperties);
+ }
+
+ return globalConfig;
+ }
+
+ @Override
+ public ExecuteResponse executeCompletely(
+ EngineExecutionContext engineExecutorContext, String code, String completedLine) {
+ return null;
+ }
+
+ @Override
+ public float progress(String taskId) {
+ return 0.0f;
+ }
+
+ @Override
+ public JobProgressInfo[] getProgressInfo(String taskId) {
+ return new JobProgressInfo[0];
+ }
+
+ @Override
+ public List<Label<?>> getExecutorLabels() {
+ return executorLabels;
+ }
+
+ @Override
+ public void setExecutorLabels(List<Label<?>> labels) {
+ if (!CollectionUtils.isEmpty(labels)) {
+ executorLabels.clear();
+ executorLabels.addAll(labels);
+ }
+ }
+
+ @Override
+ public NodeResource requestExpectedResource(NodeResource expectedResource) {
+ return null;
+ }
+
+ @Override
+ public NodeResource getCurrentNodeResource() {
+ NodeResourceUtils.appendMemoryUnitIfMissing(
+ EngineConnObject.getEngineCreationContext().getOptions());
+
+ CommonNodeResource resource = new CommonNodeResource();
+ LoadResource usedResource = new LoadResource(OverloadUtils.getProcessMaxMemory(), 1);
+ resource.setUsedResource(usedResource);
+ return resource;
+ }
+
+ @Override
+ public String getId() {
+ return Sender.getThisServiceInstance().getInstance() + "_" + id;
+ }
+
+ @Override
+ public int getConcurrentLimit() {
+ return ElasticSearchConfiguration.ENGINE_CONCURRENT_LIMIT.getValue();
+ }
+
+ @Override
+ public void killTask(String taskId) {
+
+ ElasticSearchExecutor elasticSearchExecutor = elasticSearchExecutorCache.getIfPresent(taskId);
+ if (elasticSearchExecutor != null) {
+ elasticSearchExecutor.close();
+ }
+
+ super.killTask(taskId);
+ }
+
+ @Override
+ public void killAll() {
+ elasticSearchExecutorCache.asMap().values().forEach(e -> e.close());
+ }
+
+ @Override
+ public void transformTaskStatus(EngineConnTask task, ExecutionNodeStatus newStatus) {
+ super.transformTaskStatus(task, newStatus);
+ if (ExecutionNodeStatus.isCompleted(newStatus)) {
+ elasticSearchExecutorCache.invalidate(task.getTaskId());
+ }
+ }
+
+ @Override
+ public boolean supportCallBackLogs() {
+ return false;
+ }
+}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/ElasticSearchExecutorOrder.scala b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/ElasticSearchExecutor.java
similarity index 79%
rename from linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/ElasticSearchExecutorOrder.scala
rename to linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/ElasticSearchExecutor.java
index ea49f9fe2..d15e06fd0 100644
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/ElasticSearchExecutorOrder.scala
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/ElasticSearchExecutor.java
@@ -15,11 +15,13 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.elasticsearch.executor
+package org.apache.linkis.engineplugin.elasticsearch.executor.client;
-object ElasticSearchExecutorOrder extends Enumeration {
+public interface ElasticSearchExecutor {
- type ElasticSearchExecutorOrder = Value
- val SQL = Value(1)
- val JSON = Value(3)
+ void open() throws Exception;
+
+ ElasticSearchResponse executeLine(String code);
+
+ void close();
}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/EsClient.java b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/EsClient.java
new file mode 100644
index 000000000..a8f30ac87
--- /dev/null
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/EsClient.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.elasticsearch.executor.client;
+
+import java.io.IOException;
+
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.sniff.Sniffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class EsClient implements EsClientOperate {
+
+ private static final Logger logger = LoggerFactory.getLogger(EsClient.class);
+
+ private String datasourceName;
+ public RestClient client;
+ private Sniffer sniffer;
+
+ public EsClient(String datasourceName, RestClient client, Sniffer sniffer) {
+ this.datasourceName = datasourceName;
+ this.client = client;
+ this.sniffer = sniffer;
+ }
+
+ public String getDatasourceName() {
+ return datasourceName;
+ }
+
+ public RestClient getRestClient() {
+ return client;
+ }
+
+ public Sniffer getSniffer() {
+ return sniffer;
+ }
+
+ @Override
+ public void close() {
+ if (sniffer != null) {
+ sniffer.close();
+ }
+ if (client != null) {
+ try {
+ client.close();
+ } catch (IOException e) {
+ logger.warn("RestClient close warn");
+ }
+ }
+ }
+}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/EsClientFactory.java b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/EsClientFactory.java
new file mode 100644
index 000000000..7fb8a0023
--- /dev/null
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/EsClientFactory.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.elasticsearch.executor.client;
+
+import org.apache.linkis.common.conf.CommonVars;
+import org.apache.linkis.common.exception.ErrorException;
+import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchConfiguration;
+import org.apache.linkis.engineplugin.elasticsearch.exception.EsParamsIllegalException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.Header;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.message.BasicHeader;
+
+import java.util.*;
+
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.sniff.Sniffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.linkis.engineplugin.elasticsearch.errorcode.EasticsearchErrorCodeSummary.CLUSTER_IS_BLANK;
+
+public class EsClientFactory {
+
+ private static final Logger logger = LoggerFactory.getLogger(EsClientFactory.class);
+
+ public static EsClient getRestClient(Map<String, String> options) {
+ String key = getDatasourceName(options);
+ if (StringUtils.isBlank(key)) {
+ return defaultClient;
+ }
+
+ if (!ES_CLIENT_MAP.containsKey(key)) {
+ synchronized (ES_CLIENT_MAP) {
+ if (!ES_CLIENT_MAP.containsKey(key)) {
+ try {
+ cacheClient(createRestClient(options));
+ } catch (ErrorException e) {
+ logger.error("es createRestClient failed, reason:", e);
+ }
+ }
+ }
+ }
+
+ return ES_CLIENT_MAP.get(key);
+ }
+
+ private static int MAX_CACHE_CLIENT_SIZE = 20;
+
+ private static Map<String, EsClient> ES_CLIENT_MAP =
+ new LinkedHashMap<String, EsClient>() {
+
+ @Override
+ protected boolean removeEldestEntry(Map.Entry<String, EsClient> eldest) {
+ if (size() > MAX_CACHE_CLIENT_SIZE) {
+ eldest.getValue().close();
+ return true;
+ } else {
+ return false;
+ }
+ }
+ };
+
+ private static String getDatasourceName(Map<String, String> options) {
+ return options.getOrDefault(ElasticSearchConfiguration.ES_DATASOURCE_NAME.key(), "");
+ }
+
+ private static void cacheClient(EsClient client) {
+ ES_CLIENT_MAP.put(client.getDatasourceName(), client);
+ }
+
+ private static EsClient createRestClient(Map<String, String> options) throws ErrorException {
+ String clusterStr = options.get(ElasticSearchConfiguration.ES_CLUSTER.key());
+
+ if (StringUtils.isBlank(clusterStr)) {
+ throw new EsParamsIllegalException(CLUSTER_IS_BLANK.getErrorDesc());
+ }
+
+ HttpHost[] cluster = getCluster(clusterStr);
+
+ if (cluster.length == 0) {
+ throw new EsParamsIllegalException(CLUSTER_IS_BLANK.getErrorDesc());
+ }
+
+ String username = options.get(ElasticSearchConfiguration.ES_USERNAME.key());
+ String password = options.get(ElasticSearchConfiguration.ES_PASSWORD.key());
+
+ if (ElasticSearchConfiguration.ES_AUTH_CACHE.getValue()) {
+ setAuthScope(cluster, username, password);
+ }
+
+ HttpHost[] httpHosts =
+ Arrays.stream(cluster)
+ .map(item -> new HttpHost(item.getHostName(), item.getPort()))
+ .toArray(HttpHost[]::new);
+ RestClientBuilder builder = RestClient.builder(httpHosts);
+
+ builder.setHttpClientConfigCallback(
+ new RestClientBuilder.HttpClientConfigCallback() {
+ @Override
+ public HttpAsyncClientBuilder customizeHttpClient(
+ HttpAsyncClientBuilder httpAsyncClientBuilder) {
+ if (!ElasticSearchConfiguration.ES_AUTH_CACHE.getValue()) {
+ httpAsyncClientBuilder.disableAuthCaching();
+ }
+ return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+ }
+ });
+
+ if (defaultHeaders != null) {
+ builder.setDefaultHeaders(defaultHeaders);
+ }
+
+ RestClient restClient = builder.build();
+ Sniffer sniffer =
+ ElasticSearchConfiguration.ES_SNIFFER_ENABLE.getValue(options)
+ ? Sniffer.builder(restClient).build()
+ : null;
+
+ return new EsClientImpl(getDatasourceName(options), restClient, sniffer);
+ }
+
+ private static CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+
+ private static EsClient defaultClient;
+
+ static {
+ String cluster = ElasticSearchConfiguration.ES_CLUSTER.key();
+ if (StringUtils.isBlank(cluster)) {
+ defaultClient = null;
+ } else {
+ Map<String, String> defaultOpts = new HashMap<>();
+ defaultOpts.put(ElasticSearchConfiguration.ES_CLUSTER.key(), cluster);
+ defaultOpts.put(
+ ElasticSearchConfiguration.ES_DATASOURCE_NAME.key(),
+ ElasticSearchConfiguration.ES_DATASOURCE_NAME.getValue());
+ defaultOpts.put(
+ ElasticSearchConfiguration.ES_USERNAME.key(),
+ ElasticSearchConfiguration.ES_USERNAME.getValue());
+ defaultOpts.put(
+ ElasticSearchConfiguration.ES_PASSWORD.key(),
+ ElasticSearchConfiguration.ES_PASSWORD.getValue());
+ EsClient client = null;
+ try {
+ client = createRestClient(defaultOpts);
+ } catch (ErrorException e) {
+ logger.error("es createRestClient failed, reason:", e);
+ }
+ cacheClient(client);
+ defaultClient = client;
+ }
+ }
+
+ private static Header[] defaultHeaders =
+ CommonVars.properties().entrySet().stream()
+ .filter(
+ entry ->
+ entry.getKey() != null
+ && entry.getValue() != null
+ && entry
+ .getKey()
+ .toString()
+ .startsWith(ElasticSearchConfiguration.ES_HTTP_HEADER_PREFIX))
+ .map(entry -> new BasicHeader(entry.getKey().toString(), entry.getValue().toString()))
+ .toArray(Header[]::new);
+
+ private static HttpHost[] getCluster(String clusterStr) {
+ if (StringUtils.isNotBlank(clusterStr)) {
+ return Arrays.stream(clusterStr.split(","))
+ .map(
+ value -> {
+ String[] arr = value.replace("http://", "").split(":");
+ return new HttpHost(arr[0].trim(), Integer.parseInt(arr[1].trim()));
+ })
+ .toArray(HttpHost[]::new);
+ } else {
+ return new HttpHost[0];
+ }
+ }
+
+ private static void setAuthScope(HttpHost[] cluster, String username, String password) {
+ if (cluster != null
+ && cluster.length > 0
+ && StringUtils.isNotBlank(username)
+ && StringUtils.isNotBlank(password)) {
+ Arrays.stream(cluster)
+ .forEach(
+ host ->
+ credentialsProvider.setCredentials(
+ new AuthScope(
+ host.getHostName(),
+ host.getPort(),
+ AuthScope.ANY_REALM,
+ AuthScope.ANY_SCHEME),
+ new UsernamePasswordCredentials(username, password)));
+ }
+ }
+}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/EsClientImpl.java b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/EsClientImpl.java
new file mode 100644
index 000000000..568f5ca29
--- /dev/null
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/EsClientImpl.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.elasticsearch.executor.client;
+
+import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchConfiguration;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.Header;
+import org.apache.http.auth.AUTH;
+import org.apache.http.auth.Credentials;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.message.BufferedHeader;
+import org.apache.http.util.CharArrayBuffer;
+import org.apache.http.util.EncodingUtils;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+import org.elasticsearch.client.Cancellable;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.ResponseListener;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.sniff.Sniffer;
+
+public class EsClientImpl extends EsClient {
+
+ public EsClientImpl(String datasourceName, RestClient client, Sniffer sniffer) {
+ super(datasourceName, client, sniffer);
+ }
+
+ @Override
+ public Cancellable execute(
+ String code, Map<String, String> options, ResponseListener responseListener) {
+ Request request = createRequest(code, options);
+ return client.performRequestAsync(request, responseListener);
+ }
+
+ private Request createRequest(String code, Map<String, String> options) {
+ String endpoint = ElasticSearchConfiguration.ES_HTTP_ENDPOINT.getValue(options);
+ String method = ElasticSearchConfiguration.ES_HTTP_METHOD.getValue(options);
+
+ Request request = new Request(method, endpoint);
+ request.setOptions(getRequestOptions(options));
+ request.setJsonEntity(code);
+ return request;
+ }
+
+ private RequestOptions getRequestOptions(Map<String, String> options) {
+ RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
+
+ String username = ElasticSearchConfiguration.ES_USERNAME.getValue(options);
+ String password = ElasticSearchConfiguration.ES_PASSWORD.getValue(options);
+
+ // username / password convert to base auth
+ if (StringUtils.isNotBlank(username) && StringUtils.isNotBlank(password)) {
+ Header authHeader =
+ authenticate(
+ new UsernamePasswordCredentials(username, password), StandardCharsets.UTF_8.name());
+ builder.addHeader(authHeader.getName(), authHeader.getValue());
+ }
+
+ options.entrySet().stream()
+ .filter(
+ entry ->
+ entry.getKey() != null
+ && entry.getValue() != null
+ && entry.getKey().startsWith(ElasticSearchConfiguration.ES_HTTP_HEADER_PREFIX))
+ .forEach(entry -> builder.addHeader(entry.getKey(), entry.getValue()));
+
+ return builder.build();
+ }
+
+ private Header authenticate(Credentials credentials, String charset) {
+ StringBuilder tmp = new StringBuilder();
+ tmp.append(credentials.getUserPrincipal().getName());
+ tmp.append(":");
+ tmp.append(credentials.getPassword() == null ? "null" : credentials.getPassword());
+
+ byte[] base64password =
+ Base64.encodeBase64(EncodingUtils.getBytes(tmp.toString(), charset), false);
+
+ CharArrayBuffer buffer = new CharArrayBuffer(32);
+ buffer.append(AUTH.WWW_AUTH_RESP);
+ buffer.append(": Basic ");
+ buffer.append(base64password, 0, base64password.length);
+
+ return new BufferedHeader(buffer);
+ }
+}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/exception/EsEngineException.scala b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/EsClientOperate.java
similarity index 70%
rename from linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/exception/EsEngineException.scala
rename to linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/EsClientOperate.java
index 1632ae412..619974c17 100644
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/exception/EsEngineException.scala
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/EsClientOperate.java
@@ -15,8 +15,16 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.elasticsearch.exception
+package org.apache.linkis.engineplugin.elasticsearch.executor.client;
-import org.apache.linkis.common.exception.ErrorException
+import java.util.Map;
-case class EsEngineException(errorMsg: String) extends ErrorException(70114, errorMsg)
+import org.elasticsearch.client.Cancellable;
+import org.elasticsearch.client.ResponseListener;
+
+public interface EsClientOperate {
+
+ Cancellable execute(String code, Map<String, String> options, ResponseListener responseListener);
+
+ void close();
+}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/impl/ElasticSearchExecutorImpl.java b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/impl/ElasticSearchExecutorImpl.java
new file mode 100644
index 000000000..ba21ff56c
--- /dev/null
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/impl/ElasticSearchExecutorImpl.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.elasticsearch.executor.client.impl;
+
+import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchConfiguration;
+import org.apache.linkis.engineplugin.elasticsearch.executor.client.*;
+import org.apache.linkis.protocol.constants.TaskConstant;
+import org.apache.linkis.storage.utils.StorageUtils;
+
+import org.apache.commons.lang.StringUtils;
+
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import org.elasticsearch.client.Cancellable;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.ResponseListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticSearchExecutorImpl implements ElasticSearchExecutor {
+
+ private static final Logger logger = LoggerFactory.getLogger(ElasticSearchExecutorImpl.class);
+
+ private EsClient client;
+ private Cancellable cancelable;
+ private String user;
+ private String runType;
+ private Map<String, String> properties;
+
+ public ElasticSearchExecutorImpl(String runType, Map<String, String> properties) {
+ this.runType = runType;
+ this.properties = properties;
+ }
+
+ @Override
+ public void open() throws Exception {
+ this.client = EsClientFactory.getRestClient(properties);
+ this.user =
+ StringUtils.defaultString(
+ properties.getOrDefault(TaskConstant.UMUSER, StorageUtils.getJvmUser()));
+ switch (runType.trim().toLowerCase(Locale.getDefault())) {
+ case "essql":
+ case "sql":
+ properties.putIfAbsent(
+ ElasticSearchConfiguration.ES_HTTP_ENDPOINT.key(),
+ ElasticSearchConfiguration.ES_HTTP_SQL_ENDPOINT.getValue(properties));
+ break;
+ default:
+ break;
+ }
+ }
+
+ @Override
+ public ElasticSearchResponse executeLine(String code) {
+ String realCode = code.trim();
+ logger.info("es client begins to run {} code:\n {}", runType, realCode.trim());
+ CountDownLatch countDown = new CountDownLatch(1);
+ ElasticSearchResponse[] executeResponse = {
+ new ElasticSearchErrorResponse("INCOMPLETE", null, null)
+ };
+ cancelable =
+ client.execute(
+ realCode,
+ properties,
+ new ResponseListener() {
+ @Override
+ public void onSuccess(Response response) {
+ executeResponse[0] = convertResponse(response);
+ countDown.countDown();
+ }
+
+ @Override
+ public void onFailure(Exception exception) {
+ executeResponse[0] =
+ new ElasticSearchErrorResponse(
+ "EsEngineExecutor execute fail. ", null, exception);
+ countDown.countDown();
+ }
+ });
+ try {
+ countDown.await();
+ } catch (InterruptedException e) {
+ executeResponse[0] =
+ new ElasticSearchErrorResponse("EsEngineExecutor execute interrupted. ", null, e);
+ }
+ return executeResponse[0];
+ }
+
+ // convert response to executeResponse
+ private ElasticSearchResponse convertResponse(Response response) {
+ try {
+ int statusCode = response.getStatusLine().getStatusCode();
+ if (statusCode >= 200 && statusCode < 300) {
+ return ResponseHandler$.MODULE$.handle(response);
+ } else {
+ return new ElasticSearchErrorResponse(
+ "EsEngineExecutor convert response fail. response code: " + statusCode, null, null);
+ }
+ } catch (Exception e) {
+ return new ElasticSearchErrorResponse("EsEngineExecutor convert response error.", null, e);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (cancelable != null) {
+ cancelable.cancel();
+ }
+ if (client != null) {
+ client.close();
+ }
+ }
+}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/ElasticSearchEngineConnPlugin.scala b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/ElasticSearchEngineConnPlugin.scala
deleted file mode 100644
index 67e2d39ce..000000000
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/ElasticSearchEngineConnPlugin.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.engineplugin.elasticsearch
-
-import org.apache.linkis.engineplugin.elasticsearch.builder.ElasticSearchProcessEngineConnLaunchBuilder
-import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchConfiguration
-import org.apache.linkis.engineplugin.elasticsearch.factory.ElasticSearchEngineConnFactory
-import org.apache.linkis.manager.engineplugin.common.EngineConnPlugin
-import org.apache.linkis.manager.engineplugin.common.creation.EngineConnFactory
-import org.apache.linkis.manager.engineplugin.common.launch.EngineConnLaunchBuilder
-import org.apache.linkis.manager.engineplugin.common.resource.{
- EngineResourceFactory,
- GenericEngineResourceFactory
-}
-import org.apache.linkis.manager.label.entity.Label
-import org.apache.linkis.manager.label.entity.engine.{EngineType, EngineTypeLabel}
-
-import java.util
-
-class ElasticSearchEngineConnPlugin extends EngineConnPlugin {
-
- private var engineResourceFactory: EngineResourceFactory = _
-
- private var engineFactory: EngineConnFactory = _
-
- private val defaultLabels: util.List[Label[_]] = new util.ArrayList[Label[_]]()
-
- private val resourceLocker = new Array[Byte](0)
-
- private val engineFactoryLocker = new Array[Byte](0)
-
- override def init(params: util.Map[String, AnyRef]): Unit = {
- val typeLabel = new EngineTypeLabel()
- typeLabel.setEngineType(EngineType.ELASTICSEARCH.toString)
- typeLabel.setVersion(ElasticSearchConfiguration.DEFAULT_VERSION.getValue)
- this.defaultLabels.add(typeLabel)
- }
-
- override def getEngineResourceFactory: EngineResourceFactory = {
- if (null == engineResourceFactory) resourceLocker.synchronized {
- if (null == engineResourceFactory) engineResourceFactory = new GenericEngineResourceFactory
- }
- engineResourceFactory
- }
-
- override def getEngineConnLaunchBuilder: EngineConnLaunchBuilder = {
- new ElasticSearchProcessEngineConnLaunchBuilder()
- }
-
- override def getEngineConnFactory: EngineConnFactory = {
- if (null == engineFactory) engineFactoryLocker.synchronized {
- if (null == engineFactory) engineFactory = new ElasticSearchEngineConnFactory
- }
- engineFactory
- }
-
- override def getDefaultLabels: util.List[Label[_]] = defaultLabels
-
-}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/conf/ElasticSearchConfiguration.scala b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/conf/ElasticSearchConfiguration.scala
deleted file mode 100644
index d2909a063..000000000
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/conf/ElasticSearchConfiguration.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.engineplugin.elasticsearch.conf
-
-import org.apache.linkis.common.conf.{ByteType, CommonVars}
-
-object ElasticSearchConfiguration {
-
- // es client
- val ES_CLUSTER = CommonVars("linkis.es.cluster", "127.0.0.1:9200")
- val ES_DATASOURCE_NAME = CommonVars("linkis.es.datasource", "default_datasource")
- val ES_AUTH_CACHE = CommonVars("linkis.es.auth.cache", false)
- val ES_USERNAME = CommonVars("linkis.es.username", "")
- val ES_PASSWORD = CommonVars("linkis.es.password", "")
- val ES_SNIFFER_ENABLE = CommonVars("linkis.es.sniffer.enable", false)
- val ES_HTTP_METHOD = CommonVars("linkis.es.http.method", "GET")
- val ES_HTTP_ENDPOINT = CommonVars("linkis.es.http.endpoint", "/_search")
- val ES_HTTP_SQL_ENDPOINT = CommonVars("linkis.es.sql.endpoint", "/_sql")
- val ES_SQL_FORMAT = CommonVars("linkis.es.sql.format", "{\"query\": \"%s\"}")
- val ES_HTTP_HEADER_PREFIX = "linkis.es.headers."
-
- // entrance resource
- val ENTRANCE_MAX_JOB_INSTANCE = CommonVars("linkis.es.max.job.instance", 100)
- val ENTRANCE_PROTECTED_JOB_INSTANCE = CommonVars("linkis.es.protected.job.instance", 20)
- val ENGINE_DEFAULT_LIMIT = CommonVars("linkis.es.default.limit", 5000)
-
- // resultSet
- val ENGINE_RESULT_SET_MAX_CACHE = CommonVars("linkis.resultSet.cache.max", new ByteType("512k"))
-
- val ENGINE_CONCURRENT_LIMIT = CommonVars[Int]("linkis.engineconn.concurrent.limit", 100)
-
- val DEFAULT_VERSION = CommonVars[String]("linkis.engineconn.io.version", "7.6.2")
-
-}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/conf/ElasticSearchEngineConsoleConf.scala b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/conf/ElasticSearchEngineConsoleConf.scala
deleted file mode 100644
index b197f2047..000000000
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/conf/ElasticSearchEngineConsoleConf.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.engineplugin.elasticsearch.conf
-
-import org.apache.linkis.common.conf.Configuration
-import org.apache.linkis.governance.common.protocol.conf.{
- RequestQueryEngineConfig,
- ResponseQueryConfig
-}
-import org.apache.linkis.manager.label.entity.Label
-import org.apache.linkis.manager.label.entity.engine.{EngineTypeLabel, UserCreatorLabel}
-import org.apache.linkis.protocol.CacheableProtocol
-import org.apache.linkis.rpc.RPCMapCache
-
-import java.util
-
-object ElasticSearchEngineConsoleConf
- extends RPCMapCache[Array[Label[_]], String, String](
- Configuration.CLOUD_CONSOLE_CONFIGURATION_SPRING_APPLICATION_NAME.getValue
- ) {
-
- override protected def createRequest(labels: Array[Label[_]]): CacheableProtocol = {
- val userCreatorLabel =
- labels.find(_.isInstanceOf[UserCreatorLabel]).get.asInstanceOf[UserCreatorLabel]
- val engineTypeLabel =
- labels.find(_.isInstanceOf[EngineTypeLabel]).get.asInstanceOf[EngineTypeLabel]
- RequestQueryEngineConfig(userCreatorLabel, engineTypeLabel)
- }
-
- override protected def createMap(any: Any): util.Map[String, String] = any match {
- case response: ResponseQueryConfig => response.getKeyAndValue
- }
-
-}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/ElasticSearchEngineConnExecutor.scala b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/ElasticSearchEngineConnExecutor.scala
deleted file mode 100644
index 7797ac1b1..000000000
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/ElasticSearchEngineConnExecutor.scala
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.engineplugin.elasticsearch.executor
-
-import org.apache.linkis.common.utils.{Logging, OverloadUtils, Utils}
-import org.apache.linkis.engineconn.common.conf.{EngineConnConf, EngineConnConstant}
-import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask
-import org.apache.linkis.engineconn.computation.executor.execute.{
- ConcurrentComputationExecutor,
- EngineExecutionContext
-}
-import org.apache.linkis.engineconn.core.EngineConnObject
-import org.apache.linkis.engineplugin.elasticsearch.conf.{
- ElasticSearchConfiguration,
- ElasticSearchEngineConsoleConf
-}
-import org.apache.linkis.engineplugin.elasticsearch.executor.client.{
- ElasticSearchErrorResponse,
- ElasticSearchExecutor,
- ElasticSearchJsonResponse,
- ElasticSearchTableResponse
-}
-import org.apache.linkis.engineplugin.elasticsearch.executor.client.ElasticSearchErrorResponse
-import org.apache.linkis.governance.common.entity.ExecutionNodeStatus
-import org.apache.linkis.manager.common.entity.resource.{
- CommonNodeResource,
- LoadResource,
- NodeResource
-}
-import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
-import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
-import org.apache.linkis.manager.label.entity.Label
-import org.apache.linkis.protocol.engine.JobProgressInfo
-import org.apache.linkis.rpc.Sender
-import org.apache.linkis.scheduler.executer.{
- AliasOutputExecuteResponse,
- ErrorExecuteResponse,
- ExecuteResponse
-}
-import org.apache.linkis.storage.LineRecord
-import org.apache.linkis.storage.resultset.ResultSetFactory
-import org.apache.linkis.storage.resultset.table.TableMetaData
-
-import org.apache.commons.io.IOUtils
-
-import org.springframework.util.CollectionUtils
-
-import java.util
-import java.util.concurrent.TimeUnit
-
-import scala.collection.JavaConverters._
-
-import com.google.common.cache.{Cache, CacheBuilder, RemovalListener, RemovalNotification}
-
-class ElasticSearchEngineConnExecutor(
- override val outputPrintLimit: Int,
- val id: Int,
- runType: String
-) extends ConcurrentComputationExecutor(outputPrintLimit)
- with Logging {
-
- private val executorLabels: util.List[Label[_]] = new util.ArrayList[Label[_]](2)
-
- private val elasticSearchExecutorCache: Cache[String, ElasticSearchExecutor] = CacheBuilder
- .newBuilder()
- .expireAfterAccess(EngineConnConf.ENGINE_TASK_EXPIRE_TIME.getValue, TimeUnit.MILLISECONDS)
- .removalListener(new RemovalListener[String, ElasticSearchExecutor] {
-
- override def onRemoval(
- notification: RemovalNotification[String, ElasticSearchExecutor]
- ): Unit = {
- notification.getValue.close
- val task = getTaskById(notification.getKey)
- if (!ExecutionNodeStatus.isCompleted(task.getStatus)) {
- killTask(notification.getKey)
- }
- }
-
- })
- .maximumSize(EngineConnConstant.MAX_TASK_NUM)
- .build()
-
- override def init(): Unit = {
- super.init()
- }
-
- override def execute(engineConnTask: EngineConnTask): ExecuteResponse = {
-
- val properties: util.Map[String, String] = buildRuntimeParams(engineConnTask)
- logger.info(s"The elasticsearch properties is: $properties")
-
- val elasticSearchExecutor = ElasticSearchExecutor(runType, properties)
- elasticSearchExecutor.open
- elasticSearchExecutorCache.put(engineConnTask.getTaskId, elasticSearchExecutor)
- super.execute(engineConnTask)
- }
-
- override def executeLine(
- engineExecutorContext: EngineExecutionContext,
- code: String
- ): ExecuteResponse = {
- val taskId = engineExecutorContext.getJobId.get
- val elasticSearchExecutor = elasticSearchExecutorCache.getIfPresent(taskId)
- val elasticSearchResponse = elasticSearchExecutor.executeLine(code)
-
- elasticSearchResponse match {
- case ElasticSearchTableResponse(columns, records) =>
- val metaData = new TableMetaData(columns)
- val resultSetWriter =
- engineExecutorContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE)
- resultSetWriter.addMetaData(metaData)
- records.foreach(record => resultSetWriter.addRecord(record))
- val output = resultSetWriter.toString
- Utils.tryQuietly {
- IOUtils.closeQuietly(resultSetWriter)
- }
- AliasOutputExecuteResponse(null, output)
- case ElasticSearchJsonResponse(content) =>
- val resultSetWriter =
- engineExecutorContext.createResultSetWriter(ResultSetFactory.TEXT_TYPE)
- resultSetWriter.addMetaData(null)
- content.split("\\n").foreach(item => resultSetWriter.addRecord(new LineRecord(item)))
- val output = resultSetWriter.toString
- Utils.tryQuietly {
- IOUtils.closeQuietly(resultSetWriter)
- }
- AliasOutputExecuteResponse(null, output)
- case ElasticSearchErrorResponse(message, body, cause) =>
- ErrorExecuteResponse(message, cause)
- }
- }
-
- private def buildRuntimeParams(engineConnTask: EngineConnTask): util.Map[String, String] = {
-
- // parameters specified at runtime
- var executorProperties = engineConnTask.getProperties.asInstanceOf[util.Map[String, String]]
- if (executorProperties == null) {
- executorProperties = new util.HashMap[String, String]()
- }
-
- // global engine params by console
- val globalConfig: util.Map[String, String] =
- Utils.tryAndWarn(ElasticSearchEngineConsoleConf.getCacheMap(engineConnTask.getLables))
-
- if (!executorProperties.isEmpty) {
- globalConfig.putAll(executorProperties)
- }
-
- globalConfig
- }
-
- override def executeCompletely(
- engineExecutorContext: EngineExecutionContext,
- code: String,
- completedLine: String
- ): ExecuteResponse = null
-
- override def progress(taskID: String): Float = 0.0f
-
- override def getProgressInfo(taskID: String): Array[JobProgressInfo] =
- Array.empty[JobProgressInfo]
-
- override def getExecutorLabels(): util.List[Label[_]] = executorLabels
-
- override def setExecutorLabels(labels: util.List[Label[_]]): Unit = {
- if (!CollectionUtils.isEmpty(labels)) {
- executorLabels.clear()
- executorLabels.addAll(labels)
- }
- }
-
- override def supportCallBackLogs(): Boolean = false
-
- override def requestExpectedResource(expectedResource: NodeResource): NodeResource = null
-
- override def getCurrentNodeResource(): NodeResource = {
- NodeResourceUtils.appendMemoryUnitIfMissing(
- EngineConnObject.getEngineCreationContext.getOptions
- )
-
- val resource = new CommonNodeResource
- val usedResource = new LoadResource(OverloadUtils.getProcessMaxMemory, 1)
- resource.setUsedResource(usedResource)
- resource
- }
-
- override def getId(): String = Sender.getThisServiceInstance.getInstance + s"_$id"
-
- override def getConcurrentLimit: Int =
- ElasticSearchConfiguration.ENGINE_CONCURRENT_LIMIT.getValue
-
- override def killTask(taskId: String): Unit = {
- Utils.tryAndWarn {
- val elasticSearchExecutor = elasticSearchExecutorCache.getIfPresent(taskId)
- if (null != elasticSearchExecutor) {
- elasticSearchExecutor.close
- }
- }
- super.killTask(taskId)
- }
-
- override def killAll(): Unit = {
- elasticSearchExecutorCache
- .asMap()
- .values()
- .asScala
- .foreach(e => e.close)
- }
-
- override def transformTaskStatus(task: EngineConnTask, newStatus: ExecutionNodeStatus): Unit = {
- super.transformTaskStatus(task, newStatus)
- if (ExecutionNodeStatus.isCompleted(newStatus)) {
- elasticSearchExecutorCache.invalidate(task.getTaskId)
- }
- }
-
-}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/ElasticSearchExecutor.scala b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/ElasticSearchExecutor.scala
deleted file mode 100644
index 72e1953e3..000000000
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/ElasticSearchExecutor.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.engineplugin.elasticsearch.executor.client
-
-import org.apache.linkis.common.utils.Logging
-import org.apache.linkis.engineplugin.elasticsearch.executor.client.impl.ElasticSearchExecutorImpl
-import org.apache.linkis.scheduler.executer.ExecuteResponse
-
-import java.io.IOException
-import java.util
-
-import scala.collection.JavaConverters._
-
-trait ElasticSearchExecutor extends Logging {
-
- @throws(classOf[IOException])
- def open: Unit
-
- def executeLine(code: String): ElasticSearchResponse
-
- def close: Unit
-
-}
-
-object ElasticSearchExecutor {
-
- def apply(runType: String, properties: util.Map[String, String]): ElasticSearchExecutor = {
- new ElasticSearchExecutorImpl(runType, properties)
- }
-
-}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/ElasticSearchResponse.scala b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/ElasticSearchResponse.scala
old mode 100644
new mode 100755
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/EsClient.scala b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/EsClient.scala
deleted file mode 100644
index 8b894b466..000000000
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/EsClient.scala
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.engineplugin.elasticsearch.executor.client
-
-import org.apache.linkis.common.utils.Utils
-import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchConfiguration._
-import org.apache.linkis.server.JMap
-
-import org.apache.commons.codec.binary.Base64
-import org.apache.commons.lang3.StringUtils
-import org.apache.http.Header
-import org.apache.http.auth.{AUTH, Credentials, UsernamePasswordCredentials}
-import org.apache.http.message.BufferedHeader
-import org.apache.http.util.{CharArrayBuffer, EncodingUtils}
-
-import java.nio.charset.StandardCharsets.UTF_8
-import java.util
-
-import scala.collection.JavaConverters._
-
-import org.elasticsearch.client.{Cancellable, Request, RequestOptions, ResponseListener, RestClient}
-import org.elasticsearch.client.sniff.Sniffer
-
-trait EsClientOperate {
-
- def execute(
- code: String,
- options: util.Map[String, String],
- responseListener: ResponseListener
- ): Cancellable
-
- def close(): Unit
-
-}
-
-abstract class EsClient(datasourceName: String, client: RestClient, sniffer: Sniffer)
- extends EsClientOperate {
-
- def getDatasourceName: String = datasourceName
-
- def getRestClient: RestClient = client
-
- def getSniffer: Sniffer = sniffer
-
- override def close(): Unit = Utils.tryQuietly {
- sniffer match {
- case s: Sniffer => s.close()
- case _ =>
- }
- client match {
- case c: RestClient => c.close()
- case _ =>
- }
- }
-
-}
-
-class EsClientImpl(datasourceName: String, client: RestClient, sniffer: Sniffer)
- extends EsClient(datasourceName, client, sniffer) {
-
- override def execute(
- code: String,
- options: util.Map[String, String],
- responseListener: ResponseListener
- ): Cancellable = {
- val request = createRequest(code, options)
- client.performRequestAsync(request, responseListener)
- }
-
- private def createRequest(code: String, options: util.Map[String, String]): Request = {
- val endpoint = ES_HTTP_ENDPOINT.getValue(options)
- val method = ES_HTTP_METHOD.getValue(options)
- val request = new Request(method, endpoint)
- request.setOptions(getRequestOptions(options))
- request.setJsonEntity(code)
- request
- }
-
- private def getRequestOptions(options: util.Map[String, String]): RequestOptions = {
- val builder = RequestOptions.DEFAULT.toBuilder()
-
- val username = ES_USERNAME.getValue(options)
- val password = ES_PASSWORD.getValue(options)
- // username / password convert to base auth
- if (StringUtils.isNotBlank(username) && StringUtils.isNotBlank(password)) {
- val authHeader =
- authenticate(new UsernamePasswordCredentials(username, password), UTF_8.name())
- builder.addHeader(authHeader.getName, authHeader.getValue)
- }
-
- options.asScala
- .filter(entry =>
- entry._1 != null && entry._2 != null && entry._1.startsWith(ES_HTTP_HEADER_PREFIX)
- )
- .foreach(entry => builder.addHeader(entry._1, entry._2))
-
- builder.build()
- }
-
- private def authenticate(credentials: Credentials, charset: String): Header = {
- val tmp = new StringBuilder
- tmp.append(credentials.getUserPrincipal.getName)
- tmp.append(":")
- tmp.append(
- if (credentials.getPassword == null) "null"
- else credentials.getPassword
- )
- val base64password = Base64.encodeBase64(EncodingUtils.getBytes(tmp.toString, charset), false)
- val buffer = new CharArrayBuffer(32)
- buffer.append(AUTH.WWW_AUTH_RESP)
- buffer.append(": Basic ")
- buffer.append(base64password, 0, base64password.length)
- new BufferedHeader(buffer)
- }
-
-}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/EsClientFactory.scala b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/EsClientFactory.scala
deleted file mode 100644
index 30cb6690b..000000000
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/EsClientFactory.scala
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.engineplugin.elasticsearch.executor.client
-
-import org.apache.linkis.common.conf.CommonVars
-import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchConfiguration._
-import org.apache.linkis.engineplugin.elasticsearch.errorcode.EasticsearchErrorCodeSummary.CLUSTER_IS_BLANK
-import org.apache.linkis.engineplugin.elasticsearch.exception.EsParamsIllegalException
-
-import org.apache.commons.lang3.StringUtils
-import org.apache.http.{Header, HttpHost}
-import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
-import org.apache.http.client.CredentialsProvider
-import org.apache.http.impl.client.BasicCredentialsProvider
-import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
-import org.apache.http.message.BasicHeader
-
-import java.util
-import java.util.Map
-
-import scala.collection.JavaConverters._
-
-import org.elasticsearch.client.{RestClient, RestClientBuilder}
-import org.elasticsearch.client.sniff.Sniffer
-
-object EsClientFactory {
-
- def getRestClient(options: util.Map[String, String]): EsClient = {
- val key = getDatasourceName(options)
- if (StringUtils.isBlank(key)) {
- return defaultClient
- }
-
- if (!ES_CLIENT_MAP.containsKey(key)) {
- ES_CLIENT_MAP synchronized {
- if (!ES_CLIENT_MAP.containsKey(key)) {
- cacheClient(createRestClient(options))
- }
- }
- }
-
- ES_CLIENT_MAP.get(key)
- }
-
- private val MAX_CACHE_CLIENT_SIZE = 20
-
- private val ES_CLIENT_MAP: Map[String, EsClient] = new util.LinkedHashMap[String, EsClient]() {
-
- override def removeEldestEntry(eldest: Map.Entry[String, EsClient]): Boolean =
- if (size > MAX_CACHE_CLIENT_SIZE) {
- eldest.getValue.close()
- true
- } else {
- false
- }
-
- }
-
- private def getDatasourceName(options: util.Map[String, String]): String = {
- options.getOrDefault(ES_DATASOURCE_NAME.key, "")
- }
-
- private def cacheClient(client: EsClient) = {
- ES_CLIENT_MAP.put(client.getDatasourceName, client)
- }
-
- private def createRestClient(options: util.Map[String, String]): EsClient = {
- val clusterStr = options.get(ES_CLUSTER.key)
- if (StringUtils.isBlank(clusterStr)) {
- throw EsParamsIllegalException(CLUSTER_IS_BLANK.getErrorDesc)
- }
- val cluster = getCluster(clusterStr)
- if (cluster.isEmpty) {
- throw EsParamsIllegalException(CLUSTER_IS_BLANK.getErrorDesc)
- }
- val username = options.get(ES_USERNAME.key)
- val password = options.get(ES_PASSWORD.key)
-
- if (ES_AUTH_CACHE.getValue) {
- setAuthScope(cluster, username, password)
- }
-
- val httpHosts = cluster.map(item => new HttpHost(item._1, item._2))
- val builder = RestClient
- .builder(httpHosts: _*)
- .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
- override def customizeHttpClient(
- httpAsyncClientBuilder: HttpAsyncClientBuilder
- ): HttpAsyncClientBuilder = {
- if (!ES_AUTH_CACHE.getValue) {
- httpAsyncClientBuilder.disableAuthCaching
- }
- // httpClientBuilder.setDefaultRequestConfig(RequestConfig.DEFAULT)
- // httpClientBuilder.setDefaultConnectionConfig(ConnectionConfig.DEFAULT)
- httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
- }
- })
- if (defaultHeaders != null) {
- builder.setDefaultHeaders(defaultHeaders)
- }
- val client = builder.build
-
- val sniffer = if (ES_SNIFFER_ENABLE.getValue(options)) {
- Sniffer.builder(client).build
- } else null
-
- val datasourceName = getDatasourceName(options)
- new EsClientImpl(datasourceName, client, sniffer)
- }
-
- private val credentialsProvider: CredentialsProvider = new BasicCredentialsProvider()
-
- private val defaultClient = {
- val cluster = ES_CLUSTER.getValue
- if (StringUtils.isBlank(cluster)) {
- null
- } else {
- val defaultOpts = new util.HashMap[String, String]()
- defaultOpts.put(ES_CLUSTER.key, cluster)
- defaultOpts.put(ES_DATASOURCE_NAME.key, ES_DATASOURCE_NAME.getValue)
- defaultOpts.put(ES_USERNAME.key, ES_USERNAME.getValue)
- defaultOpts.put(ES_PASSWORD.key, ES_PASSWORD.getValue)
- val client = createRestClient(defaultOpts)
- cacheClient(client)
- client
- }
- }
-
- private val defaultHeaders: Array[Header] = CommonVars.properties
- .entrySet()
- .asScala
- .filter(entry =>
- entry.getKey != null && entry.getValue != null && entry.getKey.toString
- .startsWith(ES_HTTP_HEADER_PREFIX)
- )
- .map(entry => new BasicHeader(entry.getKey.toString, entry.getValue.toString))
- .toArray[Header]
-
- // host1:port1,host2:port2 -> [(host1,port1),(host2,port2)]
- private def getCluster(clusterStr: String): Array[(String, Int)] =
- if (StringUtils.isNotBlank(clusterStr)) {
- clusterStr
- .split(",")
- .map(value => {
- val arr = value.replace("http://", "").split(":")
- (arr(0).trim, arr(1).trim.toInt)
- })
- } else Array()
-
- // set cluster auth
- private def setAuthScope(
- cluster: Array[(String, Int)],
- username: String,
- password: String
- ): Unit = if (
- cluster != null && !cluster.isEmpty
- && StringUtils.isNotBlank(username)
- && StringUtils.isNotBlank(password)
- ) {
- cluster.foreach {
- case (host, port) =>
- credentialsProvider.setCredentials(
- new AuthScope(host, port, AuthScope.ANY_REALM, AuthScope.ANY_SCHEME),
- new UsernamePasswordCredentials(username, password)
- )
- case _ =>
- }
- }
-
-}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/impl/ElasticSearchExecutorImpl.scala b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/impl/ElasticSearchExecutorImpl.scala
deleted file mode 100644
index 86d10660b..000000000
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/impl/ElasticSearchExecutorImpl.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.engineplugin.elasticsearch.executor.client.impl
-
-import org.apache.linkis.common.utils.Utils
-import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchConfiguration
-import org.apache.linkis.engineplugin.elasticsearch.exception.EsConvertResponseException
-import org.apache.linkis.engineplugin.elasticsearch.executor.client.{
- ElasticSearchErrorResponse,
- ElasticSearchExecutor,
- ElasticSearchResponse,
- EsClient,
- EsClientFactory,
- ResponseHandler
-}
-import org.apache.linkis.engineplugin.elasticsearch.executor.client.ResponseHandler
-import org.apache.linkis.protocol.constants.TaskConstant
-import org.apache.linkis.scheduler.executer.{
- AliasOutputExecuteResponse,
- ErrorExecuteResponse,
- ExecuteResponse,
- SuccessExecuteResponse
-}
-import org.apache.linkis.server.JMap
-import org.apache.linkis.storage.utils.StorageUtils
-
-import java.util
-import java.util.Locale
-import java.util.concurrent.CountDownLatch
-
-import org.elasticsearch.client.{Cancellable, Response, ResponseListener}
-
-class ElasticSearchExecutorImpl(runType: String, properties: util.Map[String, String])
- extends ElasticSearchExecutor {
-
- private var client: EsClient = _
- private var cancelable: Cancellable = _
- private var user: String = _
-
- override def open: Unit = {
- this.client = EsClientFactory.getRestClient(properties)
- this.user = properties.getOrDefault(TaskConstant.UMUSER, StorageUtils.getJvmUser)
- runType.trim.toLowerCase(Locale.getDefault) match {
- case "essql" | "sql" =>
- properties.putIfAbsent(
- ElasticSearchConfiguration.ES_HTTP_ENDPOINT.key,
- ElasticSearchConfiguration.ES_HTTP_SQL_ENDPOINT.getValue(properties)
- )
- case _ =>
- }
- }
-
- override def executeLine(code: String): ElasticSearchResponse = {
- val realCode = code.trim()
- logger.info(s"es client begins to run $runType code:\n ${realCode.trim}")
- val countDown = new CountDownLatch(1)
- var executeResponse: ElasticSearchResponse = ElasticSearchErrorResponse("INCOMPLETE")
- cancelable = client.execute(
- realCode,
- properties,
- new ResponseListener {
- override def onSuccess(response: Response): Unit = {
- executeResponse = convertResponse(response)
- countDown.countDown()
- }
- override def onFailure(exception: Exception): Unit = {
- executeResponse =
- ElasticSearchErrorResponse("EsEngineExecutor execute fail. ", null, exception)
- countDown.countDown()
- }
- }
- )
- countDown.await()
- executeResponse
- }
-
- // convert response to executeResponse
- private def convertResponse(response: Response): ElasticSearchResponse =
- Utils.tryCatch[ElasticSearchResponse] {
- val statusCode = response.getStatusLine.getStatusCode
- if (statusCode >= 200 && statusCode < 300) {
- ResponseHandler.handle(response)
- } else {
- ElasticSearchErrorResponse(
- "EsEngineExecutor convert response fail. response code: " + response.getStatusLine.getStatusCode
- )
- }
- } { case t: Throwable =>
- ElasticSearchErrorResponse("EsEngineExecutor convert response error.", null, t)
- }
-
- override def close: Unit = cancelable match {
- case c: Cancellable => c.cancel()
- case _ =>
- }
-
-}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/impl/ResponseHandlerImpl.scala b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/impl/ResponseHandlerImpl.scala
index 7720b7076..888f02df1 100644
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/impl/ResponseHandlerImpl.scala
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/impl/ResponseHandlerImpl.scala
@@ -26,7 +26,6 @@ import org.apache.linkis.engineplugin.elasticsearch.executor.client.{
ElasticSearchTableResponse,
ResponseHandler
}
-import org.apache.linkis.engineplugin.elasticsearch.executor.client.ResponseHandler
import org.apache.linkis.engineplugin.elasticsearch.executor.client.ResponseHandler._
import org.apache.linkis.storage.domain._
import org.apache.linkis.storage.domain.DataType.{DoubleType, StringType}
@@ -55,7 +54,7 @@ class ResponseHandlerImpl extends ResponseHandler {
val contentBytes = EntityUtils.toByteArray(response.getEntity)
if (contentBytes == null || contentBytes.isEmpty) {
- throw EsConvertResponseException(RESPONSE_FAIL_IS_EMPTY.getErrorDesc)
+ throw new EsConvertResponseException(RESPONSE_FAIL_IS_EMPTY.getErrorDesc)
}
val jsonNode = Utils.tryCatch {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org