You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/03/24 12:38:33 UTC

[GitHub] [spark] bozhang2820 opened a new pull request #35960: [SPARK-38461][CORE] Use error classes in org.apache.spark.broadcast

bozhang2820 opened a new pull request #35960:
URL: https://github.com/apache/spark/pull/35960


   ### What changes were proposed in this pull request?
   This change is to refactor exceptions thrown in org.apache.spark.broadcast to use error class framework.
   
   ### Why are the changes needed?
   This is to follow the error class framework.
   
   ### Does this PR introduce _any_ user-facing change?
   Yes. To aggregate the exceptions, there are minor changes in error messages in the exceptions thrown.
   
   ### How was this patch tested?
   Added unit tests.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] bozhang2820 commented on a change in pull request #35960: [SPARK-38461][CORE] Use error classes in org.apache.spark.broadcast

Posted by GitBox <gi...@apache.org>.
bozhang2820 commented on a change in pull request #35960:
URL: https://github.com/apache/spark/pull/35960#discussion_r835075441



##########
File path: core/src/main/scala/org/apache/spark/SparkEnv.scala
##########
@@ -169,7 +169,8 @@ object SparkEnv extends Logging {
       isLocal: Boolean,
       listenerBus: LiveListenerBus,
       numCores: Int,
-      mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
+      mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None,
+      blockManagerOption: Option[BlockManager] = None): SparkEnv = {

Review comment:
       Will do.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #35960: [SPARK-38461][CORE] Use error classes in org.apache.spark.broadcast

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #35960:
URL: https://github.com/apache/spark/pull/35960#discussion_r834680842



##########
File path: core/src/main/scala/org/apache/spark/SparkEnv.scala
##########
@@ -169,7 +169,8 @@ object SparkEnv extends Logging {
       isLocal: Boolean,
       listenerBus: LiveListenerBus,
       numCores: Int,
-      mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
+      mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None,
+      blockManagerOption: Option[BlockManager] = None): SparkEnv = {

Review comment:
       ```suggestion
         mockOutputCommitCoordinator: Option[BlockManager] = None): SparkEnv = {
   ```
   
   to follow `mockOutputCommitCoordinator` above? It is also more clear that this is used as mock one in test only.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] bozhang2820 commented on a change in pull request #35960: [SPARK-38461][CORE] Use error classes in org.apache.spark.broadcast

Posted by GitBox <gi...@apache.org>.
bozhang2820 commented on a change in pull request #35960:
URL: https://github.com/apache/spark/pull/35960#discussion_r835077079



##########
File path: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
##########
@@ -148,16 +149,15 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
         val pieceId = BroadcastBlockId(id, "piece" + i)
         val bytes = new ChunkedByteBuffer(block.duplicate())
         if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) {
-          throw new SparkException(s"Failed to store $pieceId of $broadcastId " +
-            s"in local BlockManager")
+          throw SparkCoreErrors.storeBlockError(pieceId)
         }
       }
       blocks.length
     } catch {
       case t: Throwable =>
         logError(s"Store broadcast $broadcastId fail, remove all pieces of the broadcast")
         blockManager.removeBroadcast(id, tellMaster = true)
-        throw t
+        throw SparkCoreErrors.storeBlockError(broadcastId, t)

Review comment:
       Will change back to throw the caught Throwable here.

##########
File path: core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
##########
@@ -304,6 +308,76 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext with Encryptio
       assert(results.collect().toSet === (1 to partitions).map(x => (x, list.sum)).toSet)
     }
   }
