You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "zhangrenhua (Jira)" <ji...@apache.org> on 2021/04/18 07:52:00 UTC
[jira] [Created] (SPARK-35126) Execute jdbc cancellation method
when jdbc load job is interrupted
zhangrenhua created SPARK-35126:
-----------------------------------
Summary: Execute jdbc cancellation method when jdbc load job is interrupted
Key: SPARK-35126
URL: https://issues.apache.org/jira/browse/SPARK-35126
Project: Spark
Issue Type: Improvement
Components: SQL
Affects Versions: 3.1.1
Environment: Environment version:
* spark3.1.1
* jdk1.8.201
* scala2.12
* mysql5.7.31
* mysql-connector-java-5.1.32.jar /mysql-connector-java-8.0.32.jar
Reporter: zhangrenhua
I have a long-running spark service that continuously receives and runs spark programs submitted by the client. There is a program to load jdbc table. Query sql is very complicated. Each execution takes a lot of time and resources. When the client submits such a similar request, the client may interrupt the job at any time. At that time, I found that the database select after the job was interrupted. The process is still executing and has not been killed.
*Scene demonstration:*
1. Prepare two tables: SPARK_TEST1/SPARK_TEST2, each of which has 1000 records)
2. Test code
{code:java}
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import java.util.concurrent.TimeUnit;
/**
* jdbc load cancel test
*
* @author gavin
* @create 2021/4/18 10:58
*/
public class JdbcLoadCancelTest {
public static void main(String[] args) throws Exception {
final SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("jdbc load test");
sparkConf.setMaster("local[*]");
final SparkContext sparkContext = new SparkContext(sparkConf);
final SparkSession sparkSession = new SparkSession(sparkContext);
// This is a sql that takes about a minute to execute
String querySql = "select t1.*\n" +
"from SPARK_TEST1 t1\n" +
"left join SPARK_TEST1 t2 on 1=1\n" +
"left join (select aa from SPARK_TEST1 limit 3) t3 on 1=1";
// Specify job information
final String jobGroup = "test";
sparkContext.clearJobGroup();
sparkContext.setJobGroup(jobGroup, "test", true);
// Start the independent thread to start the jdbc load test logic
new Thread(() -> {
final Dataset<Row> table = sparkSession.read()
.format("org.apache.spark.sql.execution.datasources.jdbc3")
.option("url", "jdbc:mysql://192.168.10.226:32320/test?useUnicode=true&characterEncoding=utf-8&useSSL=false")
.option("user", "root")
.option("password", "123456")
.option("query", querySql)
.load();
// Print the first data
System.out.println(table.limit(1).first());
}).start();
// Wait for the jdbc load job to start
TimeUnit.SECONDS.sleep(10);
// Cancel the job just now
sparkContext.cancelJobGroup(jobGroup);
// Simulate a long-running service without stopping the driver process, which is used to wait for new jobs to be received
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}
}
{code}
3. View the mysql process
{code:java}
select * from information_schema.`PROCESSLIST` where info is not null;{code}
When the program started 10 seconds later, and interrupted the job, it was found that the database query process has not been killed.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org