You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ca...@apache.org on 2022/08/09 15:30:39 UTC

[incubator-linkis] branch dev-1.2.1 updated: add progress support for trino on jdbc-plugin (#2636)

This is an automated email from the ASF dual-hosted git repository.

casion pushed a commit to branch dev-1.2.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.2.1 by this push:
     new c34d22248 add progress support for trino on jdbc-plugin (#2636)
c34d22248 is described below

commit c34d22248cca6996731a76c144da21a4776096ca
Author: Knypys <si...@qq.com>
AuthorDate: Tue Aug 9 23:30:32 2022 +0800

    add progress support for trino on jdbc-plugin (#2636)
    
    Co-authored-by: Casion <ca...@gmail.com>
---
 linkis-engineconn-plugins/jdbc/pom.xml             |  8 +-
 .../engineplugin/jdbc/ConnectionManager.java       |  4 +-
 .../engineplugin/jdbc/monitor/ProgressMonitor.java | 86 +++++++++++++++++++++
 .../jdbc/monitor/impl/TrinoProgressMonitor.java    | 88 ++++++++++++++++++++++
 .../jdbc/executer/JDBCEngineConnExecutor.scala     | 37 ++++++++-
 .../engineplugin/jdbc/ConnectionManagerTest.java   | 11 ++-
 .../engineplugin/jdbc/ProgressMonitorTest.java     | 74 ++++++++++++++++++
 .../TestJDBCEngineConnExecutor.scala               | 29 +++++++
 pom.xml                                            |  1 +
 9 files changed, 330 insertions(+), 8 deletions(-)

diff --git a/linkis-engineconn-plugins/jdbc/pom.xml b/linkis-engineconn-plugins/jdbc/pom.xml
index 6a770383e..24df9d248 100644
--- a/linkis-engineconn-plugins/jdbc/pom.xml
+++ b/linkis-engineconn-plugins/jdbc/pom.xml
@@ -128,6 +128,13 @@
             </exclusions>
         </dependency>
 
+        <dependency>
+            <groupId>io.trino</groupId>
+            <artifactId>trino-jdbc</artifactId>
+            <version>${trino.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
 
         <dependency>
             <groupId>org.apache.linkis</groupId>
@@ -196,7 +203,6 @@
                 </exclusion>
             </exclusions>
         </dependency>
-
     </dependencies>
     <build>
         <plugins>
diff --git a/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java b/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java
index b904d74fc..59db62c84 100644
--- a/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java
+++ b/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java
@@ -113,8 +113,8 @@ public class ConnectionManager {
                 JDBCPropertiesParser.getString(properties, JDBCEngineConnConstant.JDBC_DRIVER, "");
 
         if (StringUtils.isBlank(driverClassName)) {
-            LOG.error("The driver class name is not empty.");
-            throw new JDBCParamsIllegalException("The driver class name is not empty.");
+            LOG.error("The driver class name is required.");
+            throw new JDBCParamsIllegalException("The driver class name is required.");
         }
 
         String username =
diff --git a/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/monitor/ProgressMonitor.java b/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/monitor/ProgressMonitor.java
new file mode 100644
index 000000000..41b26ff1a
--- /dev/null
+++ b/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/monitor/ProgressMonitor.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.engineplugin.jdbc.monitor;
+
+import org.apache.linkis.protocol.engine.JobProgressInfo;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.alibaba.druid.pool.DruidPooledStatement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Statement;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+
+public abstract class ProgressMonitor<T> implements Consumer<T> {
+    private static final Logger LOG = LoggerFactory.getLogger(ProgressMonitor.class);
+    private static final Map<String, String> MONITORS = new ConcurrentHashMap<>();
+
+    static {
+        register(
+                "io.trino.jdbc.TrinoStatement",
+                "org.apache.linkis.manager.engineplugin.jdbc.monitor.impl.TrinoProgressMonitor");
+    }
+
+    public static void register(String statementClassName, String monitorClassName) {
+        MONITORS.put(statementClassName, monitorClassName);
+    }
+
+    public static ProgressMonitor<?> attachMonitor(Statement statement) {
+        ProgressMonitor<?> progressMonitor = null;
+        /* unwrap the druid statement */
+        if (statement instanceof DruidPooledStatement) {
+            statement = ((DruidPooledStatement) statement).getStatement();
+        }
+        try {
+            String monitorName = MONITORS.get(statement.getClass().getName());
+            if (StringUtils.isNotBlank(monitorName)) {
+                progressMonitor = (ProgressMonitor<?>) Class.forName(monitorName).newInstance();
+            }
+            if (progressMonitor != null) {
+                progressMonitor.attach(statement);
+            }
+        } catch (Exception e) {
+            LOG.warn(
+                    "Failed to create monitor for statement: {}, exception: {} - {}",
+                    statement,
+                    e.getClass().getName(),
+                    e.getMessage());
+        }
+        return progressMonitor;
+    }
+
+    public abstract void attach(Statement statement);
+
+    public abstract void callback(Runnable callback);
+
+    public abstract float getSqlProgress();
+
+    public abstract int getSucceedTasks();
+
+    public abstract int getTotalTasks();
+
+    public abstract int getRunningTasks();
+
+    public abstract int getFailedTasks();
+
+    public abstract JobProgressInfo jobProgressInfo(String id);
+}
diff --git a/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/monitor/impl/TrinoProgressMonitor.java b/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/monitor/impl/TrinoProgressMonitor.java
new file mode 100644
index 000000000..898567f7d
--- /dev/null
+++ b/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/monitor/impl/TrinoProgressMonitor.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.engineplugin.jdbc.monitor.impl;
+
+import org.apache.linkis.manager.engineplugin.jdbc.monitor.ProgressMonitor;
+import org.apache.linkis.protocol.engine.JobProgressInfo;
+
+import io.trino.jdbc.QueryStats;
+import io.trino.jdbc.TrinoStatement;
+
+import java.sql.Statement;
+
+public class TrinoProgressMonitor extends ProgressMonitor<QueryStats> {
+    private volatile Runnable callback;
+    private volatile double sqlProgress = 0.0;
+    private volatile int completedSplits = 0;
+    private volatile int totalSplits = 0;
+    private volatile int runningSplits = 0;
+
+    @Override
+    public void accept(QueryStats stats) {
+        sqlProgress = stats.getProgressPercentage().orElse(0.0) / 100;
+        completedSplits = stats.getCompletedSplits();
+        totalSplits = stats.getTotalSplits();
+        runningSplits = stats.getRunningSplits();
+
+        if (callback != null) {
+            callback.run();
+        }
+    }
+
+    @Override
+    public void attach(Statement statement) {
+        if (statement instanceof TrinoStatement) {
+            ((TrinoStatement) statement).setProgressMonitor(this);
+        }
+    }
+
+    @Override
+    public void callback(Runnable callback) {
+        this.callback = callback;
+    }
+
+    @Override
+    public float getSqlProgress() {
+        return Double.valueOf(sqlProgress).floatValue();
+    }
+
+    @Override
+    public int getSucceedTasks() {
+        return completedSplits;
+    }
+
+    @Override
+    public int getTotalTasks() {
+        return totalSplits;
+    }
+
+    @Override
+    public int getRunningTasks() {
+        return runningSplits;
+    }
+
+    @Override
+    public int getFailedTasks() {
+        return 0;
+    }
+
+    @Override
+    public JobProgressInfo jobProgressInfo(String id) {
+        return new JobProgressInfo(id, totalSplits, runningSplits, 0, completedSplits);
+    }
+}
diff --git a/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCEngineConnExecutor.scala b/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCEngineConnExecutor.scala
index 79bf23af4..142d8ec0c 100644
--- a/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCEngineConnExecutor.scala
@@ -42,9 +42,10 @@ import org.apache.linkis.protocol.CacheableProtocol
 import org.springframework.util.CollectionUtils
 import org.apache.linkis.governance.common.paser.SQLCodeParser
 import org.apache.linkis.manager.engineplugin.jdbc.constant.JDBCEngineConnConstant
+import org.apache.linkis.manager.engineplugin.jdbc.monitor.ProgressMonitor
 
+import java.util.concurrent.ConcurrentHashMap
 import scala.collection.JavaConversions._
-
 import scala.collection.mutable.ArrayBuffer
 
 class JDBCEngineConnExecutor(override val outputPrintLimit: Int, val id: Int) extends ConcurrentComputationExecutor(outputPrintLimit) {
@@ -52,6 +53,7 @@ class JDBCEngineConnExecutor(override val outputPrintLimit: Int, val id: Int) ex
 
   private val connectionManager = ConnectionManager.getInstance()
   private val executorLabels: util.List[Label[_]] = new util.ArrayList[Label[_]](2)
+  private val progressMonitors: util.Map[String, ProgressMonitor[_]] = new ConcurrentHashMap[String, ProgressMonitor[_]]()
 
 
   override def init(): Unit = {
@@ -119,10 +121,24 @@ class JDBCEngineConnExecutor(override val outputPrintLimit: Int, val id: Int) ex
       statement.setQueryTimeout(JDBCConfiguration.JDBC_QUERY_TIMEOUT.getValue)
       statement.setFetchSize(outputPrintLimit)
       statement.setMaxRows(outputPrintLimit)
+
+      val monitor = ProgressMonitor.attachMonitor(statement)
+      if (monitor != null) {
+        monitor.callback(new Runnable {
+          override def run(): Unit = {
+            engineExecutorContext.pushProgress(progress(taskId), getProgressInfo(taskId))
+          }
+        })
+        progressMonitors.put(taskId, monitor)
+      }
       logger.info(s"create statement is:  $statement")
       connectionManager.saveStatement(taskId, statement)
       val isResultSetAvailable = statement.execute(code)
       logger.info(s"Is ResultSet available ? : $isResultSetAvailable")
+      if (monitor != null) {
+        /* refresh progress */
+        engineExecutorContext.pushProgress(progress(taskId), getProgressInfo(taskId))
+      }
       try {
         if (isResultSetAvailable) {
           logger.info("ResultSet is available")
@@ -156,6 +172,7 @@ class JDBCEngineConnExecutor(override val outputPrintLimit: Int, val id: Int) ex
         }
       }
       connectionManager.removeStatement(taskId)
+      progressMonitors.remove(taskId)
     }
     SuccessExecuteResponse()
   }
@@ -210,11 +227,25 @@ class JDBCEngineConnExecutor(override val outputPrintLimit: Int, val id: Int) ex
     updatedCount < 0 && columnCount <= 0
   }
 
-  override def getProgressInfo(taskID: String): Array[JobProgressInfo] = Array.empty[JobProgressInfo]
+  override def getProgressInfo(taskID: String): Array[JobProgressInfo] = {
+    val monitor = progressMonitors.get(taskID)
+    if (monitor != null) {
+      Array(monitor.jobProgressInfo(taskID))
+    } else {
+      Array.empty[JobProgressInfo]
+    }
+  }
 
   override protected def callback(): Unit = {}
 
-  override def progress(taskID: String): Float = 0
+  override def progress(taskID: String): Float = {
+    val monitor = progressMonitors.get(taskID)
+    if (monitor != null) {
+      monitor.getSqlProgress
+    } else {
+      0
+    }
+  }
 
   override def close(): Unit = {
     logger.info("Start closing the jdbc engine.")
diff --git a/linkis-engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManagerTest.java b/linkis-engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManagerTest.java
index bb448d853..ed2beb58a 100644
--- a/linkis-engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManagerTest.java
+++ b/linkis-engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManagerTest.java
@@ -40,7 +40,6 @@ public class ConnectionManagerTest {
         properties.put(
                 JDBCEngineConnConstant.JDBC_URL,
                 "jdbc:h2:mem:linkis_db;MODE=MySQL;DATABASE_TO_LOWER=TRUE");
-        properties.put(JDBCEngineConnConstant.JDBC_DRIVER, "org.h2.Driver");
         properties.put(JDBCEngineConnConstant.JDBC_USERNAME, "user");
         properties.put(JDBCEngineConnConstant.JDBC_PASSWORD, "password");
         properties.put(JDBCEngineConnConstant.JDBC_POOL_DEFAULT_VALIDATION_QUERY, "SELECT 1");
@@ -51,7 +50,15 @@ public class ConnectionManagerTest {
         properties.put(JDBCEngineConnConstant.JDBC_PROXY_USER, "");
         properties.put(JDBCEngineConnConstant.JDBC_SCRIPTS_EXEC_USER, "leo_jie");
         ConnectionManager connectionManager = ConnectionManager.getInstance();
-        Connection conn = connectionManager.getConnection("jdbc", properties);
+        Connection conn;
+        try {
+            conn = connectionManager.getConnection("jdbc", properties);
+            Assertions.fail("The driver class name is required");
+        } catch (JDBCParamsIllegalException e) {
+            properties.put(JDBCEngineConnConstant.JDBC_DRIVER, "org.h2.Driver");
+            conn = connectionManager.getConnection("jdbc", properties);
+        }
+
         Statement statement = conn.createStatement();
         ResultSet rs = statement.executeQuery("show databases;");
         while (rs.next()) {
diff --git a/linkis-engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/ProgressMonitorTest.java b/linkis-engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/ProgressMonitorTest.java
new file mode 100644
index 000000000..25d3fc16f
--- /dev/null
+++ b/linkis-engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/ProgressMonitorTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.engineplugin.jdbc;
+
+import org.apache.linkis.manager.engineplugin.jdbc.monitor.ProgressMonitor;
+
+import io.trino.jdbc.QueryStats;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ProgressMonitorTest {
+    @Test
+    @DisplayName("testProgressMonitor")
+    public void testProgressMonitor() throws SQLException {
+        ProgressMonitor<?> monitor = ProgressMonitor.attachMonitor(null);
+        Assertions.assertNull(monitor);
+
+        String url = "jdbc:trino://127.0.0.1:8080/hive/test";
+        Properties properties = new Properties();
+        properties.setProperty("user", "test");
+        Connection connection = DriverManager.getConnection(url, properties);
+        monitor = ProgressMonitor.attachMonitor(connection.createStatement());
+        Assertions.assertNotNull(monitor);
+
+        AtomicBoolean callbackFlag = new AtomicBoolean(false);
+        monitor.callback(() -> callbackFlag.set(true));
+        Assertions.assertFalse(callbackFlag.get());
+
+        ProgressMonitor<QueryStats> trinoMonitor = (ProgressMonitor<QueryStats>) monitor;
+        trinoMonitor.accept(
+                new QueryStats(
+                        "testId",
+                        "testState",
+                        false,
+                        false,
+                        0,
+                        0,
+                        0,
+                        0,
+                        0,
+                        0,
+                        0,
+                        0,
+                        0,
+                        0,
+                        0,
+                        0,
+                        Optional.empty()));
+        Assertions.assertTrue(callbackFlag.get());
+    }
+}
diff --git a/linkis-engineconn-plugins/jdbc/src/test/scala/org.apache.linkis.manager.engineplugin.jdbc.executer/TestJDBCEngineConnExecutor.scala b/linkis-engineconn-plugins/jdbc/src/test/scala/org.apache.linkis.manager.engineplugin.jdbc.executer/TestJDBCEngineConnExecutor.scala
index 418966782..fe1affbef 100644
--- a/linkis-engineconn-plugins/jdbc/src/test/scala/org.apache.linkis.manager.engineplugin.jdbc.executer/TestJDBCEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/jdbc/src/test/scala/org.apache.linkis.manager.engineplugin.jdbc.executer/TestJDBCEngineConnExecutor.scala
@@ -27,11 +27,14 @@ import org.apache.linkis.governance.common.entity.ExecutionNodeStatus
 import org.apache.linkis.governance.common.utils.EngineConnArgumentsParser
 import org.apache.linkis.manager.engineplugin.common.launch.process.Environment
 import org.apache.linkis.manager.engineplugin.jdbc.factory.JDBCEngineConnFactory
+import org.apache.linkis.manager.engineplugin.jdbc.monitor.ProgressMonitor
 import org.apache.linkis.manager.label.builder.factory.{LabelBuilderFactory, LabelBuilderFactoryContext}
 import org.apache.linkis.manager.label.entity.Label
+import org.apache.linkis.protocol.engine.JobProgressInfo
 import org.h2.tools.Server
 import org.junit.jupiter.api.{Assertions, BeforeEach, Test}
 
+import java.sql.Statement
 import java.util
 import scala.collection.JavaConversions._
 import scala.collection.mutable.ArrayBuffer
@@ -97,6 +100,32 @@ class TestJDBCEngineConnExecutor {
         engineExecutionContext.addProperty(key, value)
     })
     Assertions.assertNotNull(jdbcExecutor.getProgressInfo(taskId))
+
+    class TestMonitor extends ProgressMonitor[Any] {
+      override def accept(t: Any): Unit = {
+      }
+
+      override def attach(statement: Statement): Unit = {
+      }
+
+      override def callback(callback: Runnable): Unit = {
+      }
+
+      override def getSqlProgress: Float = 0.0f
+
+      override def getSucceedTasks: Int = 0
+
+      override def getTotalTasks: Int = 0
+
+      override def getRunningTasks: Int = 0
+
+      override def getFailedTasks: Int = 0
+
+      override def jobProgressInfo(id: String): JobProgressInfo = null
+    }
+    ProgressMonitor.register("com.mysql.cj.jdbc.JdbcStatement",
+      "org.apache.linkis.manager.engineplugin.jdbc.executer.TestJDBCEngineConnExecutor.TestMonitor")
+
     val response = jdbcExecutor.executeLine(engineExecutionContext, cmd)
     Assertions.assertNotNull(response)
   }
diff --git a/pom.xml b/pom.xml
index bbe0f3e64..cbd11aa1b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -158,6 +158,7 @@
         <jacoco.skip>false</jacoco.skip>
         <spring.version>5.2.22.RELEASE</spring.version>
         <knife4j.version>2.0.9</knife4j.version>
+        <trino.version>371</trino.version>
         <maven-assembly-plugin.version>3.2.0</maven-assembly-plugin.version>
     </properties>
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org