You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2023/03/29 12:59:15 UTC

[linkis] branch dev-1.4.0 updated: feat: HiveEngineConn supports concurrency.(#4175) (#4359)

This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.4.0
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/dev-1.4.0 by this push:
     new 6d1eed14c feat: HiveEngineConn supports concurrency.(#4175) (#4359)
6d1eed14c is described below

commit 6d1eed14c77ff193eb6c75b9687763f449825fab
Author: CharlieYan <16...@users.noreply.github.com>
AuthorDate: Wed Mar 29 20:59:09 2023 +0800

    feat: HiveEngineConn supports concurrency.(#4175) (#4359)
    
    * feat: HiveEngineConn supports concurrency.(#4175)
---
 .../main/resources/linkis-engineconn.properties    |   5 +-
 .../hive/conf/HiveEngineConfiguration.scala        |   7 +
 .../hive/creation/HiveEngineConnFactory.scala      |  77 ++-
 .../engineplugin/hive/entity/HiveSession.scala     |  11 +-
 .../HiveEngineConcurrentConnExecutor.scala         | 536 +++++++++++++++++++++
 .../physical/CodeLogicalUnitExecTask.scala         |   1 +
 6 files changed, 620 insertions(+), 17 deletions(-)

diff --git a/linkis-engineconn-plugins/hive/src/main/resources/linkis-engineconn.properties b/linkis-engineconn-plugins/hive/src/main/resources/linkis-engineconn.properties
index a2135181f..c320a444b 100644
--- a/linkis-engineconn-plugins/hive/src/main/resources/linkis-engineconn.properties
+++ b/linkis-engineconn-plugins/hive/src/main/resources/linkis-engineconn.properties
@@ -24,4 +24,7 @@ wds.linkis.engine.connector.hooks=org.apache.linkis.engineconn.computation.execu
 wds.linkis.engineconn.maintain.enable=true
 
 #Depending on the engine selected in HIVE_ENGINE_TYPE, control the function called when canceling the task in scripts.
-wds.linkis.hive.engine.type=mr
\ No newline at end of file
+wds.linkis.hive.engine.type=mr
+
+# support parallelism execution
+wds.linkis.engineconn.support.parallelism=true
\ No newline at end of file
diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/conf/HiveEngineConfiguration.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/conf/HiveEngineConfiguration.scala
index 047157b78..a7ac85a8c 100644
--- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/conf/HiveEngineConfiguration.scala
+++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/conf/HiveEngineConfiguration.scala
@@ -40,4 +40,11 @@ object HiveEngineConfiguration {
   ).getValue
 
   val HIVE_ENGINE_TYPE = CommonVars[String]("wds.linkis.hive.engine.type", "mr").getValue
+
+  val HIVE_ENGINE_CONCURRENT_LIMIT =
+    CommonVars[Int]("linkis.hive.engineconn.concurrent.limit", 10).getValue
+
+  val HIVE_ENGINE_CONCURRENT_SUPPORT =
+    CommonVars[Boolean]("linkis.hive.engineconn.concurrent.support", false).getValue
+
 }
diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/creation/HiveEngineConnFactory.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/creation/HiveEngineConnFactory.scala
index 9a4697597..7cbcd6e2a 100644
--- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/creation/HiveEngineConnFactory.scala
+++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/creation/HiveEngineConnFactory.scala
@@ -24,11 +24,18 @@ import org.apache.linkis.engineconn.computation.executor.creation.ComputationSin
 import org.apache.linkis.engineconn.executor.entity.LabelExecutor
 import org.apache.linkis.engineplugin.hive.common.HiveUtils
 import org.apache.linkis.engineplugin.hive.conf.HiveEngineConfiguration
-import org.apache.linkis.engineplugin.hive.entity.HiveSession
+import org.apache.linkis.engineplugin.hive.entity.{
+  AbstractHiveSession,
+  HiveConcurrentSession,
+  HiveSession
+}
 import org.apache.linkis.engineplugin.hive.errorcode.HiveErrorCodeSummary.CREATE_HIVE_EXECUTOR_ERROR
 import org.apache.linkis.engineplugin.hive.errorcode.HiveErrorCodeSummary.HIVE_EXEC_JAR_ERROR
 import org.apache.linkis.engineplugin.hive.exception.HiveSessionStartFailedException
-import org.apache.linkis.engineplugin.hive.executor.HiveEngineConnExecutor
+import org.apache.linkis.engineplugin.hive.executor.{
+  HiveEngineConcurrentConnExecutor,
+  HiveEngineConnExecutor
+}
 import org.apache.linkis.hadoop.common.utils.HDFSUtils
 import org.apache.linkis.manager.label.entity.engine.{EngineType, RunType}
 import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType
@@ -38,9 +45,11 @@ import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.ql.Driver
 import org.apache.hadoop.hive.ql.session.SessionState
+import org.apache.hadoop.security.UserGroupInformation
 
 import java.io.{ByteArrayOutputStream, PrintStream}
 import java.security.PrivilegedExceptionAction
+import java.util
 
 import scala.collection.JavaConverters._
 
@@ -63,6 +72,14 @@ class HiveEngineConnFactory extends ComputationSingleExecutorEngineConnFactory w
           hiveSession.hiveConf,
           hiveSession.baos
         )
+      case hiveConcurrentSession: HiveConcurrentSession =>
+        new HiveEngineConcurrentConnExecutor(
+          id,
+          hiveConcurrentSession.sessionState,
+          hiveConcurrentSession.ugi,
+          hiveConcurrentSession.hiveConf,
+          hiveConcurrentSession.baos
+        )
       case _ =>
         throw HiveSessionStartFailedException(
           CREATE_HIVE_EXECUTOR_ERROR.getErrorCode,
@@ -73,8 +90,48 @@ class HiveEngineConnFactory extends ComputationSingleExecutorEngineConnFactory w
 
   override protected def createEngineConnSession(
       engineCreationContext: EngineCreationContext
-  ): HiveSession = {
-    val options = engineCreationContext.getOptions
+  ): AbstractHiveSession = {
+    // if hive engine support concurrent, return HiveConcurrentSession
+    if (HiveEngineConfiguration.HIVE_ENGINE_CONCURRENT_SUPPORT) {
+      return doCreateHiveConcurrentSession(engineCreationContext.getOptions)
+    }
+
+    // return HiveSession
+    doCreateHiveSession(engineCreationContext.getOptions)
+  }
+
+  def doCreateHiveConcurrentSession(options: util.Map[String, String]): HiveConcurrentSession = {
+    val hiveConf: HiveConf = getHiveConf(options)
+    val ugi = HDFSUtils.getUserGroupInformation(Utils.getJvmUser)
+    val baos = new ByteArrayOutputStream()
+    val sessionState: SessionState = getSessionState(hiveConf, ugi, baos)
+    HiveConcurrentSession(sessionState, ugi, hiveConf, baos)
+  }
+
+  def doCreateHiveSession(options: util.Map[String, String]): HiveSession = {
+    val hiveConf: HiveConf = getHiveConf(options)
+    val ugi = HDFSUtils.getUserGroupInformation(Utils.getJvmUser)
+    val baos = new ByteArrayOutputStream()
+    val sessionState: SessionState = getSessionState(hiveConf, ugi, baos)
+    HiveSession(sessionState, ugi, hiveConf, baos)
+  }
+
+  private def getSessionState(
+      hiveConf: HiveConf,
+      ugi: UserGroupInformation,
+      baos: ByteArrayOutputStream
+  ) = {
+    val sessionState: SessionState = ugi.doAs(new PrivilegedExceptionAction[SessionState] {
+      override def run(): SessionState = new SessionState(hiveConf)
+    })
+    sessionState.out = new PrintStream(baos, true, "utf-8")
+    sessionState.info = new PrintStream(System.out, true, "utf-8")
+    sessionState.err = new PrintStream(System.out, true, "utf-8")
+    SessionState.start(sessionState)
+    sessionState
+  }
+
+  private def getHiveConf(options: util.Map[String, String]) = {
     val hiveConf: HiveConf = HiveUtils.getHiveConf
     hiveConf.setVar(
       HiveConf.ConfVars.HIVEJAR,
@@ -126,17 +183,7 @@ class HiveEngineConnFactory extends ComputationSingleExecutorEngineConnFactory w
      */
     // enable hive.stats.collect.scancols
     hiveConf.setBoolean("hive.stats.collect.scancols", true)
-    val ugi = HDFSUtils.getUserGroupInformation(Utils.getJvmUser)
-    val sessionState: SessionState = ugi.doAs(new PrivilegedExceptionAction[SessionState] {
-      override def run(): SessionState = new SessionState(hiveConf)
-    })
-    val baos = new ByteArrayOutputStream()
-    sessionState.out = new PrintStream(baos, true, "utf-8")
-    sessionState.info = new PrintStream(System.out, true, "utf-8")
-    sessionState.err = new PrintStream(System.out, true, "utf-8")
-    SessionState.start(sessionState)
-
-    HiveSession(sessionState, ugi, hiveConf, baos)
+    hiveConf
   }
 
   override protected def getEngineConnType: EngineType = EngineType.HIVE
diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/entity/HiveSession.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/entity/HiveSession.scala
index c75478925..95aebac7e 100644
--- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/entity/HiveSession.scala
+++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/entity/HiveSession.scala
@@ -23,9 +23,18 @@ import org.apache.hadoop.security.UserGroupInformation
 
 import java.io.ByteArrayOutputStream
 
+abstract class AbstractHiveSession
+
 case class HiveSession(
     sessionState: SessionState,
     ugi: UserGroupInformation,
     hiveConf: HiveConf,
     baos: ByteArrayOutputStream = null
-)
+) extends AbstractHiveSession
+
+case class HiveConcurrentSession(
+    sessionState: SessionState,
+    ugi: UserGroupInformation,
+    hiveConf: HiveConf,
+    baos: ByteArrayOutputStream = null
+) extends AbstractHiveSession
diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala
new file mode 100644
index 000000000..80b2145f8
--- /dev/null
+++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala
@@ -0,0 +1,536 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.hive.executor
+
+import org.apache.linkis.common.exception.ErrorException
+import org.apache.linkis.common.utils.{ByteTimeUtils, Logging, Utils}
+import org.apache.linkis.engineconn.computation.executor.execute.{
+  ConcurrentComputationExecutor,
+  EngineExecutionContext
+}
+import org.apache.linkis.engineconn.core.EngineConnObject
+import org.apache.linkis.engineconn.executor.entity.{ConcurrentExecutor, ResourceFetchExecutor}
+import org.apache.linkis.engineplugin.hive.conf.{Counters, HiveEngineConfiguration}
+import org.apache.linkis.engineplugin.hive.creation.HiveEngineConnFactory
+import org.apache.linkis.engineplugin.hive.cs.CSHiveHelper
+import org.apache.linkis.engineplugin.hive.errorcode.HiveErrorCodeSummary.COMPILE_HIVE_QUERY_ERROR
+import org.apache.linkis.engineplugin.hive.errorcode.HiveErrorCodeSummary.GET_FIELD_SCHEMAS_ERROR
+import org.apache.linkis.engineplugin.hive.exception.HiveQueryFailedException
+import org.apache.linkis.governance.common.paser.SQLCodeParser
+import org.apache.linkis.governance.common.utils.JobUtils
+import org.apache.linkis.hadoop.common.conf.HadoopConf
+import org.apache.linkis.manager.common.entity.resource.{
+  CommonNodeResource,
+  LoadInstanceResource,
+  NodeResource
+}
+import org.apache.linkis.manager.common.protocol.resource.ResourceWithStatus
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
+import org.apache.linkis.manager.label.entity.Label
+import org.apache.linkis.protocol.engine.JobProgressInfo
+import org.apache.linkis.scheduler.executer.{
+  CompletedExecuteResponse,
+  ErrorExecuteResponse,
+  ExecuteResponse,
+  SuccessExecuteResponse
+}
+import org.apache.linkis.storage.domain.{Column, DataType}
+import org.apache.linkis.storage.resultset.ResultSetFactory
+import org.apache.linkis.storage.resultset.table.{TableMetaData, TableRecord}
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.metastore.api.{FieldSchema, Schema}
+import org.apache.hadoop.hive.ql.exec.Utilities
+import org.apache.hadoop.hive.ql.exec.mr.HadoopJobExecHelper
+import org.apache.hadoop.hive.ql.processors.{
+  CommandProcessor,
+  CommandProcessorFactory,
+  CommandProcessorResponse
+}
+import org.apache.hadoop.hive.ql.session.SessionState
+import org.apache.hadoop.mapred.{JobStatus, RunningJob}
+import org.apache.hadoop.security.UserGroupInformation
+
+import java.io.ByteArrayOutputStream
+import java.security.PrivilegedExceptionAction
+import java.util
+import java.util.concurrent.{
+  Callable,
+  ConcurrentHashMap,
+  LinkedBlockingQueue,
+  ThreadPoolExecutor,
+  TimeUnit
+}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder
+import org.slf4j.LoggerFactory
+
+class HiveEngineConcurrentConnExecutor(
+    id: Int,
+    sessionState: SessionState,
+    ugi: UserGroupInformation,
+    hiveConf: HiveConf,
+    baos: ByteArrayOutputStream = null
+) extends ConcurrentComputationExecutor
+    with ResourceFetchExecutor {
+
+  private val LOG = LoggerFactory.getLogger(getClass)
+
+  private val namePrefix: String = "HiveEngineExecutor_"
+
+  private val executorLabels: util.List[Label[_]] = new util.ArrayList[Label[_]]()
+
+  private val driverCache: util.Map[String, HiveDriverProxy] =
+    new ConcurrentHashMap[String, HiveDriverProxy]()
+
+  private val applicationStringName = "application"
+
+  private val splitter = "_"
+
+  private var backgroundOperationPool: ThreadPoolExecutor = _
+
+  override def init(): Unit = {
+    LOG.info(s"Ready to change engine state!")
+    if (HadoopConf.KEYTAB_PROXYUSER_ENABLED.getValue) {
+      System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
+    }
+    setCodeParser(new SQLCodeParser)
+
+    val queue = new LinkedBlockingQueue[Runnable](100)
+    backgroundOperationPool = new ThreadPoolExecutor(
+      100,
+      100,
+      10,
+      TimeUnit.SECONDS,
+      queue,
+      new ThreadFactoryBuilder().setNameFormat("Hive-Background-Pool-%d").build
+    )
+    backgroundOperationPool.allowCoreThreadTimeOut(true)
+    super.init()
+  }
+
+  override def executeLine(
+      engineExecutorContext: EngineExecutionContext,
+      code: String
+  ): ExecuteResponse = {
+    LOG.info(s"HiveEngineConcurrentConnExecutor Ready to executeLine: $code")
+    val taskId: String = engineExecutorContext.getJobId.get
+    CSHiveHelper.setContextIDInfoToHiveConf(engineExecutorContext, hiveConf)
+
+    val realCode = code.trim()
+
+    LOG.info(s"hive client begins to run hql code:\n ${realCode.trim}")
+    val jobId = JobUtils.getJobIdFromMap(engineExecutorContext.getProperties)
+    if (StringUtils.isNotBlank(jobId)) {
+      LOG.info(s"set mapreduce.job.tags=LINKIS_$jobId")
+      hiveConf.set("mapreduce.job.tags", s"LINKIS_$jobId")
+    }
+    if (realCode.trim.length > 500) {
+      engineExecutorContext.appendStdout(s"$getId >> ${realCode.trim.substring(0, 500)} ...")
+    } else engineExecutorContext.appendStdout(s"$getId >> ${realCode.trim}")
+    val tokens = realCode.trim.split("""\s+""")
+
+    val operation = new Callable[ExecuteResponse] {
+      override def call(): ExecuteResponse = {
+        SessionState.setCurrentSessionState(sessionState)
+        sessionState.setLastCommand(code)
+
+        val proc = CommandProcessorFactory.get(tokens, hiveConf)
+        LOG.debug("ugi is " + ugi.getUserName)
+        ugi.doAs(new PrivilegedExceptionAction[ExecuteResponse]() {
+          override def run(): ExecuteResponse = {
+            proc match {
+              case any if HiveDriverProxy.isDriver(any) =>
+                logger.info(s"driver is $any")
+
+                val driver = new HiveDriverProxy(any)
+                driverCache.put(taskId, driver)
+                executeHQL(
+                  engineExecutorContext.getJobId.get,
+                  engineExecutorContext,
+                  realCode,
+                  driver
+                )
+              case _ =>
+                val resp = proc.run(realCode.substring(tokens(0).length).trim)
+                val result = new String(baos.toByteArray)
+                logger.info("RESULT => {}", result)
+                engineExecutorContext.appendStdout(result)
+                baos.reset()
+                if (resp.getResponseCode != 0) {
+                  onComplete()
+                  throw resp.getException
+                }
+                onComplete()
+                SuccessExecuteResponse()
+            }
+          }
+        })
+      }
+    }
+
+    val future = backgroundOperationPool.submit(operation)
+    future.get()
+  }
+
+  def logMemoryCache(): Unit = {
+    logger.info(s"logMemoryCache running driver number: ${driverCache.size()}")
+    for (driverEntry <- driverCache.asScala) {
+      logger.info(s"running driver with taskId : ${driverEntry._1} .")
+    }
+  }
+
+  private def executeHQL(
+      taskId: String,
+      engineExecutorContext: EngineExecutionContext,
+      realCode: String,
+      driver: HiveDriverProxy
+  ): ExecuteResponse = {
+    var needRetry: Boolean = true
+    var tryCount: Int = 0
+    var hasResult: Boolean = false
+    var rows: Int = 0
+    var columnCount: Int = 0
+
+    while (needRetry) {
+      needRetry = false
+      driver.setTryCount(tryCount + 1)
+      val startTime = System.currentTimeMillis()
+
+      try {
+        val hiveResponse: CommandProcessorResponse =
+          if (!HiveDriverProxy.isIDriver(driver.getDriver())) {
+            var compileRet = -1
+            Utils.tryCatch {
+              compileRet = driver.compile(realCode)
+              logger.info(s"driver compile realCode : ${realCode} finished, status : ${compileRet}")
+              if (0 != compileRet) {
+                logger.warn(s"compile realCode : ${realCode} error status : ${compileRet}")
+                throw HiveQueryFailedException(
+                  COMPILE_HIVE_QUERY_ERROR.getErrorCode,
+                  COMPILE_HIVE_QUERY_ERROR.getErrorDesc
+                )
+              }
+
+              val queryPlan = driver.getPlan()
+              val numberOfJobs = Utilities.getMRTasks(queryPlan.getRootTasks).size
+              if (numberOfJobs > 0) {
+                engineExecutorContext.appendStdout(
+                  s"Your hive taskId: $taskId has $numberOfJobs MR jobs to do"
+                )
+              }
+
+              logger.info(s"there are ${numberOfJobs} jobs.")
+            } {
+              case e: Exception => logger.warn("obtain hive execute query plan failed,", e)
+              case t: Throwable => logger.warn("obtain hive execute query plan failed,", t)
+            }
+
+            driver.run(realCode, compileRet == 0)
+          } else {
+            driver.run(realCode)
+          }
+        if (hiveResponse.getResponseCode != 0) {
+          LOG.error("Hive query failed, response code is {}", hiveResponse.getResponseCode)
+          // todo check uncleared context ?
+          return ErrorExecuteResponse(hiveResponse.getErrorMessage, hiveResponse.getException)
+        }
+
+        engineExecutorContext.appendStdout(
+          s"Time taken: ${ByteTimeUtils.msDurationToString(System.currentTimeMillis() - startTime)}, begin to fetch results."
+        )
+        LOG.info(
+          s"$getId >> Time taken: ${ByteTimeUtils.msDurationToString(System.currentTimeMillis() - startTime)}, begin to fetch results."
+        )
+
+        val fieldSchemas =
+          if (hiveResponse.getSchema != null) hiveResponse.getSchema.getFieldSchemas
+          else if (driver.getSchema != null) {
+            driver.getSchema.getFieldSchemas
+          } else {
+            throw HiveQueryFailedException(
+              GET_FIELD_SCHEMAS_ERROR.getErrorCode,
+              GET_FIELD_SCHEMAS_ERROR.getErrorDesc
+            )
+          }
+        LOG.debug("fieldSchemas are " + fieldSchemas)
+        if (fieldSchemas == null || isNoResultSql(realCode)) {
+          // IOUtils.closeQuietly(resultSetWriter)
+          onComplete()
+
+          return SuccessExecuteResponse()
+        }
+        // get column data
+        val metaData: TableMetaData =
+          getResultMetaData(fieldSchemas, engineExecutorContext.getEnableResultsetMetaWithTableName)
+        // send result
+        rows = sendResultSet(engineExecutorContext, driver, metaData)
+        columnCount = if (fieldSchemas != null) fieldSchemas.size() else 0
+        hasResult = true
+
+      } catch {
+        case e if HiveDriverProxy.isCommandNeedRetryException(e) =>
+          tryCount += 1
+          needRetry = true
+          onComplete()
+
+          LOG.warn("Retry hive query with a different approach...")
+        case t: Throwable =>
+          LOG.error(s"query failed, reason : ", t)
+          onComplete()
+          return ErrorExecuteResponse(t.getMessage, t)
+      } finally {
+        driverCache.remove(taskId)
+        logMemoryCache()
+      }
+    }
+    if (hasResult) {
+      engineExecutorContext.appendStdout(s"Fetched  $columnCount col(s) : $rows row(s) in hive")
+      LOG.info(s"$getId >> Fetched  $columnCount col(s) : $rows row(s) in hive")
+    }
+
+    onComplete()
+    SuccessExecuteResponse()
+  }
+
+  private def sendResultSet(
+      engineExecutorContext: EngineExecutionContext,
+      driver: HiveDriverProxy,
+      metaData: TableMetaData
+  ): Int = {
+    val resultSetWriter = engineExecutorContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE)
+    resultSetWriter.addMetaData(metaData)
+    val colLength = metaData.columns.length
+    val result = new util.ArrayList[String]()
+    var rows = 0
+    while (driver.getResults(result)) {
+      val scalaResult: mutable.Buffer[String] = result.asScala
+      scalaResult foreach { s =>
+        val arr: Array[String] = s.split("\t")
+        val arrAny: ArrayBuffer[Any] = new ArrayBuffer[Any]()
+        if (arr.length > colLength) {
+          logger.error(
+            s"""There is a \t tab in the result of hive code query, hive cannot cut it, please use spark to execute(查询的结果中有\t制表符,hive不能进行切割,请使用spark执行)"""
+          )
+          throw new ErrorException(
+            60078,
+            """There is a \t tab in the result of your query, hive cannot cut it, please use spark to execute(您查询的结果中有\t制表符,hive不能进行切割,请使用spark执行)"""
+          )
+        }
+        if (arr.length == colLength) arr foreach arrAny.asJava.add
+        else if (arr.length == 0) for (i <- 1 to colLength) arrAny.asJava add ""
+        else {
+          val i = colLength - arr.length
+          arr foreach arrAny.asJava.add
+          for (i <- 1 to i) arrAny.asJava add ""
+        }
+        resultSetWriter.addRecord(new TableRecord(arrAny.toArray))
+      }
+      rows += result.size
+      result.clear()
+    }
+    engineExecutorContext.sendResultSet(resultSetWriter)
+    rows
+  }
+
+  private def getResultMetaData(
+      fieldSchemas: util.List[FieldSchema],
+      useTableName: Boolean
+  ): TableMetaData = {
+    var results: util.List[FieldSchema] = null
+    val nameSet = new mutable.HashSet[String]()
+    val cleanSchema = new util.ArrayList[FieldSchema]()
+    fieldSchemas.asScala foreach { fieldSchema =>
+      val name = fieldSchema.getName
+      if (name.split('.').length == 2) {
+        nameSet.add(name.split('.')(1))
+        cleanSchema.asScala += new FieldSchema(
+          name.split('.')(1),
+          fieldSchema.getType,
+          fieldSchema.getComment
+        )
+      }
+    }
+    if (nameSet.size < fieldSchemas.asScala.length) {
+      results = fieldSchemas
+    } else {
+      if (useTableName) {
+        results = fieldSchemas
+      } else {
+        results = cleanSchema
+      }
+    }
+
+    val columns = results.asScala
+      .map(result =>
+        Column(result.getName, DataType.toDataType(result.getType.toLowerCase()), result.getComment)
+      )
+      .toArray[Column]
+    val metaData = new TableMetaData(columns)
+    metaData
+  }
+
+  private def isNoResultSql(sql: String): Boolean = {
+    if (sql.trim.startsWith("create table") || sql.trim.startsWith("drop table")) true else false
+  }
+
+  /**
+   * Before the job is completed, all the remaining contents of the singleSqlProgressMap should be
+   * changed to success
+   */
+  private def onComplete(): Unit = {}
+
+  override def executeCompletely(
+      engineExecutorContext: EngineExecutionContext,
+      code: String,
+      completedLine: String
+  ): ExecuteResponse = {
+    val completeCode = code + completedLine
+    executeLine(engineExecutorContext, completeCode)
+  }
+
+  override def close(): Unit = {
+    killAll()
+
+    if (backgroundOperationPool != null) {
+      backgroundOperationPool.shutdown()
+      try backgroundOperationPool.awaitTermination(10, TimeUnit.SECONDS)
+      catch {
+        case e: InterruptedException =>
+          LOG.warn(
+            "HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT = " + 10 + " seconds has been exceeded. RUNNING background operations will be shut down",
+            e
+          )
+      }
+      backgroundOperationPool = null
+    }
+    super.close()
+  }
+
+  override def FetchResource: util.HashMap[String, ResourceWithStatus] = {
+    val resourceMap = new util.HashMap[String, ResourceWithStatus]()
+    HadoopJobExecHelper.runningJobs.asScala.foreach(yarnJob => {
+      val counters = yarnJob.getCounters
+      if (counters != null) {
+        val millsMap = counters.getCounter(Counters.MILLIS_MAPS)
+        val millsReduces = counters.getCounter(Counters.MILLIS_REDUCES)
+        val totalMapCores = counters.getCounter(Counters.VCORES_MILLIS_MAPS)
+        val totalReducesCores = counters.getCounter(Counters.VCORES_MILLIS_REDUCES)
+        val totalMapMBMemory = counters.getCounter(Counters.MB_MILLIS_MAPS)
+        val totalReducesMBMemory = counters.getCounter(Counters.MB_MILLIS_REDUCES)
+        var avgCores = 0
+        var avgMemory = 0L
+        if (millsMap > 0 && millsReduces > 0) {
+          avgCores = Math.ceil(totalMapCores / millsMap + totalReducesCores / millsReduces).toInt
+          avgMemory = Math
+            .ceil(
+              totalMapMBMemory * 1024 * 1024 / millsMap + totalReducesMBMemory.toLong * 1024 * 1024 / millsReduces
+            )
+            .toLong
+          val yarnResource = new ResourceWithStatus(
+            avgMemory,
+            avgCores,
+            0,
+            JobStatus.getJobRunState(yarnJob.getJobStatus.getRunState),
+            "default"
+          )
+          val applicationId =
+            applicationStringName + splitter + yarnJob.getID.getJtIdentifier + splitter + yarnJob.getID.getId
+          resourceMap.put(applicationId, yarnResource)
+        }
+      }
+    })
+    resourceMap
+  }
+
+  override def progress(taskID: String): Float = 0.0f
+
+  override def getProgressInfo(taskID: String): Array[JobProgressInfo] =
+    Array.empty[JobProgressInfo]
+
+  override def killTask(taskID: String): Unit = {
+    cleanup(taskID)
+    super.killTask(taskID)
+  }
+
+  override def getConcurrentLimit: Int = HiveEngineConfiguration.HIVE_ENGINE_CONCURRENT_LIMIT
+
+  override def killAll(): Unit = {
+    val iterator = driverCache.entrySet().iterator()
+    while (iterator.hasNext) {
+      val entry = iterator.next()
+      val taskID = entry.getKey
+      cleanup(taskID)
+
+      super.killTask(taskID)
+      iterator.remove()
+    }
+
+    sessionState.deleteTmpOutputFile()
+    sessionState.deleteTmpErrOutputFile()
+    sessionState.close()
+  }
+
+  private def cleanup(taskID: String) = {
+    val driver = driverCache.get(taskID)
+    if (driver == null) LOG.warn(s"do cleanup taskId :${taskID} driver is null.")
+    else {
+      driver.close()
+      driverCache.remove(taskID)
+    }
+
+    LOG.info(s"hive begins to kill job with id : ${taskID}")
+    // Configure the engine through the wds.linkis.hive.engine.type parameter to control the way the task is killed
+    LOG.info(s"hive engine type :${HiveEngineConfiguration.HIVE_ENGINE_TYPE}")
+    LOG.info("hive killed job successfully")
+  }
+
+  override def supportCallBackLogs(): Boolean = {
+    // todo
+    true
+  }
+
+  override def getExecutorLabels(): util.List[Label[_]] = executorLabels
+
+  override def setExecutorLabels(labels: util.List[Label[_]]): Unit = {
+    if (null != labels) {
+      executorLabels.clear()
+      executorLabels.addAll(labels)
+    }
+  }
+
+  override def requestExpectedResource(expectedResource: NodeResource): NodeResource = {
+    null
+  }
+
+  override def getCurrentNodeResource(): NodeResource = {
+    val resource = new CommonNodeResource
+    resource.setUsedResource(
+      NodeResourceUtils
+        .applyAsLoadInstanceResource(EngineConnObject.getEngineCreationContext.getOptions)
+    )
+    resource
+  }
+
+  override def getId(): String = namePrefix + id
+}
diff --git a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
index bd665cd34..5bd577333 100644
--- a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
+++ b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
@@ -98,6 +98,7 @@ class CodeLogicalUnitExecTask(parents: Array[ExecTask], children: Array[ExecTask
     if (executor.isDefined && !isCanceled) {
       val requestTask = toRequestTask
       val codeExecutor = executor.get
+
       val msg = if (codeExecutor.getEngineConnExecutor.isReuse()) {
         s"Succeed to reuse ec : ${codeExecutor.getEngineConnExecutor.getServiceInstance}"
       } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org