You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ca...@apache.org on 2022/08/16 08:32:07 UTC
[incubator-linkis] branch dev-1.2.0 updated: ES Engine Optimization (#2795)
This is an automated email from the ASF dual-hosted git repository.
casion pushed a commit to branch dev-1.2.0
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.2.0 by this push:
new 91d79927d ES Engine Optimization (#2795)
91d79927d is described below
commit 91d79927dc54b47f577091e55cfa81b07ba828f5
Author: aiceflower <ki...@sina.com>
AuthorDate: Tue Aug 16 16:32:02 2022 +0800
ES Engine Optimization (#2795)
* Fixed the console configuration not working
* Change the ES datasource parameters to be consistent with others
---
.../conf/ElasticSearchConfiguration.scala | 2 +-
.../conf/ElasticSearchEngineConsoleConf.scala | 39 ++++++++++++++++++++++
.../executer/ElasticSearchEngineConnExecutor.scala | 36 +++++++++++++++++---
.../executer/client/ElasticSearchExecutor.scala | 16 +++------
.../elasticsearch/executer/client/EsClient.scala | 9 ++---
.../executer/client/EsClientFactory.scala | 7 ++--
.../client/impl/ElasticSearchExecutorImpl.scala | 4 ++-
7 files changed, 86 insertions(+), 27 deletions(-)
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/conf/ElasticSearchConfiguration.scala b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/conf/ElasticSearchConfiguration.scala
index 7855d7bf7..d138b60f7 100644
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/conf/ElasticSearchConfiguration.scala
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/conf/ElasticSearchConfiguration.scala
@@ -22,7 +22,7 @@ object ElasticSearchConfiguration {
// es client
val ES_CLUSTER = CommonVars("linkis.es.cluster", "127.0.0.1:9200")
- val ES_DATASOURCE_NAME = CommonVars("linkis.datasource", "default_datasource")
+ val ES_DATASOURCE_NAME = CommonVars("linkis.es.datasource", "default_datasource")
val ES_AUTH_CACHE = CommonVars("linkis.es.auth.cache", false)
val ES_USERNAME = CommonVars("linkis.es.username", "")
val ES_PASSWORD = CommonVars("linkis.es.password", "")
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/conf/ElasticSearchEngineConsoleConf.scala b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/conf/ElasticSearchEngineConsoleConf.scala
new file mode 100644
index 000000000..fff0fb97a
--- /dev/null
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/conf/ElasticSearchEngineConsoleConf.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.linkis.engineplugin.elasticsearch.conf
+
+import java.util
+
+import org.apache.linkis.common.conf.Configuration
+import org.apache.linkis.governance.common.protocol.conf.{RequestQueryEngineConfig, ResponseQueryConfig}
+import org.apache.linkis.manager.label.entity.Label
+import org.apache.linkis.manager.label.entity.engine.{EngineTypeLabel, UserCreatorLabel}
+import org.apache.linkis.protocol.CacheableProtocol
+import org.apache.linkis.rpc.RPCMapCache
+
+object ElasticSearchEngineConsoleConf extends RPCMapCache[Array[Label[_]], String, String](Configuration.CLOUD_CONSOLE_CONFIGURATION_SPRING_APPLICATION_NAME.getValue) {
+
+ override protected def createRequest(labels: Array[Label[_]]): CacheableProtocol = {
+ val userCreatorLabel = labels.find(_.isInstanceOf[UserCreatorLabel]).get.asInstanceOf[UserCreatorLabel]
+ val engineTypeLabel = labels.find(_.isInstanceOf[EngineTypeLabel]).get.asInstanceOf[EngineTypeLabel]
+ RequestQueryEngineConfig(userCreatorLabel, engineTypeLabel)
+ }
+
+ override protected def createMap(any: Any): util.Map[String, String] = any match {
+ case response: ResponseQueryConfig => response.getKeyAndValue
+ }
+}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/ElasticSearchEngineConnExecutor.scala b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/ElasticSearchEngineConnExecutor.scala
index bacbd2b83..20b8f269c 100644
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/ElasticSearchEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/ElasticSearchEngineConnExecutor.scala
@@ -18,13 +18,16 @@ package org.apache.linkis.engineplugin.elasticsearch.executer
import java.util
import java.util.concurrent.TimeUnit
+
import com.google.common.cache.{Cache, CacheBuilder, RemovalListener, RemovalNotification}
+import org.apache.commons.io.IOUtils
import org.apache.linkis.common.utils.{Logging, OverloadUtils, Utils}
import org.apache.linkis.engineconn.common.conf.{EngineConnConf, EngineConnConstant}
import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask
import org.apache.linkis.engineconn.computation.executor.execute.{ConcurrentComputationExecutor, EngineExecutionContext}
import org.apache.linkis.engineconn.core.EngineConnObject
-import org.apache.linkis.engineplugin.elasticsearch.executer.client.ElasticSearchErrorResponse
+import org.apache.linkis.engineplugin.elasticsearch.conf.{ElasticSearchConfiguration, ElasticSearchEngineConsoleConf}
+import org.apache.linkis.engineplugin.elasticsearch.executer.client.{ElasticSearchErrorResponse, ElasticSearchExecutor, ElasticSearchJsonResponse, ElasticSearchTableResponse}
import org.apache.linkis.governance.common.entity.ExecutionNodeStatus
import org.apache.linkis.manager.common.entity.resource.{CommonNodeResource, LoadResource, NodeResource}
import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
@@ -35,9 +38,6 @@ import org.apache.linkis.scheduler.executer.{AliasOutputExecuteResponse, ErrorEx
import org.apache.linkis.storage.LineRecord
import org.apache.linkis.storage.resultset.ResultSetFactory
import org.apache.linkis.storage.resultset.table.TableMetaData
-import org.apache.commons.io.IOUtils
-import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchConfiguration
-import org.apache.linkis.engineplugin.elasticsearch.executer.client.{ElasticSearchErrorResponse, ElasticSearchExecutor, ElasticSearchJsonResponse, ElasticSearchTableResponse}
import org.springframework.util.CollectionUtils
import scala.collection.JavaConverters._
@@ -64,7 +64,11 @@ class ElasticSearchEngineConnExecutor(override val outputPrintLimit: Int, val id
}
override def execute(engineConnTask: EngineConnTask): ExecuteResponse = {
- val elasticSearchExecutor = ElasticSearchExecutor(runType, engineConnTask.getProperties)
+
+ val properties: util.Map[String, String] = buildRuntimeParams(engineConnTask)
+ logger.info(s"The elasticsearch properties is: $properties")
+
+ val elasticSearchExecutor = ElasticSearchExecutor(runType, properties)
elasticSearchExecutor.open
elasticSearchExecutorCache.put(engineConnTask.getTaskId, elasticSearchExecutor)
super.execute(engineConnTask)
@@ -73,6 +77,7 @@ class ElasticSearchEngineConnExecutor(override val outputPrintLimit: Int, val id
override def executeLine(engineExecutorContext: EngineExecutionContext, code: String): ExecuteResponse = {
val taskId = engineExecutorContext.getJobId.get
val elasticSearchExecutor = elasticSearchExecutorCache.getIfPresent(taskId)
+
val elasticSearchResponse = elasticSearchExecutor.executeLine(code)
elasticSearchResponse match {
@@ -100,6 +105,26 @@ class ElasticSearchEngineConnExecutor(override val outputPrintLimit: Int, val id
}
}
+
+ private def buildRuntimeParams(engineConnTask: EngineConnTask): util.Map[String, String] = {
+
+ // parameters specified at runtime
+ var executorProperties = engineConnTask.getProperties.asInstanceOf[util.Map[String, String]]
+ if (executorProperties == null) {
+ executorProperties = new util.HashMap[String, String]()
+ }
+
+ // global engine params by console
+ val globalConfig: util.Map[String, String] = Utils.tryAndWarn(ElasticSearchEngineConsoleConf.getCacheMap(engineConnTask.getLables))
+
+ if (!executorProperties.isEmpty) {
+ globalConfig.putAll(executorProperties)
+ }
+
+ globalConfig
+ }
+
+
override def executeCompletely(engineExecutorContext: EngineExecutionContext, code: String, completedLine: String): ExecuteResponse = null
override def progress(taskID: String): Float = 0.0f
@@ -121,6 +146,7 @@ class ElasticSearchEngineConnExecutor(override val outputPrintLimit: Int, val id
override def getCurrentNodeResource(): NodeResource = {
val properties = EngineConnObject.getEngineCreationContext.getOptions
+
if (properties.containsKey(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)) {
val settingClientMemory = properties.get(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)
if (!settingClientMemory.toLowerCase().endsWith("g")) {
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/client/ElasticSearchExecutor.scala b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/client/ElasticSearchExecutor.scala
index f1e874e06..77631c147 100644
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/client/ElasticSearchExecutor.scala
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/client/ElasticSearchExecutor.scala
@@ -18,16 +18,14 @@ package org.apache.linkis.engineplugin.elasticsearch.executer.client
import java.io.IOException
import java.util
+
import org.apache.linkis.common.utils.Logging
-import org.apache.linkis.scheduler.executer.ExecuteResponse
import org.apache.linkis.engineplugin.elasticsearch.executer.client.impl.ElasticSearchExecutorImpl
-import scala.collection.JavaConverters._
-
trait ElasticSearchExecutor extends Logging {
@throws(classOf[IOException])
- def open : Unit
+ def open: Unit
def executeLine(code: String): ElasticSearchResponse
@@ -37,14 +35,8 @@ trait ElasticSearchExecutor extends Logging {
object ElasticSearchExecutor {
- def apply(runType: String, properties: util.Map[String, Object]): ElasticSearchExecutor = {
- val newProperties = new util.HashMap[String, String]()
- properties.asScala.foreach {
- case (key: String, value: Object) if value != null =>
- newProperties.put(key, String.valueOf(value))
- case _ =>
- }
- new ElasticSearchExecutorImpl(runType, newProperties)
+ def apply(runType: String, properties: util.Map[String, String]): ElasticSearchExecutor = {
+ new ElasticSearchExecutorImpl(runType, properties)
}
}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/client/EsClient.scala b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/client/EsClient.scala
index 48cec78f6..7707862e1 100644
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/client/EsClient.scala
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/client/EsClient.scala
@@ -17,6 +17,7 @@
package org.apache.linkis.engineplugin.elasticsearch.executer.client
import java.nio.charset.StandardCharsets.UTF_8
+import java.util
import org.apache.linkis.common.utils.Utils
import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchConfiguration._
@@ -34,7 +35,7 @@ import scala.collection.JavaConverters._
trait EsClientOperate {
- def execute(code: String, options: JMap[String, String], responseListener: ResponseListener): Cancellable
+ def execute(code: String, options: util.Map[String, String], responseListener: ResponseListener): Cancellable
def close(): Unit
@@ -64,12 +65,12 @@ abstract class EsClient(datasourceName: String, client: RestClient, sniffer: Sni
class EsClientImpl(datasourceName: String, client: RestClient, sniffer: Sniffer)
extends EsClient(datasourceName, client, sniffer) {
- override def execute(code: String, options: JMap[String, String], responseListener: ResponseListener): Cancellable = {
+ override def execute(code: String, options: util.Map[String, String], responseListener: ResponseListener): Cancellable = {
val request = createRequest(code, options)
client.performRequestAsync(request, responseListener)
}
- private def createRequest(code: String, options: JMap[String, String]): Request = {
+ private def createRequest(code: String, options: util.Map[String, String]): Request = {
val endpoint = ES_HTTP_ENDPOINT.getValue(options)
val method = ES_HTTP_METHOD.getValue(options)
val request = new Request(method, endpoint)
@@ -78,7 +79,7 @@ class EsClientImpl(datasourceName: String, client: RestClient, sniffer: Sniffer)
request
}
- private def getRequestOptions(options: JMap[String, String]): RequestOptions = {
+ private def getRequestOptions(options: util.Map[String, String]): RequestOptions = {
val builder = RequestOptions.DEFAULT.toBuilder()
val username = ES_USERNAME.getValue(options)
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/client/EsClientFactory.scala b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/client/EsClientFactory.scala
index 463d2f2d3..6573536b8 100644
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/client/EsClientFactory.scala
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/client/EsClientFactory.scala
@@ -20,7 +20,6 @@ import java.util
import java.util.Map
import org.apache.linkis.common.conf.CommonVars
import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchConfiguration._
-import org.apache.linkis.server.JMap
import org.apache.commons.lang3.StringUtils
import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
import org.apache.http.client.CredentialsProvider
@@ -36,7 +35,7 @@ import scala.collection.JavaConverters._
object EsClientFactory {
- def getRestClient(options: JMap[String, String]): EsClient = {
+ def getRestClient(options: util.Map[String, String]): EsClient = {
val key = getDatasourceName(options)
if (StringUtils.isBlank(key)) {
return defaultClient
@@ -63,7 +62,7 @@ object EsClientFactory {
}
}
- private def getDatasourceName(options: JMap[String, String]): String = {
+ private def getDatasourceName(options: util.Map[String, String]): String = {
options.getOrDefault(ES_DATASOURCE_NAME.key, "")
}
@@ -71,7 +70,7 @@ object EsClientFactory {
ES_CLIENT_MAP.put(client.getDatasourceName, client)
}
- private def createRestClient(options: JMap[String, String]): EsClient = {
+ private def createRestClient(options: util.Map[String, String]): EsClient = {
val clusterStr = options.get(ES_CLUSTER.key)
if (StringUtils.isBlank(clusterStr)) {
throw EsParamsIllegalException("cluster is blank!")
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/client/impl/ElasticSearchExecutorImpl.scala b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/client/impl/ElasticSearchExecutorImpl.scala
index adf46bc46..9cfaa5968 100644
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/client/impl/ElasticSearchExecutorImpl.scala
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/client/impl/ElasticSearchExecutorImpl.scala
@@ -16,7 +16,9 @@
*/
package org.apache.linkis.engineplugin.elasticsearch.executer.client.impl
+import java.util
import java.util.concurrent.CountDownLatch
+
import org.apache.linkis.common.utils.Utils
import org.apache.linkis.engineplugin.elasticsearch.executer.client.ResponseHandler
import org.apache.linkis.protocol.constants.TaskConstant
@@ -28,7 +30,7 @@ import org.apache.linkis.engineplugin.elasticsearch.exception.EsConvertResponseE
import org.apache.linkis.engineplugin.elasticsearch.executer.client.{ElasticSearchErrorResponse, ElasticSearchExecutor, ElasticSearchResponse, EsClient, EsClientFactory, ResponseHandler}
import org.elasticsearch.client.{Cancellable, Response, ResponseListener}
-class ElasticSearchExecutorImpl(runType: String, properties: JMap[String, String]) extends ElasticSearchExecutor {
+class ElasticSearchExecutorImpl(runType: String, properties: util.Map[String, String]) extends ElasticSearchExecutor {
private var client: EsClient = _
private var cancelable: Cancellable = _
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org