You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2023/04/25 04:19:03 UTC
[linkis] branch dev-1.4.0 updated: add impala engineconn-plugin (#4458)
This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.4.0
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/dev-1.4.0 by this push:
new 265d5d3fe add impala engineconn-plugin (#4458)
265d5d3fe is described below
commit 265d5d3fe65f3533ed43c9dba67c5daa2cb210ea
Author: Knypys <si...@qq.com>
AuthorDate: Tue Apr 25 12:18:53 2023 +0800
add impala engineconn-plugin (#4458)
* add impala engine plugin
---
.../common}/password/CommandPasswordCallback.java | 4 +-
.../common}/password/StaticPasswordCallback.java | 2 +-
.../linkis/manager/am/conf/AMConfiguration.scala | 4 +-
.../manager/label/entity/engine/EngineType.scala | 2 +
.../manager/label/entity/engine/RunType.scala | 2 +-
linkis-engineconn-plugins/impala/pom.xml | 140 ++++++
.../impala/src/main/assembly/distribution.xml | 70 +++
.../impala/client/ExecutionListener.java} | 27 +-
.../engineplugin/impala/client/ImpalaClient.java | 77 +++
.../impala/client/ImpalaResultSet.java} | 43 +-
.../client/exception/ImpalaEngineException.java | 52 +++
.../client/exception/ImpalaErrorCodeSummary.java | 51 ++
.../impala/client/protocol/ExecHandler.java | 72 +++
.../impala/client/protocol/ExecProgress.java} | 33 +-
.../impala/client/protocol/ExecStatus.java | 72 +++
.../impala/client/protocol/ExecSummary.java} | 31 +-
.../impala/client/protocol/QueryColumn.java} | 27 +-
.../impala/client/thrift/ImpalaThriftClient.java | 278 +++++++++++
.../client/thrift/ImpalaThriftExecution.java | 186 ++++++++
.../client/thrift/ImpalaThriftResultSetV7.java | 361 +++++++++++++++
.../impala/client/thrift/ImpalaThriftSession.java | 104 +++++
.../client/thrift/ImpalaThriftSessionFactory.java | 196 ++++++++
.../impala/client/util/ThriftUtil.java | 65 +++
.../main/resources/linkis-engineconn.properties | 23 +
.../impala/src/main/resources/log4j2.xml | 89 ++++
.../impala/ImpalaEngineConnPlugin.scala | 66 +++
.../ImpalaProcessEngineConnLaunchBuilder.scala} | 29 +-
.../impala/conf/ImpalaConfiguration.scala | 68 +++
.../impala/conf/ImpalaEngineConfig.scala | 48 ++
.../impala/executor/ImpalaEngineConnExecutor.scala | 514 +++++++++++++++++++++
.../impala/factory/ImpalaEngineConnFactory.scala | 44 ++
.../executer/TestImpalaEngineConnExecutor.scala | 144 ++++++
linkis-engineconn-plugins/pom.xml | 1 +
.../trino/executor/TrinoEngineConnExecutor.scala | 9 +-
pom.xml | 1 +
tool/dependencies/known-dependencies.txt | 2 +
36 files changed, 2839 insertions(+), 98 deletions(-)
diff --git a/linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/CommandPasswordCallback.java b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/java/org/apache/linkis/engineconn/common/password/CommandPasswordCallback.java
similarity index 93%
rename from linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/CommandPasswordCallback.java
rename to linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/java/org/apache/linkis/engineconn/common/password/CommandPasswordCallback.java
index c26e0790c..8956f9842 100644
--- a/linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/CommandPasswordCallback.java
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/java/org/apache/linkis/engineconn/common/password/CommandPasswordCallback.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.trino.password;
+package org.apache.linkis.engineconn.common.password;
import org.apache.commons.io.IOUtils;
@@ -65,7 +65,7 @@ public class CommandPasswordCallback extends PasswordCallback {
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
- throw new RuntimeException("Failed to authenticate with command: " + prompt, e);
+ throw new RuntimeException("Failed to get password by command: " + prompt, e);
} finally {
if (process != null) {
process.destroy();
diff --git a/linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/java/org/apache/linkis/engineconn/common/password/StaticPasswordCallback.java
similarity index 95%
copy from linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java
copy to linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/java/org/apache/linkis/engineconn/common/password/StaticPasswordCallback.java
index 6d198bbae..f7b1cc4ba 100644
--- a/linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/java/org/apache/linkis/engineconn/common/password/StaticPasswordCallback.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.trino.password;
+package org.apache.linkis.engineconn.common.password;
import javax.security.auth.callback.PasswordCallback;
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/conf/AMConfiguration.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/conf/AMConfiguration.scala
index c730edf0f..acc5afc25 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/conf/AMConfiguration.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/conf/AMConfiguration.scala
@@ -17,7 +17,7 @@
package org.apache.linkis.manager.am.conf
-import org.apache.linkis.common.conf.{CommonVars, Configuration, TimeType}
+import org.apache.linkis.common.conf.{CommonVars, TimeType}
import org.apache.linkis.common.utils.Utils
import org.apache.linkis.manager.common.entity.enumeration.MaintainType
@@ -57,7 +57,7 @@ object AMConfiguration {
val MULTI_USER_ENGINE_TYPES = CommonVars(
"wds.linkis.multi.user.engine.types",
- "jdbc,es,presto,io_file,appconn,openlookeng,trino"
+ "jdbc,es,presto,io_file,appconn,openlookeng,trino,impala"
)
val ALLOW_BATCH_KILL_ENGINE_TYPES =
diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineType.scala b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineType.scala
index 47cf78b7f..d47bb8ec3 100644
--- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineType.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineType.scala
@@ -61,6 +61,8 @@ object EngineType extends Enumeration with Logging {
val SEATUNNEL = Value("seatunnel")
+ val IMPALA = Value("impala")
+
def mapFsTypeToEngineType(fsType: String): String = {
fsType match {
case "file" =>
diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
index f8ba133f3..0dee150d2 100644
--- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
@@ -43,11 +43,11 @@ object RunType extends Enumeration {
val TRINO_SQL = Value("tsql")
-
val SEATUNNEL_FLINK_SQL = Value("sfsql")
val SEATUNNEL_FLINK = Value("sflink")
val SEATUNNEL_SPARK = Value("sspark")
val DATA_CALC = Value("data_calc") // spark datacalc (ETL)
+ val IMPALA_SQL = Value("isql")
}
diff --git a/linkis-engineconn-plugins/impala/pom.xml b/linkis-engineconn-plugins/impala/pom.xml
new file mode 100644
index 000000000..1557d3ef1
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/pom.xml
@@ -0,0 +1,140 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.linkis</groupId>
+ <artifactId>linkis</artifactId>
+ <version>${revision}</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>linkis-engineplugin-impala</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.linkis</groupId>
+ <artifactId>linkis-engineconn-plugin-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.linkis</groupId>
+ <artifactId>linkis-computation-engineconn</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.linkis</groupId>
+ <artifactId>linkis-storage</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.linkis</groupId>
+ <artifactId>linkis-common</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.linkis</groupId>
+ <artifactId>linkis-rpc</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ <version>0.15.0</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- impala -->
+ <dependency>
+ <groupId>org.apache.impala</groupId>
+ <artifactId>impala-frontend</artifactId>
+ <version>${impala.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.impala</groupId>
+ <artifactId>impala-minimal-hive-exec</artifactId>
+ <version>${impala.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+
+ <repositories>
+ <repository>
+ <id>cloudera.releases.https</id>
+ <name>Cloudera Repository</name>
+ <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
+ </repository>
+ </repositories>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <inherited>false</inherited>
+ <configuration>
+ <skipAssembly>false</skipAssembly>
+ <finalName>out</finalName>
+ <appendAssemblyId>false</appendAssemblyId>
+ <attach>false</attach>
+ <descriptors>
+ <descriptor>src/main/assembly/distribution.xml</descriptor>
+ </descriptors>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <descriptors>
+ <descriptor>src/main/assembly/distribution.xml</descriptor>
+ </descriptors>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/linkis-engineconn-plugins/impala/src/main/assembly/distribution.xml b/linkis-engineconn-plugins/impala/src/main/assembly/distribution.xml
new file mode 100644
index 000000000..5462a670d
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/main/assembly/distribution.xml
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.1.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.1.1 https://maven.apache.org/xsd/assembly-2.1.1.xsd">
+ <id>linkis-manager-enginePlugin-impala</id>
+ <formats>
+ <format>dir</format>
+ <format>zip</format>
+ </formats>
+ <includeBaseDirectory>true</includeBaseDirectory>
+ <baseDirectory>impala</baseDirectory>
+
+ <dependencySets>
+ <dependencySet>
+ <!-- Enable access to all projects in the current multimodule build! <useAllReactorProjects>true</useAllReactorProjects> -->
+ <!-- Now, select which projects to include in this module-set. -->
+ <outputDirectory>/dist/v${impala.version}/lib</outputDirectory>
+ <useProjectArtifact>true</useProjectArtifact>
+ <useTransitiveDependencies>true</useTransitiveDependencies>
+ <unpack>false</unpack>
+ <useStrictFiltering>false</useStrictFiltering>
+ <useTransitiveFiltering>true</useTransitiveFiltering>
+
+ </dependencySet>
+ </dependencySets>
+
+ <fileSets>
+ <fileSet>
+ <directory>${basedir}/src/main/resources</directory>
+ <includes>
+ <include>linkis-engineconn.properties</include>
+ <include>log4j2.xml</include>
+ </includes>
+ <fileMode>0777</fileMode>
+ <outputDirectory>/dist/v${impala.version}/conf</outputDirectory>
+ <lineEnding>unix</lineEnding>
+ </fileSet>
+
+ <fileSet>
+ <directory>${basedir}/target</directory>
+ <includes>
+ <include>*.jar</include>
+ </includes>
+ <excludes>
+ <exclude>*doc.jar</exclude>
+ </excludes>
+ <fileMode>0777</fileMode>
+ <outputDirectory>/plugin/${impala.version}</outputDirectory>
+ </fileSet>
+
+ </fileSets>
+
+</assembly>
+
diff --git a/linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/ExecutionListener.java
similarity index 60%
copy from linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java
copy to linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/ExecutionListener.java
index 6d198bbae..c34c6b092 100644
--- a/linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java
+++ b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/ExecutionListener.java
@@ -15,26 +15,21 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.trino.password;
+package org.apache.linkis.engineplugin.impala.client;
-import javax.security.auth.callback.PasswordCallback;
+import org.apache.linkis.engineplugin.impala.client.protocol.ExecProgress;
+import org.apache.linkis.engineplugin.impala.client.protocol.ExecStatus;
-public class StaticPasswordCallback extends PasswordCallback {
+import java.util.List;
- private final char[] password;
+public interface ExecutionListener {
+ void created(String queryId);
- public StaticPasswordCallback(String prompt, boolean echoOn) {
- super(prompt, echoOn);
- this.password = prompt.toCharArray();
- }
+ void success(ImpalaResultSet resultSet);
- public StaticPasswordCallback(String prompt) {
- super(prompt, false);
- this.password = prompt.toCharArray();
- }
+ void error(ExecStatus status);
- @Override
- public char[] getPassword() {
- return password;
- }
+ void progress(ExecProgress progress);
+
+ void message(List<String> messages);
}
diff --git a/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/ImpalaClient.java b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/ImpalaClient.java
new file mode 100644
index 000000000..1f7bc93e7
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/ImpalaClient.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.impala.client;
+
+import org.apache.linkis.engineplugin.impala.client.exception.ImpalaEngineException;
+import org.apache.linkis.engineplugin.impala.client.protocol.ExecProgress;
+import org.apache.linkis.engineplugin.impala.client.protocol.ExecStatus;
+import org.apache.linkis.engineplugin.impala.client.protocol.ExecSummary;
+
+import java.util.Map;
+
+public interface ImpalaClient extends AutoCloseable {
+
+ /**
+ * execute async with listener
+ *
+ * @return queryId
+ */
+ default String executeAsync(String sql, ExecutionListener executionListener)
+ throws ImpalaEngineException {
+ return executeAsync(sql, executionListener, null);
+ }
+
+ /**
+ * execute async with listener and options
+ *
+ * @return queryId
+ */
+ String executeAsync(
+ String sql, ExecutionListener executionListener, Map<String, String> queryOptions)
+ throws ImpalaEngineException;
+
+ /** execute with listener */
+ default void execute(String sql, ExecutionListener executionListener)
+ throws ImpalaEngineException, InterruptedException {
+ execute(sql, executionListener, null);
+ }
+
+ /** execute with listener and options */
+ void execute(String sql, ExecutionListener executionListener, Map<String, String> queryOptions)
+ throws ImpalaEngineException, InterruptedException;
+
+ /** cancel execution */
+ void cancel(String queryId) throws ImpalaEngineException;
+
+ /** get execution summary */
+ ExecSummary getExecSummary(String queryId) throws ImpalaEngineException;
+
+ /** get execution summary progress */
+ ExecProgress getExecProgress(String queryId) throws ImpalaEngineException;
+
+ /** get execution status */
+ ExecStatus getExecStatus(String queryId) throws ImpalaEngineException;
+
+ void setQueryOption(String key, String value) throws ImpalaEngineException;
+
+ Map<String, String> getQueryOptions() throws ImpalaEngineException;
+
+ void unsetQueryOption(String key) throws ImpalaEngineException;
+
+ int getRunningExecutionCount();
+}
diff --git a/linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/ImpalaResultSet.java
similarity index 55%
copy from linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java
copy to linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/ImpalaResultSet.java
index 6d198bbae..9390b7481 100644
--- a/linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java
+++ b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/ImpalaResultSet.java
@@ -15,26 +15,41 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.trino.password;
+package org.apache.linkis.engineplugin.impala.client;
-import javax.security.auth.callback.PasswordCallback;
+import org.apache.hive.service.rpc.thrift.TTypeDesc;
-public class StaticPasswordCallback extends PasswordCallback {
+import java.util.List;
- private final char[] password;
+public interface ImpalaResultSet extends AutoCloseable {
- public StaticPasswordCallback(String prompt, boolean echoOn) {
- super(prompt, echoOn);
- this.password = prompt.toCharArray();
- }
+ Row next();
+
+ int getColumnSize();
+
+ List<? extends Column> getColumns();
+
+ interface Row {
+ Object[] getValues();
+
+ Object getObject(int columnIndex);
+
+ <T> T getObject(int columnIndex, Class<T> clasz);
- public StaticPasswordCallback(String prompt) {
- super(prompt, false);
- this.password = prompt.toCharArray();
+ String getString(int columnIndex);
+
+ Short getShort(int columnIndex);
+
+ Integer getInteger(int columnIndex);
+
+ Long getLong(int columnIndex);
}
- @Override
- public char[] getPassword() {
- return password;
+ interface Column {
+ int getIndex();
+
+ String getName();
+
+ TTypeDesc getType();
}
}
diff --git a/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/exception/ImpalaEngineException.java b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/exception/ImpalaEngineException.java
new file mode 100644
index 000000000..f2ec47f7b
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/exception/ImpalaEngineException.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.impala.client.exception;
+
+public class ImpalaEngineException extends RuntimeException {
+
+ /** */
+ private static final long serialVersionUID = 5045965784519089392L;
+
+ public ImpalaEngineException(String message) {
+ super(message);
+ }
+
+ public ImpalaEngineException(Exception exception) {
+ super(exception);
+ }
+
+ public ImpalaEngineException(String message, Exception exception) {
+ super(message, exception);
+ }
+
+ public static ImpalaEngineException of(ImpalaErrorCodeSummary code) {
+ return new ImpalaEngineException(code.getErrorDesc());
+ }
+
+ public static ImpalaEngineException of(ImpalaErrorCodeSummary code, String massage) {
+ return new ImpalaEngineException(String.format("%s, %s", code.getErrorDesc(), massage));
+ }
+
+ public static ImpalaEngineException of(ImpalaErrorCodeSummary code, Exception exception) {
+ return new ImpalaEngineException(code.getErrorDesc(), exception);
+ }
+
+ public static ImpalaEngineException of(Exception exception) {
+ return new ImpalaEngineException(exception);
+ }
+}
diff --git a/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/exception/ImpalaErrorCodeSummary.java b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/exception/ImpalaErrorCodeSummary.java
new file mode 100644
index 000000000..1511130bb
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/exception/ImpalaErrorCodeSummary.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.impala.client.exception;
+
+import org.apache.linkis.common.errorcode.ErrorCodeUtils;
+import org.apache.linkis.common.errorcode.LinkisErrorCode;
+
+public enum ImpalaErrorCodeSummary implements LinkisErrorCode {
+ ClosedError(29040, "session is closed(当前会话已断开)"),
+ ExecutionError(29041, "server report an error(执行失败)"),
+ RequestError(29042, "failed to send request to server(请求提交失败)"),
+ StillRunningError(29043, "target is still running(任务正在运行中)"),
+ InvalidHandleError(29044, "current handle is invalid(当前会话不存在)"),
+ ParallelLimitError(29045, "reach the parallel limit(当前已达到最大并行度配置)"),
+ LoginError(29046, "failed to login to target server(服务器认证失败)");
+
+ private final int errorCode;
+
+ private final String errorDesc;
+
+ ImpalaErrorCodeSummary(int errorCode, String errorDesc) {
+ ErrorCodeUtils.validateErrorCode(errorCode, 29000, 29999);
+ this.errorCode = errorCode;
+ this.errorDesc = errorDesc;
+ }
+
+ @Override
+ public int getErrorCode() {
+ return errorCode;
+ }
+
+ @Override
+ public String getErrorDesc() {
+ return errorDesc;
+ }
+}
diff --git a/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/protocol/ExecHandler.java b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/protocol/ExecHandler.java
new file mode 100644
index 000000000..430c3bc99
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/protocol/ExecHandler.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.impala.client.protocol;
+
+import org.apache.linkis.engineplugin.impala.client.ExecutionListener;
+
+public abstract class ExecHandler<T> implements AutoCloseable {
+ private String queryId;
+ private T handler;
+ private ExecutionListener executionListener = null;
+ private int errors;
+
+ private boolean isQueued;
+
+ public ExecHandler() {}
+
+ public ExecHandler(String queryId, T handler, ExecutionListener executionListener) {
+ this.queryId = queryId;
+ this.handler = handler;
+
+ if (executionListener != null) {
+ this.executionListener = executionListener;
+ executionListener.created(queryId);
+ }
+
+ this.errors = 0;
+ this.isQueued = false;
+ }
+
+ public int markError() {
+ return ++errors;
+ }
+
+ public String getQueryId() {
+ return queryId;
+ }
+
+ public T getHandler() {
+ return handler;
+ }
+
+ public ExecutionListener getResultListener() {
+ return executionListener;
+ }
+
+ public int getErrors() {
+ return errors;
+ }
+
+ public boolean isQueued() {
+ return isQueued;
+ }
+
+ public void setQueued(boolean queued) {
+ isQueued = queued;
+ }
+}
diff --git a/linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/protocol/ExecProgress.java
similarity index 52%
copy from linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java
copy to linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/protocol/ExecProgress.java
index 6d198bbae..078889624 100644
--- a/linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java
+++ b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/protocol/ExecProgress.java
@@ -15,26 +15,33 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.trino.password;
+package org.apache.linkis.engineplugin.impala.client.protocol;
-import javax.security.auth.callback.PasswordCallback;
+public class ExecProgress {
+ public static final ExecProgress DEFAULT_PROGRESS = new ExecProgress(-1, 0, -1);
-public class StaticPasswordCallback extends PasswordCallback {
+ private long totalScanRanges;
+ private long completedScanRanges;
+ private int runningNodes;
- private final char[] password;
+ public ExecProgress() {}
- public StaticPasswordCallback(String prompt, boolean echoOn) {
- super(prompt, echoOn);
- this.password = prompt.toCharArray();
+ public ExecProgress(long totalScanRanges, long completedScanRanges, int runningNodes) {
+ super();
+ this.totalScanRanges = totalScanRanges;
+ this.completedScanRanges = completedScanRanges;
+ this.runningNodes = runningNodes;
}
- public StaticPasswordCallback(String prompt) {
- super(prompt, false);
- this.password = prompt.toCharArray();
+ public long getTotalScanRanges() {
+ return totalScanRanges;
}
- @Override
- public char[] getPassword() {
- return password;
+ public long getCompletedScanRanges() {
+ return completedScanRanges;
+ }
+
+ public int getRunningNodes() {
+ return runningNodes;
}
}
diff --git a/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/protocol/ExecStatus.java b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/protocol/ExecStatus.java
new file mode 100644
index 000000000..26f6dc824
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/protocol/ExecStatus.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.impala.client.protocol;
+
+import org.apache.hive.service.rpc.thrift.TOperationState;
+
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
+
+public class ExecStatus {
+ private static final Set<Integer> ACTIVE_STATE =
+ ImmutableSet.of(
+ TOperationState.INITIALIZED_STATE.getValue(),
+ TOperationState.RUNNING_STATE.getValue(),
+ TOperationState.PENDING_STATE.getValue());
+
+ private static final Set<Integer> ERROR_STATE =
+ ImmutableSet.of(
+ TOperationState.ERROR_STATE.getValue(),
+ TOperationState.CLOSED_STATE.getValue(),
+ TOperationState.CANCELED_STATE.getValue(),
+ TOperationState.UKNOWN_STATE.getValue());
+
+ private int code;
+ private String name;
+ private String errorMessage;
+
+ public ExecStatus() {}
+
+ public ExecStatus(int code, String name, String errorMessage) {
+ super();
+ this.code = code;
+ this.name = name;
+ this.errorMessage = errorMessage;
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getErrorMessage() {
+ return errorMessage;
+ }
+
+ public boolean isActive() {
+ return ACTIVE_STATE.contains(code);
+ }
+
+ public boolean hasError() {
+ return ERROR_STATE.contains(code);
+ }
+}
diff --git a/linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/protocol/ExecSummary.java
similarity index 60%
copy from linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java
copy to linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/protocol/ExecSummary.java
index 6d198bbae..9ff786a5d 100644
--- a/linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java
+++ b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/protocol/ExecSummary.java
@@ -15,26 +15,31 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.trino.password;
+package org.apache.linkis.engineplugin.impala.client.protocol;
-import javax.security.auth.callback.PasswordCallback;
+public class ExecSummary {
+ private ExecStatus status;
+ private ExecProgress progress;
+ private int nodeNum;
-public class StaticPasswordCallback extends PasswordCallback {
+ public ExecSummary() {}
- private final char[] password;
+ public ExecSummary(ExecStatus status, ExecProgress progress, int nodeNum) {
+ super();
+ this.status = status;
+ this.progress = progress;
+ this.nodeNum = nodeNum;
+ }
- public StaticPasswordCallback(String prompt, boolean echoOn) {
- super(prompt, echoOn);
- this.password = prompt.toCharArray();
+ public ExecStatus getStatus() {
+ return status;
}
- public StaticPasswordCallback(String prompt) {
- super(prompt, false);
- this.password = prompt.toCharArray();
+ public ExecProgress getProgress() {
+ return progress;
}
- @Override
- public char[] getPassword() {
- return password;
+ public int getNodeNum() {
+ return nodeNum;
}
}
diff --git a/linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/protocol/QueryColumn.java
similarity index 61%
copy from linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java
copy to linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/protocol/QueryColumn.java
index 6d198bbae..9a0fc2231 100644
--- a/linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java
+++ b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/protocol/QueryColumn.java
@@ -15,26 +15,25 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.trino.password;
+package org.apache.linkis.engineplugin.impala.client.protocol;
-import javax.security.auth.callback.PasswordCallback;
+public class QueryColumn {
+ private String label;
+ private int index;
-public class StaticPasswordCallback extends PasswordCallback {
+ public QueryColumn() {}
- private final char[] password;
-
- public StaticPasswordCallback(String prompt, boolean echoOn) {
- super(prompt, echoOn);
- this.password = prompt.toCharArray();
+ public QueryColumn(String label, int index) {
+ super();
+ this.label = label;
+ this.index = index;
}
- public StaticPasswordCallback(String prompt) {
- super(prompt, false);
- this.password = prompt.toCharArray();
+ public String getLabel() {
+ return label;
}
- @Override
- public char[] getPassword() {
- return password;
+ public int getIndex() {
+ return index;
}
}
diff --git a/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftClient.java b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftClient.java
new file mode 100644
index 000000000..63d363fa9
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftClient.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.impala.client.thrift;
+
+import org.apache.linkis.engineplugin.impala.client.ExecutionListener;
+import org.apache.linkis.engineplugin.impala.client.ImpalaClient;
+import org.apache.linkis.engineplugin.impala.client.exception.ImpalaEngineException;
+import org.apache.linkis.engineplugin.impala.client.exception.ImpalaErrorCodeSummary;
+import org.apache.linkis.engineplugin.impala.client.protocol.ExecProgress;
+import org.apache.linkis.engineplugin.impala.client.protocol.ExecStatus;
+import org.apache.linkis.engineplugin.impala.client.protocol.ExecSummary;
+import org.apache.linkis.engineplugin.impala.client.util.ThriftUtil;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hive.service.rpc.thrift.TExecuteStatementReq;
+import org.apache.hive.service.rpc.thrift.TExecuteStatementResp;
+import org.apache.hive.service.rpc.thrift.TOperationHandle;
+import org.apache.hive.service.rpc.thrift.TSessionHandle;
+import org.apache.impala.thrift.ImpalaHiveServer2Service;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ImpalaThriftClient extends TimerTask implements ImpalaClient {
+
+ public static final Logger LOG = LoggerFactory.getLogger(ImpalaThriftClient.class.getName());
+
+ private final Map<String, String> queryOptions;
+ private final Map<String, ImpalaThriftExecution> executions;
+ private final ImpalaThriftSessionFactory sessionFactory;
+ private final ScheduledExecutorService executorService;
+ private final long heartBeatsMillis;
+
+ private int batchSize = 1000;
+ private long queryTimeoutMillis = 3600 * 1000;
+ private volatile boolean closed;
+
+ public ImpalaThriftClient(ImpalaThriftSessionFactory sessionFactory, int heartBeatsInSeconds) {
+ this.sessionFactory = sessionFactory;
+ this.executions = new ConcurrentHashMap<>();
+ this.queryOptions = new ConcurrentHashMap<>();
+ this.closed = false;
+
+ this.executorService = Executors.newSingleThreadScheduledExecutor();
+
+ this.heartBeatsMillis = Math.max(heartBeatsInSeconds, 1);
+ executorService.schedule(this, heartBeatsInSeconds, TimeUnit.SECONDS);
+ }
+
+ public void setBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ }
+
+ public void setQueryTimeoutInSeconds(int queryTimeout) {
+ this.queryTimeoutMillis = 1000L * queryTimeout;
+ }
+
+ @Override
+ public synchronized void close() {
+ closed = true;
+ executorService.shutdownNow();
+ for (ImpalaThriftExecution execution : executions.values()) {
+ execution.cancel();
+ }
+ }
+
+ private ImpalaThriftExecution submit(
+ String sql,
+ Map<String, String> queryOptions,
+ ExecutionListener executionListener,
+ boolean sync)
+ throws TException, ImpalaEngineException, InterruptedException, IOException {
+ if (closed) {
+ throw ImpalaEngineException.of(ImpalaErrorCodeSummary.ClosedError);
+ }
+
+ ImpalaThriftSession impalaSession = sessionFactory.openSession();
+
+ TSessionHandle session = impalaSession.session();
+ ImpalaHiveServer2Service.Client client = impalaSession.client();
+
+ TExecuteStatementReq req = new TExecuteStatementReq(session, sql);
+ req.setRunAsync(false);
+
+ Map<String, String> options = new TreeMap<>(this.queryOptions);
+ if (queryOptions != null && queryOptions.size() > 0) {
+ options.putAll(queryOptions);
+ }
+ req.setConfOverlay(options);
+
+ TExecuteStatementResp res = client.ExecuteStatement(req);
+ ThriftUtil.checkStatus(res.getStatus(), executionListener);
+
+ TOperationHandle operation = res.getOperationHandle();
+ if (operation == null) {
+ throw ImpalaEngineException.of(ImpalaErrorCodeSummary.RequestError);
+ }
+
+ String queryId = ThriftUtil.convertUniqueId(operation.getOperationId().getGuid());
+ return new ImpalaThriftExecution(impalaSession, operation, queryId, executionListener, sync);
+ }
+
+ private boolean progress(ImpalaThriftExecution execution) {
+ try {
+ ExecSummary summary = execution.getExecSummary();
+
+ ExecutionListener listener = execution.getListener();
+ ExecStatus status = summary.getStatus();
+ if (status.isActive()) {
+ /* executing progress */
+ if (listener != null && summary.getProgress() != null) {
+ listener.progress(summary.getProgress());
+ }
+
+ /* check execution timeout */
+ return queryTimeoutMillis <= 0
+ || System.currentTimeMillis() - execution.getTimestamp() < queryTimeoutMillis;
+ }
+
+ if (status.hasError()) {
+ if (listener != null) {
+ listener.error(summary.getStatus());
+ }
+ } else {
+ execution.tryFetchResult(batchSize);
+ }
+ return false;
+ } catch (ImpalaEngineException | TException e) {
+ LOG.warn("Progress failed for: {}", execution.getQueryId(), e);
+ if (execution.errorIncrement() >= 3) {
+ return false;
+ }
+
+ /* check execution timeout */
+ return queryTimeoutMillis <= 0
+ || System.currentTimeMillis() - execution.getTimestamp() < queryTimeoutMillis;
+ }
+ }
+
+ @Override
+ public void execute(
+ String sql, ExecutionListener executionListener, Map<String, String> queryOptions)
+ throws ImpalaEngineException, InterruptedException {
+ try (ImpalaThriftExecution execution = submit(sql, queryOptions, executionListener, true)) {
+ executions.put(execution.getQueryId(), execution);
+ while (progress(execution)) {
+ Thread.sleep(heartBeatsMillis);
+ }
+ } catch (IOException | TException e) {
+ throw ImpalaEngineException.of(ImpalaErrorCodeSummary.RequestError, e);
+ }
+ }
+
+ @Override
+ public String executeAsync(
+ String sql, ExecutionListener executionListener, Map<String, String> queryOptions)
+ throws ImpalaEngineException {
+ try {
+ ImpalaThriftExecution execution = submit(sql, queryOptions, executionListener, false);
+ String queryId = execution.getQueryId();
+ executions.put(queryId, execution);
+ return queryId;
+ } catch (TException | InterruptedException | IOException e) {
+ throw ImpalaEngineException.of(ImpalaErrorCodeSummary.RequestError, e);
+ }
+ }
+
+ @Override
+ public void cancel(String queryId) {
+ cancel(executions.get(queryId));
+ }
+
+ private void cancel(ImpalaThriftExecution execution) {
+ if (execution != null) {
+ execution.cancel();
+ }
+ }
+
+ @Override
+ public ExecSummary getExecSummary(String queryId) throws ImpalaEngineException {
+ ImpalaThriftExecution execution = executions.get(queryId);
+ if (execution != null) {
+ try {
+ return execution.getExecSummary();
+ } catch (TException e) {
+ throw ImpalaEngineException.of(ImpalaErrorCodeSummary.RequestError, e);
+ }
+ }
+ throw new IllegalArgumentException("query not found: " + queryId);
+ }
+
+ @Override
+ public ExecProgress getExecProgress(String queryId) throws ImpalaEngineException {
+ ExecSummary summary = getExecSummary(queryId);
+ if (summary != null) {
+ return summary.getProgress();
+ }
+ return null;
+ }
+
+ @Override
+ public ExecStatus getExecStatus(String queryId) throws ImpalaEngineException {
+ ImpalaThriftExecution execution = executions.get(queryId);
+ if (execution != null) {
+ try {
+ return execution.getExecStatus();
+ } catch (TException e) {
+ e.printStackTrace();
+ }
+ }
+ throw new IllegalArgumentException("query not found: " + queryId);
+ }
+
+ @Override
+ public void setQueryOption(String key, String value) {
+ if (StringUtils.isNotBlank(value)) {
+ queryOptions.put(key, value);
+ }
+ }
+
+ @Override
+ public void unsetQueryOption(String key) {
+ queryOptions.remove(key);
+ }
+
+ @Override
+ public void run() {
+ final ArrayList<ImpalaThriftExecution> runningExecutions = new ArrayList<>(executions.values());
+ LOG.debug("Impala client heart beats, {} running executions", runningExecutions.size());
+ for (ImpalaThriftExecution execution : runningExecutions) {
+ /* sync execution, skipping */
+ if (execution.isSync()) {
+ continue;
+ }
+
+ if (execution.isClosed() || !progress(execution)) {
+ executions.remove(execution.getQueryId());
+ }
+ }
+ }
+
+ @Override
+ public int getRunningExecutionCount() {
+ return sessionFactory.openedSessionCount();
+ }
+
+ @Override
+ public Map<String, String> getQueryOptions() {
+ return ImmutableMap.copyOf(queryOptions);
+ }
+}
diff --git a/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftExecution.java b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftExecution.java
new file mode 100644
index 000000000..a8c3dde24
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftExecution.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.impala.client.thrift;
+
+import org.apache.linkis.engineplugin.impala.client.ExecutionListener;
+import org.apache.linkis.engineplugin.impala.client.exception.ImpalaEngineException;
+import org.apache.linkis.engineplugin.impala.client.protocol.ExecProgress;
+import org.apache.linkis.engineplugin.impala.client.protocol.ExecStatus;
+import org.apache.linkis.engineplugin.impala.client.protocol.ExecSummary;
+import org.apache.linkis.engineplugin.impala.client.util.ThriftUtil;
+
+import org.apache.hive.service.rpc.thrift.TCancelOperationReq;
+import org.apache.hive.service.rpc.thrift.TCloseOperationReq;
+import org.apache.hive.service.rpc.thrift.TGetOperationStatusReq;
+import org.apache.hive.service.rpc.thrift.TGetOperationStatusResp;
+import org.apache.hive.service.rpc.thrift.TOperationHandle;
+import org.apache.hive.service.rpc.thrift.TOperationState;
+import org.apache.impala.thrift.TExecProgress;
+import org.apache.impala.thrift.TExecSummary;
+import org.apache.impala.thrift.TGetExecSummaryReq;
+import org.apache.impala.thrift.TGetExecSummaryResp;
+import org.apache.thrift.TException;
+
+import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ImpalaThriftExecution implements AutoCloseable {
+ public static final Logger LOG = LoggerFactory.getLogger(ImpalaThriftExecution.class.getName());
+
+ private final ImpalaThriftSession impalaSession;
+ private final TOperationHandle operation;
+ private final String queryId;
+ private final ExecutionListener listener;
+ private final boolean sync;
+
+ private long timestamp;
+ private int errors;
+
+ private volatile boolean closed;
+
+ public ImpalaThriftExecution(
+ ImpalaThriftSession impalaSession,
+ TOperationHandle operation,
+ String queryId,
+ ExecutionListener listener,
+ boolean sync) {
+ this.impalaSession = impalaSession;
+ this.operation = operation;
+ this.queryId = queryId;
+ this.listener = listener;
+ this.sync = sync;
+
+ if (listener != null) {
+ listener.created(queryId);
+ }
+
+ this.timestamp = System.currentTimeMillis();
+ this.errors = 0;
+ }
+
+ public ExecutionListener getListener() {
+ return listener;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void tryFetchResult(int batchSize) {
+ if (operation.isHasResultSet()) {
+ try (ImpalaThriftResultSetV7 resultSetV7 =
+ new ImpalaThriftResultSetV7(impalaSession.client(), operation, batchSize)) {
+ if (listener != null) {
+ listener.success(resultSetV7);
+ }
+ }
+ }
+ }
+
+ public synchronized int errorIncrement() {
+ return ++errors;
+ }
+
+ public String getQueryId() {
+ return queryId;
+ }
+
+ @Override
+ public synchronized void close() {
+ if (closed) {
+ return;
+ }
+ closed = true;
+
+ try {
+ TCloseOperationReq closeReq = new TCloseOperationReq();
+ closeReq.setOperationHandle(operation);
+ impalaSession.client().CloseOperation(closeReq);
+ } catch (Exception e) {
+ LOG.error("Failed to close the operation", e);
+ } finally {
+ impalaSession.close();
+ }
+ }
+
+ public synchronized boolean cancel() {
+ if (closed) {
+ return false;
+ }
+
+ try {
+ TCancelOperationReq cancelReq = new TCancelOperationReq();
+ cancelReq.setOperationHandle(operation);
+ impalaSession.client().CancelOperation(cancelReq);
+ } catch (Exception e) {
+ LOG.error("Failed to safely cancel the query", e);
+ }
+ close();
+ return true;
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ public synchronized ExecStatus getExecStatus() throws TException, ImpalaEngineException {
+ TGetOperationStatusReq statusReq = new TGetOperationStatusReq(operation);
+ TGetOperationStatusResp statusResp = impalaSession.client().GetOperationStatus(statusReq);
+ ThriftUtil.checkStatus(statusResp.getStatus());
+
+ TOperationState operationState = statusResp.getOperationState();
+
+ return new ExecStatus(
+ operationState.getValue(), operationState.name(), statusResp.getErrorMessage());
+ }
+
+ public synchronized ExecSummary getExecSummary() throws TException, ImpalaEngineException {
+ ExecProgress progress = ExecProgress.DEFAULT_PROGRESS;
+ int nodeNum = -1;
+
+ ExecStatus status = getExecStatus();
+ if (status.isActive()) {
+ TGetExecSummaryReq getExecSummaryReq = new TGetExecSummaryReq();
+ getExecSummaryReq.setOperationHandle(operation);
+ getExecSummaryReq.setSessionHandle(impalaSession.session());
+ TGetExecSummaryResp execSummaryResp =
+ impalaSession.client().GetExecSummary(getExecSummaryReq);
+
+ ThriftUtil.checkStatus(execSummaryResp.getStatus());
+ TExecSummary summary = execSummaryResp.getSummary();
+ nodeNum = summary.getNodesSize();
+ if (summary.isIs_queued()) {
+ if (listener != null) {
+ listener.message(Lists.newArrayList(summary.getQueued_reason()));
+ }
+ } else {
+ TExecProgress p = summary.getProgress();
+ if (p != null) {
+ progress = new ExecProgress(p.total_scan_ranges, p.num_completed_scan_ranges, nodeNum);
+ }
+ }
+ }
+
+ return new ExecSummary(status, progress, nodeNum);
+ }
+
+ public boolean isSync() {
+ return sync;
+ }
+}
diff --git a/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftResultSetV7.java b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftResultSetV7.java
new file mode 100644
index 000000000..28751f5b4
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftResultSetV7.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.impala.client.thrift;
+
+import org.apache.linkis.engineplugin.impala.client.ImpalaResultSet;
+import org.apache.linkis.engineplugin.impala.client.util.ThriftUtil;
+
+import org.apache.hive.service.rpc.thrift.TBinaryColumn;
+import org.apache.hive.service.rpc.thrift.TBoolColumn;
+import org.apache.hive.service.rpc.thrift.TByteColumn;
+import org.apache.hive.service.rpc.thrift.TColumn;
+import org.apache.hive.service.rpc.thrift.TColumnDesc;
+import org.apache.hive.service.rpc.thrift.TDoubleColumn;
+import org.apache.hive.service.rpc.thrift.TFetchResultsReq;
+import org.apache.hive.service.rpc.thrift.TFetchResultsResp;
+import org.apache.hive.service.rpc.thrift.TGetResultSetMetadataReq;
+import org.apache.hive.service.rpc.thrift.TGetResultSetMetadataResp;
+import org.apache.hive.service.rpc.thrift.TI16Column;
+import org.apache.hive.service.rpc.thrift.TI32Column;
+import org.apache.hive.service.rpc.thrift.TI64Column;
+import org.apache.hive.service.rpc.thrift.TOperationHandle;
+import org.apache.hive.service.rpc.thrift.TRowSet;
+import org.apache.hive.service.rpc.thrift.TStatus;
+import org.apache.hive.service.rpc.thrift.TStatusCode;
+import org.apache.hive.service.rpc.thrift.TStringColumn;
+import org.apache.hive.service.rpc.thrift.TTableSchema;
+import org.apache.hive.service.rpc.thrift.TTypeDesc;
+import org.apache.impala.thrift.ImpalaHiveServer2Service;
+import org.apache.thrift.TException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+public class ImpalaThriftResultSetV7 implements ImpalaResultSet {
+
+ /*
+ * bit mask, used to mask the byte value for in position bit
+ */
+ private static final byte[] bitMask = {
+ (byte) 0x01,
+ (byte) 0x02,
+ (byte) 0x04,
+ (byte) 0x08,
+ (byte) 0x10,
+ (byte) 0x20,
+ (byte) 0x40,
+ (byte) 0x80
+ };
+
+ private final ImpalaHiveServer2Service.Client client;
+ private final TOperationHandle operation;
+ private final TFetchResultsReq fetchReq;
+
+ private volatile int rowIndex;
+ private volatile int cols;
+ private volatile boolean closed;
+
+ private volatile boolean hasMoreRow;
+
+ private volatile Iterator<?>[] iterators = null;
+ private volatile byte[][] nulls = null;
+ private volatile int[] lengths = null;
+ private volatile List<Column> columns = null;
+
+ public ImpalaThriftResultSetV7(
+ ImpalaHiveServer2Service.Client client, TOperationHandle operation, int batchSize) {
+ this.client = client;
+ this.operation = operation;
+
+ this.rowIndex = -1;
+ this.cols = 0;
+ this.fetchReq = new TFetchResultsReq();
+ this.fetchReq.setMaxRows(batchSize);
+ this.fetchReq.setOperationHandle(operation);
+ this.hasMoreRow = true;
+ }
+
+ private void unpackColumn(TColumn column, int index) {
+ Iterator<?> iterator = null;
+ byte[] nulls = null;
+ int length = 0;
+ switch (column.getSetField()) {
+ case BINARY_VAL:
+ TBinaryColumn binVal = column.getBinaryVal();
+ iterator = binVal.getValuesIterator();
+ nulls = binVal.getNulls();
+ binVal.getValuesSize();
+ length = binVal.getValuesSize();
+ break;
+ case BOOL_VAL:
+ TBoolColumn boolVal = column.getBoolVal();
+ iterator = boolVal.getValuesIterator();
+ nulls = boolVal.getNulls();
+ length = boolVal.getValuesSize();
+ break;
+ case BYTE_VAL:
+ TByteColumn byteVal = column.getByteVal();
+ iterator = byteVal.getValuesIterator();
+ nulls = byteVal.getNulls();
+ length = byteVal.getValuesSize();
+ break;
+ case DOUBLE_VAL:
+ TDoubleColumn doubleVal = column.getDoubleVal();
+ iterator = doubleVal.getValuesIterator();
+ nulls = doubleVal.getNulls();
+ length = doubleVal.getValuesSize();
+ break;
+ case I16_VAL:
+ TI16Column i16Val = column.getI16Val();
+ iterator = i16Val.getValuesIterator();
+ nulls = i16Val.getNulls();
+ length = i16Val.getValuesSize();
+ break;
+ case I32_VAL:
+ TI32Column i32Val = column.getI32Val();
+ iterator = i32Val.getValuesIterator();
+ nulls = i32Val.getNulls();
+ length = i32Val.getValuesSize();
+ break;
+ case I64_VAL:
+ TI64Column i64Val = column.getI64Val();
+ iterator = i64Val.getValuesIterator();
+ nulls = i64Val.getNulls();
+ length = i64Val.getValuesSize();
+ break;
+ case STRING_VAL:
+ TStringColumn stringVal = column.getStringVal();
+ iterator = stringVal.getValuesIterator();
+ nulls = stringVal.getNulls();
+ length = stringVal.getValuesSize();
+ break;
+ }
+
+ this.iterators[index] = iterator;
+ this.nulls[index] = nulls;
+ this.lengths[index] = length;
+ }
+
+ /** fetch data from remote server */
+ private void fetch() {
+ try {
+ hasMoreRow = false;
+ iterators = null;
+ nulls = null;
+ lengths = null;
+
+ do {
+ /*
+ * request data
+ */
+ TFetchResultsResp resp = client.FetchResults(fetchReq);
+ TStatus status = resp.getStatus();
+
+ /*
+ * request error
+ */
+ if (status.getStatusCode().getValue() > TStatusCode.SUCCESS_WITH_INFO_STATUS.getValue()) {
+ throw new RuntimeException("Failed to fetch result(s). " + status);
+ }
+
+ hasMoreRow = resp.isHasMoreRows();
+
+ TRowSet result = resp.getResults();
+ List<TColumn> list = result.getColumns();
+ if (list != null) {
+ rowIndex = -1;
+
+ if (iterators == null) {
+ cols = list.size();
+ iterators = new Iterator[cols];
+ /*
+ * nulls matrix, indicate which [cols][row] is null value
+ */
+ nulls = new byte[cols][];
+ lengths = new int[cols];
+ }
+
+ /*
+ * 填充缓冲区
+ */
+ int i = 0;
+ for (TColumn item : list) {
+ unpackColumn(item, i);
+ ++i;
+ }
+ }
+
+ if (lengths != null && lengths[0] > 0) {
+ break;
+ }
+ } while (hasMoreRow);
+
+ } catch (TException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public synchronized Row next() {
+ if (closed) {
+ return null;
+ }
+
+ /*
+ * fetch remote data
+ */
+ if ((iterators == null || !iterators[0].hasNext()) && hasMoreRow) {
+ fetch();
+ }
+
+ /*
+ * build local row
+ */
+ if (iterators != null && iterators[0].hasNext()) {
+ ++rowIndex;
+ Object[] values = new Object[iterators.length];
+ for (int i = 0; i < iterators.length; ++i) {
+ values[i] = iterators[i].next();
+
+ /* use nulls matrix for recognize null value */
+ if ((nulls[i][rowIndex / 8] & bitMask[rowIndex % 8]) != 0) {
+ values[i] = null;
+ }
+ }
+ return new Row(values);
+ }
+
+ close();
+ return null;
+ }
+
+ @Override
+ public int getColumnSize() {
+ return getColumns().size();
+ }
+
+ @Override
+ public synchronized List<Column> getColumns() {
+ if (columns == null) {
+ try {
+ TGetResultSetMetadataReq metadataReq = new TGetResultSetMetadataReq(operation);
+ TGetResultSetMetadataResp metadataResp = client.GetResultSetMetadata(metadataReq);
+
+ ThriftUtil.checkStatus(metadataResp.getStatus());
+
+ TTableSchema tableSchema = metadataResp.getSchema();
+
+ if (tableSchema != null) {
+ List<TColumnDesc> columnList = tableSchema.getColumns();
+ List<Column> columns = new ArrayList<>(columnList.size());
+
+ int index = 0;
+ for (TColumnDesc tColumnDesc : columnList) {
+ columns.add(
+ new Column(++index, tColumnDesc.getColumnName(), tColumnDesc.getTypeDesc()));
+ }
+ this.columns = Collections.unmodifiableList(columns);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ if (columns == null) {
+ this.columns = Collections.emptyList();
+ }
+ }
+ return columns;
+ }
+
+ @Override
+ public void close() {
+ this.closed = true;
+ }
+
+ public static class Column implements ImpalaResultSet.Column {
+ private final int index;
+ private final String name;
+ private final TTypeDesc type;
+
+ protected Column(int index, String name, TTypeDesc type) {
+ this.index = index;
+ this.name = name;
+ this.type = type;
+ }
+
+ @Override
+ public int getIndex() {
+ return index;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public TTypeDesc getType() {
+ return type;
+ }
+ }
+
+ public static class Row implements ImpalaResultSet.Row {
+ private final Object[] values;
+
+ protected Row(Object[] values) {
+ this.values = values;
+ }
+
+ @Override
+ public Object[] getValues() {
+ return values;
+ }
+
+ @Override
+ public Object getObject(int columnIndex) {
+ return values[columnIndex - 1];
+ }
+
+ @Override
+ public <T> T getObject(int columnIndex, Class<T> type) {
+ return type.cast(values[columnIndex - 1]);
+ }
+
+ @Override
+ public String getString(int columnIndex) {
+ return (String) values[columnIndex - 1];
+ }
+
+ @Override
+ public Short getShort(int columnIndex) {
+ return (Short) values[columnIndex - 1];
+ }
+
+ @Override
+ public Integer getInteger(int columnIndex) {
+ return (Integer) values[columnIndex - 1];
+ }
+
+ @Override
+ public Long getLong(int columnIndex) {
+ return (Long) values[columnIndex - 1];
+ }
+ }
+}
diff --git a/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftSession.java b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftSession.java
new file mode 100644
index 000000000..e585b7f73
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftSession.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.impala.client.thrift;
+
+import org.apache.linkis.engineplugin.impala.client.exception.ImpalaEngineException;
+import org.apache.linkis.engineplugin.impala.client.util.ThriftUtil;
+
+import org.apache.hive.service.rpc.thrift.TCloseSessionReq;
+import org.apache.hive.service.rpc.thrift.TCloseSessionResp;
+import org.apache.hive.service.rpc.thrift.TOpenSessionReq;
+import org.apache.hive.service.rpc.thrift.TOpenSessionResp;
+import org.apache.hive.service.rpc.thrift.TProtocolVersion;
+import org.apache.hive.service.rpc.thrift.TSessionHandle;
+import org.apache.impala.thrift.ImpalaHiveServer2Service;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TTransport;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ImpalaThriftSession implements AutoCloseable {
+ public static final Logger LOG = LoggerFactory.getLogger(ImpalaThriftSession.class.getName());
+
+ private final String target;
+ private final TTransport transport;
+ private final Runnable closeCallback;
+ private final ImpalaHiveServer2Service.Client client;
+ private final TSessionHandle session;
+
+ private volatile boolean closed;
+
+ public ImpalaThriftSession(String target, TTransport transport, Runnable closeCallback)
+ throws TException {
+ this.target = target;
+ this.transport = transport;
+ this.closeCallback = closeCallback;
+ this.closed = false;
+
+ this.client = new ImpalaHiveServer2Service.Client(new TBinaryProtocol(transport));
+
+ /* open session */
+ TOpenSessionReq openReq = new TOpenSessionReq();
+ openReq.setClient_protocol(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V7);
+ TOpenSessionResp openResp = client.OpenSession(openReq);
+ this.session = openResp.getSessionHandle();
+ }
+
+ public TSessionHandle session() {
+ return session;
+ }
+
+ public ImpalaHiveServer2Service.Client client() {
+ return client;
+ }
+
+ public String target() {
+ return target;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return this == obj;
+ }
+
+ @Override
+ public synchronized void close() {
+ if (closed) {
+ return;
+ }
+ closed = true;
+
+ try {
+ if (session.isSetSessionId()) {
+ try {
+ TCloseSessionReq req = new TCloseSessionReq(session);
+ TCloseSessionResp res = client.CloseSession(req);
+ ThriftUtil.checkStatus(res.getStatus());
+ } catch (TException | ImpalaEngineException e) {
+ LOG.error("Failed to safely close the session for {}", target, e);
+ } finally {
+ transport.close();
+ }
+ }
+ } finally {
+ closeCallback.run();
+ }
+ }
+}
diff --git a/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftSessionFactory.java b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftSessionFactory.java
new file mode 100644
index 000000000..fd6eb0e9f
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftSessionFactory.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.impala.client.thrift;
+
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TSaslClientTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+
+import javax.net.SocketFactory;
+import javax.security.auth.callback.CallbackHandler;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ImpalaThriftSessionFactory {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(ImpalaThriftSessionFactory.class.getName());
+
+ protected final Map<String, SessionHolder> sessions;
+ protected final int maxConnections;
+ protected final Semaphore semaphore;
+ protected final SocketFactory socketFactory;
+ /** sasl properties */
+ protected final CallbackHandler saslCallbackHandler;
+
+ protected final String saslMechanism;
+ protected final String saslAuthorizationId;
+ protected final String saslProtocol;
+ protected final Map<String, String> saslProperties;
+ protected int connectionTimeout = 30;
+
+ public ImpalaThriftSessionFactory(
+ String[] hosts, int maxConnections, SocketFactory socketFactory) {
+ this(hosts, maxConnections, socketFactory, null, null, null, null, null);
+ }
+
+ public ImpalaThriftSessionFactory(
+ String[] hosts,
+ int maxConnections,
+ SocketFactory socketFactory,
+ String saslMechanism,
+ String saslAuthorizationId,
+ String saslProtocol,
+ Map<String, String> saslProperties,
+ CallbackHandler saslCallbackHandler) {
+ if (socketFactory == null) {
+ socketFactory = SocketFactory.getDefault();
+ }
+
+ if (maxConnections <= 0) {
+ LOG.warn("Invalid maxConnections value: {}, set to 1", maxConnections);
+ maxConnections = 1;
+ }
+
+ sessions = new TreeMap<>();
+ for (String host : hosts) {
+ String[] split = StringUtils.split(host, ":", 2);
+ if (split.length == 2
+ && StringUtils.isNotBlank(split[0])
+ && StringUtils.isNumeric(split[1])) {
+ SessionHolder holder = new SessionHolder(host, split[0], Integer.parseInt(split[1]));
+ sessions.put(host, holder);
+ }
+ }
+
+ if (sessions.isEmpty()) {
+ throw new IllegalArgumentException("invalid hosts: " + StringUtils.join(hosts, ','));
+ }
+
+ this.socketFactory = socketFactory;
+ this.maxConnections = maxConnections;
+ this.semaphore = new Semaphore(maxConnections);
+
+ this.saslMechanism = saslMechanism;
+ this.saslAuthorizationId = saslAuthorizationId;
+ this.saslProtocol = saslProtocol;
+ this.saslProperties = saslProperties;
+ this.saslCallbackHandler = saslCallbackHandler;
+ }
+
+ public int openedSessionCount() {
+ return maxConnections - semaphore.availablePermits();
+ }
+
+ public ImpalaThriftSession openSession() throws InterruptedException, IOException, TException {
+ if (!semaphore.tryAcquire(5, TimeUnit.MINUTES)) {
+ String format =
+ String.format(
+ "Connection pool for [%s] is exhausted, max allow: %d",
+ StringUtils.join(sessions.keySet(), ","), maxConnections);
+ LOG.error(format);
+ throw new IllegalStateException(format);
+ }
+
+ /*
+ * get random holder with minimal connections
+ */
+ SessionHolder holder =
+ sessions.values().stream()
+ .min(
+ Comparator.<SessionHolder>comparingInt(o -> o.connections)
+ .thenComparingInt(o -> o.randomInt))
+ .get();
+
+ /*
+ * create socket
+ */
+ Socket socket = socketFactory.createSocket(holder.host, holder.port);
+
+ /*
+ * create thrift socket
+ */
+ TSocket tSocket = new TSocket(socket);
+ tSocket.setTimeout(connectionTimeout * 1000);
+ socket.setSoTimeout(connectionTimeout * 2000);
+
+ TTransport tTransport;
+ if (saslCallbackHandler != null) {
+ tTransport =
+ new TSaslClientTransport(
+ saslMechanism,
+ saslAuthorizationId,
+ saslProtocol,
+ holder.name,
+ saslProperties,
+ saslCallbackHandler,
+ tSocket);
+
+ } else {
+ tTransport = tSocket;
+ }
+
+ if (!tTransport.isOpen()) {
+ tTransport.open();
+ }
+
+ ImpalaThriftSession impalaSession =
+ new ImpalaThriftSession(holder.name, tTransport, holder::release);
+ holder.acquire();
+ return impalaSession;
+ }
+
+ private class SessionHolder {
+ private final String name;
+ private final String host;
+ private final int port;
+ private final int randomInt = RandomUtils.nextInt();
+ private volatile int connections;
+
+ SessionHolder(String name, String host, int port) {
+ this.name = name;
+ this.host = host;
+ this.port = port;
+ }
+
+ void release() {
+ semaphore.release();
+
+ synchronized (this) {
+ --connections;
+ }
+ }
+
+ void acquire() {
+ synchronized (this) {
+ ++connections;
+ }
+ }
+ }
+}
diff --git a/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/util/ThriftUtil.java b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/util/ThriftUtil.java
new file mode 100644
index 000000000..667eca628
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/util/ThriftUtil.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.impala.client.util;
+
+import org.apache.linkis.engineplugin.impala.client.ExecutionListener;
+import org.apache.linkis.engineplugin.impala.client.exception.ImpalaEngineException;
+import org.apache.linkis.engineplugin.impala.client.exception.ImpalaErrorCodeSummary;
+
+import org.apache.hive.service.rpc.thrift.TStatus;
+
+public class ThriftUtil {
+ private static final char[] hexCode = "0123456789abcdef".toCharArray();
+
+ public static void checkStatus(TStatus status, ExecutionListener executionListener)
+ throws ImpalaEngineException {
+ switch (status.getStatusCode()) {
+ case STILL_EXECUTING_STATUS:
+ throw ImpalaEngineException.of(ImpalaErrorCodeSummary.StillRunningError);
+ case ERROR_STATUS:
+ throw ImpalaEngineException.of(
+ ImpalaErrorCodeSummary.ExecutionError, status.getErrorMessage());
+ case INVALID_HANDLE_STATUS:
+ throw ImpalaEngineException.of(ImpalaErrorCodeSummary.InvalidHandleError);
+ case SUCCESS_WITH_INFO_STATUS:
+ if (executionListener != null) {
+ executionListener.message(status.getInfoMessages());
+ }
+ break;
+ case SUCCESS_STATUS:
+ }
+ }
+
+ public static void checkStatus(TStatus status) throws ImpalaEngineException {
+ checkStatus(status, null);
+ }
+
+ /*
+ * impala unique id
+ */
+ public static String convertUniqueId(byte[] b) {
+ StringBuilder sb = new StringBuilder(":");
+ for (int i = 0; i < 8; ++i) {
+ sb.append(hexCode[(b[15 - i] >> 4) & 0xF]);
+ sb.append(hexCode[(b[15 - i] & 0xF)]);
+ sb.insert(0, hexCode[(b[i] & 0xF)]);
+ sb.insert(0, hexCode[(b[i] >> 4) & 0xF]);
+ }
+ return sb.toString();
+ }
+}
diff --git a/linkis-engineconn-plugins/impala/src/main/resources/linkis-engineconn.properties b/linkis-engineconn-plugins/impala/src/main/resources/linkis-engineconn.properties
new file mode 100644
index 000000000..18dacf8db
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/main/resources/linkis-engineconn.properties
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+wds.linkis.server.version=v1
+#wds.linkis.engineconn.debug.enable=true
+#wds.linkis.keytab.enable=true
+wds.linkis.engineconn.plugin.default.class=org.apache.linkis.engineplugin.impala.ImpalaEngineConnPlugin
+wds.linkis.engineconn.support.parallelism=true
+wds.linkis.rpc.cache.expire.time=0
+
+linkis.impala.servers=127.0.0.1:21050
diff --git a/linkis-engineconn-plugins/impala/src/main/resources/log4j2.xml b/linkis-engineconn-plugins/impala/src/main/resources/log4j2.xml
new file mode 100644
index 000000000..391418350
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/main/resources/log4j2.xml
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration status="error" monitorInterval="30">
+ <appenders>
+ <RollingFile name="RollingFile" append="true" fileName="${env:LOG_DIRS:-logs}/stdout"
+ filePattern="${env:LOG_DIRS:-logs}/$${date:yyyy-MM}/linkis-log-%d{yyyy-MM-dd-hh}-%i.log">
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] - %msg%xEx%n"/>
+ <Policies>
+ <SizeBasedTriggeringPolicy size="100MB"/>
+ </Policies>
+ <DefaultRolloverStrategy max="10"/>
+ </RollingFile>
+
+ <File name="YarnAppIdOutputFile" append="true" fileName="${env:LOG_DIRS:-logs}/yarnApp.log">
+ <RegexFilter regex=".*application_.*" onMatch="ACCEPT" onMismatch="DENY"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] - %msg%xEx%n"/>
+ </File>
+
+ <Send name="Send" >
+ <Filters>
+ <ThresholdFilter level="WARN" onMatch="ACCEPT" onMismatch="DENY" />
+ </Filters>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M - %msg%xEx%n"/>
+ </Send>
+
+ <Send name="SendPackage" >
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M - %msg%xEx%n"/>
+ </Send>
+
+ <Console name="stderr" target="SYSTEM_ERR">
+ <ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY" />
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M - %msg%xEx%n"/>
+ </Console>
+ </appenders>
+
+ <loggers>
+ <root level="INFO">
+ <appender-ref ref="stderr"/>
+ <appender-ref ref="RollingFile"/>
+ <appender-ref ref="Send"/>
+ </root>
+ <logger name="org.springframework.boot.diagnostics.LoggingFailureAnalysisReporter " level="error" additivity="true">
+ <appender-ref ref="stderr"/>
+ </logger>
+ <logger name="com.netflix.discovery" level="warn" additivity="true">
+ <appender-ref ref="Send"/>
+ </logger>
+ <logger name="org.apache.hadoop.yarn" level="warn" additivity="true">
+ <appender-ref ref="Send"/>
+ </logger>
+ <logger name="org.springframework" level="warn" additivity="true">
+ <appender-ref ref="Send"/>
+ </logger>
+ <logger name="org.apache.linkis.server.security" level="warn" additivity="true">
+ <appender-ref ref="Send"/>
+ </logger>
+ <logger name="org.apache.hadoop.hdfs.KeyProviderCache" level="fatal" additivity="true">
+ <appender-ref ref="Send"/>
+ </logger>
+ <logger name="org.spark_project.jetty" level="ERROR" additivity="true">
+ <appender-ref ref="Send"/>
+ </logger>
+ <logger name="org.eclipse.jetty" level="ERROR" additivity="true">
+ <appender-ref ref="Send"/>
+ </logger>
+ <logger name="org.springframework" level="ERROR" additivity="true">
+ <appender-ref ref="Send"/>
+ </logger>
+ <logger name="org.reflections.Reflections" level="ERROR" additivity="true">
+ <appender-ref ref="Send"/>
+ </logger>
+ </loggers>
+</configuration>
diff --git a/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/ImpalaEngineConnPlugin.scala b/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/ImpalaEngineConnPlugin.scala
new file mode 100644
index 000000000..c5462e566
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/ImpalaEngineConnPlugin.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.impala
+
+import org.apache.linkis.engineplugin.impala.builder.ImpalaProcessEngineConnLaunchBuilder
+import org.apache.linkis.engineplugin.impala.factory.ImpalaEngineConnFactory
+import org.apache.linkis.manager.engineplugin.common.EngineConnPlugin
+import org.apache.linkis.manager.engineplugin.common.creation.EngineConnFactory
+import org.apache.linkis.manager.engineplugin.common.launch.EngineConnLaunchBuilder
+import org.apache.linkis.manager.engineplugin.common.resource.{
+ EngineResourceFactory,
+ GenericEngineResourceFactory
+}
+import org.apache.linkis.manager.label.entity.Label
+
+import java.util
+
+class ImpalaEngineConnPlugin extends EngineConnPlugin {
+ private val resourceLocker = new Object()
+
+ private val engineFactoryLocker = new Object()
+
+ private var engineResourceFactory: EngineResourceFactory = _
+
+ private var engineFactory: EngineConnFactory = _
+
+ private val defaultLabels: util.List[Label[_]] = new util.ArrayList[Label[_]]()
+
+ override def init(params: util.Map[String, AnyRef]): Unit = {}
+
+ override def getEngineResourceFactory: EngineResourceFactory = {
+ if (null == engineResourceFactory) resourceLocker synchronized {
+ engineResourceFactory = new GenericEngineResourceFactory
+ }
+ engineResourceFactory
+ }
+
+ override def getEngineConnLaunchBuilder: EngineConnLaunchBuilder = {
+ new ImpalaProcessEngineConnLaunchBuilder
+ }
+
+ override def getEngineConnFactory: EngineConnFactory = {
+ if (null == engineFactory) engineFactoryLocker synchronized {
+ engineFactory = new ImpalaEngineConnFactory
+ }
+ engineFactory
+ }
+
+ override def getDefaultLabels: util.List[Label[_]] = defaultLabels
+
+}
diff --git a/linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java b/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/builder/ImpalaProcessEngineConnLaunchBuilder.scala
similarity index 54%
rename from linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java
rename to linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/builder/ImpalaProcessEngineConnLaunchBuilder.scala
index 6d198bbae..ba0218e33 100644
--- a/linkis-engineconn-plugins/trino/src/main/java/org/apache/linkis/engineplugin/trino/password/StaticPasswordCallback.java
+++ b/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/builder/ImpalaProcessEngineConnLaunchBuilder.scala
@@ -15,26 +15,21 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.trino.password;
+package org.apache.linkis.engineplugin.impala.builder
-import javax.security.auth.callback.PasswordCallback;
+import org.apache.linkis.engineplugin.impala.conf.ImpalaConfiguration
+import org.apache.linkis.manager.engineplugin.common.launch.process.JavaProcessEngineConnLaunchBuilder
+import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel
-public class StaticPasswordCallback extends PasswordCallback {
+class ImpalaProcessEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder {
- private final char[] password;
-
- public StaticPasswordCallback(String prompt, boolean echoOn) {
- super(prompt, echoOn);
- this.password = prompt.toCharArray();
- }
-
- public StaticPasswordCallback(String prompt) {
- super(prompt, false);
- this.password = prompt.toCharArray();
+ override def getEngineStartUser(label: UserCreatorLabel): String = {
+ if (ImpalaConfiguration.IMPALA_USER_ISOLATION_MODE.getValue) {
+ /* using user label if user isolation is on */
+ label.getUser
+ } else {
+ ImpalaConfiguration.IMPALA_ENGINE_USER.getValue
+ }
}
- @Override
- public char[] getPassword() {
- return password;
- }
}
diff --git a/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/conf/ImpalaConfiguration.scala b/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/conf/ImpalaConfiguration.scala
new file mode 100644
index 000000000..d0eeaa886
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/conf/ImpalaConfiguration.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.impala.conf
+
+import org.apache.linkis.common.conf.CommonVars
+import org.apache.linkis.storage.utils.StorageConfiguration
+
+import java.security.KeyStore
+
+object ImpalaConfiguration {
+
+ val ENGINE_CONCURRENT_LIMIT = CommonVars[Int]("linkis.engineconn.concurrent.limit", 100)
+
+ val DEFAULT_LIMIT = CommonVars[Int]("linkis.impala.default.limit", 5000)
+
+ val IMPALA_USER_ISOLATION_MODE =
+ CommonVars[Boolean]("linkis.impala.user.isolation.mode", false)
+
+ val IMPALA_ENGINE_USER =
+ CommonVars("linkis.impala.engine.user", StorageConfiguration.HDFS_ROOT_USER.getValue)
+
+ val IMPALA_SERVERS = CommonVars[String]("linkis.impala.servers", "127.0.0.1:21050")
+ val IMPALA_MAX_CONNECTIONS = CommonVars[Int]("linkis.impala.maxConnections", 10)
+
+ val IMPALA_SSL_ENABLE = CommonVars[Boolean]("linkis.impala.ssl.enable", false)
+ val IMPALA_SSL_KEYSTORE = CommonVars[String]("linkis.impala.ssl.keystore", null)
+
+ val IMPALA_SSL_KEYSTORE_TYPE =
+ CommonVars[String]("linkis.impala.ssl.keystore.type", KeyStore.getDefaultType)
+
+ val IMPALA_SSL_KEYSTORE_PASSWORD = CommonVars[String]("linkis.impala.ssl.keystore.password", null)
+ val IMPALA_SSL_TRUSTSTORE = CommonVars[String]("linkis.impala.ssl.truststore", null)
+
+ val IMPALA_SSL_TRUSTSTORE_TYPE =
+ CommonVars[String]("linkis.impala.ssl.truststore.type", KeyStore.getDefaultType)
+
+ val IMPALA_SSL_TRUSTSTORE_PASSWORD =
+ CommonVars[String]("linkis.impala.ssl.truststore.password", null)
+
+ val IMPALA_SASL_ENABLE = CommonVars[Boolean]("linkis.impala.sasl.enable", false)
+ val IMPALA_SASL_MECHANISM = CommonVars[String]("linkis.impala.sasl.mechanism", "PLAIN")
+ val IMPALA_SASL_AUTHORIZATION_ID = CommonVars[String]("linkis.impala.sasl.authorizationId", null)
+ val IMPALA_SASL_PROTOCOL = CommonVars[String]("linkis.impala.sasl.protocol", "LDAP")
+ val IMPALA_SASL_PROPERTIES = CommonVars[String]("linkis.impala.sasl.properties", null)
+ val IMPALA_SASL_USERNAME = CommonVars("linkis.impala.sasl.username", IMPALA_ENGINE_USER.getValue)
+ val IMPALA_SASL_PASSWORD = CommonVars[String]("linkis.impala.sasl.password", null)
+ val IMPALA_SASL_PASSWORD_CMD = CommonVars[String]("linkis.impala.sasl.password.cmd", null)
+
+ val IMPALA_HEARTBEAT_SECONDS = CommonVars[Int]("linkis.impala.heartbeat.seconds", 1)
+ val IMPALA_QUERY_TIMEOUT_SECONDS = CommonVars[Int]("linkis.impala.query.timeout.seconds", 0)
+ val IMPALA_QUERY_BATCH_SIZE = CommonVars[Int]("linkis.impala.query.batchSize", 1000)
+ val IMPALA_QUERY_OPTIONS = CommonVars[String]("linkis.impala.query.options", null)
+}
diff --git a/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/conf/ImpalaEngineConfig.scala b/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/conf/ImpalaEngineConfig.scala
new file mode 100644
index 000000000..c069aa568
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/conf/ImpalaEngineConfig.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.impala.conf
+
+import org.apache.linkis.common.conf.Configuration
+import org.apache.linkis.governance.common.protocol.conf.{
+ RequestQueryEngineConfigWithGlobalConfig,
+ ResponseQueryConfig
+}
+import org.apache.linkis.manager.label.entity.engine.{EngineTypeLabel, UserCreatorLabel}
+import org.apache.linkis.protocol.CacheableProtocol
+import org.apache.linkis.rpc.RPCMapCache
+
+import java.util
+
+object ImpalaEngineConfig
+ extends RPCMapCache[(UserCreatorLabel, EngineTypeLabel), String, String](
+ Configuration.CLOUD_CONSOLE_CONFIGURATION_SPRING_APPLICATION_NAME.getValue
+ ) {
+
+ override protected def createRequest(
+ labelTuple: (UserCreatorLabel, EngineTypeLabel)
+ ): CacheableProtocol = {
+ RequestQueryEngineConfigWithGlobalConfig(labelTuple._1, labelTuple._2)
+ }
+
+ override protected def createMap(any: Any): util.Map[String, String] = any match {
+ case response: ResponseQueryConfig =>
+ response.getKeyAndValue
+ case _ => null
+ }
+
+}
diff --git a/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/executor/ImpalaEngineConnExecutor.scala b/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/executor/ImpalaEngineConnExecutor.scala
new file mode 100644
index 000000000..2a597dd11
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/executor/ImpalaEngineConnExecutor.scala
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.impala.executor
+
+import org.apache.linkis.common.log.LogUtils
+import org.apache.linkis.common.utils.{OverloadUtils, Utils}
+import org.apache.linkis.engineconn.common.password.{
+ CommandPasswordCallback,
+ StaticPasswordCallback
+}
+import org.apache.linkis.engineconn.computation.executor.execute.{
+ ConcurrentComputationExecutor,
+ EngineExecutionContext
+}
+import org.apache.linkis.engineconn.core.EngineConnObject
+import org.apache.linkis.engineplugin.impala.client.{
+ ExecutionListener,
+ ImpalaClient,
+ ImpalaResultSet
+}
+import org.apache.linkis.engineplugin.impala.client.ImpalaResultSet.Row
+import org.apache.linkis.engineplugin.impala.client.exception.{
+ ImpalaEngineException,
+ ImpalaErrorCodeSummary
+}
+import org.apache.linkis.engineplugin.impala.client.protocol.{ExecProgress, ExecStatus}
+import org.apache.linkis.engineplugin.impala.client.thrift.{
+ ImpalaThriftClient,
+ ImpalaThriftSessionFactory
+}
+import org.apache.linkis.engineplugin.impala.conf.ImpalaConfiguration._
+import org.apache.linkis.engineplugin.impala.conf.ImpalaEngineConfig
+import org.apache.linkis.governance.common.paser.SQLCodeParser
+import org.apache.linkis.manager.common.entity.resource.{
+ CommonNodeResource,
+ LoadResource,
+ NodeResource
+}
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
+import org.apache.linkis.manager.label.entity.Label
+import org.apache.linkis.manager.label.entity.engine.{EngineTypeLabel, UserCreatorLabel}
+import org.apache.linkis.protocol.engine.JobProgressInfo
+import org.apache.linkis.rpc.Sender
+import org.apache.linkis.scheduler.executer.{
+ CompletedExecuteResponse,
+ ErrorExecuteResponse,
+ ExecuteResponse,
+ SuccessExecuteResponse
+}
+import org.apache.linkis.storage.domain.Column
+import org.apache.linkis.storage.resultset.ResultSetFactory
+import org.apache.linkis.storage.resultset.table.{TableMetaData, TableRecord}
+
+import org.apache.commons.io.IOUtils
+import org.apache.commons.lang3.StringUtils
+import org.apache.commons.lang3.exception.ExceptionUtils
+
+import org.springframework.util.CollectionUtils
+
+import javax.net.SocketFactory
+import javax.net.ssl._
+import javax.security.auth.callback.{Callback, CallbackHandler, NameCallback, PasswordCallback}
+
+import java.io.FileInputStream
+import java.security.KeyStore
+import java.util
+import java.util.concurrent.ConcurrentHashMap
+import java.util.function.Consumer
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+class ImpalaEngineConnExecutor(override val outputPrintLimit: Int, val id: Int)
+ extends ConcurrentComputationExecutor(outputPrintLimit) {
+
+ private val executorLabels: util.List[Label[_]] = new util.ArrayList[Label[_]](2)
+
+ private val impalaClients: util.Map[String, ImpalaClient] =
+ new ConcurrentHashMap[String, ImpalaClient]()
+
+ private val taskExecutions: util.Map[String, TaskExecution] =
+ new ConcurrentHashMap[String, TaskExecution]()
+
+ override def init: Unit = {
+ setCodeParser(new SQLCodeParser)
+ super.init
+ }
+
+ override def executeLine(
+ engineExecutionContext: EngineExecutionContext,
+ code: String
+ ): ExecuteResponse = {
+ val realCode = if (StringUtils.isBlank(code)) {
+ "SELECT 1"
+ } else {
+ code.trim
+ }
+
+ logger.info(s"impala client begins to run code:\n $realCode")
+ val taskId = engineExecutionContext.getJobId.get
+
+ val impalaClient = getOrCreateImpalaClient(engineExecutionContext)
+ val taskExecution: TaskExecution = TaskExecution(taskId, engineExecutionContext, impalaClient)
+ Utils.tryFinally {
+ taskExecutions.put(taskId, taskExecution)
+ Utils.tryCatch {
+ impalaClient.execute(realCode, taskExecution)
+ taskExecution.response
+ } { exception =>
+ engineExecutionContext.appendStdout(
+ LogUtils.generateERROR(ExceptionUtils.getStackTrace(exception))
+ )
+ ErrorExecuteResponse(ExceptionUtils.getMessage(exception), exception)
+ }
+ } {
+ taskExecutions.remove(taskId)
+ }
+ }
+
+ override def progress(taskId: String): Float = {
+ val taskExecution = taskExecutions.get(taskId)
+ if (taskExecution != null) {
+ taskExecution.progressFloat
+ } else {
+ 0
+ }
+ }
+
+ override def getProgressInfo(taskId: String): Array[JobProgressInfo] = {
+ val taskExecution = taskExecutions.get(taskId)
+ if (taskExecution != null) {
+ return taskExecution.progressArray
+ }
+ Array.empty[JobProgressInfo]
+ }
+
+ override def killTask(taskId: String): Unit = {
+ val taskExecution = taskExecutions.get(taskId)
+ if (taskExecution != null) {
+ taskExecution.kill()
+ }
+ super.killTask(taskId)
+ }
+
+ override def getExecutorLabels(): util.List[Label[_]] = executorLabels
+
+ override def setExecutorLabels(labels: util.List[Label[_]]): Unit = {
+ if (!CollectionUtils.isEmpty(labels)) {
+ executorLabels.clear()
+ executorLabels.addAll(labels)
+ }
+ }
+
+ override def supportCallBackLogs(): Boolean = false
+
+ override def requestExpectedResource(expectedResource: NodeResource): NodeResource = {
+ null
+ }
+
+ override def getCurrentNodeResource(): NodeResource = {
+ NodeResourceUtils.appendMemoryUnitIfMissing(
+ EngineConnObject.getEngineCreationContext.getOptions
+ )
+
+ val resource = new CommonNodeResource
+ val usedResource = new LoadResource(OverloadUtils.getProcessMaxMemory, 1)
+ resource.setUsedResource(usedResource)
+ resource
+ }
+
+ override def getId(): String = Sender.getThisServiceInstance.getInstance + s"_$id"
+
+ override def getConcurrentLimit: Int = ENGINE_CONCURRENT_LIMIT.getValue
+
+ private def queryOutput(
+ taskId: String,
+ engineExecutorContext: EngineExecutionContext,
+ resultSet: ImpalaResultSet
+ ): Unit = {
+ var columnCount = 0
+ var rows = 0
+ val resultSetWriter = engineExecutorContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE)
+ Utils.tryCatch {
+ val columns = resultSet.getColumns.asScala
+ .map(column => Column(column.getName, null, null))
+ .toArray[Column]
+ columnCount = columns.length
+ resultSetWriter.addMetaData(new TableMetaData(columns))
+
+ var row: Row = resultSet.next()
+ while (row != null) {
+ val anys: ArrayBuffer[Any] = ArrayBuffer[Any]()
+ row.getValues.foreach(v => anys += v)
+ resultSetWriter.addRecord(new TableRecord(anys.toArray))
+ // scalastyle:off println
+ println(new TableRecord(anys.toArray).tableRecordToString().mkString(" "))
+ rows += 1
+
+ row = resultSet.next()
+ }
+ } { case e: Exception =>
+ IOUtils.closeQuietly(resultSetWriter)
+ throw e
+ }
+ logger.info(s"Fetched $columnCount col(s) : $rows row(s) in impala")
+ engineExecutorContext.appendStdout(
+ LogUtils.generateInfo(s"Fetched $columnCount col(s) : $rows row(s) in impala")
+ );
+ engineExecutorContext.sendResultSet(resultSetWriter)
+ }
+
+ override def killAll(): Unit = {
+ val iterator = taskExecutions.values().iterator()
+ while (iterator.hasNext) {
+ Utils.tryAndWarn(iterator.next().kill())
+ }
+ taskExecutions.clear()
+ }
+
+ override def close(): Unit = {
+ killAll()
+ val iterator = impalaClients.values().iterator()
+ while (iterator.hasNext) {
+ Utils.tryAndWarn(iterator.next().close())
+ }
+ impalaClients.clear()
+ super.close()
+ }
+
+ case class TaskExecution(
+ taskId: String,
+ engineExecutionContext: EngineExecutionContext,
+ impalaClient: ImpalaClient
+ ) extends ExecutionListener {
+ var queryId: String = ""
+ var totalTasks: Int = -1
+ var runningTasks: Int = -1
+ var failedTasks: Int = 0
+ var succeedTasks: Int = 0
+ var response: CompletedExecuteResponse = SuccessExecuteResponse()
+
+ override def error(status: ExecStatus): Unit = {
+ response = ErrorExecuteResponse(
+ status.getErrorMessage,
+ ImpalaEngineException.of(ImpalaErrorCodeSummary.ExecutionError, status.getName)
+ )
+ }
+
+ override def success(resultSet: ImpalaResultSet): Unit = {
+ queryOutput(taskId, engineExecutionContext, resultSet)
+ }
+
+ override def created(queryId: String): Unit = {
+ this.queryId = queryId
+ }
+
+ override def progress(progress: ExecProgress): Unit = {
+ runningTasks = progress.getRunningNodes
+ totalTasks = progress.getTotalScanRanges.toInt
+ succeedTasks = progress.getCompletedScanRanges.toInt
+
+ engineExecutionContext.pushProgress(progressFloat, progressArray)
+ }
+
+ override def message(messages: util.List[String]): Unit = {
+ messages.forEach(new Consumer[String]() {
+ override def accept(message: String): Unit = engineExecutionContext.appendStdout(message)
+ })
+ }
+
+ def progressArray: Array[JobProgressInfo] = {
+ Array(JobProgressInfo(queryId, totalTasks, runningTasks, failedTasks, succeedTasks))
+ }
+
+ def progressFloat: Float = {
+ if (totalTasks > 0) {
+ succeedTasks.toFloat / totalTasks
+ } else {
+ 0
+ }
+ }
+
+ def kill(): Unit = {
+ if (StringUtils.isNotBlank(queryId)) {
+ impalaClient.cancel(queryId)
+ }
+ }
+
+ }
+
+ private def getOrCreateImpalaClient(
+ engineExecutionContext: EngineExecutionContext
+ ): ImpalaClient = {
+ val userCreatorLabel =
+ engineExecutionContext.getLabels.find(_.isInstanceOf[UserCreatorLabel]).get
+ val engineTypeLabel =
+ engineExecutionContext.getLabels.find(_.isInstanceOf[EngineTypeLabel]).get
+ var configMap: util.Map[String, String] = null
+ if (userCreatorLabel != null && engineTypeLabel != null) {
+ configMap = Utils.tryAndError(
+ ImpalaEngineConfig.getCacheMap(
+ (
+ userCreatorLabel.asInstanceOf[UserCreatorLabel],
+ engineTypeLabel.asInstanceOf[EngineTypeLabel]
+ )
+ )
+ )
+ }
+
+ val impalaServers = IMPALA_SERVERS.getValue(configMap)
+ val impalaMaxConnections = IMPALA_MAX_CONNECTIONS.getValue(configMap)
+ val impalaSaslEnable = IMPALA_SASL_ENABLE.getValue(configMap)
+ val impalaSaslProperties = IMPALA_SASL_PROPERTIES.getValue(configMap)
+ val impalaSaslUsername = IMPALA_SASL_USERNAME.getValue(configMap)
+ val impalaSaslPassword = IMPALA_SASL_PASSWORD.getValue(configMap)
+ val impalaSaslPasswordCmd = IMPALA_SASL_PASSWORD_CMD.getValue(configMap)
+ val impalaSaslMechanism = IMPALA_SASL_MECHANISM.getValue(configMap)
+ val impalaSaslAuthorization = IMPALA_SASL_AUTHORIZATION_ID.getValue(configMap)
+ val impalaSaslProtocol = IMPALA_SASL_PROTOCOL.getValue(configMap)
+ val impalaHeartbeatSeconds = IMPALA_HEARTBEAT_SECONDS.getValue(configMap)
+ val impalaQueryTimeoutSeconds = IMPALA_QUERY_TIMEOUT_SECONDS.getValue(configMap)
+ val impalaQueryBatchSize = IMPALA_QUERY_BATCH_SIZE.getValue(configMap)
+ val impalaQueryOptions = IMPALA_QUERY_OPTIONS.getValue(configMap)
+
+ val impalaSslEnable = IMPALA_SSL_ENABLE.getValue(configMap)
+ val impalaSslKeystore = IMPALA_SSL_KEYSTORE.getValue(configMap)
+ val impalaSslKeystoreType = IMPALA_SSL_KEYSTORE_TYPE.getValue(configMap)
+ val impalaSslKeystorePassword = IMPALA_SSL_KEYSTORE_PASSWORD.getValue(configMap)
+ val impalaSslTruststore = IMPALA_SSL_TRUSTSTORE.getValue(configMap)
+ val impalaSslTruststoreType = IMPALA_SSL_TRUSTSTORE_TYPE.getValue(configMap)
+ val impalaSslTruststorePassword = IMPALA_SSL_TRUSTSTORE_PASSWORD.getValue(configMap)
+
+ val impalaClientKey = Array(
+ impalaServers,
+ impalaMaxConnections,
+ impalaSaslEnable,
+ impalaSaslProperties,
+ impalaSaslUsername,
+ impalaSaslPassword,
+ impalaSaslPasswordCmd,
+ impalaSaslMechanism,
+ impalaSaslAuthorization,
+ impalaSaslProtocol,
+ impalaHeartbeatSeconds,
+ impalaQueryTimeoutSeconds,
+ impalaQueryBatchSize,
+ impalaQueryOptions,
+ impalaSslEnable,
+ impalaSslKeystore,
+ impalaSslKeystoreType,
+ impalaSslKeystorePassword,
+ impalaSslTruststore,
+ impalaSslTruststoreType,
+ impalaSslTruststorePassword
+ )
+
+ impalaClients.synchronized {
+ var client = impalaClients.get(impalaClientKey)
+ if (client == null) {
+ val socketFactory = createSocketFactory(
+ impalaSslEnable,
+ impalaSslKeystore,
+ impalaSslKeystoreType,
+ impalaSslKeystorePassword,
+ impalaSslTruststore,
+ impalaSslTruststoreType,
+ impalaSslTruststorePassword
+ )
+
+ val servers = impalaServers.split(',')
+ val maxConnections = impalaMaxConnections
+ val factory: ImpalaThriftSessionFactory = if (impalaSaslEnable) {
+ val saslProperties: util.Map[String, String] = new util.TreeMap()
+ Option(impalaSaslProperties)
+ .map(_.split(','))
+ .getOrElse(Array[String]())
+ .foreach { str =>
+ val kv = StringUtils.split(str, "=", 2)
+ saslProperties.put(kv(0), if (kv.length > 1) kv(1) else "")
+ }
+
+ val password = impalaSaslPassword
+ val passwordCmd = impalaSaslPasswordCmd
+ var passwordCallback: PasswordCallback = null
+ if (StringUtils.isNotBlank(passwordCmd)) {
+ passwordCallback = new CommandPasswordCallback(passwordCmd);
+ } else if (StringUtils.isNotBlank(password)) {
+ passwordCallback = new StaticPasswordCallback(password);
+ }
+
+ val callbackHandler: CallbackHandler = new CallbackHandler() {
+ override def handle(callbacks: Array[Callback]): Unit = callbacks.foreach {
+ case callback: NameCallback => callback.setName(impalaSaslUsername)
+ case callback: PasswordCallback => callback.setPassword(passwordCallback.getPassword)
+ }
+ }
+
+ new ImpalaThriftSessionFactory(
+ servers,
+ maxConnections,
+ socketFactory,
+ impalaSaslMechanism,
+ impalaSaslAuthorization,
+ impalaSaslProtocol,
+ saslProperties,
+ callbackHandler
+ )
+ } else {
+ new ImpalaThriftSessionFactory(servers, maxConnections, socketFactory)
+ }
+
+ val impalaClient = new ImpalaThriftClient(factory, impalaHeartbeatSeconds)
+ impalaClient.setQueryTimeoutInSeconds(impalaQueryTimeoutSeconds)
+ impalaClient.setBatchSize(impalaQueryBatchSize)
+ Option(impalaQueryOptions)
+ .map(_.split(','))
+ .getOrElse(Array[String]())
+ .foreach { str =>
+ val kv = StringUtils.split(str, "=", 2)
+ impalaClient.setQueryOption(kv(0), if (kv.length > 1) kv(1) else "")
+ }
+
+ client = impalaClient
+ }
+ client
+ }
+ }
+
+ private def createSocketFactory(
+ impalaSslEnable: Boolean,
+ impalaSslKeystore: String,
+ impalaSslKeystoreType: String,
+ impalaSslKeystorePassword: String,
+ impalaSslTruststore: String,
+ impalaSslTruststoreType: String,
+ impalaSslTruststorePassword: String
+ ): SocketFactory = {
+
+ if (impalaSslEnable) {
+ val keyStore: KeyStore = KeyStore.getInstance(impalaSslKeystoreType)
+ val keyStorePassword: Array[Char] = Option(impalaSslKeystorePassword)
+ .map(_.toCharArray)
+ .orNull
+
+ val in = new FileInputStream(impalaSslKeystore)
+ try {
+ keyStore.load(in, keyStorePassword)
+ } finally {
+ if (in != null) in.close()
+ }
+ val keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm);
+ keyManagerFactory.init(keyStore, keyStorePassword);
+ val keyManagers: Array[KeyManager] = keyManagerFactory.getKeyManagers;
+
+ var trustStore = keyStore
+ if (StringUtils.isNotBlank(impalaSslTruststore)) {
+ trustStore = KeyStore.getInstance(impalaSslTruststoreType)
+ val trustStorePassword: Array[Char] = Option(impalaSslTruststorePassword)
+ .map(_.toCharArray)
+ .orNull
+ val in = new FileInputStream(impalaSslTruststore)
+ try {
+ trustStore.load(in, trustStorePassword)
+ } finally {
+ if (in != null) in.close()
+ }
+ }
+
+ // create TrustManagerFactory
+ val trustManagerFactory =
+ TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
+ trustManagerFactory.init(trustStore)
+
+ // get X509TrustManager
+ val trustManagers = trustManagerFactory.getTrustManagers
+ if (trustManagers.length != 1 || !trustManagers(0).isInstanceOf[X509TrustManager]) {
+ throw new RuntimeException(
+ "Unexpected default trust managers:" + StringUtils.join(",", trustManagers)
+ )
+ }
+ val trustManager = trustManagers(0).asInstanceOf[X509TrustManager]
+
+ // create SSLContext
+ val sslContext = SSLContext.getInstance("TLS")
+ sslContext.init(keyManagers, Array[TrustManager](trustManager), null)
+
+ sslContext.getSocketFactory
+ } else {
+ null
+ }
+ }
+
+ override def executeCompletely(
+ engineExecutorContext: EngineExecutionContext,
+ code: String,
+ completedLine: String
+ ): ExecuteResponse = null
+
+}
diff --git a/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/factory/ImpalaEngineConnFactory.scala b/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/factory/ImpalaEngineConnFactory.scala
new file mode 100644
index 000000000..ee39a3845
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/factory/ImpalaEngineConnFactory.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.impala.factory
+
+import org.apache.linkis.engineconn.common.creation.EngineCreationContext
+import org.apache.linkis.engineconn.common.engineconn.EngineConn
+import org.apache.linkis.engineconn.computation.executor.creation.ComputationSingleExecutorEngineConnFactory
+import org.apache.linkis.engineconn.executor.entity.LabelExecutor
+import org.apache.linkis.engineplugin.impala.conf.ImpalaConfiguration
+import org.apache.linkis.engineplugin.impala.executor.ImpalaEngineConnExecutor
+import org.apache.linkis.manager.label.entity.engine.{EngineType, RunType}
+import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType
+import org.apache.linkis.manager.label.entity.engine.RunType.RunType
+
+class ImpalaEngineConnFactory extends ComputationSingleExecutorEngineConnFactory {
+
+ override def newExecutor(
+ id: Int,
+ engineCreationContext: EngineCreationContext,
+ engineConn: EngineConn
+ ): LabelExecutor = {
+ new ImpalaEngineConnExecutor(ImpalaConfiguration.DEFAULT_LIMIT.getValue, id)
+ }
+
+ override protected def getEngineConnType: EngineType = EngineType.IMPALA
+
+ override protected def getRunType: RunType = RunType.IMPALA_SQL
+
+}
diff --git a/linkis-engineconn-plugins/impala/src/test/scala/org/apache/linkis/engineplugin/impala/executer/TestImpalaEngineConnExecutor.scala b/linkis-engineconn-plugins/impala/src/test/scala/org/apache/linkis/engineplugin/impala/executer/TestImpalaEngineConnExecutor.scala
new file mode 100644
index 000000000..d3bff1685
--- /dev/null
+++ b/linkis-engineconn-plugins/impala/src/test/scala/org/apache/linkis/engineplugin/impala/executer/TestImpalaEngineConnExecutor.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.impala.executer
+
+import org.apache.linkis.common.ServiceInstance
+import org.apache.linkis.common.conf.CommonVars
+import org.apache.linkis.common.utils.Utils
+import org.apache.linkis.engineconn.common.creation.{
+ DefaultEngineCreationContext,
+ EngineCreationContext
+}
+import org.apache.linkis.engineconn.computation.executor.entity.CommonEngineConnTask
+import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext
+import org.apache.linkis.engineconn.computation.executor.utlis.ComputationEngineConstant
+import org.apache.linkis.engineplugin.impala.executor.ImpalaEngineConnExecutor
+import org.apache.linkis.engineplugin.impala.factory.ImpalaEngineConnFactory
+import org.apache.linkis.governance.common.conf.GovernanceCommonConf
+import org.apache.linkis.governance.common.entity.ExecutionNodeStatus
+import org.apache.linkis.governance.common.utils.EngineConnArgumentsParser
+import org.apache.linkis.manager.engineplugin.common.launch.process.Environment
+import org.apache.linkis.manager.label.builder.factory.{
+ LabelBuilderFactory,
+ LabelBuilderFactoryContext
+}
+import org.apache.linkis.manager.label.entity.Label
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import org.junit.jupiter.api.Assertions
+
+class TestImpalaEngineConnExecutor {
+
+ private val engineCreationContext: EngineCreationContext = new DefaultEngineCreationContext
+
+ private val labelBuilderFactory: LabelBuilderFactory =
+ LabelBuilderFactoryContext.getLabelBuilderFactory
+
+// @Test
+ def testExecuteLine: Unit = {
+ val engineconnConf = "--engineconn-conf"
+ val springConf = "--spring-conf"
+ val array = Array(
+ engineconnConf,
+ "wds.linkis.rm.instance=10",
+ engineconnConf,
+ "label.userCreator=root-IDE",
+ engineconnConf,
+ "ticketId=037ab855-0c41-4323-970d-7f75e71883b6",
+ engineconnConf,
+ "label.engineType=impala",
+ engineconnConf,
+ "linkis.impala.servers=192.168.21.121:21050",
+ engineconnConf,
+ "linkis.impala.sasl.enable=true",
+ engineconnConf,
+ "linkis.impala.sasl.username=root",
+ engineconnConf,
+ "linkis.impala.sasl.password=12345678",
+ engineconnConf,
+ "linkis.impala.default.start.user=root",
+ springConf,
+ "eureka.client.serviceUrl.defaultZone=http://127.0.0.1:8761/eureka/",
+ springConf,
+ "logging.config=classpath:log4j2.xml",
+ springConf,
+ "spring.profiles.active=engineconn",
+ springConf,
+ "server.port=35655",
+ springConf,
+ "spring.application.name=linkis-cg-engineconn"
+ )
+ this.init(array)
+ val cmd = "SHOW DATABASES"
+ val taskId = "1"
+ val task = new CommonEngineConnTask(taskId, false)
+ val properties = new util.HashMap[String, Object]
+ task.setProperties(properties)
+ task.data(ComputationEngineConstant.LOCK_TYPE_NAME, "lock")
+ task.setStatus(ExecutionNodeStatus.Scheduled)
+ val engineFactory: ImpalaEngineConnFactory = new ImpalaEngineConnFactory
+ val engine = engineFactory.createEngineConn(engineCreationContext)
+
+ val jdbcExecutor: ImpalaEngineConnExecutor = engineFactory
+ .newExecutor(1, engineCreationContext, engine)
+ .asInstanceOf[ImpalaEngineConnExecutor]
+ val engineExecutionContext = new EngineExecutionContext(jdbcExecutor, Utils.getJvmUser)
+ engineExecutionContext.setJobId(taskId)
+ val anyArray = engineCreationContext.getLabels().toArray()
+ engineExecutionContext.setLabels(anyArray.map(_.asInstanceOf[Label[_]]))
+ val testPath = this.getClass.getClassLoader.getResource("").getPath
+ engineExecutionContext.setStorePath(testPath)
+ engineCreationContext.getOptions.asScala.foreach({ case (key, value) =>
+ engineExecutionContext.addProperty(key, value)
+ })
+ Assertions.assertNotNull(jdbcExecutor.getProgressInfo(taskId))
+ val response = jdbcExecutor.executeLine(engineExecutionContext, cmd)
+ Assertions.assertNotNull(response)
+ }
+
+ private def init(args: Array[String]): Unit = {
+ val arguments = EngineConnArgumentsParser.getEngineConnArgumentsParser.parseToObj(args)
+ val engineConf = arguments.getEngineConnConfMap
+ this.engineCreationContext.setUser(engineConf.getOrElse("user", Utils.getJvmUser))
+ this.engineCreationContext.setTicketId(engineConf.getOrElse("ticketId", ""))
+ val host = CommonVars(Environment.ECM_HOST.toString, "127.0.0.1").getValue
+ val port = CommonVars(Environment.ECM_PORT.toString, "80").getValue
+ this.engineCreationContext.setEMInstance(
+ ServiceInstance(GovernanceCommonConf.ENGINE_CONN_MANAGER_SPRING_NAME.getValue, s"$host:$port")
+ )
+ val labels = new ArrayBuffer[Label[_]]
+ val labelArgs = engineConf.filter(_._1.startsWith(EngineConnArgumentsParser.LABEL_PREFIX))
+ if (labelArgs.nonEmpty) {
+ labelArgs.foreach { case (key, value) =>
+ labels += labelBuilderFactory
+ .createLabel[Label[_]](key.replace(EngineConnArgumentsParser.LABEL_PREFIX, ""), value)
+ }
+ engineCreationContext.setLabels(labels.toList.asJava)
+ }
+ val jMap = new java.util.HashMap[String, String](engineConf.size)
+ jMap.putAll(engineConf.asJava)
+ this.engineCreationContext.setOptions(jMap)
+ this.engineCreationContext.setArgs(args)
+ sys.props.asJava.putAll(jMap)
+ }
+
+}
diff --git a/linkis-engineconn-plugins/pom.xml b/linkis-engineconn-plugins/pom.xml
index 53f9be07d..9fdb4b85b 100644
--- a/linkis-engineconn-plugins/pom.xml
+++ b/linkis-engineconn-plugins/pom.xml
@@ -41,6 +41,7 @@
<module>trino</module>
<module>elasticsearch</module>
<module>seatunnel</module>
+ <module>impala</module>
</modules>
</project>
diff --git a/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala b/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala
index a3a0d0d90..c526d5690 100644
--- a/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala
@@ -20,6 +20,10 @@ package org.apache.linkis.engineplugin.trino.executor
import org.apache.linkis.common.log.LogUtils
import org.apache.linkis.common.utils.{OverloadUtils, Utils}
import org.apache.linkis.engineconn.common.conf.{EngineConnConf, EngineConnConstant}
+import org.apache.linkis.engineconn.common.password.{
+ CommandPasswordCallback,
+ StaticPasswordCallback
+}
import org.apache.linkis.engineconn.computation.executor.execute.{
ConcurrentComputationExecutor,
EngineExecutionContext
@@ -32,10 +36,6 @@ import org.apache.linkis.engineplugin.trino.exception.{
TrinoStateInvalidException
}
import org.apache.linkis.engineplugin.trino.interceptor.PasswordInterceptor
-import org.apache.linkis.engineplugin.trino.password.{
- CommandPasswordCallback,
- StaticPasswordCallback
-}
import org.apache.linkis.engineplugin.trino.socket.SocketChannelSocketFactory
import org.apache.linkis.engineplugin.trino.utils.{TrinoCode, TrinoSQLHook}
import org.apache.linkis.governance.common.paser.SQLCodeParser
@@ -44,7 +44,6 @@ import org.apache.linkis.manager.common.entity.resource.{
LoadResource,
NodeResource
}
-import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.label.entity.engine.{EngineTypeLabel, UserCreatorLabel}
diff --git a/pom.xml b/pom.xml
index 103f4acac..8abd27a40 100644
--- a/pom.xml
+++ b/pom.xml
@@ -110,6 +110,7 @@
<io_file.version>1.0</io_file.version>
<jdbc.version>4</jdbc.version>
<trino.version>371</trino.version>
+ <impala.version>3.4.0.7.2.15.0-147</impala.version>
<openlookeng.version>1.5.0</openlookeng.version>
<pipeline.version>1</pipeline.version>
<presto.version>0.234</presto.version>
diff --git a/tool/dependencies/known-dependencies.txt b/tool/dependencies/known-dependencies.txt
index 4a428dc18..9a1c6da19 100644
--- a/tool/dependencies/known-dependencies.txt
+++ b/tool/dependencies/known-dependencies.txt
@@ -234,6 +234,8 @@ httpcore-4.4.14.jar
httpcore-nio-4.4.14.jar
httpmime-4.5.13.jar
hystrix-core-1.5.18.jar
+impala-frontend-3.4.0.7.2.15.0-147.jar
+impala-minimal-hive-exec-3.4.0.7.2.15.0-147.jar
ini4j-0.5.4.jar
ion-java-1.0.2.jar
istack-commons-runtime-3.0.12.jar
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org