You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/08/30 11:29:28 UTC

[GitHub] [spark] xuanyuanking commented on a change in pull request #25341: [SPARK-28607][CORE][SHUFFLE] Don't store partition lengths twice.

xuanyuanking commented on a change in pull request #25341: [SPARK-28607][CORE][SHUFFLE] Don't store partition lengths twice.
URL: https://github.com/apache/spark/pull/25341#discussion_r319471170
 
 

 ##########
 File path: core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
 ##########
 @@ -195,40 +191,36 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
    */
   private long[] writePartitionedData(ShuffleMapOutputWriter mapOutputWriter) throws IOException {
     // Track location of the partition starts in the output file
-    final long[] lengths = new long[numPartitions];
-    if (partitionWriters == null) {
-      // We were passed an empty iterator
-      return lengths;
-    }
-    final long writeStartTime = System.nanoTime();
-    try {
-      for (int i = 0; i < numPartitions; i++) {
-        final File file = partitionWriterSegments[i].file();
-        ShufflePartitionWriter writer = mapOutputWriter.getPartitionWriter(i);
-        if (file.exists()) {
-          if (transferToEnabled) {
-            // Using WritableByteChannelWrapper to make resource closing consistent between
-            // this implementation and UnsafeShuffleWriter.
-            Optional<WritableByteChannelWrapper> maybeOutputChannel = writer.openChannelWrapper();
-            if (maybeOutputChannel.isPresent()) {
-              writePartitionedDataWithChannel(file, maybeOutputChannel.get());
+    if (partitionWriters != null) {
+      final long writeStartTime = System.nanoTime();
+      try {
+        for (int i = 0; i < numPartitions; i++) {
+          final File file = partitionWriterSegments[i].file();
+          ShufflePartitionWriter writer = mapOutputWriter.getPartitionWriter(i);
+          if (file.exists()) {
+            if (transferToEnabled) {
+              // Using WritableByteChannelWrapper to make resource closing consistent between
+              // this implementation and UnsafeShuffleWriter.
+              Optional<WritableByteChannelWrapper> maybeOutputChannel = writer.openChannelWrapper();
+              if (maybeOutputChannel.isPresent()) {
+                writePartitionedDataWithChannel(file, maybeOutputChannel.get());
+              } else {
+                writePartitionedDataWithStream(file, writer);
+              }
             } else {
               writePartitionedDataWithStream(file, writer);
             }
-          } else {
-            writePartitionedDataWithStream(file, writer);
-          }
-          if (!file.delete()) {
-            logger.error("Unable to delete file for partition {}", i);
+            if (!file.delete()) {
+              logger.error("Unable to delete file for partition {}", i);
+            }
           }
         }
-        lengths[i] = writer.getNumBytesWritten();
 
 Review comment:
   Just a quick question here. So after this change, there's no place to call `ShufflePartitionWriter.getNumBytesWritten()`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org