You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/08/07 09:05:02 UTC
spark git commit: [SPARK-21621][CORE] Reset numRecordsWritten after
DiskBlockObjectWriter.commitAndGet called
Repository: spark
Updated Branches:
refs/heads/master 39e044e3d -> 534a063f7
[SPARK-21621][CORE] Reset numRecordsWritten after DiskBlockObjectWriter.commitAndGet called
## What changes were proposed in this pull request?
We should reset numRecordsWritten to zero after DiskBlockObjectWriter.commitAndGet called.
Because when `revertPartialWritesAndClose` be called, we decrease the written records in `ShuffleWriteMetrics` . However, we decreased the written records to zero, this should be wrong, we should only decreased the number reords after the last `commitAndGet` called.
## How was this patch tested?
Modified existing test.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Xianyang Liu <xi...@intel.com>
Closes #18830 from ConeyLiu/DiskBlockObjectWriter.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/534a063f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/534a063f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/534a063f
Branch: refs/heads/master
Commit: 534a063f7c693158437d13224f50d4ae789ff6fb
Parents: 39e044e
Author: Xianyang Liu <xi...@intel.com>
Authored: Mon Aug 7 17:04:53 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Mon Aug 7 17:04:53 2017 +0800
----------------------------------------------------------------------
.../scala/org/apache/spark/storage/DiskBlockObjectWriter.scala | 2 ++
.../org/apache/spark/storage/DiskBlockObjectWriterSuite.scala | 1 +
2 files changed, 3 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/534a063f/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
index eb3ff92..a024c83 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
@@ -95,6 +95,7 @@ private[spark] class DiskBlockObjectWriter(
/**
* Keep track of number of records written and also use this to periodically
* output bytes written since the latter is expensive to do for each record.
+ * And we reset it after every commitAndGet called.
*/
private var numRecordsWritten = 0
@@ -185,6 +186,7 @@ private[spark] class DiskBlockObjectWriter(
// In certain compression codecs, more bytes are written after streams are closed
writeMetrics.incBytesWritten(committedPosition - reportedPosition)
reportedPosition = committedPosition
+ numRecordsWritten = 0
fileSegment
} else {
new FileSegment(file, committedPosition, 0)
http://git-wip-us.apache.org/repos/asf/spark/blob/534a063f/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
index bfb3ac4..cea5501 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
@@ -116,6 +116,7 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
writer.revertPartialWritesAndClose()
assert(firstSegment.length === file.length())
assert(writeMetrics.bytesWritten === file.length())
+ assert(writeMetrics.recordsWritten == 1)
}
test("calling revertPartialWritesAndClose() after commit() should have no effect") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org