You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/03/16 16:20:04 UTC

[GitHub] [spark] srowen commented on a change in pull request #27640: [SPARK-30667][CORE] Add all gather method to BarrierTaskContext

srowen commented on a change in pull request #27640: [SPARK-30667][CORE] Add all gather method to BarrierTaskContext
URL: https://github.com/apache/spark/pull/27640#discussion_r393146074
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
 ##########
 @@ -163,6 +155,73 @@ class BarrierTaskContext private[spark] (
       timerTask.cancel()
       timer.purge()
     }
+    json
+  }
+
+  /**
+   * :: Experimental ::
+   * Sets a global barrier and waits until all tasks in this stage hit this barrier. Similar to
+   * MPI_Barrier function in MPI, the barrier() function call blocks until all tasks in the same
+   * stage have reached this routine.
+   *
+   * CAUTION! In a barrier stage, each task must have the same number of barrier() calls, in all
+   * possible code branches. Otherwise, you may get the job hanging or a SparkException after
+   * timeout. Some examples of '''misuses''' are listed below:
+   * 1. Only call barrier() function on a subset of all the tasks in the same barrier stage, it
+   * shall lead to timeout of the function call.
+   * {{{
+   *   rdd.barrier().mapPartitions { iter =>
+   *       val context = BarrierTaskContext.get()
+   *       if (context.partitionId() == 0) {
+   *           // Do nothing.
+   *       } else {
+   *           context.barrier()
+   *       }
+   *       iter
+   *   }
+   * }}}
+   *
+   * 2. Include barrier() function in a try-catch code block, this may lead to timeout of the
+   * second function call.
+   * {{{
+   *   rdd.barrier().mapPartitions { iter =>
+   *       val context = BarrierTaskContext.get()
+   *       try {
+   *           // Do something that might throw an Exception.
+   *           doSomething()
+   *           context.barrier()
+   *       } catch {
+   *           case e: Exception => logWarning("...", e)
+   *       }
+   *       context.barrier()
+   *       iter
+   *   }
+   * }}}
+   */
+  @Experimental
+  @Since("2.4.0")
+  def barrier(): Unit = {
+    runBarrier(RequestMethod.BARRIER)
+    ()
+  }
+
+  /**
+   * :: Experimental ::
+   * Blocks until all tasks in the same stage have reached this routine. Each task passes in
+   * a message and returns with a list of all the messages passed in by each of those tasks.
+   *
+   * CAUTION! The allGather method requires the same precautions as the barrier method
+   *
+   * The message is type String rather than Array[Byte] because it is more convenient for
+   * the user at the cost of worse performance.
+   */
+  @Experimental
+  @Since("3.0.0")
+  def allGather(message: String): ArrayBuffer[String] = {
 
 Review comment:
   OK sure IndexedSeq. or Array is fine. Just something immutable

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org