You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ca...@apache.org on 2022/08/09 15:30:39 UTC
[incubator-linkis] branch dev-1.2.1 updated: add progress support for trino on jdbc-plugin (#2636)
This is an automated email from the ASF dual-hosted git repository.
casion pushed a commit to branch dev-1.2.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.2.1 by this push:
new c34d22248 add progress support for trino on jdbc-plugin (#2636)
c34d22248 is described below
commit c34d22248cca6996731a76c144da21a4776096ca
Author: Knypys <si...@qq.com>
AuthorDate: Tue Aug 9 23:30:32 2022 +0800
add progress support for trino on jdbc-plugin (#2636)
Co-authored-by: Casion <ca...@gmail.com>
---
linkis-engineconn-plugins/jdbc/pom.xml | 8 +-
.../engineplugin/jdbc/ConnectionManager.java | 4 +-
.../engineplugin/jdbc/monitor/ProgressMonitor.java | 86 +++++++++++++++++++++
.../jdbc/monitor/impl/TrinoProgressMonitor.java | 88 ++++++++++++++++++++++
.../jdbc/executer/JDBCEngineConnExecutor.scala | 37 ++++++++-
.../engineplugin/jdbc/ConnectionManagerTest.java | 11 ++-
.../engineplugin/jdbc/ProgressMonitorTest.java | 74 ++++++++++++++++++
.../TestJDBCEngineConnExecutor.scala | 29 +++++++
pom.xml | 1 +
9 files changed, 330 insertions(+), 8 deletions(-)
diff --git a/linkis-engineconn-plugins/jdbc/pom.xml b/linkis-engineconn-plugins/jdbc/pom.xml
index 6a770383e..24df9d248 100644
--- a/linkis-engineconn-plugins/jdbc/pom.xml
+++ b/linkis-engineconn-plugins/jdbc/pom.xml
@@ -128,6 +128,13 @@
</exclusions>
</dependency>
+ <dependency>
+ <groupId>io.trino</groupId>
+ <artifactId>trino-jdbc</artifactId>
+ <version>${trino.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.linkis</groupId>
@@ -196,7 +203,6 @@
</exclusion>
</exclusions>
</dependency>
-
</dependencies>
<build>
<plugins>
diff --git a/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java b/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java
index b904d74fc..59db62c84 100644
--- a/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java
+++ b/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java
@@ -113,8 +113,8 @@ public class ConnectionManager {
JDBCPropertiesParser.getString(properties, JDBCEngineConnConstant.JDBC_DRIVER, "");
if (StringUtils.isBlank(driverClassName)) {
- LOG.error("The driver class name is not empty.");
- throw new JDBCParamsIllegalException("The driver class name is not empty.");
+ LOG.error("The driver class name is required.");
+ throw new JDBCParamsIllegalException("The driver class name is required.");
}
String username =
diff --git a/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/monitor/ProgressMonitor.java b/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/monitor/ProgressMonitor.java
new file mode 100644
index 000000000..41b26ff1a
--- /dev/null
+++ b/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/monitor/ProgressMonitor.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.engineplugin.jdbc.monitor;
+
+import org.apache.linkis.protocol.engine.JobProgressInfo;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.alibaba.druid.pool.DruidPooledStatement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Statement;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+
+public abstract class ProgressMonitor<T> implements Consumer<T> {
+ private static final Logger LOG = LoggerFactory.getLogger(ProgressMonitor.class);
+ private static final Map<String, String> MONITORS = new ConcurrentHashMap<>();
+
+ static {
+ register(
+ "io.trino.jdbc.TrinoStatement",
+ "org.apache.linkis.manager.engineplugin.jdbc.monitor.impl.TrinoProgressMonitor");
+ }
+
+ public static void register(String statementClassName, String monitorClassName) {
+ MONITORS.put(statementClassName, monitorClassName);
+ }
+
+ public static ProgressMonitor<?> attachMonitor(Statement statement) {
+ ProgressMonitor<?> progressMonitor = null;
+ /* unwrap the druid statement */
+ if (statement instanceof DruidPooledStatement) {
+ statement = ((DruidPooledStatement) statement).getStatement();
+ }
+ try {
+ String monitorName = MONITORS.get(statement.getClass().getName());
+ if (StringUtils.isNotBlank(monitorName)) {
+ progressMonitor = (ProgressMonitor<?>) Class.forName(monitorName).newInstance();
+ }
+ if (progressMonitor != null) {
+ progressMonitor.attach(statement);
+ }
+ } catch (Exception e) {
+ LOG.warn(
+ "Failed to create monitor for statement: {}, exception: {} - {}",
+ statement,
+ e.getClass().getName(),
+ e.getMessage());
+ }
+ return progressMonitor;
+ }
+
+ public abstract void attach(Statement statement);
+
+ public abstract void callback(Runnable callback);
+
+ public abstract float getSqlProgress();
+
+ public abstract int getSucceedTasks();
+
+ public abstract int getTotalTasks();
+
+ public abstract int getRunningTasks();
+
+ public abstract int getFailedTasks();
+
+ public abstract JobProgressInfo jobProgressInfo(String id);
+}
diff --git a/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/monitor/impl/TrinoProgressMonitor.java b/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/monitor/impl/TrinoProgressMonitor.java
new file mode 100644
index 000000000..898567f7d
--- /dev/null
+++ b/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/monitor/impl/TrinoProgressMonitor.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.engineplugin.jdbc.monitor.impl;
+
+import org.apache.linkis.manager.engineplugin.jdbc.monitor.ProgressMonitor;
+import org.apache.linkis.protocol.engine.JobProgressInfo;
+
+import io.trino.jdbc.QueryStats;
+import io.trino.jdbc.TrinoStatement;
+
+import java.sql.Statement;
+
+public class TrinoProgressMonitor extends ProgressMonitor<QueryStats> {
+ private volatile Runnable callback;
+ private volatile double sqlProgress = 0.0;
+ private volatile int completedSplits = 0;
+ private volatile int totalSplits = 0;
+ private volatile int runningSplits = 0;
+
+ @Override
+ public void accept(QueryStats stats) {
+ sqlProgress = stats.getProgressPercentage().orElse(0.0) / 100;
+ completedSplits = stats.getCompletedSplits();
+ totalSplits = stats.getTotalSplits();
+ runningSplits = stats.getRunningSplits();
+
+ if (callback != null) {
+ callback.run();
+ }
+ }
+
+ @Override
+ public void attach(Statement statement) {
+ if (statement instanceof TrinoStatement) {
+ ((TrinoStatement) statement).setProgressMonitor(this);
+ }
+ }
+
+ @Override
+ public void callback(Runnable callback) {
+ this.callback = callback;
+ }
+
+ @Override
+ public float getSqlProgress() {
+ return Double.valueOf(sqlProgress).floatValue();
+ }
+
+ @Override
+ public int getSucceedTasks() {
+ return completedSplits;
+ }
+
+ @Override
+ public int getTotalTasks() {
+ return totalSplits;
+ }
+
+ @Override
+ public int getRunningTasks() {
+ return runningSplits;
+ }
+
+ @Override
+ public int getFailedTasks() {
+ return 0;
+ }
+
+ @Override
+ public JobProgressInfo jobProgressInfo(String id) {
+ return new JobProgressInfo(id, totalSplits, runningSplits, 0, completedSplits);
+ }
+}
diff --git a/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCEngineConnExecutor.scala b/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCEngineConnExecutor.scala
index 79bf23af4..142d8ec0c 100644
--- a/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCEngineConnExecutor.scala
@@ -42,9 +42,10 @@ import org.apache.linkis.protocol.CacheableProtocol
import org.springframework.util.CollectionUtils
import org.apache.linkis.governance.common.paser.SQLCodeParser
import org.apache.linkis.manager.engineplugin.jdbc.constant.JDBCEngineConnConstant
+import org.apache.linkis.manager.engineplugin.jdbc.monitor.ProgressMonitor
+import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConversions._
-
import scala.collection.mutable.ArrayBuffer
class JDBCEngineConnExecutor(override val outputPrintLimit: Int, val id: Int) extends ConcurrentComputationExecutor(outputPrintLimit) {
@@ -52,6 +53,7 @@ class JDBCEngineConnExecutor(override val outputPrintLimit: Int, val id: Int) ex
private val connectionManager = ConnectionManager.getInstance()
private val executorLabels: util.List[Label[_]] = new util.ArrayList[Label[_]](2)
+ private val progressMonitors: util.Map[String, ProgressMonitor[_]] = new ConcurrentHashMap[String, ProgressMonitor[_]]()
override def init(): Unit = {
@@ -119,10 +121,24 @@ class JDBCEngineConnExecutor(override val outputPrintLimit: Int, val id: Int) ex
statement.setQueryTimeout(JDBCConfiguration.JDBC_QUERY_TIMEOUT.getValue)
statement.setFetchSize(outputPrintLimit)
statement.setMaxRows(outputPrintLimit)
+
+ val monitor = ProgressMonitor.attachMonitor(statement)
+ if (monitor != null) {
+ monitor.callback(new Runnable {
+ override def run(): Unit = {
+ engineExecutorContext.pushProgress(progress(taskId), getProgressInfo(taskId))
+ }
+ })
+ progressMonitors.put(taskId, monitor)
+ }
logger.info(s"create statement is: $statement")
connectionManager.saveStatement(taskId, statement)
val isResultSetAvailable = statement.execute(code)
logger.info(s"Is ResultSet available ? : $isResultSetAvailable")
+ if (monitor != null) {
+ /* refresh progress */
+ engineExecutorContext.pushProgress(progress(taskId), getProgressInfo(taskId))
+ }
try {
if (isResultSetAvailable) {
logger.info("ResultSet is available")
@@ -156,6 +172,7 @@ class JDBCEngineConnExecutor(override val outputPrintLimit: Int, val id: Int) ex
}
}
connectionManager.removeStatement(taskId)
+ progressMonitors.remove(taskId)
}
SuccessExecuteResponse()
}
@@ -210,11 +227,25 @@ class JDBCEngineConnExecutor(override val outputPrintLimit: Int, val id: Int) ex
updatedCount < 0 && columnCount <= 0
}
- override def getProgressInfo(taskID: String): Array[JobProgressInfo] = Array.empty[JobProgressInfo]
+ override def getProgressInfo(taskID: String): Array[JobProgressInfo] = {
+ val monitor = progressMonitors.get(taskID)
+ if (monitor != null) {
+ Array(monitor.jobProgressInfo(taskID))
+ } else {
+ Array.empty[JobProgressInfo]
+ }
+ }
override protected def callback(): Unit = {}
- override def progress(taskID: String): Float = 0
+ override def progress(taskID: String): Float = {
+ val monitor = progressMonitors.get(taskID)
+ if (monitor != null) {
+ monitor.getSqlProgress
+ } else {
+ 0
+ }
+ }
override def close(): Unit = {
logger.info("Start closing the jdbc engine.")
diff --git a/linkis-engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManagerTest.java b/linkis-engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManagerTest.java
index bb448d853..ed2beb58a 100644
--- a/linkis-engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManagerTest.java
+++ b/linkis-engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManagerTest.java
@@ -40,7 +40,6 @@ public class ConnectionManagerTest {
properties.put(
JDBCEngineConnConstant.JDBC_URL,
"jdbc:h2:mem:linkis_db;MODE=MySQL;DATABASE_TO_LOWER=TRUE");
- properties.put(JDBCEngineConnConstant.JDBC_DRIVER, "org.h2.Driver");
properties.put(JDBCEngineConnConstant.JDBC_USERNAME, "user");
properties.put(JDBCEngineConnConstant.JDBC_PASSWORD, "password");
properties.put(JDBCEngineConnConstant.JDBC_POOL_DEFAULT_VALIDATION_QUERY, "SELECT 1");
@@ -51,7 +50,15 @@ public class ConnectionManagerTest {
properties.put(JDBCEngineConnConstant.JDBC_PROXY_USER, "");
properties.put(JDBCEngineConnConstant.JDBC_SCRIPTS_EXEC_USER, "leo_jie");
ConnectionManager connectionManager = ConnectionManager.getInstance();
- Connection conn = connectionManager.getConnection("jdbc", properties);
+ Connection conn;
+ try {
+ conn = connectionManager.getConnection("jdbc", properties);
+ Assertions.fail("The driver class name is required");
+ } catch (JDBCParamsIllegalException e) {
+ properties.put(JDBCEngineConnConstant.JDBC_DRIVER, "org.h2.Driver");
+ conn = connectionManager.getConnection("jdbc", properties);
+ }
+
Statement statement = conn.createStatement();
ResultSet rs = statement.executeQuery("show databases;");
while (rs.next()) {
diff --git a/linkis-engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/ProgressMonitorTest.java b/linkis-engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/ProgressMonitorTest.java
new file mode 100644
index 000000000..25d3fc16f
--- /dev/null
+++ b/linkis-engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/ProgressMonitorTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.engineplugin.jdbc;
+
+import org.apache.linkis.manager.engineplugin.jdbc.monitor.ProgressMonitor;
+
+import io.trino.jdbc.QueryStats;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ProgressMonitorTest {
+ @Test
+ @DisplayName("testProgressMonitor")
+ public void testProgressMonitor() throws SQLException {
+ ProgressMonitor<?> monitor = ProgressMonitor.attachMonitor(null);
+ Assertions.assertNull(monitor);
+
+ String url = "jdbc:trino://127.0.0.1:8080/hive/test";
+ Properties properties = new Properties();
+ properties.setProperty("user", "test");
+ Connection connection = DriverManager.getConnection(url, properties);
+ monitor = ProgressMonitor.attachMonitor(connection.createStatement());
+ Assertions.assertNotNull(monitor);
+
+ AtomicBoolean callbackFlag = new AtomicBoolean(false);
+ monitor.callback(() -> callbackFlag.set(true));
+ Assertions.assertFalse(callbackFlag.get());
+
+ ProgressMonitor<QueryStats> trinoMonitor = (ProgressMonitor<QueryStats>) monitor;
+ trinoMonitor.accept(
+ new QueryStats(
+ "testId",
+ "testState",
+ false,
+ false,
+ 0,
+ 0,
+ 0,
+ 0,
+ 0,
+ 0,
+ 0,
+ 0,
+ 0,
+ 0,
+ 0,
+ 0,
+ Optional.empty()));
+ Assertions.assertTrue(callbackFlag.get());
+ }
+}
diff --git a/linkis-engineconn-plugins/jdbc/src/test/scala/org.apache.linkis.manager.engineplugin.jdbc.executer/TestJDBCEngineConnExecutor.scala b/linkis-engineconn-plugins/jdbc/src/test/scala/org.apache.linkis.manager.engineplugin.jdbc.executer/TestJDBCEngineConnExecutor.scala
index 418966782..fe1affbef 100644
--- a/linkis-engineconn-plugins/jdbc/src/test/scala/org.apache.linkis.manager.engineplugin.jdbc.executer/TestJDBCEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/jdbc/src/test/scala/org.apache.linkis.manager.engineplugin.jdbc.executer/TestJDBCEngineConnExecutor.scala
@@ -27,11 +27,14 @@ import org.apache.linkis.governance.common.entity.ExecutionNodeStatus
import org.apache.linkis.governance.common.utils.EngineConnArgumentsParser
import org.apache.linkis.manager.engineplugin.common.launch.process.Environment
import org.apache.linkis.manager.engineplugin.jdbc.factory.JDBCEngineConnFactory
+import org.apache.linkis.manager.engineplugin.jdbc.monitor.ProgressMonitor
import org.apache.linkis.manager.label.builder.factory.{LabelBuilderFactory, LabelBuilderFactoryContext}
import org.apache.linkis.manager.label.entity.Label
+import org.apache.linkis.protocol.engine.JobProgressInfo
import org.h2.tools.Server
import org.junit.jupiter.api.{Assertions, BeforeEach, Test}
+import java.sql.Statement
import java.util
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
@@ -97,6 +100,32 @@ class TestJDBCEngineConnExecutor {
engineExecutionContext.addProperty(key, value)
})
Assertions.assertNotNull(jdbcExecutor.getProgressInfo(taskId))
+
+ class TestMonitor extends ProgressMonitor[Any] {
+ override def accept(t: Any): Unit = {
+ }
+
+ override def attach(statement: Statement): Unit = {
+ }
+
+ override def callback(callback: Runnable): Unit = {
+ }
+
+ override def getSqlProgress: Float = 0.0f
+
+ override def getSucceedTasks: Int = 0
+
+ override def getTotalTasks: Int = 0
+
+ override def getRunningTasks: Int = 0
+
+ override def getFailedTasks: Int = 0
+
+ override def jobProgressInfo(id: String): JobProgressInfo = null
+ }
+ ProgressMonitor.register("com.mysql.cj.jdbc.JdbcStatement",
+ "org.apache.linkis.manager.engineplugin.jdbc.executer.TestJDBCEngineConnExecutor.TestMonitor")
+
val response = jdbcExecutor.executeLine(engineExecutionContext, cmd)
Assertions.assertNotNull(response)
}
diff --git a/pom.xml b/pom.xml
index bbe0f3e64..cbd11aa1b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -158,6 +158,7 @@
<jacoco.skip>false</jacoco.skip>
<spring.version>5.2.22.RELEASE</spring.version>
<knife4j.version>2.0.9</knife4j.version>
+ <trino.version>371</trino.version>
<maven-assembly-plugin.version>3.2.0</maven-assembly-plugin.version>
</properties>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org