You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/04/30 18:39:47 UTC

[GitHub] [spark] vanzin commented on a change in pull request #24499: [SPARK-25888][Core] Serve local disk persisted blocks by the external service after releasing executor by dynamic allocation

vanzin commented on a change in pull request #24499: [SPARK-25888][Core] Serve local disk persisted blocks by the external service after releasing executor by dynamic allocation
URL: https://github.com/apache/spark/pull/24499#discussion_r279872120
 
 

 ##########
 File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
 ##########
 @@ -213,21 +214,42 @@ public ShuffleMetrics() {
   private class ManagedBufferIterator implements Iterator<ManagedBuffer> {
 
     private int index = 0;
-    private final String appId;
-    private final String execId;
-    private final int shuffleId;
-    // An array containing mapId and reduceId pairs.
-    private final int[] mapIdAndReduceIds;
-
-    ManagedBufferIterator(String appId, String execId, String[] blockIds) {
-      this.appId = appId;
-      this.execId = execId;
+    private final Function<Integer, ManagedBuffer> blockDataForIndexFn;
+    private final int size;
+
+    ManagedBufferIterator(final String appId, final String execId, String[] blockIds) {
       String[] blockId0Parts = blockIds[0].split("_");
-      if (blockId0Parts.length != 4 || !blockId0Parts[0].equals("shuffle")) {
-        throw new IllegalArgumentException("Unexpected shuffle block id format: " + blockIds[0]);
+      if (blockId0Parts.length == 4 && blockId0Parts[0].equals("shuffle")) {
+        final int shuffleId = Integer.parseInt(blockId0Parts[1]);
+        final int[] mapIdAndReduceIds = shuffleMapIdAndReduceIds(blockIds, shuffleId);
+        size = mapIdAndReduceIds.length;
+        blockDataForIndexFn = index -> blockManager.getBlockData(appId, execId, shuffleId,
+          mapIdAndReduceIds[index], mapIdAndReduceIds[index + 1]);
+      } else if(blockId0Parts.length == 3 && blockId0Parts[0].equals("rdd")) {
 
 Review comment:
   space after if

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org