You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "zhangrenhua (Jira)" <ji...@apache.org> on 2021/04/18 07:52:00 UTC

[jira] [Created] (SPARK-35126) Execute jdbc cancellation method when jdbc load job is interrupted

zhangrenhua created SPARK-35126:
-----------------------------------

             Summary: Execute jdbc cancellation method when jdbc load job is interrupted
                 Key: SPARK-35126
                 URL: https://issues.apache.org/jira/browse/SPARK-35126
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 3.1.1
         Environment: Environment version:
 * spark3.1.1
 * jdk1.8.201
 * scala2.12
 * mysql5.7.31
 * mysql-connector-java-5.1.32.jar /mysql-connector-java-8.0.32.jar
            Reporter: zhangrenhua


I have a long-running spark service that continuously receives and runs spark programs submitted by the client. There is a program to load jdbc table. Query sql is very complicated. Each execution takes a lot of time and resources. When the client submits such a similar request, the client may interrupt the job at any time. At that time, I found that the database select after the job was interrupted. The process is still executing and has not been killed.

 

*Scene demonstration:*

1. Prepare two tables: SPARK_TEST1/SPARK_TEST2, each of which has 1000 records)

2. Test code
{code:java}
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.util.concurrent.TimeUnit;

/**
 * jdbc load cancel test
 *
 * @author gavin
 * @create 2021/4/18 10:58
 */
public class JdbcLoadCancelTest {

    public static void main(String[] args) throws Exception {
        final SparkConf sparkConf = new SparkConf();
        sparkConf.setAppName("jdbc load test");
        sparkConf.setMaster("local[*]");
        final SparkContext sparkContext = new SparkContext(sparkConf);
        final SparkSession sparkSession = new SparkSession(sparkContext);

        // This is a sql that takes about a minute to execute
        String querySql = "select t1.*\n" +
                "from SPARK_TEST1 t1\n" +
                "left join SPARK_TEST1 t2 on 1=1\n" +
                "left join (select aa from SPARK_TEST1 limit 3) t3  on 1=1";

        // Specify job information
        final String jobGroup = "test";
        sparkContext.clearJobGroup();
        sparkContext.setJobGroup(jobGroup, "test", true);

        // Start the independent thread to start the jdbc load test logic
        new Thread(() -> {
            final Dataset<Row> table = sparkSession.read()
                    .format("org.apache.spark.sql.execution.datasources.jdbc3")
                    .option("url", "jdbc:mysql://192.168.10.226:32320/test?useUnicode=true&characterEncoding=utf-8&useSSL=false")
                    .option("user", "root")
                    .option("password", "123456")
                    .option("query", querySql)
                    .load();

            // Print the first data
            System.out.println(table.limit(1).first());
        }).start();

        // Wait for the jdbc load job to start
        TimeUnit.SECONDS.sleep(10);

        // Cancel the job just now
        sparkContext.cancelJobGroup(jobGroup);

        // Simulate a long-running service without stopping the driver process, which is used to wait for new jobs to be received
        TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
    }
}
{code}
 

3. View the mysql process
{code:java}
select * from information_schema.`PROCESSLIST` where info is not null;{code}
When the program started 10 seconds later, and interrupted the job, it was found that the database query process has not been killed.

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org