+
+  test("Error in BlockManager.putSingle") {
+    val mockBlockManager = mock(classOf[BlockManager])
+    when(mockBlockManager.putSingle(any, any, any, any)(any)).thenReturn(false)
+    sc = sparkContextWithBlockManager(mockBlockManager)
+    val thrown = intercept[SparkException] { sc.broadcast(Array(1)) }
+    assert(thrown.getErrorClass == "STORE_BLOCK_ERROR")
+    assert(thrown.getMessage.startsWith("Failed to store block: "))
+  }
+
+  test("Error in BlockManager.putBytes") {
+    val mockBlockManager = mock(classOf[BlockManager])
+    when(mockBlockManager.putBytes(any, any, any, any)(any)).thenReturn(false)
+    sc = sparkContextWithBlockManager(mockBlockManager)
+    val thrown = intercept[SparkException] { sc.broadcast(Array(1)) }
+    assert(thrown.getErrorClass == "STORE_BLOCK_ERROR")
+    assert(thrown.getMessage.startsWith("Failed to store block: "))
+  }
+
+  test("Error when getting local blocks") {
+    val mockBlockManager = mock(classOf[BlockManager])
+    when(mockBlockManager.putSingle(any, any, any, any)(any)).thenReturn(true)
+    when(mockBlockManager.putBytes(any, any, any, any)(any)).thenReturn(true)
+    val mockBlockResult = mock(classOf[BlockResult])
+    when(mockBlockResult.data).thenReturn(Iterator.empty)
+    when(mockBlockManager.getLocalValues(any[BlockId])).thenReturn(Some(mockBlockResult))
+    sc = sparkContextWithBlockManager(mockBlockManager)
+    val thrown = intercept[SparkException] { sc.broadcast(Array(1)).value }
+    assert(thrown.getErrorClass == "GET_LOCAL_BLOCK_ERROR")
+    assert(thrown.getMessage.startsWith("Failed to get local block: "))
+  }
+
+  test("Error when getting remote blocks") {
+    val mockBlockManager = mock(classOf[BlockManager])
+    when(mockBlockManager.putSingle(any, any, any, any)(any)).thenReturn(true)
+    when(mockBlockManager.putBytes(any, any, any, any)(any)).thenReturn(true)
+    when(mockBlockManager.getLocalValues(any[BlockId])).thenReturn(None)
+    when(mockBlockManager.getLocalBytes(any[BlockId])).thenReturn(None)
+    when(mockBlockManager.getRemoteBytes(any[BlockId])).thenReturn(None)
+    sc = sparkContextWithBlockManager(mockBlockManager)
+    val thrown = intercept[SparkException] { sc.broadcast(Array(1)).value }
+    assert(thrown.getErrorClass == "GET_BLOCK_ERROR")
+    assert(thrown.getMessage.startsWith("Failed to get block: "))
+  }
+
+  test("Corrupted remote blocks") {
+    val mockBlockManager = mock(classOf[BlockManager])
+    when(mockBlockManager.putSingle(any, any, any, any)(any)).thenReturn(true)
+    when(mockBlockManager.putBytes(any, any, any, any)(any)).thenReturn(true)
+    when(mockBlockManager.getLocalValues(any[BlockId])).thenReturn(None)
+    when(mockBlockManager.getLocalBytes(any[BlockId])).thenReturn(None)
+    when(mockBlockManager.getRemoteBytes(any[BlockId])).thenReturn(
+      Some(new ChunkedByteBuffer(Array(ByteBuffer.allocate(0)))))
+    sc = sparkContextWithBlockManager(mockBlockManager)
+    val thrown = intercept[SparkException] { sc.broadcast(Array(1)).value }
+    assert(thrown.getErrorClass == "CORRUPTED_REMOTE_BLOCK")
+    assert(thrown.getMessage.startsWith("Corrupted remote block: "))
+  }
+
+  private def sparkContextWithBlockManager(blockManager: BlockManager): SparkContext = {
+    new SparkContext(new SparkConf().setAppName("test").setMaster("local")) {
+      override private[spark] def createSparkEnv(
+                                                  conf: SparkConf,
+                                                  isLocal: Boolean,
+                                                  listenerBus: LiveListenerBus) = {

Review comment:
       Will do.

##########
File path: core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
##########
@@ -325,4 +325,24 @@ object SparkCoreErrors {
     new SparkException(errorClass = "GRAPHITE_SINK_PROPERTY_MISSING",
       messageParameters = Array(missingProperty), cause = null)
   }
+
+  def storeBlockError(blockId: BlockId, cause: Throwable = null): Throwable = {
+    new SparkException(errorClass = "STORE_BLOCK_ERROR",
+      messageParameters = Array(blockId.toString), cause = cause)
+  }
+

Review comment:
       Not sure what you mean by "replacing a single invocation site". Could you elaborate?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #35960: [SPARK-38461][CORE] Use error classes in org.apache.spark.broadcast

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #35960:
URL: https://github.com/apache/spark/pull/35960#discussion_r834947208



##########
File path: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
##########
@@ -148,16 +149,15 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
         val pieceId = BroadcastBlockId(id, "piece" + i)
         val bytes = new ChunkedByteBuffer(block.duplicate())
         if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) {
-          throw new SparkException(s"Failed to store $pieceId of $broadcastId " +
-            s"in local BlockManager")
+          throw SparkCoreErrors.storeBlockError(pieceId)
         }
       }
       blocks.length
     } catch {
       case t: Throwable =>
         logError(s"Store broadcast $broadcastId fail, remove all pieces of the broadcast")
         blockManager.removeBroadcast(id, tellMaster = true)
-        throw t
+        throw SparkCoreErrors.storeBlockError(broadcastId, t)

Review comment:
       Two issues with this:
   a) We are changing the exception being thrown (note: we are catching `Throwable`).
   b) Message for exception thrown in L151 is getting changed in L160.

##########
File path: core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
##########
@@ -325,4 +325,24 @@ object SparkCoreErrors {
     new SparkException(errorClass = "GRAPHITE_SINK_PROPERTY_MISSING",
       messageParameters = Array(missingProperty), cause = null)
   }
