You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ji...@apache.org on 2020/06/01 21:34:56 UTC

[spark] branch branch-3.0 updated: [SPARK-31764][CORE][3.0] JsonProtocol doesn't write RDDInfo#isBarrier

This is an automated email from the ASF dual-hosted git repository.

jiangxb1987 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new b6c8366  [SPARK-31764][CORE][3.0] JsonProtocol doesn't write RDDInfo#isBarrier
b6c8366 is described below

commit b6c8366cc58b7d5e35ec2a31532ae7ee22275454
Author: Kousuke Saruta <sa...@oss.nttdata.com>
AuthorDate: Mon Jun 1 14:33:35 2020 -0700

    [SPARK-31764][CORE][3.0] JsonProtocol doesn't write RDDInfo#isBarrier
    
    ### What changes were proposed in this pull request?
    
    This PR backports the change of #28583 (SPARK-31764) to branch-3.0, which changes JsonProtocol to write RDDInfos#isBarrier.
    
    ### Why are the changes needed?
    
    JsonProtocol reads RDDInfos#isBarrier but not write it so it's a bug.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    I added a testcase.
    
    Closes #28660 from sarutak/SPARK-31764-branch-3.0.
    
    Authored-by: Kousuke Saruta <sa...@oss.nttdata.com>
    Signed-off-by: Xingbo Jiang <xi...@databricks.com>
---
 .../scala/org/apache/spark/util/JsonProtocol.scala |  1 +
 .../scheduler/EventLoggingListenerSuite.scala      | 44 ++++++++++++++++++++++
 .../org/apache/spark/util/JsonProtocolSuite.scala  | 11 ++++++
 3 files changed, 56 insertions(+)

diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index aee9862..f445fd4 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -475,6 +475,7 @@ private[spark] object JsonProtocol {
     ("Callsite" -> rddInfo.callSite) ~
     ("Parent IDs" -> parentIds) ~
     ("Storage Level" -> storageLevel) ~
+    ("Barrier" -> rddInfo.isBarrier) ~
     ("Number of Partitions" -> rddInfo.numPartitions) ~
     ("Number of Cached Partitions" -> rddInfo.numCachedPartitions) ~
     ("Memory Size" -> rddInfo.memSize) ~
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 2869240..046564d 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -36,6 +36,7 @@ import org.apache.spark.deploy.history.{EventLogFileReader, SingleEventLogFileWr
 import org.apache.spark.deploy.history.EventLogTestHelper._
 import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{EVENT_LOG_DIR, EVENT_LOG_ENABLED}
 import org.apache.spark.io._
 import org.apache.spark.metrics.{ExecutorMetricType, MetricsSystem}
 import org.apache.spark.scheduler.cluster.ExecutorInfo
@@ -99,6 +100,49 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
     testStageExecutorMetricsEventLogging()
   }
 
+  test("SPARK-31764: isBarrier should be logged in event log") {
+    val conf = new SparkConf()
+    conf.set(EVENT_LOG_ENABLED, true)
+    conf.set(EVENT_LOG_DIR, testDirPath.toString)
+    val sc = new SparkContext("local", "test-SPARK-31764", conf)
+    val appId = sc.applicationId
+
+    sc.parallelize(1 to 10)
+      .barrier()
+      .mapPartitions(_.map(elem => (elem, elem)))
+      .filter(elem => elem._1 % 2 == 0)
+      .reduceByKey(_ + _)
+      .collect
+    sc.stop()
+
+    val eventLogStream = EventLogFileReader.openEventLog(new Path(testDirPath, appId), fileSystem)
+    val events = readLines(eventLogStream).map(line => JsonProtocol.sparkEventFromJson(parse(line)))
+    val jobStartEvents = events
+      .filter(event => event.isInstanceOf[SparkListenerJobStart])
+      .map(_.asInstanceOf[SparkListenerJobStart])
+
+    assert(jobStartEvents.size === 1)
+    val stageInfos = jobStartEvents.head.stageInfos
+    assert(stageInfos.size === 2)
+
+    val stage0 = stageInfos(0)
+    val rddInfosInStage0 = stage0.rddInfos
+    assert(rddInfosInStage0.size === 3)
+    val sortedRddInfosInStage0 = rddInfosInStage0.sortBy(_.scope.get.name)
+    assert(sortedRddInfosInStage0(0).scope.get.name === "filter")
+    assert(sortedRddInfosInStage0(0).isBarrier === true)
+    assert(sortedRddInfosInStage0(1).scope.get.name === "mapPartitions")
+    assert(sortedRddInfosInStage0(1).isBarrier === true)
+    assert(sortedRddInfosInStage0(2).scope.get.name === "parallelize")
+    assert(sortedRddInfosInStage0(2).isBarrier === false)
+
+    val stage1 = stageInfos(1)
+    val rddInfosInStage1 = stage1.rddInfos
+    assert(rddInfosInStage1.size === 1)
+    assert(rddInfosInStage1(0).scope.get.name === "reduceByKey")
+    assert(rddInfosInStage1(0).isBarrier === false) // reduceByKey
+  }
+
   /* ----------------- *
    * Actual test logic *
    * ----------------- */
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index f2b9f97..d1f09d8 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -1063,6 +1063,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
       |          "Deserialized": true,
       |          "Replication": 1
       |        },
+      |        "Barrier" : false,
       |        "Number of Partitions": 201,
       |        "Number of Cached Partitions": 301,
       |        "Memory Size": 401,
@@ -1585,6 +1586,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
+      |          "Barrier" : false,
       |          "Number of Partitions": 200,
       |          "Number of Cached Partitions": 300,
       |          "Memory Size": 400,
@@ -1629,6 +1631,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
+      |          "Barrier" : false,
       |          "Number of Partitions": 400,
       |          "Number of Cached Partitions": 600,
       |          "Memory Size": 800,
@@ -1645,6 +1648,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
+      |          "Barrier" : false,
       |          "Number of Partitions": 401,
       |          "Number of Cached Partitions": 601,
       |          "Memory Size": 801,
@@ -1689,6 +1693,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
+      |          "Barrier" : false,
       |          "Number of Partitions": 600,
       |          "Number of Cached Partitions": 900,
       |          "Memory Size": 1200,
@@ -1705,6 +1710,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
+      |          "Barrier" : false,
       |          "Number of Partitions": 601,
       |          "Number of Cached Partitions": 901,
       |          "Memory Size": 1201,
@@ -1721,6 +1727,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
+      |          "Barrier" : false,
       |          "Number of Partitions": 602,
       |          "Number of Cached Partitions": 902,
       |          "Memory Size": 1202,
@@ -1765,6 +1772,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
+      |          "Barrier" : false,
       |          "Number of Partitions": 800,
       |          "Number of Cached Partitions": 1200,
       |          "Memory Size": 1600,
@@ -1781,6 +1789,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
+      |          "Barrier" : false,
       |          "Number of Partitions": 801,
       |          "Number of Cached Partitions": 1201,
       |          "Memory Size": 1601,
@@ -1797,6 +1806,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
+      |          "Barrier" : false,
       |          "Number of Partitions": 802,
       |          "Number of Cached Partitions": 1202,
       |          "Memory Size": 1602,
@@ -1813,6 +1823,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
+      |          "Barrier" : false,
       |          "Number of Partitions": 803,
       |          "Number of Cached Partitions": 1203,
       |          "Memory Size": 1603,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org