You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2023/04/25 04:19:03 UTC

[linkis] branch dev-1.4.0 updated: add impala engineconn-plugin (#4458)

This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.4.0
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/dev-1.4.0 by this push:
     new 265d5d3fe add impala engineconn-plugin (#4458)
265d5d3fe is described below

commit 265d5d3fe65f3533ed43c9dba67c5daa2cb210ea
Author: Knypys <si...@qq.com>
AuthorDate: Tue Apr 25 12:18:53 2023 +0800

    add impala engineconn-plugin (#4458)
    
    * add impala engine plugin
---
 .../common}/password/CommandPasswordCallback.java  |   4 +-
 .../common}/password/StaticPasswordCallback.java   |   2 +-
 .../linkis/manager/am/conf/AMConfiguration.scala   |   4 +-
 .../manager/label/entity/engine/EngineType.scala   |   2 +
 .../manager/label/entity/engine/RunType.scala      |   2 +-
 linkis-engineconn-plugins/impala/pom.xml           | 140 ++++++
 .../impala/src/main/assembly/distribution.xml      |  70 +++
 .../impala/client/ExecutionListener.java}          |  27 +-
 .../engineplugin/impala/client/ImpalaClient.java   |  77 +++
 .../impala/client/ImpalaResultSet.java}            |  43 +-
 .../client/exception/ImpalaEngineException.java    |  52 +++
 .../client/exception/ImpalaErrorCodeSummary.java   |  51 ++
 .../impala/client/protocol/ExecHandler.java        |  72 +++
 .../impala/client/protocol/ExecProgress.java}      |  33 +-
 .../impala/client/protocol/ExecStatus.java         |  72 +++
 .../impala/client/protocol/ExecSummary.java}       |  31 +-
 .../impala/client/protocol/QueryColumn.java}       |  27 +-
 .../impala/client/thrift/ImpalaThriftClient.java   | 278 +++++++++++
 .../client/thrift/ImpalaThriftExecution.java       | 186 ++++++++
 .../client/thrift/ImpalaThriftResultSetV7.java     | 361 +++++++++++++++
 .../impala/client/thrift/ImpalaThriftSession.java  | 104 +++++
 .../client/thrift/ImpalaThriftSessionFactory.java  | 196 ++++++++
 .../impala/client/util/ThriftUtil.java             |  65 +++
 .../main/resources/linkis-engineconn.properties    |  23 +
 .../impala/src/main/resources/log4j2.xml           |  89 ++++
 .../impala/ImpalaEngineConnPlugin.scala            |  66 +++
 .../ImpalaProcessEngineConnLaunchBuilder.scala}    |  29 +-
 .../impala/conf/ImpalaConfiguration.scala          |  68 +++
 .../impala/conf/ImpalaEngineConfig.scala           |  48 ++
 .../impala/executor/ImpalaEngineConnExecutor.scala | 514 +++++++++++++++++++++
 .../impala/factory/ImpalaEngineConnFactory.scala   |  44 ++
 .../executer/TestImpalaEngineConnExecutor.scala    | 144 ++++++
 linkis-engineconn-plugins/pom.xml                  |   1 +
 .../trino/executor/TrinoEngineConnExecutor.scala   |   9 +-
 pom.xml                                            |   1 +
 tool/dependencies/known-dependencies.txt           |   2 +
 36 files changed, 2839 insertions(+), 98 deletions(-)

diff --git a/linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/CommandPasswordCallback.java b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/java/org/apache/linkis/engineconn/common/password/CommandPasswordCallback.java
similarity index 93%
rename from linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/CommandPasswordCallback.java
rename to linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/java/org/apache/linkis/engineconn/common/password/CommandPasswordCallback.java
index c26e0790c..8956f9842 100644
--- a/linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/CommandPasswordCallback.java
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/java/org/apache/linkis/engineconn/common/password/CommandPasswordCallback.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.engineplugin.trino.password;
+package org.apache.linkis.engineconn.common.password;
 
 import org.apache.commons.io.IOUtils;
 
