You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ji...@apache.org on 2020/05/27 21:37:58 UTC
[spark] branch master updated: [SPARK-31764][CORE] JsonProtocol
doesn't write RDDInfo#isBarrier
This is an automated email from the ASF dual-hosted git repository.
jiangxb1987 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d19b173 [SPARK-31764][CORE] JsonProtocol doesn't write RDDInfo#isBarrier
d19b173 is described below
commit d19b173b47af04fe6f03e2b21b60eb317aeaae4f
Author: Kousuke Saruta <sa...@oss.nttdata.com>
AuthorDate: Wed May 27 14:36:12 2020 -0700
[SPARK-31764][CORE] JsonProtocol doesn't write RDDInfo#isBarrier
### What changes were proposed in this pull request?
This PR changes JsonProtocol to write RDDInfos#isBarrier.
### Why are the changes needed?
JsonProtocol reads RDDInfos#isBarrier but not write it so it's a bug.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
I added a testcase.
Closes #28583 from sarutak/SPARK-31764.
Authored-by: Kousuke Saruta <sa...@oss.nttdata.com>
Signed-off-by: Xingbo Jiang <xi...@databricks.com>
---
.../scala/org/apache/spark/util/JsonProtocol.scala | 1 +
.../scheduler/EventLoggingListenerSuite.scala | 44 ++++++++++++++++++++++
.../org/apache/spark/util/JsonProtocolSuite.scala | 11 ++++++
3 files changed, 56 insertions(+)
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 26bbff5..844d9b7 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -487,6 +487,7 @@ private[spark] object JsonProtocol {
("Callsite" -> rddInfo.callSite) ~
("Parent IDs" -> parentIds) ~
("Storage Level" -> storageLevel) ~
+ ("Barrier" -> rddInfo.isBarrier) ~
("Number of Partitions" -> rddInfo.numPartitions) ~
("Number of Cached Partitions" -> rddInfo.numCachedPartitions) ~
("Memory Size" -> rddInfo.memSize) ~
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 61ea21f..7c23e44 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -36,6 +36,7 @@ import org.apache.spark.deploy.history.{EventLogFileReader, SingleEventLogFileWr
import org.apache.spark.deploy.history.EventLogTestHelper._
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{EVENT_LOG_DIR, EVENT_LOG_ENABLED}
import org.apache.spark.io._
import org.apache.spark.metrics.{ExecutorMetricType, MetricsSystem}
import org.apache.spark.resource.ResourceProfile
@@ -100,6 +101,49 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
testStageExecutorMetricsEventLogging()
}
+ test("SPARK-31764: isBarrier should be logged in event log") {
+ val conf = new SparkConf()
+ conf.set(EVENT_LOG_ENABLED, true)
+ conf.set(EVENT_LOG_DIR, testDirPath.toString)
+ val sc = new SparkContext("local", "test-SPARK-31764", conf)
+ val appId = sc.applicationId
+
+ sc.parallelize(1 to 10)
+ .barrier()
+ .mapPartitions(_.map(elem => (elem, elem)))
+ .filter(elem => elem._1 % 2 == 0)
+ .reduceByKey(_ + _)
+ .collect
+ sc.stop()
+
+ val eventLogStream = EventLogFileReader.openEventLog(new Path(testDirPath, appId), fileSystem)
+ val events = readLines(eventLogStream).map(line => JsonProtocol.sparkEventFromJson(parse(line)))
+ val jobStartEvents = events
+ .filter(event => event.isInstanceOf[SparkListenerJobStart])
+ .map(_.asInstanceOf[SparkListenerJobStart])
+
+ assert(jobStartEvents.size === 1)
+ val stageInfos = jobStartEvents.head.stageInfos
+ assert(stageInfos.size === 2)
+
+ val stage0 = stageInfos(0)
+ val rddInfosInStage0 = stage0.rddInfos
+ assert(rddInfosInStage0.size === 3)
+ val sortedRddInfosInStage0 = rddInfosInStage0.sortBy(_.scope.get.name)
+ assert(sortedRddInfosInStage0(0).scope.get.name === "filter")
+ assert(sortedRddInfosInStage0(0).isBarrier === true)
+ assert(sortedRddInfosInStage0(1).scope.get.name === "mapPartitions")
+ assert(sortedRddInfosInStage0(1).isBarrier === true)
+ assert(sortedRddInfosInStage0(2).scope.get.name === "parallelize")
+ assert(sortedRddInfosInStage0(2).isBarrier === false)
+
+ val stage1 = stageInfos(1)
+ val rddInfosInStage1 = stage1.rddInfos
+ assert(rddInfosInStage1.size === 1)
+ assert(rddInfosInStage1(0).scope.get.name === "reduceByKey")
+ assert(rddInfosInStage1(0).isBarrier === false) // reduceByKey
+ }
+
/* ----------------- *
* Actual test logic *
* ----------------- */
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index bc7f8b5..248142a 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -1100,6 +1100,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Deserialized": true,
| "Replication": 1
| },
+ | "Barrier" : false,
| "Number of Partitions": 201,
| "Number of Cached Partitions": 301,
| "Memory Size": 401,
@@ -1623,6 +1624,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Deserialized": true,
| "Replication": 1
| },
+ | "Barrier" : false,
| "Number of Partitions": 200,
| "Number of Cached Partitions": 300,
| "Memory Size": 400,
@@ -1668,6 +1670,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Deserialized": true,
| "Replication": 1
| },
+ | "Barrier" : false,
| "Number of Partitions": 400,
| "Number of Cached Partitions": 600,
| "Memory Size": 800,
@@ -1684,6 +1687,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Deserialized": true,
| "Replication": 1
| },
+ | "Barrier" : false,
| "Number of Partitions": 401,
| "Number of Cached Partitions": 601,
| "Memory Size": 801,
@@ -1729,6 +1733,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Deserialized": true,
| "Replication": 1
| },
+ | "Barrier" : false,
| "Number of Partitions": 600,
| "Number of Cached Partitions": 900,
| "Memory Size": 1200,
@@ -1745,6 +1750,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Deserialized": true,
| "Replication": 1
| },
+ | "Barrier" : false,
| "Number of Partitions": 601,
| "Number of Cached Partitions": 901,
| "Memory Size": 1201,
@@ -1761,6 +1767,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Deserialized": true,
| "Replication": 1
| },
+ | "Barrier" : false,
| "Number of Partitions": 602,
| "Number of Cached Partitions": 902,
| "Memory Size": 1202,
@@ -1806,6 +1813,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Deserialized": true,
| "Replication": 1
| },
+ | "Barrier" : false,
| "Number of Partitions": 800,
| "Number of Cached Partitions": 1200,
| "Memory Size": 1600,
@@ -1822,6 +1830,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Deserialized": true,
| "Replication": 1
| },
+ | "Barrier" : false,
| "Number of Partitions": 801,
| "Number of Cached Partitions": 1201,
| "Memory Size": 1601,
@@ -1838,6 +1847,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Deserialized": true,
| "Replication": 1
| },
+ | "Barrier" : false,
| "Number of Partitions": 802,
| "Number of Cached Partitions": 1202,
| "Memory Size": 1602,
@@ -1854,6 +1864,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Deserialized": true,
| "Replication": 1
| },
+ | "Barrier" : false,
| "Number of Partitions": 803,
| "Number of Cached Partitions": 1203,
| "Memory Size": 1603,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org