You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ji...@apache.org on 2020/06/01 21:34:56 UTC
[spark] branch branch-3.0 updated: [SPARK-31764][CORE][3.0]
JsonProtocol doesn't write RDDInfo#isBarrier
This is an automated email from the ASF dual-hosted git repository.
jiangxb1987 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new b6c8366 [SPARK-31764][CORE][3.0] JsonProtocol doesn't write RDDInfo#isBarrier
b6c8366 is described below
commit b6c8366cc58b7d5e35ec2a31532ae7ee22275454
Author: Kousuke Saruta <sa...@oss.nttdata.com>
AuthorDate: Mon Jun 1 14:33:35 2020 -0700
[SPARK-31764][CORE][3.0] JsonProtocol doesn't write RDDInfo#isBarrier
### What changes were proposed in this pull request?
This PR backports the change of #28583 (SPARK-31764) to branch-3.0, which changes JsonProtocol to write RDDInfos#isBarrier.
### Why are the changes needed?
JsonProtocol reads RDDInfos#isBarrier but not write it so it's a bug.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
I added a testcase.
Closes #28660 from sarutak/SPARK-31764-branch-3.0.
Authored-by: Kousuke Saruta <sa...@oss.nttdata.com>
Signed-off-by: Xingbo Jiang <xi...@databricks.com>
---
.../scala/org/apache/spark/util/JsonProtocol.scala | 1 +
.../scheduler/EventLoggingListenerSuite.scala | 44 ++++++++++++++++++++++
.../org/apache/spark/util/JsonProtocolSuite.scala | 11 ++++++
3 files changed, 56 insertions(+)
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index aee9862..f445fd4 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -475,6 +475,7 @@ private[spark] object JsonProtocol {
("Callsite" -> rddInfo.callSite) ~
("Parent IDs" -> parentIds) ~
("Storage Level" -> storageLevel) ~
+ ("Barrier" -> rddInfo.isBarrier) ~
("Number of Partitions" -> rddInfo.numPartitions) ~
("Number of Cached Partitions" -> rddInfo.numCachedPartitions) ~
("Memory Size" -> rddInfo.memSize) ~
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 2869240..046564d 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -36,6 +36,7 @@ import org.apache.spark.deploy.history.{EventLogFileReader, SingleEventLogFileWr
import org.apache.spark.deploy.history.EventLogTestHelper._
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{EVENT_LOG_DIR, EVENT_LOG_ENABLED}
import org.apache.spark.io._
import org.apache.spark.metrics.{ExecutorMetricType, MetricsSystem}
import org.apache.spark.scheduler.cluster.ExecutorInfo
@@ -99,6 +100,49 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
testStageExecutorMetricsEventLogging()
}
+ test("SPARK-31764: isBarrier should be logged in event log") {
+ val conf = new SparkConf()
+ conf.set(EVENT_LOG_ENABLED, true)
+ conf.set(EVENT_LOG_DIR, testDirPath.toString)
+ val sc = new SparkContext("local", "test-SPARK-31764", conf)
+ val appId = sc.applicationId
+
+ sc.parallelize(1 to 10)
+ .barrier()
+ .mapPartitions(_.map(elem => (elem, elem)))
+ .filter(elem => elem._1 % 2 == 0)
+ .reduceByKey(_ + _)
+ .collect
+ sc.stop()
+
+ val eventLogStream = EventLogFileReader.openEventLog(new Path(testDirPath, appId), fileSystem)
+ val events = readLines(eventLogStream).map(line => JsonProtocol.sparkEventFromJson(parse(line)))
+ val jobStartEvents = events
+ .filter(event => event.isInstanceOf[SparkListenerJobStart])
+ .map(_.asInstanceOf[SparkListenerJobStart])
+
+ assert(jobStartEvents.size === 1)
+ val stageInfos = jobStartEvents.head.stageInfos
+ assert(stageInfos.size === 2)
+
+ val stage0 = stageInfos(0)
+ val rddInfosInStage0 = stage0.rddInfos
+ assert(rddInfosInStage0.size === 3)
+ val sortedRddInfosInStage0 = rddInfosInStage0.sortBy(_.scope.get.name)
+ assert(sortedRddInfosInStage0(0).scope.get.name === "filter")
+ assert(sortedRddInfosInStage0(0).isBarrier === true)
+ assert(sortedRddInfosInStage0(1).scope.get.name === "mapPartitions")
+ assert(sortedRddInfosInStage0(1).isBarrier === true)
+ assert(sortedRddInfosInStage0(2).scope.get.name === "parallelize")
+ assert(sortedRddInfosInStage0(2).isBarrier === false)
+
+ val stage1 = stageInfos(1)
+ val rddInfosInStage1 = stage1.rddInfos
+ assert(rddInfosInStage1.size === 1)
+ assert(rddInfosInStage1(0).scope.get.name === "reduceByKey")
+ assert(rddInfosInStage1(0).isBarrier === false) // reduceByKey
+ }
+
/* ----------------- *
* Actual test logic *
* ----------------- */
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index f2b9f97..d1f09d8 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -1063,6 +1063,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Deserialized": true,
| "Replication": 1
| },
+ | "Barrier" : false,
| "Number of Partitions": 201,
| "Number of Cached Partitions": 301,
| "Memory Size": 401,
@@ -1585,6 +1586,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Deserialized": true,
| "Replication": 1
| },
+ | "Barrier" : false,
| "Number of Partitions": 200,
| "Number of Cached Partitions": 300,
| "Memory Size": 400,
@@ -1629,6 +1631,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Deserialized": true,
| "Replication": 1
| },
+ | "Barrier" : false,
| "Number of Partitions": 400,
| "Number of Cached Partitions": 600,
| "Memory Size": 800,
@@ -1645,6 +1648,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Deserialized": true,
| "Replication": 1
| },
+ | "Barrier" : false,
| "Number of Partitions": 401,
| "Number of Cached Partitions": 601,
| "Memory Size": 801,
@@ -1689,6 +1693,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Deserialized": true,
| "Replication": 1
| },
+ | "Barrier" : false,
| "Number of Partitions": 600,
| "Number of Cached Partitions": 900,
| "Memory Size": 1200,
@@ -1705,6 +1710,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Deserialized": true,
| "Replication": 1
| },
+ | "Barrier" : false,
| "Number of Partitions": 601,
| "Number of Cached Partitions": 901,
| "Memory Size": 1201,
@@ -1721,6 +1727,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Deserialized": true,
| "Replication": 1
| },
+ | "Barrier" : false,
| "Number of Partitions": 602,
| "Number of Cached Partitions": 902,
| "Memory Size": 1202,
@@ -1765,6 +1772,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Deserialized": true,
| "Replication": 1
| },
+ | "Barrier" : false,
| "Number of Partitions": 800,
| "Number of Cached Partitions": 1200,
| "Memory Size": 1600,
@@ -1781,6 +1789,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Deserialized": true,
| "Replication": 1
| },
+ | "Barrier" : false,
| "Number of Partitions": 801,
| "Number of Cached Partitions": 1201,
| "Memory Size": 1601,
@@ -1797,6 +1806,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Deserialized": true,
| "Replication": 1
| },
+ | "Barrier" : false,
| "Number of Partitions": 802,
| "Number of Cached Partitions": 1202,
| "Memory Size": 1602,
@@ -1813,6 +1823,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Deserialized": true,
| "Replication": 1
| },
+ | "Barrier" : false,
| "Number of Partitions": 803,
| "Number of Cached Partitions": 1203,
| "Memory Size": 1603,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org