@@ -65,7 +65,7 @@ public class CommandPasswordCallback extends PasswordCallback {
     } catch (RuntimeException e) {
       throw e;
     } catch (Exception e) {
-      throw new RuntimeException("Failed to authenticate with command: " + prompt, e);
+      throw new RuntimeException("Failed to get password by command: " + prompt, e);
     } finally {
       if (process != null) {
         process.destroy();
diff --git a/linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/java/org/apache/linkis/engineconn/common/password/StaticPasswordCallback.java
similarity index 95%
copy from linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java
copy to linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/java/org/apache/linkis/engineconn/common/password/StaticPasswordCallback.java
index 6d198bbae..f7b1cc4ba 100644
--- a/linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/java/org/apache/linkis/engineconn/common/password/StaticPasswordCallback.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.engineplugin.trino.password;
+package org.apache.linkis.engineconn.common.password;
 
 import javax.security.auth.callback.PasswordCallback;
 
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/conf/AMConfiguration.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/conf/AMConfiguration.scala
index c730edf0f..acc5afc25 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/conf/AMConfiguration.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/conf/AMConfiguration.scala
@@ -17,7 +17,7 @@
 
 package org.apache.linkis.manager.am.conf
 
-import org.apache.linkis.common.conf.{CommonVars, Configuration, TimeType}
+import org.apache.linkis.common.conf.{CommonVars, TimeType}
 import org.apache.linkis.common.utils.Utils
 import org.apache.linkis.manager.common.entity.enumeration.MaintainType
 
@@ -57,7 +57,7 @@ object AMConfiguration {
 
   val MULTI_USER_ENGINE_TYPES = CommonVars(
     "wds.linkis.multi.user.engine.types",
-    "jdbc,es,presto,io_file,appconn,openlookeng,trino"
+    "jdbc,es,presto,io_file,appconn,openlookeng,trino,impala"
   )
 
   val ALLOW_BATCH_KILL_ENGINE_TYPES =
diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineType.scala b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineType.scala
index 47cf78b7f..d47bb8ec3 100644
--- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineType.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineType.scala
@@ -61,6 +61,8 @@ object EngineType extends Enumeration with Logging {
 
   val SEATUNNEL = Value("seatunnel")
 
+  val IMPALA = Value("impala")
+
   def mapFsTypeToEngineType(fsType: String): String = {
     fsType match {
       case "file" =>
diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
index f8ba133f3..0dee150d2 100644
--- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
@@ -43,11 +43,11 @@ object RunType extends Enumeration {
 
   val TRINO_SQL = Value("tsql")
 
-
   val SEATUNNEL_FLINK_SQL = Value("sfsql")
   val SEATUNNEL_FLINK = Value("sflink")
   val SEATUNNEL_SPARK = Value("sspark")
 
   val DATA_CALC = Value("data_calc") // spark datacalc (ETL)
 
+  val IMPALA_SQL = Value("isql")
 }
diff --git a/linkis-engineconn-plugins/impala/pom.xml b/linkis-engineconn-plugins/impala/pom.xml
new file mode 100644
index 000000000..1557d3ef1
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/pom.xml
@@ -0,0 +1,140 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~ 
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~ 
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.linkis</groupId>
+    <artifactId>linkis</artifactId>
+    <version>${revision}</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>linkis-engineplugin-impala</artifactId>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.linkis</groupId>
+      <artifactId>linkis-engineconn-plugin-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.linkis</groupId>
+      <artifactId>linkis-computation-engineconn</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.linkis</groupId>
+      <artifactId>linkis-storage</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.linkis</groupId>
+      <artifactId>linkis-common</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.linkis</groupId>
+      <artifactId>linkis-rpc</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.thrift</groupId>
+      <artifactId>libthrift</artifactId>
+      <version>0.15.0</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <!-- impala -->
+    <dependency>
+      <groupId>org.apache.impala</groupId>
+      <artifactId>impala-frontend</artifactId>
+      <version>${impala.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.impala</groupId>
+      <artifactId>impala-minimal-hive-exec</artifactId>
+      <version>${impala.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+  </dependencies>
+
+  <repositories>
+    <repository>
+      <id>cloudera.releases.https</id>
+      <name>Cloudera Repository</name>
+      <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
+    </repository>
+  </repositories>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>net.alchim31.maven</groupId>
+        <artifactId>scala-maven-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <inherited>false</inherited>
+        <configuration>
+          <skipAssembly>false</skipAssembly>
+          <finalName>out</finalName>
+          <appendAssemblyId>false</appendAssemblyId>
+          <attach>false</attach>
+          <descriptors>
+            <descriptor>src/main/assembly/distribution.xml</descriptor>
+          </descriptors>
+        </configuration>
+        <executions>
+          <execution>
+            <id>make-assembly</id>
+            <goals>
+              <goal>single</goal>
+            </goals>
+            <phase>package</phase>
+            <configuration>
+              <descriptors>
+                <descriptor>src/main/assembly/distribution.xml</descriptor>
+              </descriptors>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/linkis-engineconn-plugins/impala/src/main/assembly/distribution.xml b/linkis-engineconn-plugins/impala/src/main/assembly/distribution.xml
new file mode 100644
index 000000000..5462a670d
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/main/assembly/distribution.xml
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.1.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+          xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.1.1 https://maven.apache.org/xsd/assembly-2.1.1.xsd">
+    <id>linkis-manager-enginePlugin-impala</id>
+    <formats>
+        <format>dir</format>
+        <format>zip</format>
+    </formats>
+    <includeBaseDirectory>true</includeBaseDirectory>
+    <baseDirectory>impala</baseDirectory>
+
+    <dependencySets>
+        <dependencySet>
+            <!-- Enable access to all projects in the current multimodule build! <useAllReactorProjects>true</useAllReactorProjects> -->
+            <!-- Now, select which projects to include in this module-set. -->
+            <outputDirectory>/dist/v${impala.version}/lib</outputDirectory>
+            <useProjectArtifact>true</useProjectArtifact>
+            <useTransitiveDependencies>true</useTransitiveDependencies>
+            <unpack>false</unpack>
+            <useStrictFiltering>false</useStrictFiltering>
+            <useTransitiveFiltering>true</useTransitiveFiltering>
+
+        </dependencySet>
+    </dependencySets>
+
+    <fileSets>
+        <fileSet>
+            <directory>${basedir}/src/main/resources</directory>
+            <includes>
+                <include>linkis-engineconn.properties</include>
+                <include>log4j2.xml</include>
+            </includes>
+            <fileMode>0777</fileMode>
+            <outputDirectory>/dist/v${impala.version}/conf</outputDirectory>
+            <lineEnding>unix</lineEnding>
+        </fileSet>
+
+        <fileSet>
+            <directory>${basedir}/target</directory>
+            <includes>
+                <include>*.jar</include>
+            </includes>
+            <excludes>
+                <exclude>*doc.jar</exclude>
+            </excludes>
+            <fileMode>0777</fileMode>
+            <outputDirectory>/plugin/${impala.version}</outputDirectory>
+        </fileSet>
+
+    </fileSets>
+
+</assembly>
+
diff --git a/linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/ExecutionListener.java
similarity index 60%
copy from linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java
copy to linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/ExecutionListener.java
index 6d198bbae..c34c6b092 100644
--- a/linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java
+++ b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/ExecutionListener.java
@@ -15,26 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.engineplugin.trino.password;
+package org.apache.linkis.engineplugin.impala.client;
 
-import javax.security.auth.callback.PasswordCallback;
+import org.apache.linkis.engineplugin.impala.client.protocol.ExecProgress;
+import org.apache.linkis.engineplugin.impala.client.protocol.ExecStatus;
 
-public class StaticPasswordCallback extends PasswordCallback {
+import java.util.List;
 
-  private final char[] password;
+public interface ExecutionListener {
+  void created(String queryId);
 
-  public StaticPasswordCallback(String prompt, boolean echoOn) {
-    super(prompt, echoOn);
-    this.password = prompt.toCharArray();
-  }
+  void success(ImpalaResultSet resultSet);
 
-  public StaticPasswordCallback(String prompt) {
-    super(prompt, false);
-    this.password = prompt.toCharArray();
-  }
+  void error(ExecStatus status);
 
-  @Override
-  public char[] getPassword() {
-    return password;
-  }
+  void progress(ExecProgress progress);
+
+  void message(List<String> messages);
 }
diff --git a/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/ImpalaClient.java b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/ImpalaClient.java
new file mode 100644
index 000000000..1f7bc93e7
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/ImpalaClient.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.impala.client;
+
+import org.apache.linkis.engineplugin.impala.client.exception.ImpalaEngineException;
+import org.apache.linkis.engineplugin.impala.client.protocol.ExecProgress;
+import org.apache.linkis.engineplugin.impala.client.protocol.ExecStatus;
+import org.apache.linkis.engineplugin.impala.client.protocol.ExecSummary;
+
+import java.util.Map;
+
+public interface ImpalaClient extends AutoCloseable {
+
+  /**
+   * execute async with listener
+   *
+   * @return queryId
+   */
+  default String executeAsync(String sql, ExecutionListener executionListener)
+      throws ImpalaEngineException {
+    return executeAsync(sql, executionListener, null);
+  }
+
+  /**
+   * execute async with listener and options
+   *
+   * @return queryId
+   */
+  String executeAsync(
+      String sql, ExecutionListener executionListener, Map<String, String> queryOptions)
+      throws ImpalaEngineException;
+
+  /** execute with listener */
+  default void execute(String sql, ExecutionListener executionListener)
+      throws ImpalaEngineException, InterruptedException {
+    execute(sql, executionListener, null);
+  }
+
+  /** execute with listener and options */
+  void execute(String sql, ExecutionListener executionListener, Map<String, String> queryOptions)
+      throws ImpalaEngineException, InterruptedException;
+
+  /** cancel execution */
+  void cancel(String queryId) throws ImpalaEngineException;
+
+  /** get execution summary */
+  ExecSummary getExecSummary(String queryId) throws ImpalaEngineException;
+
+  /** get execution summary progress */
+  ExecProgress getExecProgress(String queryId) throws ImpalaEngineException;
+
+  /** get execution status */
+  ExecStatus getExecStatus(String queryId) throws ImpalaEngineException;
+
+  void setQueryOption(String key, String value) throws ImpalaEngineException;
+
+  Map<String, String> getQueryOptions() throws ImpalaEngineException;
+
+  void unsetQueryOption(String key) throws ImpalaEngineException;
+
+  int getRunningExecutionCount();
+}
diff --git a/linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/ImpalaResultSet.java
similarity index 55%
copy from linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java
copy to linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/ImpalaResultSet.java
index 6d198bbae..9390b7481 100644
--- a/linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java
+++ b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/ImpalaResultSet.java
@@ -15,26 +15,41 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.engineplugin.trino.password;
+package org.apache.linkis.engineplugin.impala.client;
 
-import javax.security.auth.callback.PasswordCallback;
+import org.apache.hive.service.rpc.thrift.TTypeDesc;
 
-public class StaticPasswordCallback extends PasswordCallback {
+import java.util.List;
 
-  private final char[] password;
+public interface ImpalaResultSet extends AutoCloseable {
 
-  public StaticPasswordCallback(String prompt, boolean echoOn) {
-    super(prompt, echoOn);
-    this.password = prompt.toCharArray();
-  }
+  Row next();
+
+  int getColumnSize();
+
+  List<? extends Column> getColumns();
+
+  interface Row {
+    Object[] getValues();
+
+    Object getObject(int columnIndex);
+
+    <T> T getObject(int columnIndex, Class<T> clasz);
 
-  public StaticPasswordCallback(String prompt) {
-    super(prompt, false);
-    this.password = prompt.toCharArray();
+    String getString(int columnIndex);
+
+    Short getShort(int columnIndex);
+
+    Integer getInteger(int columnIndex);
+
+    Long getLong(int columnIndex);
   }
 
-  @Override
-  public char[] getPassword() {
-    return password;
+  interface Column {
+    int getIndex();
+
+    String getName();
+
+    TTypeDesc getType();
   }
 }
diff --git a/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/exception/ImpalaEngineException.java b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/exception/ImpalaEngineException.java
new file mode 100644
index 000000000..f2ec47f7b
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/exception/ImpalaEngineException.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.impala.client.exception;
+
+public class ImpalaEngineException extends RuntimeException {
+
+  /** */
+  private static final long serialVersionUID = 5045965784519089392L;
+
+  public ImpalaEngineException(String message) {
+    super(message);
+  }
+
+  public ImpalaEngineException(Exception exception) {
+    super(exception);
+  }
+
+  public ImpalaEngineException(String message, Exception exception) {
+    super(message, exception);
+  }
+
+  public static ImpalaEngineException of(ImpalaErrorCodeSummary code) {
+    return new ImpalaEngineException(code.getErrorDesc());
+  }
+
+  public static ImpalaEngineException of(ImpalaErrorCodeSummary code, String massage) {
+    return new ImpalaEngineException(String.format("%s, %s", code.getErrorDesc(), massage));
+  }
+
+  public static ImpalaEngineException of(ImpalaErrorCodeSummary code, Exception exception) {
+    return new ImpalaEngineException(code.getErrorDesc(), exception);
+  }
+
+  public static ImpalaEngineException of(Exception exception) {
+    return new ImpalaEngineException(exception);
+  }
+}
diff --git a/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/exception/ImpalaErrorCodeSummary.java b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/exception/ImpalaErrorCodeSummary.java
new file mode 100644
index 000000000..1511130bb
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/exception/ImpalaErrorCodeSummary.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.impala.client.exception;
+
+import org.apache.linkis.common.errorcode.ErrorCodeUtils;
+import org.apache.linkis.common.errorcode.LinkisErrorCode;
+
+public enum ImpalaErrorCodeSummary implements LinkisErrorCode {
+  ClosedError(29040, "session is closed(当前会话已断开)"),
+  ExecutionError(29041, "server report an error(执行失败)"),
+  RequestError(29042, "failed to send request to server(请求提交失败)"),
+  StillRunningError(29043, "target is still running(任务正在运行中)"),
+  InvalidHandleError(29044, "current handle is invalid(当前会话不存在)"),
+  ParallelLimitError(29045, "reach the parallel limit(当前已达到最大并行度配置)"),
+  LoginError(29046, "failed to login to target server(服务器认证失败)");
+
+  private final int errorCode;
+
+  private final String errorDesc;
+
+  ImpalaErrorCodeSummary(int errorCode, String errorDesc) {
+    ErrorCodeUtils.validateErrorCode(errorCode, 29000, 29999);
+    this.errorCode = errorCode;
+    this.errorDesc = errorDesc;
+  }
+
+  @Override
+  public int getErrorCode() {
+    return errorCode;
+  }
+
+  @Override
+  public String getErrorDesc() {
+    return errorDesc;
+  }
+}
diff --git a/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/protocol/ExecHandler.java b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/protocol/ExecHandler.java
new file mode 100644
index 000000000..430c3bc99
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/protocol/ExecHandler.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.impala.client.protocol;
+
+import org.apache.linkis.engineplugin.impala.client.ExecutionListener;
+
+public abstract class ExecHandler<T> implements AutoCloseable {
+  private String queryId;
+  private T handler;
+  private ExecutionListener executionListener = null;
+  private int errors;
+
+  private boolean isQueued;
+
+  public ExecHandler() {}
+
+  public ExecHandler(String queryId, T handler, ExecutionListener executionListener) {
+    this.queryId = queryId;
+    this.handler = handler;
+
+    if (executionListener != null) {
+      this.executionListener = executionListener;
+      executionListener.created(queryId);
+    }
+
+    this.errors = 0;
+    this.isQueued = false;
+  }
+
+  public int markError() {
+    return ++errors;
+  }
+
+  public String getQueryId() {
+    return queryId;
+  }
+
+  public T getHandler() {
+    return handler;
+  }
+
+  public ExecutionListener getResultListener() {
+    return executionListener;
+  }
+
+  public int getErrors() {
+    return errors;
+  }
+
+  public boolean isQueued() {
+    return isQueued;
+  }
+
+  public void setQueued(boolean queued) {
+    isQueued = queued;
+  }
+}
diff --git a/linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/protocol/ExecProgress.java
similarity index 52%
copy from linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java
copy to linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/protocol/ExecProgress.java
index 6d198bbae..078889624 100644
--- a/linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java
+++ b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/protocol/ExecProgress.java
@@ -15,26 +15,33 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.engineplugin.trino.password;
+package org.apache.linkis.engineplugin.impala.client.protocol;
 
-import javax.security.auth.callback.PasswordCallback;
+public class ExecProgress {
+  public static final ExecProgress DEFAULT_PROGRESS = new ExecProgress(-1, 0, -1);
 
-public class StaticPasswordCallback extends PasswordCallback {
+  private long totalScanRanges;
+  private long completedScanRanges;
+  private int runningNodes;
 
-  private final char[] password;
+  public ExecProgress() {}
 
-  public StaticPasswordCallback(String prompt, boolean echoOn) {
-    super(prompt, echoOn);
-    this.password = prompt.toCharArray();
+  public ExecProgress(long totalScanRanges, long completedScanRanges, int runningNodes) {
+    super();
+    this.totalScanRanges = totalScanRanges;
+    this.completedScanRanges = completedScanRanges;
+    this.runningNodes = runningNodes;
   }
 
-  public StaticPasswordCallback(String prompt) {
-    super(prompt, false);
-    this.password = prompt.toCharArray();
+  public long getTotalScanRanges() {
+    return totalScanRanges;
   }
 
-  @Override
-  public char[] getPassword() {
-    return password;
+  public long getCompletedScanRanges() {
+    return completedScanRanges;
+  }
+
+  public int getRunningNodes() {
+    return runningNodes;
   }
 }
diff --git a/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/protocol/ExecStatus.java b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/protocol/ExecStatus.java
new file mode 100644
index 000000000..26f6dc824
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/protocol/ExecStatus.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.impala.client.protocol;
+
+import org.apache.hive.service.rpc.thrift.TOperationState;
+
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
+
+public class ExecStatus {
+  private static final Set<Integer> ACTIVE_STATE =
+      ImmutableSet.of(
+          TOperationState.INITIALIZED_STATE.getValue(),
+          TOperationState.RUNNING_STATE.getValue(),
+          TOperationState.PENDING_STATE.getValue());
+
+  private static final Set<Integer> ERROR_STATE =
+      ImmutableSet.of(
+          TOperationState.ERROR_STATE.getValue(),
+          TOperationState.CLOSED_STATE.getValue(),
+          TOperationState.CANCELED_STATE.getValue(),
+          TOperationState.UKNOWN_STATE.getValue());
+
+  private int code;
+  private String name;
+  private String errorMessage;
+
+  public ExecStatus() {}
+
+  public ExecStatus(int code, String name, String errorMessage) {
+    super();
+    this.code = code;
+    this.name = name;
+    this.errorMessage = errorMessage;
+  }
+
+  public int getCode() {
+    return code;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public String getErrorMessage() {
+    return errorMessage;
+  }
+
+  public boolean isActive() {
+    return ACTIVE_STATE.contains(code);
+  }
+
+  public boolean hasError() {
+    return ERROR_STATE.contains(code);
+  }
+}
diff --git a/linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/protocol/ExecSummary.java
similarity index 60%
copy from linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java
copy to linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/protocol/ExecSummary.java
index 6d198bbae..9ff786a5d 100644
--- a/linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java
+++ b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/protocol/ExecSummary.java
@@ -15,26 +15,31 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.engineplugin.trino.password;
+package org.apache.linkis.engineplugin.impala.client.protocol;
 
-import javax.security.auth.callback.PasswordCallback;
+public class ExecSummary {
+  private ExecStatus status;
+  private ExecProgress progress;
+  private int nodeNum;
 
-public class StaticPasswordCallback extends PasswordCallback {
+  public ExecSummary() {}
 
-  private final char[] password;
+  public ExecSummary(ExecStatus status, ExecProgress progress, int nodeNum) {
+    super();
+    this.status = status;
+    this.progress = progress;
+    this.nodeNum = nodeNum;
+  }
 
-  public StaticPasswordCallback(String prompt, boolean echoOn) {
-    super(prompt, echoOn);
-    this.password = prompt.toCharArray();
+  public ExecStatus getStatus() {
+    return status;
   }
 
-  public StaticPasswordCallback(String prompt) {
-    super(prompt, false);
-    this.password = prompt.toCharArray();
+  public ExecProgress getProgress() {
+    return progress;
   }
 
-  @Override
-  public char[] getPassword() {
-    return password;
+  public int getNodeNum() {
+    return nodeNum;
   }
 }
diff --git a/linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/protocol/QueryColumn.java
similarity index 61%
copy from linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java
copy to linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/protocol/QueryColumn.java
index 6d198bbae..9a0fc2231 100644
--- a/linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java
+++ b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/protocol/QueryColumn.java
@@ -15,26 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.engineplugin.trino.password;
+package org.apache.linkis.engineplugin.impala.client.protocol;
 
-import javax.security.auth.callback.PasswordCallback;
+public class QueryColumn {
+  private String label;
+  private int index;
 
-public class StaticPasswordCallback extends PasswordCallback {
+  public QueryColumn() {}
 
-  private final char[] password;
-
-  public StaticPasswordCallback(String prompt, boolean echoOn) {
-    super(prompt, echoOn);
-    this.password = prompt.toCharArray();
+  public QueryColumn(String label, int index) {
+    super();
+    this.label = label;
+    this.index = index;
   }
 
-  public StaticPasswordCallback(String prompt) {
-    super(prompt, false);
-    this.password = prompt.toCharArray();
+  public String getLabel() {
+    return label;
   }
 
-  @Override
-  public char[] getPassword() {
-    return password;
+  public int getIndex() {
+    return index;
   }
 }
diff --git a/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftClient.java b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftClient.java
new file mode 100644
index 000000000..63d363fa9
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftClient.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.impala.client.thrift;
+
+import org.apache.linkis.engineplugin.impala.client.ExecutionListener;
+import org.apache.linkis.engineplugin.impala.client.ImpalaClient;
+import org.apache.linkis.engineplugin.impala.client.exception.ImpalaEngineException;
+import org.apache.linkis.engineplugin.impala.client.exception.ImpalaErrorCodeSummary;
+import org.apache.linkis.engineplugin.impala.client.protocol.ExecProgress;
+import org.apache.linkis.engineplugin.impala.client.protocol.ExecStatus;
+import org.apache.linkis.engineplugin.impala.client.protocol.ExecSummary;
+import org.apache.linkis.engineplugin.impala.client.util.ThriftUtil;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hive.service.rpc.thrift.TExecuteStatementReq;
+import org.apache.hive.service.rpc.thrift.TExecuteStatementResp;
+import org.apache.hive.service.rpc.thrift.TOperationHandle;
+import org.apache.hive.service.rpc.thrift.TSessionHandle;
+import org.apache.impala.thrift.ImpalaHiveServer2Service;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ImpalaThriftClient extends TimerTask implements ImpalaClient {
+
+  public static final Logger LOG = LoggerFactory.getLogger(ImpalaThriftClient.class.getName());
+
+  private final Map<String, String> queryOptions;
+  private final Map<String, ImpalaThriftExecution> executions;
+  private final ImpalaThriftSessionFactory sessionFactory;
+  private final ScheduledExecutorService executorService;
+  private final long heartBeatsMillis;
+
+  private int batchSize = 1000;
+  private long queryTimeoutMillis = 3600 * 1000;
+  private volatile boolean closed;
+
+  public ImpalaThriftClient(ImpalaThriftSessionFactory sessionFactory, int heartBeatsInSeconds) {
+    this.sessionFactory = sessionFactory;
+    this.executions = new ConcurrentHashMap<>();
+    this.queryOptions = new ConcurrentHashMap<>();
+    this.closed = false;
+
+    this.executorService = Executors.newSingleThreadScheduledExecutor();
+
+    this.heartBeatsMillis = Math.max(heartBeatsInSeconds, 1);
+    executorService.schedule(this, heartBeatsInSeconds, TimeUnit.SECONDS);
+  }
+
+  public void setBatchSize(int batchSize) {
+    this.batchSize = batchSize;
+  }
+
+  public void setQueryTimeoutInSeconds(int queryTimeout) {
+    this.queryTimeoutMillis = 1000L * queryTimeout;
+  }
+
+  @Override
+  public synchronized void close() {
+    closed = true;
+    executorService.shutdownNow();
+    for (ImpalaThriftExecution execution : executions.values()) {
+      execution.cancel();
+    }
+  }
+
+  private ImpalaThriftExecution submit(
+      String sql,
+      Map<String, String> queryOptions,
+      ExecutionListener executionListener,
+      boolean sync)
+      throws TException, ImpalaEngineException, InterruptedException, IOException {
+    if (closed) {
+      throw ImpalaEngineException.of(ImpalaErrorCodeSummary.ClosedError);
+    }
+
+    ImpalaThriftSession impalaSession = sessionFactory.openSession();
+
+    TSessionHandle session = impalaSession.session();
+    ImpalaHiveServer2Service.Client client = impalaSession.client();
+
+    TExecuteStatementReq req = new TExecuteStatementReq(session, sql);
+    req.setRunAsync(false);
+
+    Map<String, String> options = new TreeMap<>(this.queryOptions);
+    if (queryOptions != null && queryOptions.size() > 0) {
+      options.putAll(queryOptions);
+    }
+    req.setConfOverlay(options);
+
+    TExecuteStatementResp res = client.ExecuteStatement(req);
+    ThriftUtil.checkStatus(res.getStatus(), executionListener);
+
+    TOperationHandle operation = res.getOperationHandle();
+    if (operation == null) {
+      throw ImpalaEngineException.of(ImpalaErrorCodeSummary.RequestError);
+    }
+
+    String queryId = ThriftUtil.convertUniqueId(operation.getOperationId().getGuid());
+    return new ImpalaThriftExecution(impalaSession, operation, queryId, executionListener, sync);
+  }
+
+  private boolean progress(ImpalaThriftExecution execution) {
+    try {
+      ExecSummary summary = execution.getExecSummary();
+
+      ExecutionListener listener = execution.getListener();
+      ExecStatus status = summary.getStatus();
+      if (status.isActive()) {
+        /* executing progress */
+        if (listener != null && summary.getProgress() != null) {
+          listener.progress(summary.getProgress());
+        }
+
+        /* check execution timeout */
+        return queryTimeoutMillis <= 0
+            || System.currentTimeMillis() - execution.getTimestamp() < queryTimeoutMillis;
+      }
+
+      if (status.hasError()) {
+        if (listener != null) {
+          listener.error(summary.getStatus());
+        }
+      } else {
+        execution.tryFetchResult(batchSize);
+      }
+      return false;
+    } catch (ImpalaEngineException | TException e) {
+      LOG.warn("Progress failed for: {}", execution.getQueryId(), e);
+      if (execution.errorIncrement() >= 3) {
+        return false;
+      }
+
+      /* check execution timeout */
+      return queryTimeoutMillis <= 0
+          || System.currentTimeMillis() - execution.getTimestamp() < queryTimeoutMillis;
+    }
+  }
+
+  @Override
+  public void execute(
+      String sql, ExecutionListener executionListener, Map<String, String> queryOptions)
+      throws ImpalaEngineException, InterruptedException {
+    try (ImpalaThriftExecution execution = submit(sql, queryOptions, executionListener, true)) {
+      executions.put(execution.getQueryId(), execution);
+      while (progress(execution)) {
+        Thread.sleep(heartBeatsMillis);
+      }
+    } catch (IOException | TException e) {
+      throw ImpalaEngineException.of(ImpalaErrorCodeSummary.RequestError, e);
+    }
+  }
+
+  @Override
+  public String executeAsync(
+      String sql, ExecutionListener executionListener, Map<String, String> queryOptions)
+      throws ImpalaEngineException {
+    try {
+      ImpalaThriftExecution execution = submit(sql, queryOptions, executionListener, false);
+      String queryId = execution.getQueryId();
+      executions.put(queryId, execution);
+      return queryId;
+    } catch (TException | InterruptedException | IOException e) {
+      throw ImpalaEngineException.of(ImpalaErrorCodeSummary.RequestError, e);
+    }
+  }
+
+  @Override
+  public void cancel(String queryId) {
+    cancel(executions.get(queryId));
+  }
+
+  private void cancel(ImpalaThriftExecution execution) {
+    if (execution != null) {
+      execution.cancel();
+    }
+  }
+
+  @Override
+  public ExecSummary getExecSummary(String queryId) throws ImpalaEngineException {
+    ImpalaThriftExecution execution = executions.get(queryId);
+    if (execution != null) {
+      try {
+        return execution.getExecSummary();
+      } catch (TException e) {
+        throw ImpalaEngineException.of(ImpalaErrorCodeSummary.RequestError, e);
+      }
+    }
+    throw new IllegalArgumentException("query not found: " + queryId);
+  }
+
+  @Override
+  public ExecProgress getExecProgress(String queryId) throws ImpalaEngineException {
+    ExecSummary summary = getExecSummary(queryId);
+    if (summary != null) {
+      return summary.getProgress();
+    }
+    return null;
+  }
+
+  @Override
+  public ExecStatus getExecStatus(String queryId) throws ImpalaEngineException {
+    ImpalaThriftExecution execution = executions.get(queryId);
+    if (execution != null) {
+      try {
+        return execution.getExecStatus();
+      } catch (TException e) {
+        e.printStackTrace();
+      }
+    }
+    throw new IllegalArgumentException("query not found: " + queryId);
+  }
+
+  @Override
+  public void setQueryOption(String key, String value) {
+    if (StringUtils.isNotBlank(value)) {
+      queryOptions.put(key, value);
+    }
+  }
+
+  @Override
+  public void unsetQueryOption(String key) {
+    queryOptions.remove(key);
+  }
+
+  @Override
+  public void run() {
+    final ArrayList<ImpalaThriftExecution> runningExecutions = new ArrayList<>(executions.values());
+    LOG.debug("Impala client heart beats, {} running executions", runningExecutions.size());
+    for (ImpalaThriftExecution execution : runningExecutions) {
+      /* sync execution, skipping */
+      if (execution.isSync()) {
+        continue;
+      }
+
+      if (execution.isClosed() || !progress(execution)) {
+        executions.remove(execution.getQueryId());
+      }
+    }
+  }
+
+  @Override
+  public int getRunningExecutionCount() {
+    return sessionFactory.openedSessionCount();
+  }
+
+  @Override
+  public Map<String, String> getQueryOptions() {
+    return ImmutableMap.copyOf(queryOptions);
+  }
+}
diff --git a/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftExecution.java b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftExecution.java
new file mode 100644
index 000000000..a8c3dde24
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftExecution.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.impala.client.thrift;
+
+import org.apache.linkis.engineplugin.impala.client.ExecutionListener;
+import org.apache.linkis.engineplugin.impala.client.exception.ImpalaEngineException;
+import org.apache.linkis.engineplugin.impala.client.protocol.ExecProgress;
+import org.apache.linkis.engineplugin.impala.client.protocol.ExecStatus;
+import org.apache.linkis.engineplugin.impala.client.protocol.ExecSummary;
+import org.apache.linkis.engineplugin.impala.client.util.ThriftUtil;
+
+import org.apache.hive.service.rpc.thrift.TCancelOperationReq;
+import org.apache.hive.service.rpc.thrift.TCloseOperationReq;
+import org.apache.hive.service.rpc.thrift.TGetOperationStatusReq;
+import org.apache.hive.service.rpc.thrift.TGetOperationStatusResp;
+import org.apache.hive.service.rpc.thrift.TOperationHandle;
+import org.apache.hive.service.rpc.thrift.TOperationState;
+import org.apache.impala.thrift.TExecProgress;
+import org.apache.impala.thrift.TExecSummary;
+import org.apache.impala.thrift.TGetExecSummaryReq;
+import org.apache.impala.thrift.TGetExecSummaryResp;
+import org.apache.thrift.TException;
+
+import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ImpalaThriftExecution implements AutoCloseable {
+  public static final Logger LOG = LoggerFactory.getLogger(ImpalaThriftExecution.class.getName());
+
+  private final ImpalaThriftSession impalaSession;
+  private final TOperationHandle operation;
+  private final String queryId;
+  private final ExecutionListener listener;
+  private final boolean sync;
+
+  private long timestamp;
+  private int errors;
+
+  private volatile boolean closed;
+
+  public ImpalaThriftExecution(
+      ImpalaThriftSession impalaSession,
+      TOperationHandle operation,
+      String queryId,
+      ExecutionListener listener,
+      boolean sync) {
+    this.impalaSession = impalaSession;
+    this.operation = operation;
+    this.queryId = queryId;
+    this.listener = listener;
+    this.sync = sync;
+
+    if (listener != null) {
+      listener.created(queryId);
+    }
+
+    this.timestamp = System.currentTimeMillis();
+    this.errors = 0;
+  }
+
+  public ExecutionListener getListener() {
+    return listener;
+  }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  public void tryFetchResult(int batchSize) {
+    if (operation.isHasResultSet()) {
+      try (ImpalaThriftResultSetV7 resultSetV7 =
+          new ImpalaThriftResultSetV7(impalaSession.client(), operation, batchSize)) {
+        if (listener != null) {
+          listener.success(resultSetV7);
+        }
+      }
+    }
+  }
+
+  public synchronized int errorIncrement() {
+    return ++errors;
+  }
+
+  public String getQueryId() {
+    return queryId;
+  }
+
+  @Override
+  public synchronized void close() {
+    if (closed) {
+      return;
+    }
+    closed = true;
+
+    try {
+      TCloseOperationReq closeReq = new TCloseOperationReq();
+      closeReq.setOperationHandle(operation);
+      impalaSession.client().CloseOperation(closeReq);
+    } catch (Exception e) {
+      LOG.error("Failed to close the operation", e);
+    } finally {
+      impalaSession.close();
+    }
+  }
+
+  public synchronized boolean cancel() {
+    if (closed) {
+      return false;
+    }
+
+    try {
+      TCancelOperationReq cancelReq = new TCancelOperationReq();
+      cancelReq.setOperationHandle(operation);
+      impalaSession.client().CancelOperation(cancelReq);
+    } catch (Exception e) {
+      LOG.error("Failed to safely cancel the query", e);
+    }
+    close();
+    return true;
+  }
+
+  public boolean isClosed() {
+    return closed;
+  }
+
+  public synchronized ExecStatus getExecStatus() throws TException, ImpalaEngineException {
+    TGetOperationStatusReq statusReq = new TGetOperationStatusReq(operation);
+    TGetOperationStatusResp statusResp = impalaSession.client().GetOperationStatus(statusReq);
+    ThriftUtil.checkStatus(statusResp.getStatus());
+
+    TOperationState operationState = statusResp.getOperationState();
+
+    return new ExecStatus(
+        operationState.getValue(), operationState.name(), statusResp.getErrorMessage());
+  }
+
+  public synchronized ExecSummary getExecSummary() throws TException, ImpalaEngineException {
+    ExecProgress progress = ExecProgress.DEFAULT_PROGRESS;
+    int nodeNum = -1;
+
+    ExecStatus status = getExecStatus();
+    if (status.isActive()) {
+      TGetExecSummaryReq getExecSummaryReq = new TGetExecSummaryReq();
+      getExecSummaryReq.setOperationHandle(operation);
+      getExecSummaryReq.setSessionHandle(impalaSession.session());
+      TGetExecSummaryResp execSummaryResp =
+          impalaSession.client().GetExecSummary(getExecSummaryReq);
+
+      ThriftUtil.checkStatus(execSummaryResp.getStatus());
+      TExecSummary summary = execSummaryResp.getSummary();
+      nodeNum = summary.getNodesSize();
+      if (summary.isIs_queued()) {
+        if (listener != null) {
+          listener.message(Lists.newArrayList(summary.getQueued_reason()));
+        }
+      } else {
+        TExecProgress p = summary.getProgress();
+        if (p != null) {
+          progress = new ExecProgress(p.total_scan_ranges, p.num_completed_scan_ranges, nodeNum);
+        }
+      }
+    }
+
+    return new ExecSummary(status, progress, nodeNum);
+  }
+
+  public boolean isSync() {
+    return sync;
+  }
+}
diff --git a/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftResultSetV7.java b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftResultSetV7.java
new file mode 100644
index 000000000..28751f5b4
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftResultSetV7.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.impala.client.thrift;
+
+import org.apache.linkis.engineplugin.impala.client.ImpalaResultSet;
+import org.apache.linkis.engineplugin.impala.client.util.ThriftUtil;
+
+import org.apache.hive.service.rpc.thrift.TBinaryColumn;
+import org.apache.hive.service.rpc.thrift.TBoolColumn;
+import org.apache.hive.service.rpc.thrift.TByteColumn;
+import org.apache.hive.service.rpc.thrift.TColumn;
+import org.apache.hive.service.rpc.thrift.TColumnDesc;
+import org.apache.hive.service.rpc.thrift.TDoubleColumn;
+import org.apache.hive.service.rpc.thrift.TFetchResultsReq;
+import org.apache.hive.service.rpc.thrift.TFetchResultsResp;
+import org.apache.hive.service.rpc.thrift.TGetResultSetMetadataReq;
+import org.apache.hive.service.rpc.thrift.TGetResultSetMetadataResp;
+import org.apache.hive.service.rpc.thrift.TI16Column;
+import org.apache.hive.service.rpc.thrift.TI32Column;
+import org.apache.hive.service.rpc.thrift.TI64Column;
+import org.apache.hive.service.rpc.thrift.TOperationHandle;
+import org.apache.hive.service.rpc.thrift.TRowSet;
+import org.apache.hive.service.rpc.thrift.TStatus;
+import org.apache.hive.service.rpc.thrift.TStatusCode;
+import org.apache.hive.service.rpc.thrift.TStringColumn;
+import org.apache.hive.service.rpc.thrift.TTableSchema;
+import org.apache.hive.service.rpc.thrift.TTypeDesc;
+import org.apache.impala.thrift.ImpalaHiveServer2Service;
+import org.apache.thrift.TException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+public class ImpalaThriftResultSetV7 implements ImpalaResultSet {
+
+  /*
+   * bit mask, used to mask the byte value for in position bit
+   */
+  private static final byte[] bitMask = {
+    (byte) 0x01,
+    (byte) 0x02,
+    (byte) 0x04,
+    (byte) 0x08,
+    (byte) 0x10,
+    (byte) 0x20,
+    (byte) 0x40,
+    (byte) 0x80
+  };
+
+  private final ImpalaHiveServer2Service.Client client;
+  private final TOperationHandle operation;
+  private final TFetchResultsReq fetchReq;
+
+  private volatile int rowIndex;
+  private volatile int cols;
+  private volatile boolean closed;
+
+  private volatile boolean hasMoreRow;
+
+  private volatile Iterator<?>[] iterators = null;
+  private volatile byte[][] nulls = null;
+  private volatile int[] lengths = null;
+  private volatile List<Column> columns = null;
+
+  public ImpalaThriftResultSetV7(
+      ImpalaHiveServer2Service.Client client, TOperationHandle operation, int batchSize) {
+    this.client = client;
+    this.operation = operation;
+
+    this.rowIndex = -1;
+    this.cols = 0;
+    this.fetchReq = new TFetchResultsReq();
+    this.fetchReq.setMaxRows(batchSize);
+    this.fetchReq.setOperationHandle(operation);
+    this.hasMoreRow = true;
+  }
+
+  private void unpackColumn(TColumn column, int index) {
+    Iterator<?> iterator = null;
+    byte[] nulls = null;
+    int length = 0;
+    switch (column.getSetField()) {
+      case BINARY_VAL:
+        TBinaryColumn binVal = column.getBinaryVal();
+        iterator = binVal.getValuesIterator();
+        nulls = binVal.getNulls();
+        binVal.getValuesSize();
+        length = binVal.getValuesSize();
+        break;
+      case BOOL_VAL:
+        TBoolColumn boolVal = column.getBoolVal();
+        iterator = boolVal.getValuesIterator();
+        nulls = boolVal.getNulls();
+        length = boolVal.getValuesSize();
+        break;
+      case BYTE_VAL:
+        TByteColumn byteVal = column.getByteVal();
+        iterator = byteVal.getValuesIterator();
+        nulls = byteVal.getNulls();
+        length = byteVal.getValuesSize();
+        break;
+      case DOUBLE_VAL:
+        TDoubleColumn doubleVal = column.getDoubleVal();
+        iterator = doubleVal.getValuesIterator();
+        nulls = doubleVal.getNulls();
+        length = doubleVal.getValuesSize();
+        break;
+      case I16_VAL:
+        TI16Column i16Val = column.getI16Val();
+        iterator = i16Val.getValuesIterator();
+        nulls = i16Val.getNulls();
+        length = i16Val.getValuesSize();
+        break;
+      case I32_VAL:
+        TI32Column i32Val = column.getI32Val();
+        iterator = i32Val.getValuesIterator();
+        nulls = i32Val.getNulls();
+        length = i32Val.getValuesSize();
+        break;
+      case I64_VAL:
+        TI64Column i64Val = column.getI64Val();
+        iterator = i64Val.getValuesIterator();
+        nulls = i64Val.getNulls();
+        length = i64Val.getValuesSize();
+        break;
+      case STRING_VAL:
+        TStringColumn stringVal = column.getStringVal();
+        iterator = stringVal.getValuesIterator();
+        nulls = stringVal.getNulls();
+        length = stringVal.getValuesSize();
+        break;
+    }
+
+    this.iterators[index] = iterator;
+    this.nulls[index] = nulls;
+    this.lengths[index] = length;
+  }
+
+  /** fetch data from remote server */
+  private void fetch() {
+    try {
+      hasMoreRow = false;
+      iterators = null;
+      nulls = null;
+      lengths = null;
+
+      do {
+        /*
+         * request data
+         */
+        TFetchResultsResp resp = client.FetchResults(fetchReq);
+        TStatus status = resp.getStatus();
+
+        /*
+         * request error
+         */
+        if (status.getStatusCode().getValue() > TStatusCode.SUCCESS_WITH_INFO_STATUS.getValue()) {
+          throw new RuntimeException("Failed to fetch result(s). " + status);
+        }
+
+        hasMoreRow = resp.isHasMoreRows();
+
+        TRowSet result = resp.getResults();
+        List<TColumn> list = result.getColumns();
+        if (list != null) {
+          rowIndex = -1;
+
+          if (iterators == null) {
+            cols = list.size();
+            iterators = new Iterator[cols];
+            /*
+             * nulls matrix, indicate which [cols][row] is null value
+             */
+            nulls = new byte[cols][];
+            lengths = new int[cols];
+          }
+
+          /*
+           * 填充缓冲区
+           */
+          int i = 0;
+          for (TColumn item : list) {
+            unpackColumn(item, i);
+            ++i;
+          }
+        }
+
+        if (lengths != null && lengths[0] > 0) {
+          break;
+        }
+      } while (hasMoreRow);
+
+    } catch (TException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public synchronized Row next() {
+    if (closed) {
+      return null;
+    }
+
+    /*
+     * fetch remote data
+     */
+    if ((iterators == null || !iterators[0].hasNext()) && hasMoreRow) {
+      fetch();
+    }
+
+    /*
+     * build local row
+     */
+    if (iterators != null && iterators[0].hasNext()) {
+      ++rowIndex;
+      Object[] values = new Object[iterators.length];
+      for (int i = 0; i < iterators.length; ++i) {
+        values[i] = iterators[i].next();
+
+        /* use nulls matrix for recognize null value */
+        if ((nulls[i][rowIndex / 8] & bitMask[rowIndex % 8]) != 0) {
+          values[i] = null;
+        }
+      }
+      return new Row(values);
+    }
+
+    close();
+    return null;
+  }
+
+  @Override
+  public int getColumnSize() {
+    return getColumns().size();
+  }
+
+  @Override
+  public synchronized List<Column> getColumns() {
+    if (columns == null) {
+      try {
+        TGetResultSetMetadataReq metadataReq = new TGetResultSetMetadataReq(operation);
+        TGetResultSetMetadataResp metadataResp = client.GetResultSetMetadata(metadataReq);
+
+        ThriftUtil.checkStatus(metadataResp.getStatus());
+
+        TTableSchema tableSchema = metadataResp.getSchema();
+
+        if (tableSchema != null) {
+          List<TColumnDesc> columnList = tableSchema.getColumns();
+          List<Column> columns = new ArrayList<>(columnList.size());
+
+          int index = 0;
+          for (TColumnDesc tColumnDesc : columnList) {
+            columns.add(
+                new Column(++index, tColumnDesc.getColumnName(), tColumnDesc.getTypeDesc()));
+          }
+          this.columns = Collections.unmodifiableList(columns);
+        }
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+
+      if (columns == null) {
+        this.columns = Collections.emptyList();
+      }
+    }
+    return columns;
+  }
+
+  @Override
+  public void close() {
+    this.closed = true;
+  }
+
+  public static class Column implements ImpalaResultSet.Column {
+    private final int index;
+    private final String name;
+    private final TTypeDesc type;
+
+    protected Column(int index, String name, TTypeDesc type) {
+      this.index = index;
+      this.name = name;
+      this.type = type;
+    }
+
+    @Override
+    public int getIndex() {
+      return index;
+    }
+
+    @Override
+    public String getName() {
+      return name;
+    }
+
+    @Override
+    public TTypeDesc getType() {
+      return type;
+    }
+  }
+
+  public static class Row implements ImpalaResultSet.Row {
+    private final Object[] values;
+
+    protected Row(Object[] values) {
+      this.values = values;
+    }
+
+    @Override
+    public Object[] getValues() {
+      return values;
+    }
+
+    @Override
+    public Object getObject(int columnIndex) {
+      return values[columnIndex - 1];
+    }
+
+    @Override
+    public <T> T getObject(int columnIndex, Class<T> type) {
+      return type.cast(values[columnIndex - 1]);
+    }
+
+    @Override
+    public String getString(int columnIndex) {
+      return (String) values[columnIndex - 1];
+    }
+
+    @Override
+    public Short getShort(int columnIndex) {
+      return (Short) values[columnIndex - 1];
+    }
+
+    @Override
+    public Integer getInteger(int columnIndex) {
+      return (Integer) values[columnIndex - 1];
+    }
+
+    @Override
+    public Long getLong(int columnIndex) {
+      return (Long) values[columnIndex - 1];
+    }
+  }
+}
diff --git a/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftSession.java b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftSession.java
new file mode 100644
index 000000000..e585b7f73
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftSession.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.impala.client.thrift;
+
+import org.apache.linkis.engineplugin.impala.client.exception.ImpalaEngineException;
+import org.apache.linkis.engineplugin.impala.client.util.ThriftUtil;
+
+import org.apache.hive.service.rpc.thrift.TCloseSessionReq;
+import org.apache.hive.service.rpc.thrift.TCloseSessionResp;
+import org.apache.hive.service.rpc.thrift.TOpenSessionReq;
+import org.apache.hive.service.rpc.thrift.TOpenSessionResp;
+import org.apache.hive.service.rpc.thrift.TProtocolVersion;
+import org.apache.hive.service.rpc.thrift.TSessionHandle;
+import org.apache.impala.thrift.ImpalaHiveServer2Service;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TTransport;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ImpalaThriftSession implements AutoCloseable {
+  public static final Logger LOG = LoggerFactory.getLogger(ImpalaThriftSession.class.getName());
+
+  private final String target;
+  private final TTransport transport;
+  private final Runnable closeCallback;
+  private final ImpalaHiveServer2Service.Client client;
+  private final TSessionHandle session;
+
+  private volatile boolean closed;
+
+  public ImpalaThriftSession(String target, TTransport transport, Runnable closeCallback)
+      throws TException {
+    this.target = target;
+    this.transport = transport;
+    this.closeCallback = closeCallback;
+    this.closed = false;
+
+    this.client = new ImpalaHiveServer2Service.Client(new TBinaryProtocol(transport));
+
+    /* open session */
+    TOpenSessionReq openReq = new TOpenSessionReq();
+    openReq.setClient_protocol(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V7);
+    TOpenSessionResp openResp = client.OpenSession(openReq);
+    this.session = openResp.getSessionHandle();
+  }
+
+  public TSessionHandle session() {
+    return session;
+  }
+
+  public ImpalaHiveServer2Service.Client client() {
+    return client;
+  }
+
+  public String target() {
+    return target;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    return this == obj;
+  }
+
+  @Override
+  public synchronized void close() {
+    if (closed) {
+      return;
+    }
+    closed = true;
+
+    try {
+      if (session.isSetSessionId()) {
+        try {
+          TCloseSessionReq req = new TCloseSessionReq(session);
+          TCloseSessionResp res = client.CloseSession(req);
+          ThriftUtil.checkStatus(res.getStatus());
+        } catch (TException | ImpalaEngineException e) {
+          LOG.error("Failed to safely close the session for {}", target, e);
+        } finally {
+          transport.close();
+        }
+      }
+    } finally {
+      closeCallback.run();
+    }
+  }
+}
diff --git a/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftSessionFactory.java b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftSessionFactory.java
new file mode 100644
index 000000000..fd6eb0e9f
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftSessionFactory.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.impala.client.thrift;
+
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TSaslClientTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+
+import javax.net.SocketFactory;
+import javax.security.auth.callback.CallbackHandler;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ImpalaThriftSessionFactory {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(ImpalaThriftSessionFactory.class.getName());
+
+  protected final Map<String, SessionHolder> sessions;
+  protected final int maxConnections;
+  protected final Semaphore semaphore;
+  protected final SocketFactory socketFactory;
+  /** sasl properties */
+  protected final CallbackHandler saslCallbackHandler;
+
+  protected final String saslMechanism;
+  protected final String saslAuthorizationId;
+  protected final String saslProtocol;
+  protected final Map<String, String> saslProperties;
+  protected int connectionTimeout = 30;
+
+  public ImpalaThriftSessionFactory(
+      String[] hosts, int maxConnections, SocketFactory socketFactory) {
+    this(hosts, maxConnections, socketFactory, null, null, null, null, null);
+  }
+
+  public ImpalaThriftSessionFactory(
+      String[] hosts,
+      int maxConnections,
+      SocketFactory socketFactory,
+      String saslMechanism,
+      String saslAuthorizationId,
+      String saslProtocol,
+      Map<String, String> saslProperties,
+      CallbackHandler saslCallbackHandler) {
+    if (socketFactory == null) {
+      socketFactory = SocketFactory.getDefault();
+    }
+
+    if (maxConnections <= 0) {
+      LOG.warn("Invalid maxConnections value: {}, set to 1", maxConnections);
+      maxConnections = 1;
+    }
+
+    sessions = new TreeMap<>();
+    for (String host : hosts) {
+      String[] split = StringUtils.split(host, ":", 2);
+      if (split.length == 2
+          && StringUtils.isNotBlank(split[0])
+          && StringUtils.isNumeric(split[1])) {
+        SessionHolder holder = new SessionHolder(host, split[0], Integer.parseInt(split[1]));
+        sessions.put(host, holder);
+      }
+    }
+
+    if (sessions.isEmpty()) {
+      throw new IllegalArgumentException("invalid hosts: " + StringUtils.join(hosts, ','));
+    }
+
+    this.socketFactory = socketFactory;
+    this.maxConnections = maxConnections;
+    this.semaphore = new Semaphore(maxConnections);
+
+    this.saslMechanism = saslMechanism;
+    this.saslAuthorizationId = saslAuthorizationId;
+    this.saslProtocol = saslProtocol;
+    this.saslProperties = saslProperties;
+    this.saslCallbackHandler = saslCallbackHandler;
+  }
+
+  public int openedSessionCount() {
+    return maxConnections - semaphore.availablePermits();
+  }
+
+  public ImpalaThriftSession openSession() throws InterruptedException, IOException, TException {
+    if (!semaphore.tryAcquire(5, TimeUnit.MINUTES)) {
+      String format =
+          String.format(
+              "Connection pool for [%s] is exhausted, max allow: %d",
+              StringUtils.join(sessions.keySet(), ","), maxConnections);
+      LOG.error(format);
+      throw new IllegalStateException(format);
+    }
+
+    /*
+     * get random holder with minimal connections
+     */
+    SessionHolder holder =
+        sessions.values().stream()
+            .min(
+                Comparator.<SessionHolder>comparingInt(o -> o.connections)
+                    .thenComparingInt(o -> o.randomInt))
+            .get();
+
+    /*
+     * create socket
+     */
+    Socket socket = socketFactory.createSocket(holder.host, holder.port);
+
+    /*
+     * create thrift socket
+     */
+    TSocket tSocket = new TSocket(socket);
+    tSocket.setTimeout(connectionTimeout * 1000);
+    socket.setSoTimeout(connectionTimeout * 2000);
+
+    TTransport tTransport;
+    if (saslCallbackHandler != null) {
+      tTransport =
+          new TSaslClientTransport(
+              saslMechanism,
+              saslAuthorizationId,
+              saslProtocol,
+              holder.name,
+              saslProperties,
+              saslCallbackHandler,
+              tSocket);
+
+    } else {
+      tTransport = tSocket;
+    }
+
+    if (!tTransport.isOpen()) {
+      tTransport.open();
+    }
+
+    ImpalaThriftSession impalaSession =
+        new ImpalaThriftSession(holder.name, tTransport, holder::release);
+    holder.acquire();
+    return impalaSession;
+  }
+
+  private class SessionHolder {
+    private final String name;
+    private final String host;
+    private final int port;
+    private final int randomInt = RandomUtils.nextInt();
+    private volatile int connections;
+
+    SessionHolder(String name, String host, int port) {
+      this.name = name;
+      this.host = host;
+      this.port = port;
+    }
+
+    void release() {
+      semaphore.release();
+
+      synchronized (this) {
+        --connections;
+      }
+    }
+
+    void acquire() {
+      synchronized (this) {
+        ++connections;
+      }
+    }
+  }
+}
diff --git a/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/util/ThriftUtil.java b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/util/ThriftUtil.java
new file mode 100644
index 000000000..667eca628
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/util/ThriftUtil.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.impala.client.util;
+
+import org.apache.linkis.engineplugin.impala.client.ExecutionListener;
+import org.apache.linkis.engineplugin.impala.client.exception.ImpalaEngineException;
+import org.apache.linkis.engineplugin.impala.client.exception.ImpalaErrorCodeSummary;
+
+import org.apache.hive.service.rpc.thrift.TStatus;
+
+public class ThriftUtil {
+  private static final char[] hexCode = "0123456789abcdef".toCharArray();
+
+  public static void checkStatus(TStatus status, ExecutionListener executionListener)
+      throws ImpalaEngineException {
+    switch (status.getStatusCode()) {
+      case STILL_EXECUTING_STATUS:
+        throw ImpalaEngineException.of(ImpalaErrorCodeSummary.StillRunningError);
+      case ERROR_STATUS:
+        throw ImpalaEngineException.of(
+            ImpalaErrorCodeSummary.ExecutionError, status.getErrorMessage());
+      case INVALID_HANDLE_STATUS:
+        throw ImpalaEngineException.of(ImpalaErrorCodeSummary.InvalidHandleError);
+      case SUCCESS_WITH_INFO_STATUS:
+        if (executionListener != null) {
+          executionListener.message(status.getInfoMessages());
+        }
+        break;
+      case SUCCESS_STATUS:
+    }
+  }
+
+  public static void checkStatus(TStatus status) throws ImpalaEngineException {
+    checkStatus(status, null);
+  }
+
+  /*
+   * impala unique id
+   */
+  public static String convertUniqueId(byte[] b) {
+    StringBuilder sb = new StringBuilder(":");
+    for (int i = 0; i < 8; ++i) {
+      sb.append(hexCode[(b[15 - i] >> 4) & 0xF]);
+      sb.append(hexCode[(b[15 - i] & 0xF)]);
+      sb.insert(0, hexCode[(b[i] & 0xF)]);
+      sb.insert(0, hexCode[(b[i] >> 4) & 0xF]);
+    }
+    return sb.toString();
+  }
+}
diff --git a/linkis-engineconn-plugins/impala/src/main/resources/linkis-engineconn.properties b/linkis-engineconn-plugins/impala/src/main/resources/linkis-engineconn.properties
new file mode 100644
index 000000000..18dacf8db
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/main/resources/linkis-engineconn.properties
@@ -0,0 +1,23 @@
+# 
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+wds.linkis.server.version=v1
+#wds.linkis.engineconn.debug.enable=true
+#wds.linkis.keytab.enable=true
+wds.linkis.engineconn.plugin.default.class=org.apache.linkis.engineplugin.impala.ImpalaEngineConnPlugin
+wds.linkis.engineconn.support.parallelism=true
+wds.linkis.rpc.cache.expire.time=0
+
+linkis.impala.servers=127.0.0.1:21050
diff --git a/linkis-engineconn-plugins/impala/src/main/resources/log4j2.xml b/linkis-engineconn-plugins/impala/src/main/resources/log4j2.xml
new file mode 100644
index 000000000..391418350
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/main/resources/log4j2.xml
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~ 
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~ 
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration status="error" monitorInterval="30">
+    <appenders>
+        <RollingFile name="RollingFile" append="true" fileName="${env:LOG_DIRS:-logs}/stdout"
+                     filePattern="${env:LOG_DIRS:-logs}/$${date:yyyy-MM}/linkis-log-%d{yyyy-MM-dd-hh}-%i.log">
+            <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] - %msg%xEx%n"/>
+            <Policies>
+                <SizeBasedTriggeringPolicy size="100MB"/>
+            </Policies>
+            <DefaultRolloverStrategy max="10"/>
+        </RollingFile>
+
+        <File name="YarnAppIdOutputFile" append="true" fileName="${env:LOG_DIRS:-logs}/yarnApp.log">
+            <RegexFilter regex=".*application_.*" onMatch="ACCEPT" onMismatch="DENY"/>
+            <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] - %msg%xEx%n"/>
+        </File>
+
+        <Send name="Send" >
+            <Filters>
+                <ThresholdFilter level="WARN" onMatch="ACCEPT" onMismatch="DENY" />
+            </Filters>
+            <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M - %msg%xEx%n"/>
+        </Send>
+
+        <Send name="SendPackage" >
+            <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M - %msg%xEx%n"/>
+        </Send>
+
+        <Console name="stderr" target="SYSTEM_ERR">
+            <ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY" />
+            <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M - %msg%xEx%n"/>
+        </Console>
+    </appenders>
+
+    <loggers>
+        <root level="INFO">
+            <appender-ref ref="stderr"/>
+            <appender-ref ref="RollingFile"/>
+            <appender-ref ref="Send"/>
+        </root>
+        <logger name="org.springframework.boot.diagnostics.LoggingFailureAnalysisReporter " level="error" additivity="true">
+            <appender-ref ref="stderr"/>
+        </logger>
+        <logger name="com.netflix.discovery" level="warn" additivity="true">
+            <appender-ref ref="Send"/>
+        </logger>
+        <logger name="org.apache.hadoop.yarn" level="warn" additivity="true">
+            <appender-ref ref="Send"/>
+        </logger>
+        <logger name="org.springframework" level="warn" additivity="true">
+            <appender-ref ref="Send"/>
+        </logger>
+        <logger name="org.apache.linkis.server.security" level="warn" additivity="true">
+            <appender-ref ref="Send"/>
+        </logger>
+        <logger name="org.apache.hadoop.hdfs.KeyProviderCache" level="fatal" additivity="true">
+            <appender-ref ref="Send"/>
+        </logger>
+        <logger name="org.spark_project.jetty" level="ERROR" additivity="true">
+            <appender-ref ref="Send"/>
+        </logger>
+        <logger name="org.eclipse.jetty" level="ERROR" additivity="true">
+            <appender-ref ref="Send"/>
+        </logger>
+        <logger name="org.springframework" level="ERROR" additivity="true">
+            <appender-ref ref="Send"/>
+        </logger>
+        <logger name="org.reflections.Reflections" level="ERROR" additivity="true">
+            <appender-ref ref="Send"/>
+        </logger>
+    </loggers>
+</configuration>
diff --git a/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/ImpalaEngineConnPlugin.scala b/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/ImpalaEngineConnPlugin.scala
new file mode 100644
index 000000000..c5462e566
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/ImpalaEngineConnPlugin.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.impala
+
+import org.apache.linkis.engineplugin.impala.builder.ImpalaProcessEngineConnLaunchBuilder
+import org.apache.linkis.engineplugin.impala.factory.ImpalaEngineConnFactory
+import org.apache.linkis.manager.engineplugin.common.EngineConnPlugin
+import org.apache.linkis.manager.engineplugin.common.creation.EngineConnFactory
+import org.apache.linkis.manager.engineplugin.common.launch.EngineConnLaunchBuilder
+import org.apache.linkis.manager.engineplugin.common.resource.{
+  EngineResourceFactory,
+  GenericEngineResourceFactory
+}
+import org.apache.linkis.manager.label.entity.Label
+
+import java.util
+
+class ImpalaEngineConnPlugin extends EngineConnPlugin {
+  private val resourceLocker = new Object()
+
+  private val engineFactoryLocker = new Object()
+
+  private var engineResourceFactory: EngineResourceFactory = _
+
+  private var engineFactory: EngineConnFactory = _
+
+  private val defaultLabels: util.List[Label[_]] = new util.ArrayList[Label[_]]()
+
+  override def init(params: util.Map[String, AnyRef]): Unit = {}
+
+  override def getEngineResourceFactory: EngineResourceFactory = {
+    if (null == engineResourceFactory) resourceLocker synchronized {
+      engineResourceFactory = new GenericEngineResourceFactory
+    }
+    engineResourceFactory
+  }
+
+  override def getEngineConnLaunchBuilder: EngineConnLaunchBuilder = {
+    new ImpalaProcessEngineConnLaunchBuilder
+  }
+
+  override def getEngineConnFactory: EngineConnFactory = {
+    if (null == engineFactory) engineFactoryLocker synchronized {
+      engineFactory = new ImpalaEngineConnFactory
+    }
+    engineFactory
+  }
+
+  override def getDefaultLabels: util.List[Label[_]] = defaultLabels
+
+}
diff --git a/linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java b/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/builder/ImpalaProcessEngineConnLaunchBuilder.scala
similarity index 54%
rename from linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java
rename to linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/builder/ImpalaProcessEngineConnLaunchBuilder.scala
index 6d198bbae..ba0218e33 100644
--- a/linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java
+++ b/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/builder/ImpalaProcessEngineConnLaunchBuilder.scala
@@ -15,26 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.engineplugin.trino.password;
+package org.apache.linkis.engineplugin.impala.builder
 
-import javax.security.auth.callback.PasswordCallback;
+import org.apache.linkis.engineplugin.impala.conf.ImpalaConfiguration
+import org.apache.linkis.manager.engineplugin.common.launch.process.JavaProcessEngineConnLaunchBuilder
+import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel
 
-public class StaticPasswordCallback extends PasswordCallback {
+class ImpalaProcessEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder {
 
-  private final char[] password;
-
-  public StaticPasswordCallback(String prompt, boolean echoOn) {
-    super(prompt, echoOn);
-    this.password = prompt.toCharArray();
-  }
-
-  public StaticPasswordCallback(String prompt) {
-    super(prompt, false);
-    this.password = prompt.toCharArray();
+  override def getEngineStartUser(label: UserCreatorLabel): String = {
+    if (ImpalaConfiguration.IMPALA_USER_ISOLATION_MODE.getValue) {
+      /* using user label if user isolation is on */
+      label.getUser
+    } else {
+      ImpalaConfiguration.IMPALA_ENGINE_USER.getValue
+    }
   }
 
-  @Override
-  public char[] getPassword() {
-    return password;
-  }
 }
diff --git a/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/conf/ImpalaConfiguration.scala b/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/conf/ImpalaConfiguration.scala
new file mode 100644
index 000000000..d0eeaa886
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/conf/ImpalaConfiguration.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.impala.conf
+
+import org.apache.linkis.common.conf.CommonVars
+import org.apache.linkis.storage.utils.StorageConfiguration
+
+import java.security.KeyStore
+
+object ImpalaConfiguration {
+
+  val ENGINE_CONCURRENT_LIMIT = CommonVars[Int]("linkis.engineconn.concurrent.limit", 100)
+
+  val DEFAULT_LIMIT = CommonVars[Int]("linkis.impala.default.limit", 5000)
+
+  val IMPALA_USER_ISOLATION_MODE =
+    CommonVars[Boolean]("linkis.impala.user.isolation.mode", false)
+
+  val IMPALA_ENGINE_USER =
+    CommonVars("linkis.impala.engine.user", StorageConfiguration.HDFS_ROOT_USER.getValue)
+
+  val IMPALA_SERVERS = CommonVars[String]("linkis.impala.servers", "127.0.0.1:21050")
+  val IMPALA_MAX_CONNECTIONS = CommonVars[Int]("linkis.impala.maxConnections", 10)
+
+  val IMPALA_SSL_ENABLE = CommonVars[Boolean]("linkis.impala.ssl.enable", false)
+  val IMPALA_SSL_KEYSTORE = CommonVars[String]("linkis.impala.ssl.keystore", null)
+
+  val IMPALA_SSL_KEYSTORE_TYPE =
+    CommonVars[String]("linkis.impala.ssl.keystore.type", KeyStore.getDefaultType)
+
+  val IMPALA_SSL_KEYSTORE_PASSWORD = CommonVars[String]("linkis.impala.ssl.keystore.password", null)
+  val IMPALA_SSL_TRUSTSTORE = CommonVars[String]("linkis.impala.ssl.truststore", null)
+
+  val IMPALA_SSL_TRUSTSTORE_TYPE =
+    CommonVars[String]("linkis.impala.ssl.truststore.type", KeyStore.getDefaultType)
+
+  val IMPALA_SSL_TRUSTSTORE_PASSWORD =
+    CommonVars[String]("linkis.impala.ssl.truststore.password", null)
+
+  val IMPALA_SASL_ENABLE = CommonVars[Boolean]("linkis.impala.sasl.enable", false)
+  val IMPALA_SASL_MECHANISM = CommonVars[String]("linkis.impala.sasl.mechanism", "PLAIN")
+  val IMPALA_SASL_AUTHORIZATION_ID = CommonVars[String]("linkis.impala.sasl.authorizationId", null)
+  val IMPALA_SASL_PROTOCOL = CommonVars[String]("linkis.impala.sasl.protocol", "LDAP")
+  val IMPALA_SASL_PROPERTIES = CommonVars[String]("linkis.impala.sasl.properties", null)
+  val IMPALA_SASL_USERNAME = CommonVars("linkis.impala.sasl.username", IMPALA_ENGINE_USER.getValue)
+  val IMPALA_SASL_PASSWORD = CommonVars[String]("linkis.impala.sasl.password", null)
+  val IMPALA_SASL_PASSWORD_CMD = CommonVars[String]("linkis.impala.sasl.password.cmd", null)
+
+  val IMPALA_HEARTBEAT_SECONDS = CommonVars[Int]("linkis.impala.heartbeat.seconds", 1)
+  val IMPALA_QUERY_TIMEOUT_SECONDS = CommonVars[Int]("linkis.impala.query.timeout.seconds", 0)
+  val IMPALA_QUERY_BATCH_SIZE = CommonVars[Int]("linkis.impala.query.batchSize", 1000)
+  val IMPALA_QUERY_OPTIONS = CommonVars[String]("linkis.impala.query.options", null)
+}
diff --git a/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/conf/ImpalaEngineConfig.scala b/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/conf/ImpalaEngineConfig.scala
new file mode 100644
index 000000000..c069aa568
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/conf/ImpalaEngineConfig.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.impala.conf
+
+import org.apache.linkis.common.conf.Configuration
+import org.apache.linkis.governance.common.protocol.conf.{
+  RequestQueryEngineConfigWithGlobalConfig,
+  ResponseQueryConfig
+}
+import org.apache.linkis.manager.label.entity.engine.{EngineTypeLabel, UserCreatorLabel}
+import org.apache.linkis.protocol.CacheableProtocol
+import org.apache.linkis.rpc.RPCMapCache
+
+import java.util
+
+object ImpalaEngineConfig
+    extends RPCMapCache[(UserCreatorLabel, EngineTypeLabel), String, String](
+      Configuration.CLOUD_CONSOLE_CONFIGURATION_SPRING_APPLICATION_NAME.getValue
+    ) {
+
+  override protected def createRequest(
+      labelTuple: (UserCreatorLabel, EngineTypeLabel)
+  ): CacheableProtocol = {
+    RequestQueryEngineConfigWithGlobalConfig(labelTuple._1, labelTuple._2)
+  }
+
+  override protected def createMap(any: Any): util.Map[String, String] = any match {
+    case response: ResponseQueryConfig =>
+      response.getKeyAndValue
+    case _ => null
+  }
+
+}
diff --git a/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/executor/ImpalaEngineConnExecutor.scala b/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/executor/ImpalaEngineConnExecutor.scala
new file mode 100644
index 000000000..2a597dd11
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/executor/ImpalaEngineConnExecutor.scala
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.impala.executor
+
+import org.apache.linkis.common.log.LogUtils
+import org.apache.linkis.common.utils.{OverloadUtils, Utils}
+import org.apache.linkis.engineconn.common.password.{
+  CommandPasswordCallback,
+  StaticPasswordCallback
+}
+import org.apache.linkis.engineconn.computation.executor.execute.{
+  ConcurrentComputationExecutor,
+  EngineExecutionContext
+}
+import org.apache.linkis.engineconn.core.EngineConnObject
+import org.apache.linkis.engineplugin.impala.client.{
+  ExecutionListener,
+  ImpalaClient,
+  ImpalaResultSet
+}
+import org.apache.linkis.engineplugin.impala.client.ImpalaResultSet.Row
+import org.apache.linkis.engineplugin.impala.client.exception.{
+  ImpalaEngineException,
+  ImpalaErrorCodeSummary
+}
+import org.apache.linkis.engineplugin.impala.client.protocol.{ExecProgress, ExecStatus}
+import org.apache.linkis.engineplugin.impala.client.thrift.{
+  ImpalaThriftClient,
+  ImpalaThriftSessionFactory
+}
+import org.apache.linkis.engineplugin.impala.conf.ImpalaConfiguration._
+import org.apache.linkis.engineplugin.impala.conf.ImpalaEngineConfig
+import org.apache.linkis.governance.common.paser.SQLCodeParser
+import org.apache.linkis.manager.common.entity.resource.{
+  CommonNodeResource,
+  LoadResource,
+  NodeResource
+}
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
+import org.apache.linkis.manager.label.entity.Label
+import org.apache.linkis.manager.label.entity.engine.{EngineTypeLabel, UserCreatorLabel}
+import org.apache.linkis.protocol.engine.JobProgressInfo
+import org.apache.linkis.rpc.Sender
+import org.apache.linkis.scheduler.executer.{
+  CompletedExecuteResponse,
+  ErrorExecuteResponse,
+  ExecuteResponse,
+  SuccessExecuteResponse
+}
+import org.apache.linkis.storage.domain.Column
+import org.apache.linkis.storage.resultset.ResultSetFactory
+import org.apache.linkis.storage.resultset.table.{TableMetaData, TableRecord}
+
+import org.apache.commons.io.IOUtils
+import org.apache.commons.lang3.StringUtils
+import org.apache.commons.lang3.exception.ExceptionUtils
+
+import org.springframework.util.CollectionUtils
+
+import javax.net.SocketFactory
+import javax.net.ssl._
+import javax.security.auth.callback.{Callback, CallbackHandler, NameCallback, PasswordCallback}
+
+import java.io.FileInputStream
+import java.security.KeyStore
+import java.util
+import java.util.concurrent.ConcurrentHashMap
+import java.util.function.Consumer
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+class ImpalaEngineConnExecutor(override val outputPrintLimit: Int, val id: Int)
+    extends ConcurrentComputationExecutor(outputPrintLimit) {
+
+  private val executorLabels: util.List[Label[_]] = new util.ArrayList[Label[_]](2)
+
+  private val impalaClients: util.Map[String, ImpalaClient] =
+    new ConcurrentHashMap[String, ImpalaClient]()
+
+  private val taskExecutions: util.Map[String, TaskExecution] =
+    new ConcurrentHashMap[String, TaskExecution]()
+
+  override def init: Unit = {
+    setCodeParser(new SQLCodeParser)
+    super.init
+  }
+
+  override def executeLine(
+      engineExecutionContext: EngineExecutionContext,
+      code: String
+  ): ExecuteResponse = {
+    val realCode = if (StringUtils.isBlank(code)) {
+      "SELECT 1"
+    } else {
+      code.trim
+    }
+
+    logger.info(s"impala client begins to run code:\n $realCode")
+    val taskId = engineExecutionContext.getJobId.get
+
+    val impalaClient = getOrCreateImpalaClient(engineExecutionContext)
+    val taskExecution: TaskExecution = TaskExecution(taskId, engineExecutionContext, impalaClient)
+    Utils.tryFinally {
+      taskExecutions.put(taskId, taskExecution)
+      Utils.tryCatch {
+        impalaClient.execute(realCode, taskExecution)
+        taskExecution.response
+      } { exception =>
+        engineExecutionContext.appendStdout(
+          LogUtils.generateERROR(ExceptionUtils.getStackTrace(exception))
+        )
+        ErrorExecuteResponse(ExceptionUtils.getMessage(exception), exception)
+      }
+    } {
+      taskExecutions.remove(taskId)
+    }
+  }
+
+  override def progress(taskId: String): Float = {
+    val taskExecution = taskExecutions.get(taskId)
+    if (taskExecution != null) {
+      taskExecution.progressFloat
+    } else {
+      0
+    }
+  }
+
+  override def getProgressInfo(taskId: String): Array[JobProgressInfo] = {
+    val taskExecution = taskExecutions.get(taskId)
+    if (taskExecution != null) {
+      return taskExecution.progressArray
+    }
+    Array.empty[JobProgressInfo]
+  }
+
+  override def killTask(taskId: String): Unit = {
+    val taskExecution = taskExecutions.get(taskId)
+    if (taskExecution != null) {
+      taskExecution.kill()
+    }
+    super.killTask(taskId)
+  }
+
+  override def getExecutorLabels(): util.List[Label[_]] = executorLabels
+
+  override def setExecutorLabels(labels: util.List[Label[_]]): Unit = {
+    if (!CollectionUtils.isEmpty(labels)) {
+      executorLabels.clear()
+      executorLabels.addAll(labels)
+    }
+  }
+
+  override def supportCallBackLogs(): Boolean = false
+
+  override def requestExpectedResource(expectedResource: NodeResource): NodeResource = {
+    null
+  }
+
+  override def getCurrentNodeResource(): NodeResource = {
+    NodeResourceUtils.appendMemoryUnitIfMissing(
+      EngineConnObject.getEngineCreationContext.getOptions
+    )
+
+    val resource = new CommonNodeResource
+    val usedResource = new LoadResource(OverloadUtils.getProcessMaxMemory, 1)
+    resource.setUsedResource(usedResource)
+    resource
+  }
+
+  override def getId(): String = Sender.getThisServiceInstance.getInstance + s"_$id"
+
+  override def getConcurrentLimit: Int = ENGINE_CONCURRENT_LIMIT.getValue
+
+  private def queryOutput(
+      taskId: String,
+      engineExecutorContext: EngineExecutionContext,
+      resultSet: ImpalaResultSet
+  ): Unit = {
+    var columnCount = 0
+    var rows = 0
+    val resultSetWriter = engineExecutorContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE)
+    Utils.tryCatch {
+      val columns = resultSet.getColumns.asScala
+        .map(column => Column(column.getName, null, null))
+        .toArray[Column]
+      columnCount = columns.length
+      resultSetWriter.addMetaData(new TableMetaData(columns))
+
+      var row: Row = resultSet.next()
+      while (row != null) {
+        val anys: ArrayBuffer[Any] = ArrayBuffer[Any]()
+        row.getValues.foreach(v => anys += v)
+        resultSetWriter.addRecord(new TableRecord(anys.toArray))
+        // scalastyle:off println
+        println(new TableRecord(anys.toArray).tableRecordToString().mkString(" "))
+        rows += 1
+
+        row = resultSet.next()
+      }
+    } { case e: Exception =>
+      IOUtils.closeQuietly(resultSetWriter)
+      throw e
+    }
+    logger.info(s"Fetched $columnCount col(s) : $rows row(s) in impala")
+    engineExecutorContext.appendStdout(
+      LogUtils.generateInfo(s"Fetched $columnCount col(s) : $rows row(s) in impala")
+    );
+    engineExecutorContext.sendResultSet(resultSetWriter)
+  }
+
+  override def killAll(): Unit = {
+    val iterator = taskExecutions.values().iterator()
+    while (iterator.hasNext) {
+      Utils.tryAndWarn(iterator.next().kill())
+    }
+    taskExecutions.clear()
+  }
+
+  override def close(): Unit = {
+    killAll()
+    val iterator = impalaClients.values().iterator()
+    while (iterator.hasNext) {
+      Utils.tryAndWarn(iterator.next().close())
+    }
+    impalaClients.clear()
+    super.close()
+  }
+
+  case class TaskExecution(
+      taskId: String,
+      engineExecutionContext: EngineExecutionContext,
+      impalaClient: ImpalaClient
+  ) extends ExecutionListener {
+    var queryId: String = ""
+    var totalTasks: Int = -1
+    var runningTasks: Int = -1
+    var failedTasks: Int = 0
+    var succeedTasks: Int = 0
+    var response: CompletedExecuteResponse = SuccessExecuteResponse()
+
+    override def error(status: ExecStatus): Unit = {
+      response = ErrorExecuteResponse(
+        status.getErrorMessage,
+        ImpalaEngineException.of(ImpalaErrorCodeSummary.ExecutionError, status.getName)
+      )
+    }
+
+    override def success(resultSet: ImpalaResultSet): Unit = {
+      queryOutput(taskId, engineExecutionContext, resultSet)
+    }
+
+    override def created(queryId: String): Unit = {
+      this.queryId = queryId
+    }
+
+    override def progress(progress: ExecProgress): Unit = {
+      runningTasks = progress.getRunningNodes
+      totalTasks = progress.getTotalScanRanges.toInt
+      succeedTasks = progress.getCompletedScanRanges.toInt
+
+      engineExecutionContext.pushProgress(progressFloat, progressArray)
+    }
+
+    override def message(messages: util.List[String]): Unit = {
+      messages.forEach(new Consumer[String]() {
+        override def accept(message: String): Unit = engineExecutionContext.appendStdout(message)
+      })
+    }
+
+    def progressArray: Array[JobProgressInfo] = {
+      Array(JobProgressInfo(queryId, totalTasks, runningTasks, failedTasks, succeedTasks))
+    }
+
+    def progressFloat: Float = {
+      if (totalTasks > 0) {
+        succeedTasks.toFloat / totalTasks
+      } else {
+        0
+      }
+    }
+
+    def kill(): Unit = {
+      if (StringUtils.isNotBlank(queryId)) {
+        impalaClient.cancel(queryId)
+      }
+    }
+
+  }
+
+  private def getOrCreateImpalaClient(
+      engineExecutionContext: EngineExecutionContext
+  ): ImpalaClient = {
+    val userCreatorLabel =
+      engineExecutionContext.getLabels.find(_.isInstanceOf[UserCreatorLabel]).get
+    val engineTypeLabel =
+      engineExecutionContext.getLabels.find(_.isInstanceOf[EngineTypeLabel]).get
+    var configMap: util.Map[String, String] = null
+    if (userCreatorLabel != null && engineTypeLabel != null) {
+      configMap = Utils.tryAndError(
+        ImpalaEngineConfig.getCacheMap(
+          (
+            userCreatorLabel.asInstanceOf[UserCreatorLabel],
+            engineTypeLabel.asInstanceOf[EngineTypeLabel]
+          )
+        )
+      )
+    }
+
+    val impalaServers = IMPALA_SERVERS.getValue(configMap)
+    val impalaMaxConnections = IMPALA_MAX_CONNECTIONS.getValue(configMap)
+    val impalaSaslEnable = IMPALA_SASL_ENABLE.getValue(configMap)
+    val impalaSaslProperties = IMPALA_SASL_PROPERTIES.getValue(configMap)
+    val impalaSaslUsername = IMPALA_SASL_USERNAME.getValue(configMap)
+    val impalaSaslPassword = IMPALA_SASL_PASSWORD.getValue(configMap)
+    val impalaSaslPasswordCmd = IMPALA_SASL_PASSWORD_CMD.getValue(configMap)
+    val impalaSaslMechanism = IMPALA_SASL_MECHANISM.getValue(configMap)
+    val impalaSaslAuthorization = IMPALA_SASL_AUTHORIZATION_ID.getValue(configMap)
+    val impalaSaslProtocol = IMPALA_SASL_PROTOCOL.getValue(configMap)
+    val impalaHeartbeatSeconds = IMPALA_HEARTBEAT_SECONDS.getValue(configMap)
+    val impalaQueryTimeoutSeconds = IMPALA_QUERY_TIMEOUT_SECONDS.getValue(configMap)
+    val impalaQueryBatchSize = IMPALA_QUERY_BATCH_SIZE.getValue(configMap)
+    val impalaQueryOptions = IMPALA_QUERY_OPTIONS.getValue(configMap)
+
+    val impalaSslEnable = IMPALA_SSL_ENABLE.getValue(configMap)
+    val impalaSslKeystore = IMPALA_SSL_KEYSTORE.getValue(configMap)
+    val impalaSslKeystoreType = IMPALA_SSL_KEYSTORE_TYPE.getValue(configMap)
+    val impalaSslKeystorePassword = IMPALA_SSL_KEYSTORE_PASSWORD.getValue(configMap)
+    val impalaSslTruststore = IMPALA_SSL_TRUSTSTORE.getValue(configMap)
+    val impalaSslTruststoreType = IMPALA_SSL_TRUSTSTORE_TYPE.getValue(configMap)
+    val impalaSslTruststorePassword = IMPALA_SSL_TRUSTSTORE_PASSWORD.getValue(configMap)
+
+    val impalaClientKey = Array(
+      impalaServers,
+      impalaMaxConnections,
+      impalaSaslEnable,
+      impalaSaslProperties,
+      impalaSaslUsername,
+      impalaSaslPassword,
+      impalaSaslPasswordCmd,
+      impalaSaslMechanism,
+      impalaSaslAuthorization,
+      impalaSaslProtocol,
+      impalaHeartbeatSeconds,
+      impalaQueryTimeoutSeconds,
+      impalaQueryBatchSize,
+      impalaQueryOptions,
+      impalaSslEnable,
+      impalaSslKeystore,
+      impalaSslKeystoreType,
+      impalaSslKeystorePassword,
+      impalaSslTruststore,
+      impalaSslTruststoreType,
+      impalaSslTruststorePassword
+    )
+
+    impalaClients.synchronized {
+      var client = impalaClients.get(impalaClientKey)
+      if (client == null) {
+        val socketFactory = createSocketFactory(
+          impalaSslEnable,
+          impalaSslKeystore,
+          impalaSslKeystoreType,
+          impalaSslKeystorePassword,
+          impalaSslTruststore,
+          impalaSslTruststoreType,
+          impalaSslTruststorePassword
+        )
+
+        val servers = impalaServers.split(',')
+        val maxConnections = impalaMaxConnections
+        val factory: ImpalaThriftSessionFactory = if (impalaSaslEnable) {
+          val saslProperties: util.Map[String, String] = new util.TreeMap()
+          Option(impalaSaslProperties)
+            .map(_.split(','))
+            .getOrElse(Array[String]())
+            .foreach { str =>
+              val kv = StringUtils.split(str, "=", 2)
+              saslProperties.put(kv(0), if (kv.length > 1) kv(1) else "")
+            }
+
+          val password = impalaSaslPassword
+          val passwordCmd = impalaSaslPasswordCmd
+          var passwordCallback: PasswordCallback = null
+          if (StringUtils.isNotBlank(passwordCmd)) {
+            passwordCallback = new CommandPasswordCallback(passwordCmd);
+          } else if (StringUtils.isNotBlank(password)) {
+            passwordCallback = new StaticPasswordCallback(password);
+          }
+
+          val callbackHandler: CallbackHandler = new CallbackHandler() {
+            override def handle(callbacks: Array[Callback]): Unit = callbacks.foreach {
+              case callback: NameCallback => callback.setName(impalaSaslUsername)
+              case callback: PasswordCallback => callback.setPassword(passwordCallback.getPassword)
+            }
+          }
+
+          new ImpalaThriftSessionFactory(
+            servers,
+            maxConnections,
+            socketFactory,
+            impalaSaslMechanism,
+            impalaSaslAuthorization,
+            impalaSaslProtocol,
+            saslProperties,
+            callbackHandler
+          )
+        } else {
+          new ImpalaThriftSessionFactory(servers, maxConnections, socketFactory)
+        }
+
+        val impalaClient = new ImpalaThriftClient(factory, impalaHeartbeatSeconds)
+        impalaClient.setQueryTimeoutInSeconds(impalaQueryTimeoutSeconds)
+        impalaClient.setBatchSize(impalaQueryBatchSize)
+        Option(impalaQueryOptions)
+          .map(_.split(','))
+          .getOrElse(Array[String]())
+          .foreach { str =>
+            val kv = StringUtils.split(str, "=", 2)
+            impalaClient.setQueryOption(kv(0), if (kv.length > 1) kv(1) else "")
+          }
+
+        client = impalaClient
+      }
+      client
+    }
+  }
+
+  private def createSocketFactory(
+      impalaSslEnable: Boolean,
+      impalaSslKeystore: String,
+      impalaSslKeystoreType: String,
+      impalaSslKeystorePassword: String,
+      impalaSslTruststore: String,
+      impalaSslTruststoreType: String,
+      impalaSslTruststorePassword: String
+  ): SocketFactory = {
+
+    if (impalaSslEnable) {
+      val keyStore: KeyStore = KeyStore.getInstance(impalaSslKeystoreType)
+      val keyStorePassword: Array[Char] = Option(impalaSslKeystorePassword)
+        .map(_.toCharArray)
+        .orNull
+
+      val in = new FileInputStream(impalaSslKeystore)
+      try {
+        keyStore.load(in, keyStorePassword)
+      } finally {
+        if (in != null) in.close()
+      }
+      val keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm);
+      keyManagerFactory.init(keyStore, keyStorePassword);
+      val keyManagers: Array[KeyManager] = keyManagerFactory.getKeyManagers;
+
+      var trustStore = keyStore
+      if (StringUtils.isNotBlank(impalaSslTruststore)) {
+        trustStore = KeyStore.getInstance(impalaSslTruststoreType)
+        val trustStorePassword: Array[Char] = Option(impalaSslTruststorePassword)
+          .map(_.toCharArray)
+          .orNull
+        val in = new FileInputStream(impalaSslTruststore)
+        try {
+          trustStore.load(in, trustStorePassword)
+        } finally {
+          if (in != null) in.close()
+        }
+      }
+
+      // create TrustManagerFactory
+      val trustManagerFactory =
+        TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
+      trustManagerFactory.init(trustStore)
+
+      // get X509TrustManager
+      val trustManagers = trustManagerFactory.getTrustManagers
+      if (trustManagers.length != 1 || !trustManagers(0).isInstanceOf[X509TrustManager]) {
+        throw new RuntimeException(
+          "Unexpected default trust managers:" + StringUtils.join(",", trustManagers)
+        )
+      }
+      val trustManager = trustManagers(0).asInstanceOf[X509TrustManager]
+
+      // create SSLContext
+      val sslContext = SSLContext.getInstance("TLS")
+      sslContext.init(keyManagers, Array[TrustManager](trustManager), null)
+
+      sslContext.getSocketFactory
+    } else {
+      null
+    }
+  }
+
+  override def executeCompletely(
+      engineExecutorContext: EngineExecutionContext,
+      code: String,
+      completedLine: String
+  ): ExecuteResponse = null
+
+}
diff --git a/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/factory/ImpalaEngineConnFactory.scala b/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/factory/ImpalaEngineConnFactory.scala
new file mode 100644
index 000000000..ee39a3845
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/factory/ImpalaEngineConnFactory.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.impala.factory
+
+import org.apache.linkis.engineconn.common.creation.EngineCreationContext
+import org.apache.linkis.engineconn.common.engineconn.EngineConn
+import org.apache.linkis.engineconn.computation.executor.creation.ComputationSingleExecutorEngineConnFactory
+import org.apache.linkis.engineconn.executor.entity.LabelExecutor
+import org.apache.linkis.engineplugin.impala.conf.ImpalaConfiguration
+import org.apache.linkis.engineplugin.impala.executor.ImpalaEngineConnExecutor
+import org.apache.linkis.manager.label.entity.engine.{EngineType, RunType}
+import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType
+import org.apache.linkis.manager.label.entity.engine.RunType.RunType
+
+class ImpalaEngineConnFactory extends ComputationSingleExecutorEngineConnFactory {
+
+  override def newExecutor(
+      id: Int,
+      engineCreationContext: EngineCreationContext,
+      engineConn: EngineConn
+  ): LabelExecutor = {
+    new ImpalaEngineConnExecutor(ImpalaConfiguration.DEFAULT_LIMIT.getValue, id)
+  }
+
+  override protected def getEngineConnType: EngineType = EngineType.IMPALA
+
+  override protected def getRunType: RunType = RunType.IMPALA_SQL
+
+}
diff --git a/linkis-engineconn-plugins/impala/src/test/scala/org/apache/linkis/engineplugin/impala/executer/TestImpalaEngineConnExecutor.scala b/linkis-engineconn-plugins/impala/src/test/scala/org/apache/linkis/engineplugin/impala/executer/TestImpalaEngineConnExecutor.scala
new file mode 100644
index 000000000..d3bff1685
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/test/scala/org/apache/linkis/engineplugin/impala/executer/TestImpalaEngineConnExecutor.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.impala.executer
+
+import org.apache.linkis.common.ServiceInstance
+import org.apache.linkis.common.conf.CommonVars
+import org.apache.linkis.common.utils.Utils
+import org.apache.linkis.engineconn.common.creation.{
+  DefaultEngineCreationContext,
+  EngineCreationContext
+}
+import org.apache.linkis.engineconn.computation.executor.entity.CommonEngineConnTask
+import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext
+import org.apache.linkis.engineconn.computation.executor.utlis.ComputationEngineConstant
+import org.apache.linkis.engineplugin.impala.executor.ImpalaEngineConnExecutor
+import org.apache.linkis.engineplugin.impala.factory.ImpalaEngineConnFactory
+import org.apache.linkis.governance.common.conf.GovernanceCommonConf
+import org.apache.linkis.governance.common.entity.ExecutionNodeStatus
+import org.apache.linkis.governance.common.utils.EngineConnArgumentsParser
+import org.apache.linkis.manager.engineplugin.common.launch.process.Environment
+import org.apache.linkis.manager.label.builder.factory.{
+  LabelBuilderFactory,
+  LabelBuilderFactoryContext
+}
+import org.apache.linkis.manager.label.entity.Label
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import org.junit.jupiter.api.Assertions
+
+class TestImpalaEngineConnExecutor {
+
+  private val engineCreationContext: EngineCreationContext = new DefaultEngineCreationContext
+
+  private val labelBuilderFactory: LabelBuilderFactory =
+    LabelBuilderFactoryContext.getLabelBuilderFactory
+
+//  @Test
+  def testExecuteLine: Unit = {
+    val engineconnConf = "--engineconn-conf"
+    val springConf = "--spring-conf"
+    val array = Array(
+      engineconnConf,
+      "wds.linkis.rm.instance=10",
+      engineconnConf,
+      "label.userCreator=root-IDE",
+      engineconnConf,
+      "ticketId=037ab855-0c41-4323-970d-7f75e71883b6",
+      engineconnConf,
+      "label.engineType=impala",
+      engineconnConf,
+      "linkis.impala.servers=192.168.21.121:21050",
+      engineconnConf,
+      "linkis.impala.sasl.enable=true",
+      engineconnConf,
+      "linkis.impala.sasl.username=root",
+      engineconnConf,
+      "linkis.impala.sasl.password=12345678",
+      engineconnConf,
+      "linkis.impala.default.start.user=root",
+      springConf,
+      "eureka.client.serviceUrl.defaultZone=http://127.0.0.1:8761/eureka/",
+      springConf,
+      "logging.config=classpath:log4j2.xml",
+      springConf,
+      "spring.profiles.active=engineconn",
+      springConf,
+      "server.port=35655",
+      springConf,
+      "spring.application.name=linkis-cg-engineconn"
+    )
+    this.init(array)
+    val cmd = "SHOW DATABASES"
+    val taskId = "1"
+    val task = new CommonEngineConnTask(taskId, false)
+    val properties = new util.HashMap[String, Object]
+    task.setProperties(properties)
+    task.data(ComputationEngineConstant.LOCK_TYPE_NAME, "lock")
+    task.setStatus(ExecutionNodeStatus.Scheduled)
+    val engineFactory: ImpalaEngineConnFactory = new ImpalaEngineConnFactory
+    val engine = engineFactory.createEngineConn(engineCreationContext)
+
+    val jdbcExecutor: ImpalaEngineConnExecutor = engineFactory
+      .newExecutor(1, engineCreationContext, engine)
+      .asInstanceOf[ImpalaEngineConnExecutor]
+    val engineExecutionContext = new EngineExecutionContext(jdbcExecutor, Utils.getJvmUser)
+    engineExecutionContext.setJobId(taskId)
+    val anyArray = engineCreationContext.getLabels().toArray()
+    engineExecutionContext.setLabels(anyArray.map(_.asInstanceOf[Label[_]]))
+    val testPath = this.getClass.getClassLoader.getResource("").getPath
+    engineExecutionContext.setStorePath(testPath)
+    engineCreationContext.getOptions.asScala.foreach({ case (key, value) =>
+      engineExecutionContext.addProperty(key, value)
+    })
+    Assertions.assertNotNull(jdbcExecutor.getProgressInfo(taskId))
+    val response = jdbcExecutor.executeLine(engineExecutionContext, cmd)
+    Assertions.assertNotNull(response)
+  }
+
+  private def init(args: Array[String]): Unit = {
+    val arguments = EngineConnArgumentsParser.getEngineConnArgumentsParser.parseToObj(args)
+    val engineConf = arguments.getEngineConnConfMap
+    this.engineCreationContext.setUser(engineConf.getOrElse("user", Utils.getJvmUser))
+    this.engineCreationContext.setTicketId(engineConf.getOrElse("ticketId", ""))
+    val host = CommonVars(Environment.ECM_HOST.toString, "127.0.0.1").getValue
+    val port = CommonVars(Environment.ECM_PORT.toString, "80").getValue
+    this.engineCreationContext.setEMInstance(
+      ServiceInstance(GovernanceCommonConf.ENGINE_CONN_MANAGER_SPRING_NAME.getValue, s"$host:$port")
+    )
+    val labels = new ArrayBuffer[Label[_]]
+    val labelArgs = engineConf.filter(_._1.startsWith(EngineConnArgumentsParser.LABEL_PREFIX))
+    if (labelArgs.nonEmpty) {
+      labelArgs.foreach { case (key, value) =>
+        labels += labelBuilderFactory
+          .createLabel[Label[_]](key.replace(EngineConnArgumentsParser.LABEL_PREFIX, ""), value)
+      }
+      engineCreationContext.setLabels(labels.toList.asJava)
+    }
+    val jMap = new java.util.HashMap[String, String](engineConf.size)
+    jMap.putAll(engineConf.asJava)
+    this.engineCreationContext.setOptions(jMap)
+    this.engineCreationContext.setArgs(args)
+    sys.props.asJava.putAll(jMap)
+  }
+
+}
diff --git a/linkis-engineconn-plugins/pom.xml b/linkis-engineconn-plugins/pom.xml
index 53f9be07d..9fdb4b85b 100644
--- a/linkis-engineconn-plugins/pom.xml
+++ b/linkis-engineconn-plugins/pom.xml
@@ -41,6 +41,7 @@
     <module>trino</module>
     <module>elasticsearch</module>
     <module>seatunnel</module>
+    <module>impala</module>
   </modules>
 
 </project>
diff --git a/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala b/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala
index a3a0d0d90..c526d5690 100644
--- a/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala
@@ -20,6 +20,10 @@ package org.apache.linkis.engineplugin.trino.executor
 import org.apache.linkis.common.log.LogUtils
 import org.apache.linkis.common.utils.{OverloadUtils, Utils}
 import org.apache.linkis.engineconn.common.conf.{EngineConnConf, EngineConnConstant}
+import org.apache.linkis.engineconn.common.password.{
+  CommandPasswordCallback,
+  StaticPasswordCallback
+}
 import org.apache.linkis.engineconn.computation.executor.execute.{
   ConcurrentComputationExecutor,
   EngineExecutionContext
@@ -32,10 +36,6 @@ import org.apache.linkis.engineplugin.trino.exception.{
   TrinoStateInvalidException
 }
 import org.apache.linkis.engineplugin.trino.interceptor.PasswordInterceptor
-import org.apache.linkis.engineplugin.trino.password.{
-  CommandPasswordCallback,
-  StaticPasswordCallback
-}
 import org.apache.linkis.engineplugin.trino.socket.SocketChannelSocketFactory
 import org.apache.linkis.engineplugin.trino.utils.{TrinoCode, TrinoSQLHook}
 import org.apache.linkis.governance.common.paser.SQLCodeParser
@@ -44,7 +44,6 @@ import org.apache.linkis.manager.common.entity.resource.{
   LoadResource,
   NodeResource
 }
-import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
 import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
 import org.apache.linkis.manager.label.entity.Label
 import org.apache.linkis.manager.label.entity.engine.{EngineTypeLabel, UserCreatorLabel}
diff --git a/pom.xml b/pom.xml
index 103f4acac..8abd27a40 100644
--- a/pom.xml
+++ b/pom.xml
@@ -110,6 +110,7 @@
     <io_file.version>1.0</io_file.version>
     <jdbc.version>4</jdbc.version>
     <trino.version>371</trino.version>
+    <impala.version>3.4.0.7.2.15.0-147</impala.version>
     <openlookeng.version>1.5.0</openlookeng.version>
     <pipeline.version>1</pipeline.version>
     <presto.version>0.234</presto.version>
diff --git a/tool/dependencies/known-dependencies.txt b/tool/dependencies/known-dependencies.txt
index 4a428dc18..9a1c6da19 100644
--- a/tool/dependencies/known-dependencies.txt
+++ b/tool/dependencies/known-dependencies.txt
@@ -234,6 +234,8 @@ httpcore-4.4.14.jar
 httpcore-nio-4.4.14.jar
 httpmime-4.5.13.jar
 hystrix-core-1.5.18.jar
+impala-frontend-3.4.0.7.2.15.0-147.jar
+impala-minimal-hive-exec-3.4.0.7.2.15.0-147.jar
 ini4j-0.5.4.jar
 ion-java-1.0.2.jar
 istack-commons-runtime-3.0.12.jar


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org