You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/07/04 01:51:21 UTC

[GitHub] [spark] squito commented on a change in pull request #25007: [SPARK-28209][CORE][SHUFFLE] Proposed new shuffle writer API

squito commented on a change in pull request #25007: [SPARK-28209][CORE][SHUFFLE] Proposed new shuffle writer API 
URL: https://github.com/apache/spark/pull/25007#discussion_r300161025
 
 

 ##########
 File path: core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
 ##########
 @@ -99,74 +106,83 @@
 
   BypassMergeSortShuffleWriter(
       BlockManager blockManager,
-      IndexShuffleBlockResolver shuffleBlockResolver,
       BypassMergeSortShuffleHandle<K, V> handle,
       int mapId,
+      long mapTaskAttemptId,
       SparkConf conf,
-      ShuffleWriteMetricsReporter writeMetrics) {
+      ShuffleWriteMetricsReporter writeMetrics,
+      ShuffleWriteSupport shuffleWriteSupport) {
     // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
     this.fileBufferSize = (int) (long) conf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024;
     this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true);
     this.blockManager = blockManager;
     final ShuffleDependency<K, V, V> dep = handle.dependency();
     this.mapId = mapId;
+    this.mapTaskAttemptId = mapTaskAttemptId;
     this.shuffleId = dep.shuffleId();
     this.partitioner = dep.partitioner();
     this.numPartitions = partitioner.numPartitions();
     this.writeMetrics = writeMetrics;
     this.serializer = dep.serializer();
-    this.shuffleBlockResolver = shuffleBlockResolver;
+    this.shuffleWriteSupport = shuffleWriteSupport;
   }
 
   @Override
   public void write(Iterator<Product2<K, V>> records) throws IOException {
     assert (partitionWriters == null);
-    if (!records.hasNext()) {
-      partitionLengths = new long[numPartitions];
-      shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);
-      mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
-      return;
-    }
-    final SerializerInstance serInstance = serializer.newInstance();
-    final long openStartTime = System.nanoTime();
-    partitionWriters = new DiskBlockObjectWriter[numPartitions];
-    partitionWriterSegments = new FileSegment[numPartitions];
-    for (int i = 0; i < numPartitions; i++) {
-      final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
-        blockManager.diskBlockManager().createTempShuffleBlock();
-      final File file = tempShuffleBlockIdPlusFile._2();
-      final BlockId blockId = tempShuffleBlockIdPlusFile._1();
-      partitionWriters[i] =
-        blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
-    }
-    // Creating the file to write to and creating a disk writer both involve interacting with
-    // the disk, and can take a long time in aggregate when we open many files, so should be
-    // included in the shuffle write time.
-    writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
-
-    while (records.hasNext()) {
-      final Product2<K, V> record = records.next();
-      final K key = record._1();
-      partitionWriters[partitioner.getPartition(key)].write(key, record._2());
-    }
+    ShuffleMapOutputWriter mapOutputWriter = shuffleWriteSupport
+        .createMapOutputWriter(shuffleId, mapId, mapTaskAttemptId, numPartitions);
+    try {
+      if (!records.hasNext()) {
+        partitionLengths = new long[numPartitions];
+        mapOutputWriter.commitAllPartitions();
+        mapStatus = MapStatus$.MODULE$.apply(
+            blockManager.shuffleServerId(),
+            partitionLengths);
+        return;
+      }
+      final SerializerInstance serInstance = serializer.newInstance();
+      final long openStartTime = System.nanoTime();
+      partitionWriters = new DiskBlockObjectWriter[numPartitions];
+      partitionWriterSegments = new FileSegment[numPartitions];
+      for (int i = 0; i < numPartitions; i++) {
+        final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
+            blockManager.diskBlockManager().createTempShuffleBlock();
+        final File file = tempShuffleBlockIdPlusFile._2();
+        final BlockId blockId = tempShuffleBlockIdPlusFile._1();
+        partitionWriters[i] =
+            blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
+      }
+      // Creating the file to write to and creating a disk writer both involve interacting with
+      // the disk, and can take a long time in aggregate when we open many files, so should be
+      // included in the shuffle write time.
+      writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
 
-    for (int i = 0; i < numPartitions; i++) {
-      try (DiskBlockObjectWriter writer = partitionWriters[i]) {
-        partitionWriterSegments[i] = writer.commitAndGet();
+      while (records.hasNext()) {
+        final Product2<K, V> record = records.next();
+        final K key = record._1();
+        partitionWriters[partitioner.getPartition(key)].write(key, record._2());
       }
-    }
 
-    File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
-    File tmp = Utils.tempFileWith(output);
-    try {
-      partitionLengths = writePartitionedFile(tmp);
-      shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
-    } finally {
-      if (tmp.exists() && !tmp.delete()) {
-        logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
+      for (int i = 0; i < numPartitions; i++) {
+        try (DiskBlockObjectWriter writer = partitionWriters[i]) {
+          partitionWriterSegments[i] = writer.commitAndGet();
+        }
       }
+
+      partitionLengths = writePartitionedData(mapOutputWriter);
+      mapOutputWriter.commitAllPartitions();
+      mapStatus = MapStatus$.MODULE$.apply(
+          blockManager.shuffleServerId(),
+          partitionLengths);
+    } catch (Exception e) {
+      try {
+        mapOutputWriter.abort(e);
+      } catch (Exception e2) {
+        logger.error("Failed to abort the writer after failing to write map output.", e2);
 
 Review comment:
   `e.addSuppressed(e2)`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org