You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2023/07/17 08:07:44 UTC

[linkis] branch master updated: [Bug] Fix impala plugin for client creation (#4776)

This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/master by this push:
     new ec627119b [Bug] Fix impala plugin for client creation (#4776)
ec627119b is described below

commit ec627119bad6d3469cffa5503743f73c80b9c2f0
Author: Knypys <si...@qq.com>
AuthorDate: Mon Jul 17 16:07:38 2023 +0800

    [Bug] Fix impala plugin for client creation (#4776)
    
    * fix usage docs
    
    * fix engine for linkis-storage type change
    
    * fix impala executer bugs
    
    * fix impala client creation
---
 .../client/thrift/ImpalaThriftSessionFactory.java  |  2 +-
 .../impala/executor/ImpalaEngineConnExecutor.scala | 67 ++++++++--------------
 2 files changed, 24 insertions(+), 45 deletions(-)

diff --git a/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftSessionFactory.java b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftSessionFactory.java
index fd6eb0e9f..472564253 100644
--- a/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftSessionFactory.java
+++ b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftSessionFactory.java
@@ -90,7 +90,7 @@ public class ImpalaThriftSessionFactory {
     }
 
     if (sessions.isEmpty()) {
-      throw new IllegalArgumentException("invalid hosts: " + StringUtils.join(hosts, ','));
+      throw new IllegalArgumentException("Invalid hosts: " + StringUtils.join(hosts, ','));
     }
 
     this.socketFactory = socketFactory;
diff --git a/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/executor/ImpalaEngineConnExecutor.scala b/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/executor/ImpalaEngineConnExecutor.scala
index d9cf13a3c..23cd1a0e6 100644
--- a/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/executor/ImpalaEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/executor/ImpalaEngineConnExecutor.scala
@@ -17,73 +17,44 @@
 
 package org.apache.linkis.engineplugin.impala.executor
 
+import org.apache.commons.collections.MapUtils
+import org.apache.commons.io.IOUtils
+import org.apache.commons.lang3.StringUtils
+import org.apache.commons.lang3.exception.ExceptionUtils
 import org.apache.linkis.common.log.LogUtils
 import org.apache.linkis.common.utils.{OverloadUtils, Utils}
-import org.apache.linkis.engineconn.common.password.{
-  CommandPasswordCallback,
-  StaticPasswordCallback
-}
-import org.apache.linkis.engineconn.computation.executor.execute.{
-  ConcurrentComputationExecutor,
-  EngineExecutionContext
-}
+import org.apache.linkis.engineconn.common.password.{CommandPasswordCallback, StaticPasswordCallback}
+import org.apache.linkis.engineconn.computation.executor.execute.{ConcurrentComputationExecutor, EngineExecutionContext}
 import org.apache.linkis.engineconn.core.EngineConnObject
-import org.apache.linkis.engineplugin.impala.client.{
-  ExecutionListener,
-  ImpalaClient,
-  ImpalaResultSet
-}
 import org.apache.linkis.engineplugin.impala.client.ImpalaResultSet.Row
-import org.apache.linkis.engineplugin.impala.client.exception.{
-  ImpalaEngineException,
-  ImpalaErrorCodeSummary
-}
+import org.apache.linkis.engineplugin.impala.client.exception.{ImpalaEngineException, ImpalaErrorCodeSummary}
 import org.apache.linkis.engineplugin.impala.client.protocol.{ExecProgress, ExecStatus}
-import org.apache.linkis.engineplugin.impala.client.thrift.{
-  ImpalaThriftClient,
-  ImpalaThriftSessionFactory
-}
+import org.apache.linkis.engineplugin.impala.client.thrift.{ImpalaThriftClient, ImpalaThriftSessionFactory}
+import org.apache.linkis.engineplugin.impala.client.{ExecutionListener, ImpalaClient, ImpalaResultSet}
 import org.apache.linkis.engineplugin.impala.conf.ImpalaConfiguration._
 import org.apache.linkis.engineplugin.impala.conf.ImpalaEngineConfig
 import org.apache.linkis.governance.common.paser.SQLCodeParser
-import org.apache.linkis.manager.common.entity.resource.{
-  CommonNodeResource,
-  LoadResource,
-  NodeResource
-}
+import org.apache.linkis.manager.common.entity.resource.{CommonNodeResource, LoadResource, NodeResource}
 import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
 import org.apache.linkis.manager.label.entity.Label
 import org.apache.linkis.manager.label.entity.engine.{EngineTypeLabel, UserCreatorLabel}
 import org.apache.linkis.protocol.engine.JobProgressInfo
 import org.apache.linkis.rpc.Sender
-import org.apache.linkis.scheduler.executer.{
-  CompletedExecuteResponse,
-  ErrorExecuteResponse,
-  ExecuteResponse,
-  SuccessExecuteResponse
-}
+import org.apache.linkis.scheduler.executer.{CompletedExecuteResponse, ErrorExecuteResponse, ExecuteResponse, SuccessExecuteResponse}
 import org.apache.linkis.storage.domain.Column
 import org.apache.linkis.storage.resultset.ResultSetFactory
 import org.apache.linkis.storage.resultset.table.{TableMetaData, TableRecord}
-
-import org.apache.commons.io.IOUtils
-import org.apache.commons.lang3.StringUtils
-import org.apache.commons.lang3.exception.ExceptionUtils
-
 import org.springframework.util.CollectionUtils
 
-import javax.net.SocketFactory
-import javax.net.ssl._
-import javax.security.auth.callback.{Callback, CallbackHandler, NameCallback, PasswordCallback}
-
 import java.io.FileInputStream
 import java.security.KeyStore
 import java.util
 import java.util.concurrent.ConcurrentHashMap
 import java.util.function.Consumer
-
+import javax.net.SocketFactory
+import javax.net.ssl._
+import javax.security.auth.callback.{Callback, CallbackHandler, NameCallback, PasswordCallback}
 import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
 
 class ImpalaEngineConnExecutor(override val outputPrintLimit: Int, val id: Int)
     extends ConcurrentComputationExecutor(outputPrintLimit) {
@@ -308,7 +279,7 @@ class ImpalaEngineConnExecutor(override val outputPrintLimit: Int, val id: Int)
       engineExecutionContext.getLabels.find(_.isInstanceOf[EngineTypeLabel]).get
     var configMap: util.Map[String, String] = null
     if (userCreatorLabel != null && engineTypeLabel != null) {
-      configMap = Utils.tryAndError(
+      configMap = Utils.tryAndWarn(
         ImpalaEngineConfig.getCacheMap(
           (
             userCreatorLabel.asInstanceOf[UserCreatorLabel],
@@ -317,6 +288,14 @@ class ImpalaEngineConnExecutor(override val outputPrintLimit: Int, val id: Int)
         )
       )
     }
+    if (configMap == null) {
+      configMap = new util.HashMap[String, String]()
+    }
+
+    val properties = engineExecutionContext.getProperties.asInstanceOf[util.Map[String, String]]
+    if (MapUtils.isNotEmpty(properties)) {
+      configMap.putAll(properties)
+    }
 
     val impalaServers = IMPALA_SERVERS.getValue(configMap)
     val impalaMaxConnections = IMPALA_MAX_CONNECTIONS.getValue(configMap)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org