You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/07/31 18:13:27 UTC

[GitHub] [spark] mccheah commented on a change in pull request #25304: [SPARK-28570][CORE][SHUFFLE] Make UnsafeShuffleWriter use the new API.

mccheah commented on a change in pull request #25304: [SPARK-28570][CORE][SHUFFLE] Make UnsafeShuffleWriter use the new API.
URL: https://github.com/apache/spark/pull/25304#discussion_r309361928
 
 

 ##########
 File path: core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
 ##########
 @@ -430,54 +415,49 @@ void forceSorterToSpill() throws IOException {
    * This is only safe when the IO compression codec and serializer support concatenation of
    * serialized streams.
    *
+   * @param spills the spills to merge.
+   * @param mapWriter the map output writer to use for output.
    * @return the partition lengths in the merged file.
    */
-  private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) throws IOException {
-    assert (spills.length >= 2);
+  private long[] mergeSpillsWithTransferTo(
+      SpillInfo[] spills,
+      ShuffleMapOutputWriter mapWriter) throws IOException {
     final int numPartitions = partitioner.numPartitions();
     final long[] partitionLengths = new long[numPartitions];
     final FileChannel[] spillInputChannels = new FileChannel[spills.length];
     final long[] spillInputChannelPositions = new long[spills.length];
-    FileChannel mergedFileOutputChannel = null;
 
     boolean threwException = true;
     try {
       for (int i = 0; i < spills.length; i++) {
         spillInputChannels[i] = new FileInputStream(spills[i].file).getChannel();
       }
-      // This file needs to opened in append mode in order to work around a Linux kernel bug that
-      // affects transferTo; see SPARK-3948 for more details.
-      mergedFileOutputChannel = new FileOutputStream(outputFile, true).getChannel();
-
-      long bytesWrittenToMergedFile = 0;
       for (int partition = 0; partition < numPartitions; partition++) {
-        for (int i = 0; i < spills.length; i++) {
-          final long partitionLengthInSpill = spills[i].partitionLengths[partition];
-          final FileChannel spillInputChannel = spillInputChannels[i];
-          final long writeStartTime = System.nanoTime();
-          Utils.copyFileStreamNIO(
-            spillInputChannel,
-            mergedFileOutputChannel,
-            spillInputChannelPositions[i],
-            partitionLengthInSpill);
-          spillInputChannelPositions[i] += partitionLengthInSpill;
-          writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
-          bytesWrittenToMergedFile += partitionLengthInSpill;
-          partitionLengths[partition] += partitionLengthInSpill;
+        boolean copyThrewExecption = true;
+        ShufflePartitionWriter writer = mapWriter.getPartitionWriter(partition);
+        WritableByteChannelWrapper resolvedChannel = writer.openChannelWrapper()
+            .orElseGet(() -> new StreamFallbackChannelWrapper(openStreamUnchecked(writer)));
+        try {
+          for (int i = 0; i < spills.length; i++) {
+            long partitionLengthInSpill = 0L;
+            partitionLengthInSpill += spills[i].partitionLengths[partition];
+            final FileChannel spillInputChannel = spillInputChannels[i];
+            final long writeStartTime = System.nanoTime();
+            Utils.copyFileStreamNIO(
+                    spillInputChannel,
+                    resolvedChannel.channel(),
+                    spillInputChannelPositions[i],
+                    partitionLengthInSpill);
+            copyThrewExecption = false;
+            spillInputChannelPositions[i] += partitionLengthInSpill;
+            writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
+          }
+        } finally {
+          Closeables.close(resolvedChannel, copyThrewExecption);
         }
-      }
-      // Check the position after transferTo loop to see if it is in the right position and raise an
-      // exception if it is incorrect. The position will not be increased to the expected length
-      // after calling transferTo in kernel version 2.6.32. This issue is described at
-      // https://bugs.openjdk.java.net/browse/JDK-7052359 and SPARK-3948.
-      if (mergedFileOutputChannel.position() != bytesWrittenToMergedFile) {
 
 Review comment:
   Nice catch - I moved it accordingly.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org