You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/04/18 08:21:32 UTC

[GitHub] [spark] zhangrenhua opened a new pull request #32222: [SPARK-35126][SQL] Execute jdbc cancellation method when jdbc load job is interrupted

zhangrenhua opened a new pull request #32222:
URL: https://github.com/apache/spark/pull/32222


   
   See [SPARK-35126](https://issues.apache.org/jira/browse/SPARK-35126)
   By adding the start job interrupt listener thread before sql executes the query, if the job is interrupted and stmt.executeQuery() is not completed, execute stmt.cancel() to tell the database to terminate the SQL process just executed. If the sql execution is complete, the listening thread will stop working.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maropu commented on a change in pull request #32222: [SPARK-35126][SQL] Execute jdbc cancellation method when jdbc load job is interrupted

Posted by GitBox <gi...@apache.org>.
maropu commented on a change in pull request #32222:
URL: https://github.com/apache/spark/pull/32222#discussion_r615483668



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
##########
@@ -301,10 +306,44 @@ private[jdbc] class JDBCRDD(
         ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
     stmt.setFetchSize(options.fetchSize)
     stmt.setQueryTimeout(options.queryTimeout)
-    rs = stmt.executeQuery()
+
+    val isFinished = new AtomicBoolean(false)
+    startTheJobInterruptListenerThread(context, stmt, isFinished)
+    try{
+      rs = stmt.executeQuery()
+    } finally {
+      isFinished.set(true)
+    }
     val rowsIterator = JdbcUtils.resultSetToSparkInternalRows(rs, schema, inputMetrics)
 
     CompletionIterator[InternalRow, Iterator[InternalRow]](
       new InterruptibleIterator(context, rowsIterator), close())
   }
+
+  /**
+   * Start the job interruption listener thread, if the job is interrupted and
+   * stmt.executeQuery() is not completed, initiate a stmt.cancel() request.
+   */
+  private def startTheJobInterruptListenerThread(context: TaskContext, stmt: PreparedStatement
+                                                 , isFinished: AtomicBoolean): Unit = {

Review comment:
       I think we already respect the interruption via `InterruptibleIterator`:
   https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala#L308
   
   Why do we need to handle it in this way?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhangrenhua commented on a change in pull request #32222: [SPARK-35126][SQL] Execute jdbc cancellation method when jdbc load job is interrupted

Posted by GitBox <gi...@apache.org>.
zhangrenhua commented on a change in pull request #32222:
URL: https://github.com/apache/spark/pull/32222#discussion_r615486844



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
##########
@@ -301,10 +306,44 @@ private[jdbc] class JDBCRDD(
         ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
     stmt.setFetchSize(options.fetchSize)
     stmt.setQueryTimeout(options.queryTimeout)
-    rs = stmt.executeQuery()
+
+    val isFinished = new AtomicBoolean(false)
+    startTheJobInterruptListenerThread(context, stmt, isFinished)
+    try{
+      rs = stmt.executeQuery()
+    } finally {
+      isFinished.set(true)
+    }
     val rowsIterator = JdbcUtils.resultSetToSparkInternalRows(rs, schema, inputMetrics)
 
     CompletionIterator[InternalRow, Iterator[InternalRow]](
       new InterruptibleIterator(context, rowsIterator), close())
   }
+
+  /**
+   * Start the job interruption listener thread, if the job is interrupted and
+   * stmt.executeQuery() is not completed, initiate a stmt.cancel() request.
+   */
+  private def startTheJobInterruptListenerThread(context: TaskContext, stmt: PreparedStatement
+                                                 , isFinished: AtomicBoolean): Unit = {

Review comment:
       InterruptibleIterator is only for sql execution completed and returned ResultSet object, support interruption in the process of data iteration. When sql is executing, it cannot be interrupted




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maropu commented on a change in pull request #32222: [SPARK-35126][SQL] Execute jdbc cancellation method when jdbc load job is interrupted

Posted by GitBox <gi...@apache.org>.
maropu commented on a change in pull request #32222:
URL: https://github.com/apache/spark/pull/32222#discussion_r625576857



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
##########
@@ -301,10 +306,44 @@ private[jdbc] class JDBCRDD(
         ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
     stmt.setFetchSize(options.fetchSize)
     stmt.setQueryTimeout(options.queryTimeout)
-    rs = stmt.executeQuery()
+
+    val isFinished = new AtomicBoolean(false)
+    startTheJobInterruptListenerThread(context, stmt, isFinished)
+    try{
+      rs = stmt.executeQuery()
+    } finally {
+      isFinished.set(true)
+    }
     val rowsIterator = JdbcUtils.resultSetToSparkInternalRows(rs, schema, inputMetrics)
 
     CompletionIterator[InternalRow, Iterator[InternalRow]](
       new InterruptibleIterator(context, rowsIterator), close())
   }
+
+  /**
+   * Start the job interruption listener thread, if the job is interrupted and
+   * stmt.executeQuery() is not completed, initiate a stmt.cancel() request.
+   */
+  private def startTheJobInterruptListenerThread(context: TaskContext, stmt: PreparedStatement
+                                                 , isFinished: AtomicBoolean): Unit = {

Review comment:
       I'm still not sure whether this is useful or not. If `fetchSize` is not enough, how about using `queryTimeout` together? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhangrenhua commented on pull request #32222: [SPARK-35126][SQL] Execute jdbc cancellation method when jdbc load job is interrupted

Posted by GitBox <gi...@apache.org>.
zhangrenhua commented on pull request #32222:
URL: https://github.com/apache/spark/pull/32222#issuecomment-822101328


   > Thank you for your first contribution, @zhangrenhua . Could you follow the PR template, first? https://github.com/apache/spark/blob/master/.github/PULL_REQUEST_TEMPLATE
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhangrenhua commented on a change in pull request #32222: [SPARK-35126][SQL] Execute jdbc cancellation method when jdbc load job is interrupted

Posted by GitBox <gi...@apache.org>.
zhangrenhua commented on a change in pull request #32222:
URL: https://github.com/apache/spark/pull/32222#discussion_r615487137



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
##########
@@ -301,10 +306,44 @@ private[jdbc] class JDBCRDD(
         ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
     stmt.setFetchSize(options.fetchSize)
     stmt.setQueryTimeout(options.queryTimeout)
-    rs = stmt.executeQuery()
+
+    val isFinished = new AtomicBoolean(false)
+    startTheJobInterruptListenerThread(context, stmt, isFinished)
+    try{
+      rs = stmt.executeQuery()
+    } finally {
+      isFinished.set(true)
+    }
     val rowsIterator = JdbcUtils.resultSetToSparkInternalRows(rs, schema, inputMetrics)
 
     CompletionIterator[InternalRow, Iterator[InternalRow]](
       new InterruptibleIterator(context, rowsIterator), close())
   }
+
+  /**
+   * Start the job interruption listener thread, if the job is interrupted and
+   * stmt.executeQuery() is not completed, initiate a stmt.cancel() request.
+   */
+  private def startTheJobInterruptListenerThread(context: TaskContext, stmt: PreparedStatement
+                                                 , isFinished: AtomicBoolean): Unit = {

Review comment:
       Ok, I will adjust it later




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhangrenhua commented on a change in pull request #32222: [SPARK-35126][SQL] Execute jdbc cancellation method when jdbc load job is interrupted

Posted by GitBox <gi...@apache.org>.
zhangrenhua commented on a change in pull request #32222:
URL: https://github.com/apache/spark/pull/32222#discussion_r625512554



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
##########
@@ -301,10 +306,44 @@ private[jdbc] class JDBCRDD(
         ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
     stmt.setFetchSize(options.fetchSize)
     stmt.setQueryTimeout(options.queryTimeout)
-    rs = stmt.executeQuery()
+
+    val isFinished = new AtomicBoolean(false)
+    startTheJobInterruptListenerThread(context, stmt, isFinished)
+    try{
+      rs = stmt.executeQuery()
+    } finally {
+      isFinished.set(true)
+    }
     val rowsIterator = JdbcUtils.resultSetToSparkInternalRows(rs, schema, inputMetrics)
 
     CompletionIterator[InternalRow, Iterator[InternalRow]](
       new InterruptibleIterator(context, rowsIterator), close())
   }
+
+  /**
+   * Start the job interruption listener thread, if the job is interrupted and
+   * stmt.executeQuery() is not completed, initiate a stmt.cancel() request.
+   */
+  private def startTheJobInterruptListenerThread(context: TaskContext, stmt: PreparedStatement
+                                                 , isFinished: AtomicBoolean): Unit = {

Review comment:
       This pull request is useful for long-running services that often deal with jdbc data sources.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhangrenhua closed pull request #32222: [SPARK-35126][SQL] Execute jdbc cancellation method when jdbc load job is interrupted

Posted by GitBox <gi...@apache.org>.
zhangrenhua closed pull request #32222:
URL: https://github.com/apache/spark/pull/32222


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maropu commented on a change in pull request #32222: [SPARK-35126][SQL] Execute jdbc cancellation method when jdbc load job is interrupted

Posted by GitBox <gi...@apache.org>.
maropu commented on a change in pull request #32222:
URL: https://github.com/apache/spark/pull/32222#discussion_r615483750



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
##########
@@ -301,10 +306,44 @@ private[jdbc] class JDBCRDD(
         ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
     stmt.setFetchSize(options.fetchSize)
     stmt.setQueryTimeout(options.queryTimeout)
-    rs = stmt.executeQuery()
+
+    val isFinished = new AtomicBoolean(false)
+    startTheJobInterruptListenerThread(context, stmt, isFinished)
+    try{
+      rs = stmt.executeQuery()
+    } finally {
+      isFinished.set(true)
+    }
     val rowsIterator = JdbcUtils.resultSetToSparkInternalRows(rs, schema, inputMetrics)
 
     CompletionIterator[InternalRow, Iterator[InternalRow]](
       new InterruptibleIterator(context, rowsIterator), close())
   }
+
+  /**
+   * Start the job interruption listener thread, if the job is interrupted and
+   * stmt.executeQuery() is not completed, initiate a stmt.cancel() request.
+   */
+  private def startTheJobInterruptListenerThread(context: TaskContext, stmt: PreparedStatement
+                                                 , isFinished: AtomicBoolean): Unit = {

Review comment:
       btw, please follow the style guide: https://github.com/databricks/scala-style-guide




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhangrenhua commented on a change in pull request #32222: [SPARK-35126][SQL] Execute jdbc cancellation method when jdbc load job is interrupted

Posted by GitBox <gi...@apache.org>.
zhangrenhua commented on a change in pull request #32222:
URL: https://github.com/apache/spark/pull/32222#discussion_r615370452



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
##########
@@ -301,10 +306,44 @@ private[jdbc] class JDBCRDD(
         ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
     stmt.setFetchSize(options.fetchSize)
     stmt.setQueryTimeout(options.queryTimeout)
-    rs = stmt.executeQuery()
+
+    val isFinished = new AtomicBoolean(false)
+    startTheJobInterruptListenerThread(context, stmt, isFinished)
+    try{
+      rs = stmt.executeQuery()
+    } finally {
+      isFinished.set(true)
+    }
     val rowsIterator = JdbcUtils.resultSetToSparkInternalRows(rs, schema, inputMetrics)
 
     CompletionIterator[InternalRow, Iterator[InternalRow]](
       new InterruptibleIterator(context, rowsIterator), close())
   }
+
+  /**
+   * Start the job interruption listener thread, if the job is interrupted and
+   * stmt.executeQuery() is not completed, initiate a stmt.cancel() request.
+   */
+  private def startTheJobInterruptListenerThread(context: TaskContext, stmt: PreparedStatement
+                                                 , isFinished: AtomicBoolean): Unit = {
+    val thread = new Thread(() => {
+      val waitInterval = 100
+      // Always wait for job interruption or stmt.executeQuery() execution to complete
+      while (!context.isInterrupted() && !isFinished.get()) {
+        Thread.sleep(waitInterval)
+      }
+
+      // Is interrupted and stmt.executeQuery() is not completed, then initiate a cancel request
+      if (context.isInterrupted() && !isFinished.get()) {
+        logInfo(s"try to cancel query ${context.getKillReason()}")
+        try {
+          stmt.cancel()
+        } catch {
+          case e: Exception => logWarning("Exception cancel statement", e)

Review comment:
       Yes,only need to handle SQLException.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maropu commented on a change in pull request #32222: [SPARK-35126][SQL] Execute jdbc cancellation method when jdbc load job is interrupted

Posted by GitBox <gi...@apache.org>.
maropu commented on a change in pull request #32222:
URL: https://github.com/apache/spark/pull/32222#discussion_r615491216



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
##########
@@ -301,10 +306,44 @@ private[jdbc] class JDBCRDD(
         ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
     stmt.setFetchSize(options.fetchSize)
     stmt.setQueryTimeout(options.queryTimeout)
-    rs = stmt.executeQuery()
+
+    val isFinished = new AtomicBoolean(false)
+    startTheJobInterruptListenerThread(context, stmt, isFinished)
+    try{
+      rs = stmt.executeQuery()
+    } finally {
+      isFinished.set(true)
+    }
     val rowsIterator = JdbcUtils.resultSetToSparkInternalRows(rs, schema, inputMetrics)
 
     CompletionIterator[InternalRow, Iterator[InternalRow]](
       new InterruptibleIterator(context, rowsIterator), close())
   }
+
+  /**
+   * Start the job interruption listener thread, if the job is interrupted and
+   * stmt.executeQuery() is not completed, initiate a stmt.cancel() request.
+   */
+  private def startTheJobInterruptListenerThread(context: TaskContext, stmt: PreparedStatement
+                                                 , isFinished: AtomicBoolean): Unit = {

Review comment:
       How about setting `fetchSize`? If ResultSet is a cursor, it seems the process can be interrupted correctly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] github-actions[bot] closed pull request #32222: [SPARK-35126][SQL] Execute jdbc cancellation method when jdbc load job is interrupted

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #32222:
URL: https://github.com/apache/spark/pull/32222


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhangrenhua edited a comment on pull request #32222: [SPARK-35126][SQL] Execute jdbc cancellation method when jdbc load job is interrupted

Posted by GitBox <gi...@apache.org>.
zhangrenhua edited a comment on pull request #32222:
URL: https://github.com/apache/spark/pull/32222#issuecomment-822101328


   > Thank you for your first contribution, @zhangrenhua . Could you follow the PR template, first? https://github.com/apache/spark/blob/master/.github/PULL_REQUEST_TEMPLATE
   
   Ok, I will add it tonight
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yikf commented on a change in pull request #32222: [SPARK-35126][SQL] Execute jdbc cancellation method when jdbc load job is interrupted

Posted by GitBox <gi...@apache.org>.
yikf commented on a change in pull request #32222:
URL: https://github.com/apache/spark/pull/32222#discussion_r615367709



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
##########
@@ -301,10 +306,44 @@ private[jdbc] class JDBCRDD(
         ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
     stmt.setFetchSize(options.fetchSize)
     stmt.setQueryTimeout(options.queryTimeout)
-    rs = stmt.executeQuery()
+
+    val isFinished = new AtomicBoolean(false)
+    startTheJobInterruptListenerThread(context, stmt, isFinished)
+    try{
+      rs = stmt.executeQuery()
+    } finally {
+      isFinished.set(true)
+    }
     val rowsIterator = JdbcUtils.resultSetToSparkInternalRows(rs, schema, inputMetrics)
 
     CompletionIterator[InternalRow, Iterator[InternalRow]](
       new InterruptibleIterator(context, rowsIterator), close())
   }
+
+  /**
+   * Start the job interruption listener thread, if the job is interrupted and
+   * stmt.executeQuery() is not completed, initiate a stmt.cancel() request.
+   */
+  private def startTheJobInterruptListenerThread(context: TaskContext, stmt: PreparedStatement
+                                                 , isFinished: AtomicBoolean): Unit = {
+    val thread = new Thread(() => {
+      val waitInterval = 100
+      // Always wait for job interruption or stmt.executeQuery() execution to complete
+      while (!context.isInterrupted() && !isFinished.get()) {
+        Thread.sleep(waitInterval)
+      }
+
+      // Is interrupted and stmt.executeQuery() is not completed, then initiate a cancel request
+      if (context.isInterrupted() && !isFinished.get()) {
+        logInfo(s"try to cancel query ${context.getKillReason()}")
+        try {
+          stmt.cancel()
+        } catch {
+          case e: Exception => logWarning("Exception cancel statement", e)

Review comment:
       if cancel occur a error, `stmt.cancel()` will throw SQLException only, rigth?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
##########
@@ -301,10 +306,44 @@ private[jdbc] class JDBCRDD(
         ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
     stmt.setFetchSize(options.fetchSize)
     stmt.setQueryTimeout(options.queryTimeout)
-    rs = stmt.executeQuery()
+
+    val isFinished = new AtomicBoolean(false)

Review comment:
       Could you add test suite for this code?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
##########
@@ -301,10 +306,44 @@ private[jdbc] class JDBCRDD(
         ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
     stmt.setFetchSize(options.fetchSize)
     stmt.setQueryTimeout(options.queryTimeout)
-    rs = stmt.executeQuery()
+
+    val isFinished = new AtomicBoolean(false)
+    startTheJobInterruptListenerThread(context, stmt, isFinished)
+    try{
+      rs = stmt.executeQuery()
+    } finally {
+      isFinished.set(true)
+    }
     val rowsIterator = JdbcUtils.resultSetToSparkInternalRows(rs, schema, inputMetrics)
 
     CompletionIterator[InternalRow, Iterator[InternalRow]](
       new InterruptibleIterator(context, rowsIterator), close())
   }
+
+  /**
+   * Start the job interruption listener thread, if the job is interrupted and
+   * stmt.executeQuery() is not completed, initiate a stmt.cancel() request.
+   */
+  private def startTheJobInterruptListenerThread(context: TaskContext, stmt: PreparedStatement
+                                                 , isFinished: AtomicBoolean): Unit = {
+    val thread = new Thread(() => {
+      val waitInterval = 100

Review comment:
       why is the period set to 100 millis?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32222: [SPARK-35126][SQL] Execute jdbc cancellation method when jdbc load job is interrupted

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32222:
URL: https://github.com/apache/spark/pull/32222#issuecomment-821957264


   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maropu commented on pull request #32222: [SPARK-35126][SQL] Execute jdbc cancellation method when jdbc load job is interrupted

Posted by GitBox <gi...@apache.org>.
maropu commented on pull request #32222:
URL: https://github.com/apache/spark/pull/32222#issuecomment-822097262


   Thank you for your first contribution, @zhangrenhua . Could you follow the PR template, first? https://github.com/apache/spark/blob/master/.github/PULL_REQUEST_TEMPLATE


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhangrenhua commented on a change in pull request #32222: [SPARK-35126][SQL] Execute jdbc cancellation method when jdbc load job is interrupted

Posted by GitBox <gi...@apache.org>.
zhangrenhua commented on a change in pull request #32222:
URL: https://github.com/apache/spark/pull/32222#discussion_r616002502



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
##########
@@ -301,10 +306,44 @@ private[jdbc] class JDBCRDD(
         ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
     stmt.setFetchSize(options.fetchSize)
     stmt.setQueryTimeout(options.queryTimeout)
-    rs = stmt.executeQuery()
+
+    val isFinished = new AtomicBoolean(false)
+    startTheJobInterruptListenerThread(context, stmt, isFinished)
+    try{
+      rs = stmt.executeQuery()
+    } finally {
+      isFinished.set(true)
+    }
     val rowsIterator = JdbcUtils.resultSetToSparkInternalRows(rs, schema, inputMetrics)
 
     CompletionIterator[InternalRow, Iterator[InternalRow]](
       new InterruptibleIterator(context, rowsIterator), close())
   }
+
+  /**
+   * Start the job interruption listener thread, if the job is interrupted and
+   * stmt.executeQuery() is not completed, initiate a stmt.cancel() request.
+   */
+  private def startTheJobInterruptListenerThread(context: TaskContext, stmt: PreparedStatement
+                                                 , isFinished: AtomicBoolean): Unit = {

Review comment:
       I found that by setting FetchSize, can respond quickly and return the ResultSet object. If it is mysql database, you need to add the useCursorFetch=true parameter in the jdbc url. Thank you for your answer, I will close this pull request.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhangrenhua commented on a change in pull request #32222: [SPARK-35126][SQL] Execute jdbc cancellation method when jdbc load job is interrupted

Posted by GitBox <gi...@apache.org>.
zhangrenhua commented on a change in pull request #32222:
URL: https://github.com/apache/spark/pull/32222#discussion_r615369897



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
##########
@@ -301,10 +306,44 @@ private[jdbc] class JDBCRDD(
         ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
     stmt.setFetchSize(options.fetchSize)
     stmt.setQueryTimeout(options.queryTimeout)
-    rs = stmt.executeQuery()
+
+    val isFinished = new AtomicBoolean(false)
+    startTheJobInterruptListenerThread(context, stmt, isFinished)
+    try{
+      rs = stmt.executeQuery()
+    } finally {
+      isFinished.set(true)
+    }
     val rowsIterator = JdbcUtils.resultSetToSparkInternalRows(rs, schema, inputMetrics)
 
     CompletionIterator[InternalRow, Iterator[InternalRow]](
       new InterruptibleIterator(context, rowsIterator), close())
   }
+
+  /**
+   * Start the job interruption listener thread, if the job is interrupted and
+   * stmt.executeQuery() is not completed, initiate a stmt.cancel() request.
+   */
+  private def startTheJobInterruptListenerThread(context: TaskContext, stmt: PreparedStatement
+                                                 , isFinished: AtomicBoolean): Unit = {
+    val thread = new Thread(() => {
+      val waitInterval = 100

Review comment:
       I have no way of knowing how long the sql execution needs to be processed. It may be a few seconds or a few hours, so I use the default 100 milliseconds to see if the job is interrupted/the query is completed. Do you mean that you need to parameterize this variable for configuration?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhangrenhua commented on a change in pull request #32222: [SPARK-35126][SQL] Execute jdbc cancellation method when jdbc load job is interrupted

Posted by GitBox <gi...@apache.org>.
zhangrenhua commented on a change in pull request #32222:
URL: https://github.com/apache/spark/pull/32222#discussion_r617416902



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
##########
@@ -301,10 +306,44 @@ private[jdbc] class JDBCRDD(
         ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
     stmt.setFetchSize(options.fetchSize)
     stmt.setQueryTimeout(options.queryTimeout)
-    rs = stmt.executeQuery()
+
+    val isFinished = new AtomicBoolean(false)
+    startTheJobInterruptListenerThread(context, stmt, isFinished)
+    try{
+      rs = stmt.executeQuery()
+    } finally {
+      isFinished.set(true)
+    }
     val rowsIterator = JdbcUtils.resultSetToSparkInternalRows(rs, schema, inputMetrics)
 
     CompletionIterator[InternalRow, Iterator[InternalRow]](
       new InterruptibleIterator(context, rowsIterator), close())
   }
+
+  /**
+   * Start the job interruption listener thread, if the job is interrupted and
+   * stmt.executeQuery() is not completed, initiate a stmt.cancel() request.
+   */
+  private def startTheJobInterruptListenerThread(context: TaskContext, stmt: PreparedStatement
+                                                 , isFinished: AtomicBoolean): Unit = {

Review comment:
       I'm sorry I opened this question again. Because there were a lot of things in the previous two days, I closed this question in a hurry.
   If a more complex query sql specifies the fetchsize parameter, it is also invalid, because sql logic processing takes a long time, so the ResultSet cursor will not be returned immediately.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhangrenhua commented on a change in pull request #32222: [SPARK-35126][SQL] Execute jdbc cancellation method when jdbc load job is interrupted

Posted by GitBox <gi...@apache.org>.
zhangrenhua commented on a change in pull request #32222:
URL: https://github.com/apache/spark/pull/32222#discussion_r615370079



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
##########
@@ -301,10 +306,44 @@ private[jdbc] class JDBCRDD(
         ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
     stmt.setFetchSize(options.fetchSize)
     stmt.setQueryTimeout(options.queryTimeout)
-    rs = stmt.executeQuery()
+
+    val isFinished = new AtomicBoolean(false)

Review comment:
       Okay, I will add the test suite later




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhangrenhua closed pull request #32222: [SPARK-35126][SQL] Execute jdbc cancellation method when jdbc load job is interrupted

Posted by GitBox <gi...@apache.org>.
zhangrenhua closed pull request #32222:
URL: https://github.com/apache/spark/pull/32222


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhangrenhua commented on a change in pull request #32222: [SPARK-35126][SQL] Execute jdbc cancellation method when jdbc load job is interrupted

Posted by GitBox <gi...@apache.org>.
zhangrenhua commented on a change in pull request #32222:
URL: https://github.com/apache/spark/pull/32222#discussion_r615377595



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
##########
@@ -301,10 +306,44 @@ private[jdbc] class JDBCRDD(
         ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
     stmt.setFetchSize(options.fetchSize)
     stmt.setQueryTimeout(options.queryTimeout)
-    rs = stmt.executeQuery()
+
+    val isFinished = new AtomicBoolean(false)

Review comment:
       Test suite JdbcUtilsSuite has been added




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhangrenhua commented on a change in pull request #32222: [SPARK-35126][SQL] Execute jdbc cancellation method when jdbc load job is interrupted

Posted by GitBox <gi...@apache.org>.
zhangrenhua commented on a change in pull request #32222:
URL: https://github.com/apache/spark/pull/32222#discussion_r625583748



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
##########
@@ -301,10 +306,44 @@ private[jdbc] class JDBCRDD(
         ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
     stmt.setFetchSize(options.fetchSize)
     stmt.setQueryTimeout(options.queryTimeout)
-    rs = stmt.executeQuery()
+
+    val isFinished = new AtomicBoolean(false)
+    startTheJobInterruptListenerThread(context, stmt, isFinished)
+    try{
+      rs = stmt.executeQuery()
+    } finally {
+      isFinished.set(true)
+    }
     val rowsIterator = JdbcUtils.resultSetToSparkInternalRows(rs, schema, inputMetrics)
 
     CompletionIterator[InternalRow, Iterator[InternalRow]](
       new InterruptibleIterator(context, rowsIterator), close())
   }
+
+  /**
+   * Start the job interruption listener thread, if the job is interrupted and
+   * stmt.executeQuery() is not completed, initiate a stmt.cancel() request.
+   */
+  private def startTheJobInterruptListenerThread(context: TaskContext, stmt: PreparedStatement
+                                                 , isFinished: AtomicBoolean): Unit = {

Review comment:
       In real scenarios, `query` is often a more complicated SQL. This SQL may use joins between several tables or views. Setting `fetchsize` can not return the cursor object immediately, setting `queryTimeout` cannot satisfy , The client will stop the request immediately after it stops, and the job will be abnormally terminated due to the unreasonable setting of `queryTimeout`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maropu commented on a change in pull request #32222: [SPARK-35126][SQL] Execute jdbc cancellation method when jdbc load job is interrupted

Posted by GitBox <gi...@apache.org>.
maropu commented on a change in pull request #32222:
URL: https://github.com/apache/spark/pull/32222#discussion_r620060860



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
##########
@@ -301,10 +306,44 @@ private[jdbc] class JDBCRDD(
         ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
     stmt.setFetchSize(options.fetchSize)
     stmt.setQueryTimeout(options.queryTimeout)
-    rs = stmt.executeQuery()
+
+    val isFinished = new AtomicBoolean(false)
+    startTheJobInterruptListenerThread(context, stmt, isFinished)
+    try{
+      rs = stmt.executeQuery()
+    } finally {
+      isFinished.set(true)
+    }
     val rowsIterator = JdbcUtils.resultSetToSparkInternalRows(rs, schema, inputMetrics)
 
     CompletionIterator[InternalRow, Iterator[InternalRow]](
       new InterruptibleIterator(context, rowsIterator), close())
   }
+
+  /**
+   * Start the job interruption listener thread, if the job is interrupted and
+   * stmt.executeQuery() is not completed, initiate a stmt.cancel() request.
+   */
+  private def startTheJobInterruptListenerThread(context: TaskContext, stmt: PreparedStatement
+                                                 , isFinished: AtomicBoolean): Unit = {

Review comment:
       Please describe what's an issue in the PR description, first.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhangrenhua commented on pull request #32222: [SPARK-35126][SQL] Execute jdbc cancellation method when jdbc load job is interrupted

Posted by GitBox <gi...@apache.org>.
zhangrenhua commented on pull request #32222:
URL: https://github.com/apache/spark/pull/32222#issuecomment-821957603


   > Can one of the admins verify this patch?
   
   I have completed the verification of the patch in the development environment. Do I need to provide a verification report?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yikf commented on a change in pull request #32222: [SPARK-35126][SQL] Execute jdbc cancellation method when jdbc load job is interrupted

Posted by GitBox <gi...@apache.org>.
yikf commented on a change in pull request #32222:
URL: https://github.com/apache/spark/pull/32222#discussion_r615367709



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
##########
@@ -301,10 +306,44 @@ private[jdbc] class JDBCRDD(
         ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
     stmt.setFetchSize(options.fetchSize)
     stmt.setQueryTimeout(options.queryTimeout)
-    rs = stmt.executeQuery()
+
+    val isFinished = new AtomicBoolean(false)
+    startTheJobInterruptListenerThread(context, stmt, isFinished)
+    try{
+      rs = stmt.executeQuery()
+    } finally {
+      isFinished.set(true)
+    }
     val rowsIterator = JdbcUtils.resultSetToSparkInternalRows(rs, schema, inputMetrics)
 
     CompletionIterator[InternalRow, Iterator[InternalRow]](
       new InterruptibleIterator(context, rowsIterator), close())
   }
+
+  /**
+   * Start the job interruption listener thread, if the job is interrupted and
+   * stmt.executeQuery() is not completed, initiate a stmt.cancel() request.
+   */
+  private def startTheJobInterruptListenerThread(context: TaskContext, stmt: PreparedStatement
+                                                 , isFinished: AtomicBoolean): Unit = {
+    val thread = new Thread(() => {
+      val waitInterval = 100
+      // Always wait for job interruption or stmt.executeQuery() execution to complete
+      while (!context.isInterrupted() && !isFinished.get()) {
+        Thread.sleep(waitInterval)
+      }
+
+      // Is interrupted and stmt.executeQuery() is not completed, then initiate a cancel request
+      if (context.isInterrupted() && !isFinished.get()) {
+        logInfo(s"try to cancel query ${context.getKillReason()}")
+        try {
+          stmt.cancel()
+        } catch {
+          case e: Exception => logWarning("Exception cancel statement", e)

Review comment:
       if cancel occur a error, `stmt.cancel()` will throw `SQLException` only, rigth?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhangrenhua commented on a change in pull request #32222: [SPARK-35126][SQL] Execute jdbc cancellation method when jdbc load job is interrupted

Posted by GitBox <gi...@apache.org>.
zhangrenhua commented on a change in pull request #32222:
URL: https://github.com/apache/spark/pull/32222#discussion_r625583986



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
##########
@@ -301,10 +306,44 @@ private[jdbc] class JDBCRDD(
         ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
     stmt.setFetchSize(options.fetchSize)
     stmt.setQueryTimeout(options.queryTimeout)
-    rs = stmt.executeQuery()
+
+    val isFinished = new AtomicBoolean(false)
+    startTheJobInterruptListenerThread(context, stmt, isFinished)
+    try{
+      rs = stmt.executeQuery()
+    } finally {
+      isFinished.set(true)
+    }
     val rowsIterator = JdbcUtils.resultSetToSparkInternalRows(rs, schema, inputMetrics)
 
     CompletionIterator[InternalRow, Iterator[InternalRow]](
       new InterruptibleIterator(context, rowsIterator), close())
   }
+
+  /**
+   * Start the job interruption listener thread, if the job is interrupted and
+   * stmt.executeQuery() is not completed, initiate a stmt.cancel() request.
+   */
+  private def startTheJobInterruptListenerThread(context: TaskContext, stmt: PreparedStatement
+                                                 , isFinished: AtomicBoolean): Unit = {

Review comment:
       Try to write a complex SQL, such as Cartesian product to test the effect of jdbc `fetchsize`,Or based on the jdbc query of hive on mapreduce, you will find that the `fetchsize` is set at this time, and you have to wait for the job to execute for a period of time before returning the cursor object. The client cannot terminate the job run.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhangrenhua commented on a change in pull request #32222: [SPARK-35126][SQL] Execute jdbc cancellation method when jdbc load job is interrupted

Posted by GitBox <gi...@apache.org>.
zhangrenhua commented on a change in pull request #32222:
URL: https://github.com/apache/spark/pull/32222#discussion_r615493254



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
##########
@@ -301,10 +306,44 @@ private[jdbc] class JDBCRDD(
         ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
     stmt.setFetchSize(options.fetchSize)
     stmt.setQueryTimeout(options.queryTimeout)
-    rs = stmt.executeQuery()
+
+    val isFinished = new AtomicBoolean(false)
+    startTheJobInterruptListenerThread(context, stmt, isFinished)
+    try{
+      rs = stmt.executeQuery()
+    } finally {
+      isFinished.set(true)
+    }
     val rowsIterator = JdbcUtils.resultSetToSparkInternalRows(rs, schema, inputMetrics)
 
     CompletionIterator[InternalRow, Iterator[InternalRow]](
       new InterruptibleIterator(context, rowsIterator), close())
   }
+
+  /**
+   * Start the job interruption listener thread, if the job is interrupted and
+   * stmt.executeQuery() is not completed, initiate a stmt.cancel() request.
+   */
+  private def startTheJobInterruptListenerThread(context: TaskContext, stmt: PreparedStatement
+                                                 , isFinished: AtomicBoolean): Unit = {

Review comment:
       Okay, I will test and verify it at night. But I think it might not work. In view of the complicated logic and the execution of SQL that takes several minutes or even hours, the ResultSet object may not be returned in time.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhangrenhua commented on a change in pull request #32222: [SPARK-35126][SQL] Execute jdbc cancellation method when jdbc load job is interrupted

Posted by GitBox <gi...@apache.org>.
zhangrenhua commented on a change in pull request #32222:
URL: https://github.com/apache/spark/pull/32222#discussion_r615369554



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
##########
@@ -301,10 +306,44 @@ private[jdbc] class JDBCRDD(
         ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
     stmt.setFetchSize(options.fetchSize)
     stmt.setQueryTimeout(options.queryTimeout)
-    rs = stmt.executeQuery()
+
+    val isFinished = new AtomicBoolean(false)
+    startTheJobInterruptListenerThread(context, stmt, isFinished)
+    try{
+      rs = stmt.executeQuery()
+    } finally {
+      isFinished.set(true)
+    }
     val rowsIterator = JdbcUtils.resultSetToSparkInternalRows(rs, schema, inputMetrics)
 
     CompletionIterator[InternalRow, Iterator[InternalRow]](
       new InterruptibleIterator(context, rowsIterator), close())
   }
+
+  /**
+   * Start the job interruption listener thread, if the job is interrupted and
+   * stmt.executeQuery() is not completed, initiate a stmt.cancel() request.
+   */
+  private def startTheJobInterruptListenerThread(context: TaskContext, stmt: PreparedStatement
+                                                 , isFinished: AtomicBoolean): Unit = {
+    val thread = new Thread(() => {
+      val waitInterval = 100
+      // Always wait for job interruption or stmt.executeQuery() execution to complete
+      while (!context.isInterrupted() && !isFinished.get()) {
+        Thread.sleep(waitInterval)
+      }
+
+      // Is interrupted and stmt.executeQuery() is not completed, then initiate a cancel request
+      if (context.isInterrupted() && !isFinished.get()) {
+        logInfo(s"try to cancel query ${context.getKillReason()}")
+        try {
+          stmt.cancel()
+        } catch {
+          case e: Exception => logWarning("Exception cancel statement", e)

Review comment:
       Cancal is executed only when the job is interrupted and the query is not completed, so if the cancellation fails, the subsequent close method will continue.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhangrenhua commented on a change in pull request #32222: [SPARK-35126][SQL] Execute jdbc cancellation method when jdbc load job is interrupted

Posted by GitBox <gi...@apache.org>.
zhangrenhua commented on a change in pull request #32222:
URL: https://github.com/apache/spark/pull/32222#discussion_r620092718



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
##########
@@ -301,10 +306,44 @@ private[jdbc] class JDBCRDD(
         ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
     stmt.setFetchSize(options.fetchSize)
     stmt.setQueryTimeout(options.queryTimeout)
-    rs = stmt.executeQuery()
+
+    val isFinished = new AtomicBoolean(false)
+    startTheJobInterruptListenerThread(context, stmt, isFinished)
+    try{
+      rs = stmt.executeQuery()
+    } finally {
+      isFinished.set(true)
+    }
     val rowsIterator = JdbcUtils.resultSetToSparkInternalRows(rs, schema, inputMetrics)
 
     CompletionIterator[InternalRow, Iterator[InternalRow]](
       new InterruptibleIterator(context, rowsIterator), close())
   }
+
+  /**
+   * Start the job interruption listener thread, if the job is interrupted and
+   * stmt.executeQuery() is not completed, initiate a stmt.cancel() request.
+   */
+  private def startTheJobInterruptListenerThread(context: TaskContext, stmt: PreparedStatement
+                                                 , isFinished: AtomicBoolean): Unit = {

Review comment:
       Already added, thank you!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] github-actions[bot] commented on pull request #32222: [SPARK-35126][SQL] Execute jdbc cancellation method when jdbc load job is interrupted

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #32222:
URL: https://github.com/apache/spark/pull/32222#issuecomment-898056293


   We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org