You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/03/04 01:10:14 UTC

spark git commit: [SPARK-19718][SS] Handle more interrupt cases properly for Hadoop

Repository: spark
Updated Branches:
  refs/heads/master f5fdbe043 -> a6a7a95e2


[SPARK-19718][SS] Handle more interrupt cases properly for Hadoop

## What changes were proposed in this pull request?

[SPARK-19617](https://issues.apache.org/jira/browse/SPARK-19617) changed `HDFSMetadataLog` to enable interrupts when using the local file system. However, now we hit [HADOOP-12074](https://issues.apache.org/jira/browse/HADOOP-12074): `Shell.runCommand` converts `InterruptedException` to `new IOException(ie.toString())` before Hadoop 2.8. This is the Hadoop patch to fix HADOOP-1207: https://github.com/apache/hadoop/commit/95c73d49b1bb459b626a9ac52acadb8f5fa724de

This PR adds new logic to handle the following cases related to `InterruptedException`.
- Check if the message of IOException starts with `java.lang.InterruptedException`. If so, treat it as `InterruptedException`. This is for pre-Hadoop 2.8.
- Treat `InterruptedIOException` as `InterruptedException`. This is for Hadoop 2.8+ and other places that may throw `InterruptedIOException` when the thread is interrupted.

## How was this patch tested?

The new unit test.

Author: Shixiong Zhu <sh...@databricks.com>

Closes #17044 from zsxwing/SPARK-19718.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6a7a95e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6a7a95e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6a7a95e

Branch: refs/heads/master
Commit: a6a7a95e2f3482d84fcd744713e43f80ea90e33a
Parents: f5fdbe0
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Fri Mar 3 17:10:11 2017 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Fri Mar 3 17:10:11 2017 -0800

----------------------------------------------------------------------
 .../execution/streaming/StreamExecution.scala   |  20 +++-
 .../spark/sql/streaming/StreamSuite.scala       | 109 ++++++++++++++++++-
 2 files changed, 119 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a6a7a95e/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 6e77f35..70912d1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.io.{InterruptedIOException, IOException}
 import java.util.UUID
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.concurrent.atomic.AtomicReference
@@ -37,6 +38,12 @@ import org.apache.spark.sql.execution.command.StreamingExplainCommand
 import org.apache.spark.sql.streaming._
 import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
 
+/** States for [[StreamExecution]]'s lifecycle. */
+trait State
+case object INITIALIZING extends State
+case object ACTIVE extends State
+case object TERMINATED extends State
+
 /**
  * Manages the execution of a streaming Spark SQL query that is occurring in a separate thread.
  * Unlike a standard query, a streaming query executes repeatedly each time new data arrives at any
@@ -298,7 +305,14 @@ class StreamExecution(
         // `stop()` is already called. Let `finally` finish the cleanup.
       }
     } catch {
-      case _: InterruptedException if state.get == TERMINATED => // interrupted by stop()
+      case _: InterruptedException | _: InterruptedIOException if state.get == TERMINATED =>
+        // interrupted by stop()
+        updateStatusMessage("Stopped")
+      case e: IOException if e.getMessage != null
+        && e.getMessage.startsWith(classOf[InterruptedException].getName)
+        && state.get == TERMINATED =>
+        // This is a workaround for HADOOP-12074: `Shell.runCommand` converts `InterruptedException`
+        // to `new IOException(ie.toString())` before Hadoop 2.8.
         updateStatusMessage("Stopped")
       case e: Throwable =>
         streamDeathCause = new StreamingQueryException(
@@ -721,10 +735,6 @@ class StreamExecution(
     }
   }
 
-  trait State
-  case object INITIALIZING extends State
-  case object ACTIVE extends State
-  case object TERMINATED extends State
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a6a7a95e/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index f44cfad..6dfcd8b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.sql.streaming
 
+import java.io.{InterruptedIOException, IOException}
+import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit}
+
 import scala.reflect.ClassTag
 import scala.util.control.ControlThrowable
 
@@ -350,13 +353,45 @@ class StreamSuite extends StreamTest {
       }
     }
   }
-}
 
-/**
- * A fake StreamSourceProvider thats creates a fake Source that cannot be reused.
- */
-class FakeDefaultSource extends StreamSourceProvider {
+  test("handle IOException when the streaming thread is interrupted (pre Hadoop 2.8)") {
+    // This test uses a fake source to throw the same IOException as pre Hadoop 2.8 when the
+    // streaming thread is interrupted. We should handle it properly by not failing the query.
+    ThrowingIOExceptionLikeHadoop12074.createSourceLatch = new CountDownLatch(1)
+    val query = spark
+      .readStream
+      .format(classOf[ThrowingIOExceptionLikeHadoop12074].getName)
+      .load()
+      .writeStream
+      .format("console")
+      .start()
+    assert(ThrowingIOExceptionLikeHadoop12074.createSourceLatch
+      .await(streamingTimeout.toMillis, TimeUnit.MILLISECONDS),
+      "ThrowingIOExceptionLikeHadoop12074.createSource wasn't called before timeout")
+    query.stop()
+    assert(query.exception.isEmpty)
+  }
 
+  test("handle InterruptedIOException when the streaming thread is interrupted (Hadoop 2.8+)") {
+    // This test uses a fake source to throw the same InterruptedIOException as Hadoop 2.8+ when the
+    // streaming thread is interrupted. We should handle it properly by not failing the query.
+    ThrowingInterruptedIOException.createSourceLatch = new CountDownLatch(1)
+    val query = spark
+      .readStream
+      .format(classOf[ThrowingInterruptedIOException].getName)
+      .load()
+      .writeStream
+      .format("console")
+      .start()
+    assert(ThrowingInterruptedIOException.createSourceLatch
+      .await(streamingTimeout.toMillis, TimeUnit.MILLISECONDS),
+      "ThrowingInterruptedIOException.createSource wasn't called before timeout")
+    query.stop()
+    assert(query.exception.isEmpty)
+  }
+}
+
+abstract class FakeSource extends StreamSourceProvider {
   private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil)
 
   override def sourceSchema(
@@ -364,6 +399,10 @@ class FakeDefaultSource extends StreamSourceProvider {
       schema: Option[StructType],
       providerName: String,
       parameters: Map[String, String]): (String, StructType) = ("fakeSource", fakeSchema)
+}
+
+/** A fake StreamSourceProvider that creates a fake Source that cannot be reused. */
+class FakeDefaultSource extends FakeSource {
 
   override def createSource(
       spark: SQLContext,
@@ -395,3 +434,63 @@ class FakeDefaultSource extends StreamSourceProvider {
     }
   }
 }
+
+/** A fake source that throws the same IOException like pre Hadoop 2.8 when it's interrupted. */
+class ThrowingIOExceptionLikeHadoop12074 extends FakeSource {
+  import ThrowingIOExceptionLikeHadoop12074._
+
+  override def createSource(
+      spark: SQLContext,
+      metadataPath: String,
+      schema: Option[StructType],
+      providerName: String,
+      parameters: Map[String, String]): Source = {
+    createSourceLatch.countDown()
+    try {
+      Thread.sleep(30000)
+      throw new TimeoutException("sleep was not interrupted in 30 seconds")
+    } catch {
+      case ie: InterruptedException =>
+        throw new IOException(ie.toString)
+    }
+  }
+}
+
+object ThrowingIOExceptionLikeHadoop12074 {
+  /**
+   * A latch to allow the user to wait until [[ThrowingIOExceptionLikeHadoop12074.createSource]] is
+   * called.
+   */
+  @volatile var createSourceLatch: CountDownLatch = null
+}
+
+/** A fake source that throws InterruptedIOException like Hadoop 2.8+ when it's interrupted. */
+class ThrowingInterruptedIOException extends FakeSource {
+  import ThrowingInterruptedIOException._
+
+  override def createSource(
+      spark: SQLContext,
+      metadataPath: String,
+      schema: Option[StructType],
+      providerName: String,
+      parameters: Map[String, String]): Source = {
+    createSourceLatch.countDown()
+    try {
+      Thread.sleep(30000)
+      throw new TimeoutException("sleep was not interrupted in 30 seconds")
+    } catch {
+      case ie: InterruptedException =>
+        val iie = new InterruptedIOException(ie.toString)
+        iie.initCause(ie)
+        throw iie
+    }
+  }
+}
+
+object ThrowingInterruptedIOException {
+  /**
+   * A latch to allow the user to wait until [[ThrowingInterruptedIOException.createSource]] is
+   * called.
+   */
+  @volatile var createSourceLatch: CountDownLatch = null
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org