You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2023/07/17 08:07:44 UTC
[linkis] branch master updated: [Bug] Fix impala plugin for client creation (#4776)
This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/master by this push:
new ec627119b [Bug] Fix impala plugin for client creation (#4776)
ec627119b is described below
commit ec627119bad6d3469cffa5503743f73c80b9c2f0
Author: Knypys <si...@qq.com>
AuthorDate: Mon Jul 17 16:07:38 2023 +0800
[Bug] Fix impala plugin for client creation (#4776)
* fix usage docs
* fix engine for linkis-storage type change
* fix impala executer bugs
* fix impala client creation
---
.../client/thrift/ImpalaThriftSessionFactory.java | 2 +-
.../impala/executor/ImpalaEngineConnExecutor.scala | 67 ++++++++--------------
2 files changed, 24 insertions(+), 45 deletions(-)
diff --git a/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftSessionFactory.java b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftSessionFactory.java
index fd6eb0e9f..472564253 100644
--- a/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftSessionFactory.java
+++ b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftSessionFactory.java
@@ -90,7 +90,7 @@ public class ImpalaThriftSessionFactory {
}
if (sessions.isEmpty()) {
- throw new IllegalArgumentException("invalid hosts: " + StringUtils.join(hosts, ','));
+ throw new IllegalArgumentException("Invalid hosts: " + StringUtils.join(hosts, ','));
}
this.socketFactory = socketFactory;
diff --git a/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/executor/ImpalaEngineConnExecutor.scala b/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/executor/ImpalaEngineConnExecutor.scala
index d9cf13a3c..23cd1a0e6 100644
--- a/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/executor/ImpalaEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/executor/ImpalaEngineConnExecutor.scala
@@ -17,73 +17,44 @@
package org.apache.linkis.engineplugin.impala.executor
+import org.apache.commons.collections.MapUtils
+import org.apache.commons.io.IOUtils
+import org.apache.commons.lang3.StringUtils
+import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.linkis.common.log.LogUtils
import org.apache.linkis.common.utils.{OverloadUtils, Utils}
-import org.apache.linkis.engineconn.common.password.{
- CommandPasswordCallback,
- StaticPasswordCallback
-}
-import org.apache.linkis.engineconn.computation.executor.execute.{
- ConcurrentComputationExecutor,
- EngineExecutionContext
-}
+import org.apache.linkis.engineconn.common.password.{CommandPasswordCallback, StaticPasswordCallback}
+import org.apache.linkis.engineconn.computation.executor.execute.{ConcurrentComputationExecutor, EngineExecutionContext}
import org.apache.linkis.engineconn.core.EngineConnObject
-import org.apache.linkis.engineplugin.impala.client.{
- ExecutionListener,
- ImpalaClient,
- ImpalaResultSet
-}
import org.apache.linkis.engineplugin.impala.client.ImpalaResultSet.Row
-import org.apache.linkis.engineplugin.impala.client.exception.{
- ImpalaEngineException,
- ImpalaErrorCodeSummary
-}
+import org.apache.linkis.engineplugin.impala.client.exception.{ImpalaEngineException, ImpalaErrorCodeSummary}
import org.apache.linkis.engineplugin.impala.client.protocol.{ExecProgress, ExecStatus}
-import org.apache.linkis.engineplugin.impala.client.thrift.{
- ImpalaThriftClient,
- ImpalaThriftSessionFactory
-}
+import org.apache.linkis.engineplugin.impala.client.thrift.{ImpalaThriftClient, ImpalaThriftSessionFactory}
+import org.apache.linkis.engineplugin.impala.client.{ExecutionListener, ImpalaClient, ImpalaResultSet}
import org.apache.linkis.engineplugin.impala.conf.ImpalaConfiguration._
import org.apache.linkis.engineplugin.impala.conf.ImpalaEngineConfig
import org.apache.linkis.governance.common.paser.SQLCodeParser
-import org.apache.linkis.manager.common.entity.resource.{
- CommonNodeResource,
- LoadResource,
- NodeResource
-}
+import org.apache.linkis.manager.common.entity.resource.{CommonNodeResource, LoadResource, NodeResource}
import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.label.entity.engine.{EngineTypeLabel, UserCreatorLabel}
import org.apache.linkis.protocol.engine.JobProgressInfo
import org.apache.linkis.rpc.Sender
-import org.apache.linkis.scheduler.executer.{
- CompletedExecuteResponse,
- ErrorExecuteResponse,
- ExecuteResponse,
- SuccessExecuteResponse
-}
+import org.apache.linkis.scheduler.executer.{CompletedExecuteResponse, ErrorExecuteResponse, ExecuteResponse, SuccessExecuteResponse}
import org.apache.linkis.storage.domain.Column
import org.apache.linkis.storage.resultset.ResultSetFactory
import org.apache.linkis.storage.resultset.table.{TableMetaData, TableRecord}
-
-import org.apache.commons.io.IOUtils
-import org.apache.commons.lang3.StringUtils
-import org.apache.commons.lang3.exception.ExceptionUtils
-
import org.springframework.util.CollectionUtils
-import javax.net.SocketFactory
-import javax.net.ssl._
-import javax.security.auth.callback.{Callback, CallbackHandler, NameCallback, PasswordCallback}
-
import java.io.FileInputStream
import java.security.KeyStore
import java.util
import java.util.concurrent.ConcurrentHashMap
import java.util.function.Consumer
-
+import javax.net.SocketFactory
+import javax.net.ssl._
+import javax.security.auth.callback.{Callback, CallbackHandler, NameCallback, PasswordCallback}
import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
class ImpalaEngineConnExecutor(override val outputPrintLimit: Int, val id: Int)
extends ConcurrentComputationExecutor(outputPrintLimit) {
@@ -308,7 +279,7 @@ class ImpalaEngineConnExecutor(override val outputPrintLimit: Int, val id: Int)
engineExecutionContext.getLabels.find(_.isInstanceOf[EngineTypeLabel]).get
var configMap: util.Map[String, String] = null
if (userCreatorLabel != null && engineTypeLabel != null) {
- configMap = Utils.tryAndError(
+ configMap = Utils.tryAndWarn(
ImpalaEngineConfig.getCacheMap(
(
userCreatorLabel.asInstanceOf[UserCreatorLabel],
@@ -317,6 +288,14 @@ class ImpalaEngineConnExecutor(override val outputPrintLimit: Int, val id: Int)
)
)
}
+ if (configMap == null) {
+ configMap = new util.HashMap[String, String]()
+ }
+
+ val properties = engineExecutionContext.getProperties.asInstanceOf[util.Map[String, String]]
+ if (MapUtils.isNotEmpty(properties)) {
+ configMap.putAll(properties)
+ }
val impalaServers = IMPALA_SERVERS.getValue(configMap)
val impalaMaxConnections = IMPALA_MAX_CONNECTIONS.getValue(configMap)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org