You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ca...@apache.org on 2022/08/16 08:32:07 UTC

[incubator-linkis] branch dev-1.2.0 updated: ES Engine Optimization (#2795)

This is an automated email from the ASF dual-hosted git repository.

casion pushed a commit to branch dev-1.2.0
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.2.0 by this push:
     new 91d79927d ES Engine Optimization (#2795)
91d79927d is described below

commit 91d79927dc54b47f577091e55cfa81b07ba828f5
Author: aiceflower <ki...@sina.com>
AuthorDate: Tue Aug 16 16:32:02 2022 +0800

    ES Engine Optimization (#2795)
    
    * Fixed the console configuration not working
    * Change the ES datasource parameters to be consistent with others
---
 .../conf/ElasticSearchConfiguration.scala          |  2 +-
 .../conf/ElasticSearchEngineConsoleConf.scala      | 39 ++++++++++++++++++++++
 .../executer/ElasticSearchEngineConnExecutor.scala | 36 +++++++++++++++++---
 .../executer/client/ElasticSearchExecutor.scala    | 16 +++------
 .../elasticsearch/executer/client/EsClient.scala   |  9 ++---
 .../executer/client/EsClientFactory.scala          |  7 ++--
 .../client/impl/ElasticSearchExecutorImpl.scala    |  4 ++-
 7 files changed, 86 insertions(+), 27 deletions(-)

diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/conf/ElasticSearchConfiguration.scala b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/conf/ElasticSearchConfiguration.scala
index 7855d7bf7..d138b60f7 100644
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/conf/ElasticSearchConfiguration.scala
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/conf/ElasticSearchConfiguration.scala
@@ -22,7 +22,7 @@ object ElasticSearchConfiguration {
 
   // es client
   val ES_CLUSTER = CommonVars("linkis.es.cluster", "127.0.0.1:9200")
-  val ES_DATASOURCE_NAME = CommonVars("linkis.datasource", "default_datasource")
+  val ES_DATASOURCE_NAME = CommonVars("linkis.es.datasource", "default_datasource")
   val ES_AUTH_CACHE = CommonVars("linkis.es.auth.cache", false)
   val ES_USERNAME = CommonVars("linkis.es.username", "")
   val ES_PASSWORD = CommonVars("linkis.es.password", "")
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/conf/ElasticSearchEngineConsoleConf.scala b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/conf/ElasticSearchEngineConsoleConf.scala
new file mode 100644
index 000000000..fff0fb97a
--- /dev/null
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/conf/ElasticSearchEngineConsoleConf.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.linkis.engineplugin.elasticsearch.conf
+
+import java.util
+
+import org.apache.linkis.common.conf.Configuration
+import org.apache.linkis.governance.common.protocol.conf.{RequestQueryEngineConfig, ResponseQueryConfig}
+import org.apache.linkis.manager.label.entity.Label
+import org.apache.linkis.manager.label.entity.engine.{EngineTypeLabel, UserCreatorLabel}
+import org.apache.linkis.protocol.CacheableProtocol
+import org.apache.linkis.rpc.RPCMapCache
+
+object ElasticSearchEngineConsoleConf extends RPCMapCache[Array[Label[_]], String, String](Configuration.CLOUD_CONSOLE_CONFIGURATION_SPRING_APPLICATION_NAME.getValue) {
+
+  override protected def createRequest(labels: Array[Label[_]]): CacheableProtocol = {
+    val userCreatorLabel = labels.find(_.isInstanceOf[UserCreatorLabel]).get.asInstanceOf[UserCreatorLabel]
+    val engineTypeLabel = labels.find(_.isInstanceOf[EngineTypeLabel]).get.asInstanceOf[EngineTypeLabel]
+    RequestQueryEngineConfig(userCreatorLabel, engineTypeLabel)
+  }
+
+  override protected def createMap(any: Any): util.Map[String, String] = any match {
+    case response: ResponseQueryConfig => response.getKeyAndValue
+  }
+}
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/ElasticSearchEngineConnExecutor.scala b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/ElasticSearchEngineConnExecutor.scala
index bacbd2b83..20b8f269c 100644
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/ElasticSearchEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/ElasticSearchEngineConnExecutor.scala
@@ -18,13 +18,16 @@ package org.apache.linkis.engineplugin.elasticsearch.executer
 
 import java.util
 import java.util.concurrent.TimeUnit
+
 import com.google.common.cache.{Cache, CacheBuilder, RemovalListener, RemovalNotification}
+import org.apache.commons.io.IOUtils
 import org.apache.linkis.common.utils.{Logging, OverloadUtils, Utils}
 import org.apache.linkis.engineconn.common.conf.{EngineConnConf, EngineConnConstant}
 import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask
 import org.apache.linkis.engineconn.computation.executor.execute.{ConcurrentComputationExecutor, EngineExecutionContext}
 import org.apache.linkis.engineconn.core.EngineConnObject
-import org.apache.linkis.engineplugin.elasticsearch.executer.client.ElasticSearchErrorResponse
+import org.apache.linkis.engineplugin.elasticsearch.conf.{ElasticSearchConfiguration, ElasticSearchEngineConsoleConf}
+import org.apache.linkis.engineplugin.elasticsearch.executer.client.{ElasticSearchErrorResponse, ElasticSearchExecutor, ElasticSearchJsonResponse, ElasticSearchTableResponse}
 import org.apache.linkis.governance.common.entity.ExecutionNodeStatus
 import org.apache.linkis.manager.common.entity.resource.{CommonNodeResource, LoadResource, NodeResource}
 import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
@@ -35,9 +38,6 @@ import org.apache.linkis.scheduler.executer.{AliasOutputExecuteResponse, ErrorEx
 import org.apache.linkis.storage.LineRecord
 import org.apache.linkis.storage.resultset.ResultSetFactory
 import org.apache.linkis.storage.resultset.table.TableMetaData
-import org.apache.commons.io.IOUtils
-import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchConfiguration
-import org.apache.linkis.engineplugin.elasticsearch.executer.client.{ElasticSearchErrorResponse, ElasticSearchExecutor, ElasticSearchJsonResponse, ElasticSearchTableResponse}
 import org.springframework.util.CollectionUtils
 
 import scala.collection.JavaConverters._
@@ -64,7 +64,11 @@ class ElasticSearchEngineConnExecutor(override val outputPrintLimit: Int, val id
   }
 
   override def execute(engineConnTask: EngineConnTask): ExecuteResponse = {
-    val elasticSearchExecutor = ElasticSearchExecutor(runType, engineConnTask.getProperties)
+
+    val properties: util.Map[String, String] = buildRuntimeParams(engineConnTask)
+    logger.info(s"The elasticsearch properties is: $properties")
+
+    val elasticSearchExecutor = ElasticSearchExecutor(runType, properties)
     elasticSearchExecutor.open
     elasticSearchExecutorCache.put(engineConnTask.getTaskId, elasticSearchExecutor)
     super.execute(engineConnTask)
@@ -73,6 +77,7 @@ class ElasticSearchEngineConnExecutor(override val outputPrintLimit: Int, val id
   override def executeLine(engineExecutorContext: EngineExecutionContext, code: String): ExecuteResponse = {
     val taskId = engineExecutorContext.getJobId.get
     val elasticSearchExecutor = elasticSearchExecutorCache.getIfPresent(taskId)
+
     val elasticSearchResponse = elasticSearchExecutor.executeLine(code)
 
     elasticSearchResponse match {
@@ -100,6 +105,26 @@ class ElasticSearchEngineConnExecutor(override val outputPrintLimit: Int, val id
     }
   }
 
+
+  private def buildRuntimeParams(engineConnTask: EngineConnTask): util.Map[String, String] = {
+
+    // parameters specified at runtime
+    var executorProperties = engineConnTask.getProperties.asInstanceOf[util.Map[String, String]]
+    if (executorProperties == null) {
+      executorProperties = new util.HashMap[String, String]()
+    }
+
+    // global  engine params by console
+    val globalConfig: util.Map[String, String] = Utils.tryAndWarn(ElasticSearchEngineConsoleConf.getCacheMap(engineConnTask.getLables))
+
+    if (!executorProperties.isEmpty) {
+      globalConfig.putAll(executorProperties)
+    }
+
+    globalConfig
+  }
+
+
   override def executeCompletely(engineExecutorContext: EngineExecutionContext, code: String, completedLine: String): ExecuteResponse = null
 
   override def progress(taskID: String): Float = 0.0f
@@ -121,6 +146,7 @@ class ElasticSearchEngineConnExecutor(override val outputPrintLimit: Int, val id
 
   override def getCurrentNodeResource(): NodeResource = {
     val properties = EngineConnObject.getEngineCreationContext.getOptions
+
     if (properties.containsKey(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)) {
       val settingClientMemory = properties.get(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)
       if (!settingClientMemory.toLowerCase().endsWith("g")) {
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/client/ElasticSearchExecutor.scala b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/client/ElasticSearchExecutor.scala
index f1e874e06..77631c147 100644
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/client/ElasticSearchExecutor.scala
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/client/ElasticSearchExecutor.scala
@@ -18,16 +18,14 @@ package org.apache.linkis.engineplugin.elasticsearch.executer.client
 
 import java.io.IOException
 import java.util
+
 import org.apache.linkis.common.utils.Logging
-import org.apache.linkis.scheduler.executer.ExecuteResponse
 import org.apache.linkis.engineplugin.elasticsearch.executer.client.impl.ElasticSearchExecutorImpl
 
-import scala.collection.JavaConverters._
-
 trait ElasticSearchExecutor extends Logging {
 
   @throws(classOf[IOException])
-  def open : Unit
+  def open: Unit
 
   def executeLine(code: String): ElasticSearchResponse
 
@@ -37,14 +35,8 @@ trait ElasticSearchExecutor extends Logging {
 
 object ElasticSearchExecutor {
 
-  def apply(runType: String, properties: util.Map[String, Object]): ElasticSearchExecutor = {
-    val newProperties = new util.HashMap[String, String]()
-    properties.asScala.foreach {
-      case (key: String, value: Object) if value != null =>
-        newProperties.put(key, String.valueOf(value))
-      case _ =>
-    }
-    new ElasticSearchExecutorImpl(runType, newProperties)
+  def apply(runType: String, properties: util.Map[String, String]): ElasticSearchExecutor = {
+    new ElasticSearchExecutorImpl(runType, properties)
   }
 
 }
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/client/EsClient.scala b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/client/EsClient.scala
index 48cec78f6..7707862e1 100644
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/client/EsClient.scala
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/client/EsClient.scala
@@ -17,6 +17,7 @@
 package org.apache.linkis.engineplugin.elasticsearch.executer.client
 
 import java.nio.charset.StandardCharsets.UTF_8
+import java.util
 
 import org.apache.linkis.common.utils.Utils
 import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchConfiguration._
@@ -34,7 +35,7 @@ import scala.collection.JavaConverters._
 
 trait EsClientOperate {
 
-  def execute(code: String, options: JMap[String, String], responseListener: ResponseListener): Cancellable
+  def execute(code: String, options: util.Map[String, String], responseListener: ResponseListener): Cancellable
 
   def close(): Unit
 
@@ -64,12 +65,12 @@ abstract class EsClient(datasourceName: String, client: RestClient, sniffer: Sni
 class EsClientImpl(datasourceName: String, client: RestClient, sniffer: Sniffer)
   extends EsClient(datasourceName, client, sniffer) {
 
-  override def execute(code: String, options: JMap[String, String], responseListener: ResponseListener): Cancellable = {
+  override def execute(code: String, options: util.Map[String, String], responseListener: ResponseListener): Cancellable = {
     val request = createRequest(code, options)
     client.performRequestAsync(request, responseListener)
   }
 
-  private def createRequest(code: String, options: JMap[String, String]): Request = {
+  private def createRequest(code: String, options: util.Map[String, String]): Request = {
     val endpoint = ES_HTTP_ENDPOINT.getValue(options)
     val method = ES_HTTP_METHOD.getValue(options)
     val request = new Request(method, endpoint)
@@ -78,7 +79,7 @@ class EsClientImpl(datasourceName: String, client: RestClient, sniffer: Sniffer)
     request
   }
 
-  private def getRequestOptions(options: JMap[String, String]): RequestOptions = {
+  private def getRequestOptions(options: util.Map[String, String]): RequestOptions = {
     val builder = RequestOptions.DEFAULT.toBuilder()
 
     val username = ES_USERNAME.getValue(options)
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/client/EsClientFactory.scala b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/client/EsClientFactory.scala
index 463d2f2d3..6573536b8 100644
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/client/EsClientFactory.scala
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/client/EsClientFactory.scala
@@ -20,7 +20,6 @@ import java.util
 import java.util.Map
 import org.apache.linkis.common.conf.CommonVars
 import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchConfiguration._
-import org.apache.linkis.server.JMap
 import org.apache.commons.lang3.StringUtils
 import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
 import org.apache.http.client.CredentialsProvider
@@ -36,7 +35,7 @@ import scala.collection.JavaConverters._
 
 object EsClientFactory {
 
-  def getRestClient(options: JMap[String, String]): EsClient = {
+  def getRestClient(options: util.Map[String, String]): EsClient = {
     val key = getDatasourceName(options)
     if (StringUtils.isBlank(key)) {
       return defaultClient
@@ -63,7 +62,7 @@ object EsClientFactory {
     }
   }
 
-  private def getDatasourceName(options: JMap[String, String]): String = {
+  private def getDatasourceName(options: util.Map[String, String]): String = {
     options.getOrDefault(ES_DATASOURCE_NAME.key, "")
   }
 
@@ -71,7 +70,7 @@ object EsClientFactory {
     ES_CLIENT_MAP.put(client.getDatasourceName, client)
   }
 
-  private def createRestClient(options: JMap[String, String]): EsClient = {
+  private def createRestClient(options: util.Map[String, String]): EsClient = {
     val clusterStr = options.get(ES_CLUSTER.key)
     if (StringUtils.isBlank(clusterStr)) {
       throw EsParamsIllegalException("cluster is blank!")
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/client/impl/ElasticSearchExecutorImpl.scala b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/client/impl/ElasticSearchExecutorImpl.scala
index adf46bc46..9cfaa5968 100644
--- a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/client/impl/ElasticSearchExecutorImpl.scala
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executer/client/impl/ElasticSearchExecutorImpl.scala
@@ -16,7 +16,9 @@
  */
 package org.apache.linkis.engineplugin.elasticsearch.executer.client.impl
 
+import java.util
 import java.util.concurrent.CountDownLatch
+
 import org.apache.linkis.common.utils.Utils
 import org.apache.linkis.engineplugin.elasticsearch.executer.client.ResponseHandler
 import org.apache.linkis.protocol.constants.TaskConstant
@@ -28,7 +30,7 @@ import org.apache.linkis.engineplugin.elasticsearch.exception.EsConvertResponseE
 import org.apache.linkis.engineplugin.elasticsearch.executer.client.{ElasticSearchErrorResponse, ElasticSearchExecutor, ElasticSearchResponse, EsClient, EsClientFactory, ResponseHandler}
 import org.elasticsearch.client.{Cancellable, Response, ResponseListener}
 
-class ElasticSearchExecutorImpl(runType: String, properties: JMap[String, String]) extends ElasticSearchExecutor {
+class ElasticSearchExecutorImpl(runType: String, properties: util.Map[String, String]) extends ElasticSearchExecutor {
 
   private var client: EsClient = _
   private var cancelable: Cancellable = _


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org