You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ir...@apache.org on 2018/02/02 15:11:00 UTC

spark git commit: [SPARK-23253][CORE][SHUFFLE] Only write shuffle temporary index file when there is not an existing one

Repository: spark
Updated Branches:
  refs/heads/master b9503fcbb -> dd52681bf


[SPARK-23253][CORE][SHUFFLE] Only write shuffle temporary index file when there is not an existing one

## What changes were proposed in this pull request?

Shuffle Index temporay file is used for atomic creating shuffle index file, it is not needed when the index file already exists after another attempts of same task had it done.

## How was this patch tested?

exitsting ut

cc squito

Author: Kent Yao <ya...@hotmail.com>

Closes #20422 from yaooqinn/SPARK-23253.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dd52681b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dd52681b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dd52681b

Branch: refs/heads/master
Commit: dd52681bf542386711609cb037a55b3d264eddef
Parents: b9503fc
Author: Kent Yao <ya...@hotmail.com>
Authored: Fri Feb 2 09:10:50 2018 -0600
Committer: Imran Rashid <ir...@cloudera.com>
Committed: Fri Feb 2 09:10:50 2018 -0600

----------------------------------------------------------------------
 .../shuffle/IndexShuffleBlockResolver.scala     | 27 +++++----
 .../sort/IndexShuffleBlockResolverSuite.scala   | 59 ++++++++++++++------
 2 files changed, 56 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dd52681b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
index 266ee42..c5f3f6e 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
@@ -141,19 +141,6 @@ private[spark] class IndexShuffleBlockResolver(
     val indexFile = getIndexFile(shuffleId, mapId)
     val indexTmp = Utils.tempFileWith(indexFile)
     try {
-      val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
-      Utils.tryWithSafeFinally {
-        // We take in lengths of each block, need to convert it to offsets.
-        var offset = 0L
-        out.writeLong(offset)
-        for (length <- lengths) {
-          offset += length
-          out.writeLong(offset)
-        }
-      } {
-        out.close()
-      }
-
       val dataFile = getDataFile(shuffleId, mapId)
       // There is only one IndexShuffleBlockResolver per executor, this synchronization make sure
       // the following check and rename are atomic.
@@ -166,10 +153,22 @@ private[spark] class IndexShuffleBlockResolver(
           if (dataTmp != null && dataTmp.exists()) {
             dataTmp.delete()
           }
-          indexTmp.delete()
         } else {
           // This is the first successful attempt in writing the map outputs for this task,
           // so override any existing index and data files with the ones we wrote.
+          val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
+          Utils.tryWithSafeFinally {
+            // We take in lengths of each block, need to convert it to offsets.
+            var offset = 0L
+            out.writeLong(offset)
+            for (length <- lengths) {
+              offset += length
+              out.writeLong(offset)
+            }
+          } {
+            out.close()
+          }
+
           if (indexFile.exists()) {
             indexFile.delete()
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/dd52681b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
index d21ce73..4ce379b 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.shuffle.sort
 
-import java.io.{File, FileInputStream, FileOutputStream}
+import java.io.{DataInputStream, File, FileInputStream, FileOutputStream}
 
 import org.mockito.{Mock, MockitoAnnotations}
 import org.mockito.Answers.RETURNS_SMART_NULLS
@@ -64,6 +64,9 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa
   }
 
   test("commit shuffle files multiple times") {
+    val shuffleId = 1
+    val mapId = 2
+    val idxName = s"shuffle_${shuffleId}_${mapId}_0.index"
     val resolver = new IndexShuffleBlockResolver(conf, blockManager)
     val lengths = Array[Long](10, 0, 20)
     val dataTmp = File.createTempFile("shuffle", null, tempDir)
@@ -73,9 +76,13 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa
     } {
       out.close()
     }
-    resolver.writeIndexFileAndCommit(1, 2, lengths, dataTmp)
+    resolver.writeIndexFileAndCommit(shuffleId, mapId, lengths, dataTmp)
 
-    val dataFile = resolver.getDataFile(1, 2)
+    val indexFile = new File(tempDir.getAbsolutePath, idxName)
+    val dataFile = resolver.getDataFile(shuffleId, mapId)
+
+    assert(indexFile.exists())
+    assert(indexFile.length() === (lengths.length + 1) * 8)
     assert(dataFile.exists())
     assert(dataFile.length() === 30)
     assert(!dataTmp.exists())
@@ -89,7 +96,9 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa
     } {
       out2.close()
     }
-    resolver.writeIndexFileAndCommit(1, 2, lengths2, dataTmp2)
+    resolver.writeIndexFileAndCommit(shuffleId, mapId, lengths2, dataTmp2)
+
+    assert(indexFile.length() === (lengths.length + 1) * 8)
     assert(lengths2.toSeq === lengths.toSeq)
     assert(dataFile.exists())
     assert(dataFile.length() === 30)
@@ -97,18 +106,27 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa
 
     // The dataFile should be the previous one
     val firstByte = new Array[Byte](1)
-    val in = new FileInputStream(dataFile)
+    val dataIn = new FileInputStream(dataFile)
     Utils.tryWithSafeFinally {
-      in.read(firstByte)
+      dataIn.read(firstByte)
     } {
-      in.close()
+      dataIn.close()
     }
     assert(firstByte(0) === 0)
 
+    // The index file should not change
+    val indexIn = new DataInputStream(new FileInputStream(indexFile))
+    Utils.tryWithSafeFinally {
+      indexIn.readLong() // the first offset is always 0
+      assert(indexIn.readLong() === 10, "The index file should not change")
+    } {
+      indexIn.close()
+    }
+
     // remove data file
     dataFile.delete()
 
-    val lengths3 = Array[Long](10, 10, 15)
+    val lengths3 = Array[Long](7, 10, 15, 3)
     val dataTmp3 = File.createTempFile("shuffle", null, tempDir)
     val out3 = new FileOutputStream(dataTmp3)
     Utils.tryWithSafeFinally {
@@ -117,20 +135,29 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa
     } {
       out3.close()
     }
-    resolver.writeIndexFileAndCommit(1, 2, lengths3, dataTmp3)
+    resolver.writeIndexFileAndCommit(shuffleId, mapId, lengths3, dataTmp3)
+    assert(indexFile.length() === (lengths3.length + 1) * 8)
     assert(lengths3.toSeq != lengths.toSeq)
     assert(dataFile.exists())
     assert(dataFile.length() === 35)
-    assert(!dataTmp2.exists())
+    assert(!dataTmp3.exists())
 
-    // The dataFile should be the previous one
-    val firstByte2 = new Array[Byte](1)
-    val in2 = new FileInputStream(dataFile)
+    // The dataFile should be the new one, since we deleted the dataFile from the first attempt
+    val dataIn2 = new FileInputStream(dataFile)
+    Utils.tryWithSafeFinally {
+      dataIn2.read(firstByte)
+    } {
+      dataIn2.close()
+    }
+    assert(firstByte(0) === 2)
+
+    // The index file should be updated, since we deleted the dataFile from the first attempt
+    val indexIn2 = new DataInputStream(new FileInputStream(indexFile))
     Utils.tryWithSafeFinally {
-      in2.read(firstByte2)
+      indexIn2.readLong() // the first offset is always 0
+      assert(indexIn2.readLong() === 7, "The index file should be updated")
     } {
-      in2.close()
+      indexIn2.close()
     }
-    assert(firstByte2(0) === 2)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org