You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/08/07 09:05:02 UTC

spark git commit: [SPARK-21621][CORE] Reset numRecordsWritten after DiskBlockObjectWriter.commitAndGet called

Repository: spark
Updated Branches:
  refs/heads/master 39e044e3d -> 534a063f7


[SPARK-21621][CORE] Reset numRecordsWritten after DiskBlockObjectWriter.commitAndGet called

## What changes were proposed in this pull request?

We should reset numRecordsWritten to zero after DiskBlockObjectWriter.commitAndGet called.
Because when `revertPartialWritesAndClose` be called, we decrease the written records in `ShuffleWriteMetrics` . However, we decreased the written records to zero, this should be wrong, we should only decreased the number reords after the last `commitAndGet` called.

## How was this patch tested?
Modified existing test.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Xianyang Liu <xi...@intel.com>

Closes #18830 from ConeyLiu/DiskBlockObjectWriter.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/534a063f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/534a063f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/534a063f

Branch: refs/heads/master
Commit: 534a063f7c693158437d13224f50d4ae789ff6fb
Parents: 39e044e
Author: Xianyang Liu <xi...@intel.com>
Authored: Mon Aug 7 17:04:53 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Mon Aug 7 17:04:53 2017 +0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/storage/DiskBlockObjectWriter.scala     | 2 ++
 .../org/apache/spark/storage/DiskBlockObjectWriterSuite.scala      | 1 +
 2 files changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/534a063f/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
index eb3ff92..a024c83 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
@@ -95,6 +95,7 @@ private[spark] class DiskBlockObjectWriter(
   /**
    * Keep track of number of records written and also use this to periodically
    * output bytes written since the latter is expensive to do for each record.
+   * And we reset it after every commitAndGet called.
    */
   private var numRecordsWritten = 0
 
@@ -185,6 +186,7 @@ private[spark] class DiskBlockObjectWriter(
       // In certain compression codecs, more bytes are written after streams are closed
       writeMetrics.incBytesWritten(committedPosition - reportedPosition)
       reportedPosition = committedPosition
+      numRecordsWritten = 0
       fileSegment
     } else {
       new FileSegment(file, committedPosition, 0)

http://git-wip-us.apache.org/repos/asf/spark/blob/534a063f/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
index bfb3ac4..cea5501 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
@@ -116,6 +116,7 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
     writer.revertPartialWritesAndClose()
     assert(firstSegment.length === file.length())
     assert(writeMetrics.bytesWritten === file.length())
+    assert(writeMetrics.recordsWritten == 1)
   }
 
   test("calling revertPartialWritesAndClose() after commit() should have no effect") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org