+
+  def storeBlockError(blockId: BlockId, cause: Throwable = null): Throwable = {
+    new SparkException(errorClass = "STORE_BLOCK_ERROR",
+      messageParameters = Array(blockId.toString), cause = cause)
+  }
+

Review comment:
       Let us remove `getBlockError`, `corruptedRemoteBlockError` and `getLocalBlockError` - they are replacing a single invocation site.

##########
File path: core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
##########
@@ -304,6 +308,76 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext with Encryptio
       assert(results.collect().toSet === (1 to partitions).map(x => (x, list.sum)).toSet)
     }
   }
+
+  test("Error in BlockManager.putSingle") {
+    val mockBlockManager = mock(classOf[BlockManager])
+    when(mockBlockManager.putSingle(any, any, any, any)(any)).thenReturn(false)
+    sc = sparkContextWithBlockManager(mockBlockManager)
+    val thrown = intercept[SparkException] { sc.broadcast(Array(1)) }
+    assert(thrown.getErrorClass == "STORE_BLOCK_ERROR")
+    assert(thrown.getMessage.startsWith("Failed to store block: "))
+  }
+
+  test("Error in BlockManager.putBytes") {
+    val mockBlockManager = mock(classOf[BlockManager])
+    when(mockBlockManager.putBytes(any, any, any, any)(any)).thenReturn(false)
+    sc = sparkContextWithBlockManager(mockBlockManager)
+    val thrown = intercept[SparkException] { sc.broadcast(Array(1)) }
+    assert(thrown.getErrorClass == "STORE_BLOCK_ERROR")
+    assert(thrown.getMessage.startsWith("Failed to store block: "))
+  }
+
+  test("Error when getting local blocks") {
+    val mockBlockManager = mock(classOf[BlockManager])
+    when(mockBlockManager.putSingle(any, any, any, any)(any)).thenReturn(true)
+    when(mockBlockManager.putBytes(any, any, any, any)(any)).thenReturn(true)
+    val mockBlockResult = mock(classOf[BlockResult])
+    when(mockBlockResult.data).thenReturn(Iterator.empty)
+    when(mockBlockManager.getLocalValues(any[BlockId])).thenReturn(Some(mockBlockResult))
+    sc = sparkContextWithBlockManager(mockBlockManager)
+    val thrown = intercept[SparkException] { sc.broadcast(Array(1)).value }
+    assert(thrown.getErrorClass == "GET_LOCAL_BLOCK_ERROR")
+    assert(thrown.getMessage.startsWith("Failed to get local block: "))
+  }
+
+  test("Error when getting remote blocks") {
+    val mockBlockManager = mock(classOf[BlockManager])
+    when(mockBlockManager.putSingle(any, any, any, any)(any)).thenReturn(true)
+    when(mockBlockManager.putBytes(any, any, any, any)(any)).thenReturn(true)
+    when(mockBlockManager.getLocalValues(any[BlockId])).thenReturn(None)
+    when(mockBlockManager.getLocalBytes(any[BlockId])).thenReturn(None)
+    when(mockBlockManager.getRemoteBytes(any[BlockId])).thenReturn(None)
+    sc = sparkContextWithBlockManager(mockBlockManager)
+    val thrown = intercept[SparkException] { sc.broadcast(Array(1)).value }
+    assert(thrown.getErrorClass == "GET_BLOCK_ERROR")
+    assert(thrown.getMessage.startsWith("Failed to get block: "))
+  }
+
+  test("Corrupted remote blocks") {
+    val mockBlockManager = mock(classOf[BlockManager])
+    when(mockBlockManager.putSingle(any, any, any, any)(any)).thenReturn(true)
+    when(mockBlockManager.putBytes(any, any, any, any)(any)).thenReturn(true)
+    when(mockBlockManager.getLocalValues(any[BlockId])).thenReturn(None)
+    when(mockBlockManager.getLocalBytes(any[BlockId])).thenReturn(None)
+    when(mockBlockManager.getRemoteBytes(any[BlockId])).thenReturn(
+      Some(new ChunkedByteBuffer(Array(ByteBuffer.allocate(0)))))
+    sc = sparkContextWithBlockManager(mockBlockManager)
+    val thrown = intercept[SparkException] { sc.broadcast(Array(1)).value }
+    assert(thrown.getErrorClass == "CORRUPTED_REMOTE_BLOCK")
+    assert(thrown.getMessage.startsWith("Corrupted remote block: "))
+  }
+
+  private def sparkContextWithBlockManager(blockManager: BlockManager): SparkContext = {
+    new SparkContext(new SparkConf().setAppName("test").setMaster("local")) {
+      override private[spark] def createSparkEnv(
+                                                  conf: SparkConf,
+                                                  isLocal: Boolean,
+                                                  listenerBus: LiveListenerBus) = {

Review comment:
       Fix indentation here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org