You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/08/24 01:05:29 UTC

[GitHub] [spark] rmcyang opened a new pull request, #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

rmcyang opened a new pull request, #37638:
URL: https://github.com/apache/spark/pull/37638

   ### What changes were proposed in this pull request?
   This is one of the patches for SPARK-33235: Push-based Shuffle Improvement Tasks.
   Added a class `PushMergeMetrics`, to collect below metrics from shuffle server side for Push-based shuffle:
   - no opportunity responses
   - too late responses
   - pushed bytes written
   - cached block bytes
   
   ### Why are the changes needed?
   This helps to understand the push based shuffle metrics from shuffle server side.
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   
   ### How was this patch tested?
   Added a method `verifyMetrics` to verify those metrics in existing unit tests.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rmcyang commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
rmcyang commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1060964263


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1904,4 +1951,52 @@ long getPos() {
       return pos;
     }
   }
+
+  /**
+   * A class that wraps all the push-based shuffle service metrics.
+   */
+  static class PushMergeMetrics implements MetricSet {
+    // blockAppendCollisions tracks how many times a shuffle block collided because
+    // of another block for the same reduce partition was being written
+    static final String BLOCK_APPEND_COLLISIONS_METRIC = "blockAppendCollisions";
+    // lateBlockPushes tracks how many times a shuffle block push request is too late
+    static final String LATE_BLOCK_PUSHES_METRIC = "lateBlockPushes";
+    // blockBytesWritten tracks the length of the pushed block data written to file in bytes
+    static final String BLOCK_BYTES_WRITTEN_METRIC = "blockBytesWritten";
+    // deferredBlockBytes tracks the size of the current deferred block parts buffered in memory.
+    static final String DEFERRED_BLOCK_BYTES_METRIC = "deferredBlockBytes";
+    // deferredBlocks tracks the number of deferred blocks got written to the merged shuffle file
+    static final String DEFERRED_BLOCKS_METRIC = "deferredBlocks";
+    // staleBlockPushes tracks how many times a shuffle block push request it stale
+    static final String STALE_BLOCK_PUSHES_METRIC = "staleBlockPushes";

Review Comment:
   Sounds good, I added a few lines describing them under `shuffleService` section, PTAL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rmcyang commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
rmcyang commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1055806229


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1197,15 +1230,15 @@ public void onData(String streamId, ByteBuffer buf) throws IOException {
             appShuffleInfo.shuffles.get(partitionInfo.appAttemptShuffleMergeId.shuffleId);
         if (isStale(info, partitionInfo.appAttemptShuffleMergeId.shuffleMergeId) ||

Review Comment:
   Make sense. Added `staleBlockPushes`, PTAL.
   WIP to add UT.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
otterc commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1003646453


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1904,4 +1941,42 @@ long getPos() {
       return pos;
     }
   }
+
+  /**
+   * A class that wraps all the push-based shuffle service metrics.
+   */
+  static class PushMergeMetrics implements MetricSet {
+    // couldNotFindOpportunityResponses tracks how many times a shuffle block collided because
+    // of another block for the same reduce partition was being written
+    static final String NO_OPPORTUNITY_RESPONSES_METRIC = "couldNotFindOpportunityResponses";
+    // tooLateResponses tracks how many times a shuffle block push request is too late
+    static final String TOO_LATE_RESPONSES_METRIC = "tooLateResponses";
+    // pushedBytesWritten tracks the length of the pushed block data written to file in bytes
+    static final String PUSHED_BYTES_WRITTEN_METRIC = "pushedBytesWritten";
+    // cachedBlocksBytes tracks the size of the current deferred block parts buffered in memory.
+    static final String CACHED_BLOCKS_BYTES_METRIC = "cachedBlocksBytes";

Review Comment:
   The reason to track deferred bytes is because we want to know how much memory can be used up by these blocks. What insight can we derive from tracking the number of these blocks?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r958962174


##########
common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java:
##########
@@ -296,10 +301,15 @@ protected void serviceInit(Configuration externalConf) throws Exception {
           DEFAULT_SPARK_SHUFFLE_SERVICE_METRICS_NAME);
       YarnShuffleServiceMetrics serviceMetrics =
           new YarnShuffleServiceMetrics(metricsNamespace, blockHandler.getAllMetrics());
+      YarnShuffleServiceMetrics mergeManagerMetrics =
+          new YarnShuffleServiceMetrics("mergeManagerMetrics", blockPushResolver.getMetrics());

Review Comment:
   We should not be directly creating `RemoteBlockPushResolver` within service init - it is done via `newMergedShuffleFileManagerInstance`, so that it returns the right instance configured by admin (by default disabled).
   
   We should be adding a `getMetrics` to `MergedShuffleFileManager` to retrieve the metrics we want to register.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rmcyang commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
rmcyang commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1066466470


##########
docs/monitoring.md:
##########
@@ -1421,6 +1421,20 @@ Note: applies to the shuffle service
 - shuffle-server.usedDirectMemory
 - shuffle-server.usedHeapMemory
 
+- Notes: below shuffle service server-side metrics are specific to the Push-Based Shuffle
+(with `spark.shuffle.push.enabled` set as true on the client side, and with the server side
+flag `spark.shuffle.push.server.mergedShuffleFileManagerImpl` set as the appropriate
+org.apache.spark.network.shuffle.MergedShuffleFileManager implementation for the Push-Based
+Shuffle to be enabled)
+  - blockAppendCollisions - the number of shuffle push blocks collided in shuffle services
+    as another block for the same reduce partition were being written
+  - lateBlockPushes - the number of shuffle push blocks that are received in shuffle service
+    after the specific shuffle merge has been finalized
+  - blockBytesWritten - the size of the pushed block data written to file in bytes
+  - deferredBlockBytes - the size of the current deferred block parts buffered in memory
+  - deferredBlocks - the number of the current deferred block parts buffered in memory
+  - staleBlockPushes - the number of stale shuffle block push requests

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rmcyang commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
rmcyang commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1066463478


##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java:
##########
@@ -257,6 +274,7 @@ public void testDuplicateBlocksAreIgnoredWhenPrevStreamIsInProgress() throws IOE
     pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0);
     validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{4}, new int[][]{{0}});
+    verifyMetrics(4, 0, 0, 0, 0, 0);

Review Comment:
   Sounds good, added a new metric `ignoredBlockBytes` to tracks the size of the blocks that are ignored. @mridulm @otterc 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1003635126


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1197,15 +1230,15 @@ public void onData(String streamId, ByteBuffer buf) throws IOException {
             appShuffleInfo.shuffles.get(partitionInfo.appAttemptShuffleMergeId.shuffleId);
         if (isStale(info, partitionInfo.appAttemptShuffleMergeId.shuffleMergeId) ||
             isTooLate(info, partitionInfo.reduceId)) {

Review Comment:
   If too late, mark `lateBlockPushes` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhouyejoe commented on pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
zhouyejoe commented on PR #37638:
URL: https://github.com/apache/spark/pull/37638#issuecomment-1230888830

   There were some test failures in YarnShuffleServiceSuite from some of the UTs adde by me in the NodeManager work preserving restart PR. Reran the PR test job to see whether it is consistent failing. @rmcyang  Are you able to reproduce the UT failures locally? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r981843162


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -593,6 +607,9 @@ public void onData(String streamId, ByteBuffer buf) {
 
         @Override
         public void onComplete(String streamId) {
+          if (isTooLate) {
+            pushMergeMetrics.tooLateResponses.mark();
+          }

Review Comment:
   Why are we doing this as part of `onComplete`, instead of when we detect it is too late ?



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1904,4 +1941,42 @@ long getPos() {
       return pos;
     }
   }
+
+  /**
+   * A class that wraps all the push-based shuffle service metrics.
+   */
+  static class PushMergeMetrics implements MetricSet {
+    // couldNotFindOpportunityResponses tracks how many times a shuffle block collided because
+    // of another block for the same reduce partition was being written
+    static final String NO_OPPORTUNITY_RESPONSES_METRIC = "couldNotFindOpportunityResponses";
+    // tooLateResponses tracks how many times a shuffle block push request is too late
+    static final String TOO_LATE_RESPONSES_METRIC = "tooLateResponses";
+    // pushedBytesWritten tracks the length of the pushed block data written to file in bytes
+    static final String PUSHED_BYTES_WRITTEN_METRIC = "pushedBytesWritten";
+    // cachedBlocksBytes tracks the size of the current deferred block parts buffered in memory.
+    static final String CACHED_BLOCKS_BYTES_METRIC = "cachedBlocksBytes";

Review Comment:
   rename from cached to deferred ?
   `cachedBlocksBytes` -> `deferredBlockBytes`
   
   Additionally, do we want to track the number of blocks which are deferred ?



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1124,9 +1143,23 @@ private boolean isDuplicateBlock() {
      * block parts buffered in memory.
      */
     private void writeDeferredBufs() throws IOException {
+      long totalSize = 0;
       for (ByteBuffer deferredBuf : deferredBufs) {
+        totalSize += deferredBuf.limit();
         writeBuf(deferredBuf);
       }
+      mergeManager.pushMergeMetrics.cachedBlockBytes.dec(totalSize);
+      deferredBufs = null;
+    }
+
+    private void freeDeferredBufs() {
+      if (deferredBufs != null && !deferredBufs.isEmpty()) {
+        long totalSize = 0;
+        for (ByteBuffer deferredBuf : deferredBufs) {
+          totalSize += deferredBuf.limit();
+        }
+        mergeManager.pushMergeMetrics.cachedBlockBytes.dec(totalSize);
+      }
       deferredBufs = null;

Review Comment:
   There are a bunch of other places where we simply drop `deferredBufs` entirely (`abortIfNecessary`, `onData`, etc).
   Given this is tracked as a counter, we should update there as well (since we have already `inc` on it).
   
   Essentially, replace all `deferredBufs = null` with `freeDeferredBufs`



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1904,4 +1941,42 @@ long getPos() {
       return pos;
     }
   }
+
+  /**
+   * A class that wraps all the push-based shuffle service metrics.
+   */
+  static class PushMergeMetrics implements MetricSet {
+    // couldNotFindOpportunityResponses tracks how many times a shuffle block collided because
+    // of another block for the same reduce partition was being written
+    static final String NO_OPPORTUNITY_RESPONSES_METRIC = "couldNotFindOpportunityResponses";

Review Comment:
   If this is a collusion, rename it ?
   Something like `blockAppendCollisions` or some such ?



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1904,4 +1941,42 @@ long getPos() {
       return pos;
     }
   }
+
+  /**
+   * A class that wraps all the push-based shuffle service metrics.
+   */
+  static class PushMergeMetrics implements MetricSet {
+    // couldNotFindOpportunityResponses tracks how many times a shuffle block collided because
+    // of another block for the same reduce partition was being written
+    static final String NO_OPPORTUNITY_RESPONSES_METRIC = "couldNotFindOpportunityResponses";
+    // tooLateResponses tracks how many times a shuffle block push request is too late
+    static final String TOO_LATE_RESPONSES_METRIC = "tooLateResponses";
+    // pushedBytesWritten tracks the length of the pushed block data written to file in bytes
+    static final String PUSHED_BYTES_WRITTEN_METRIC = "pushedBytesWritten";

Review Comment:
   `pushedBytesWritten` -> `blockBytesWritten` ?



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1904,4 +1941,42 @@ long getPos() {
       return pos;
     }
   }
+
+  /**
+   * A class that wraps all the push-based shuffle service metrics.
+   */
+  static class PushMergeMetrics implements MetricSet {
+    // couldNotFindOpportunityResponses tracks how many times a shuffle block collided because
+    // of another block for the same reduce partition was being written
+    static final String NO_OPPORTUNITY_RESPONSES_METRIC = "couldNotFindOpportunityResponses";
+    // tooLateResponses tracks how many times a shuffle block push request is too late
+    static final String TOO_LATE_RESPONSES_METRIC = "tooLateResponses";

Review Comment:
   `lateBlockPushes` or some such.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1038914434


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -593,6 +607,9 @@ public void onData(String streamId, ByteBuffer buf) {
 
         @Override
         public void onComplete(String streamId) {
+          if (isTooLate) {
+            pushMergeMetrics.tooLateResponses.mark();
+          }

Review Comment:
   Any comments on this @otterc, @zhouyejoe ?
   Essentially, we are not marking late block push in case of `onFailure` - any reason for it ?
   
   IMO we should do it for both - and if yes, we can move it out of the callback entirely and do it immediately before the callback construction.
   Pls let me know if I am missing something ! Thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rmcyang commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
rmcyang commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r997733472


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1124,9 +1143,23 @@ private boolean isDuplicateBlock() {
      * block parts buffered in memory.
      */
     private void writeDeferredBufs() throws IOException {
+      long totalSize = 0;
       for (ByteBuffer deferredBuf : deferredBufs) {
+        totalSize += deferredBuf.limit();
         writeBuf(deferredBuf);
       }
+      mergeManager.pushMergeMetrics.cachedBlockBytes.dec(totalSize);
+      deferredBufs = null;
+    }
+
+    private void freeDeferredBufs() {
+      if (deferredBufs != null && !deferredBufs.isEmpty()) {
+        long totalSize = 0;
+        for (ByteBuffer deferredBuf : deferredBufs) {
+          totalSize += deferredBuf.limit();
+        }
+        mergeManager.pushMergeMetrics.cachedBlockBytes.dec(totalSize);
+      }
       deferredBufs = null;

Review Comment:
   Makes sense. Have checked all the places and made the replacement, it seems just applies to `abortIfNecessary` and `onData`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1057885497


##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java:
##########
@@ -257,6 +274,7 @@ public void testDuplicateBlocksAreIgnoredWhenPrevStreamIsInProgress() throws IOE
     pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0);
     validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{4}, new int[][]{{0}});
+    verifyMetrics(4, 0, 0, 0, 0, 0);

Review Comment:
   We should be capturing this in metrics - essentially streams which were ignored due to a speculative task writing concurrently. Same for `testDuplicateBlocksAreIgnoredWhenPrevStreamHasCompleted`
   We can add it as a follow up though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rmcyang commented on pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
rmcyang commented on PR #37638:
URL: https://github.com/apache/spark/pull/37638#issuecomment-1225047079

   cc @zhouyejoe @otterc @mridulm @xkrogen. Please take a look, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rmcyang commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
rmcyang commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1034531438


##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java:
##########
@@ -1415,6 +1440,24 @@ private void pushBlockHelper(
     }
   }
 
+  private void verifyMetrics(

Review Comment:
   Sure, I enhanced the UTs `testDeferredBufsAreWrittenDuringOnData` and `testDeferredBufsAreWrittenDuringOnComplete` to cover the test for `expectedCachedBlocksBytes` since the UTs produced the cached block bytes. Thanks for pointing this out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rmcyang commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
rmcyang commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r997736011


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1124,9 +1143,23 @@ private boolean isDuplicateBlock() {
      * block parts buffered in memory.
      */
     private void writeDeferredBufs() throws IOException {
+      long totalSize = 0;
       for (ByteBuffer deferredBuf : deferredBufs) {
+        totalSize += deferredBuf.limit();
         writeBuf(deferredBuf);
       }
+      mergeManager.pushMergeMetrics.cachedBlockBytes.dec(totalSize);
+      deferredBufs = null;
+    }
+
+    private void freeDeferredBufs() {
+      if (deferredBufs != null && !deferredBufs.isEmpty()) {
+        long totalSize = 0;
+        for (ByteBuffer deferredBuf : deferredBufs) {
+          totalSize += deferredBuf.limit();
+        }
+        mergeManager.pushMergeMetrics.cachedBlockBytes.dec(totalSize);
+      }
       deferredBufs = null;

Review Comment:
   Makes sense. Have checked all the places and made the replacement, looks like the remaining places that need it are only `abortIfNecessary` and `onData`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rmcyang commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
rmcyang commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r997761687


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1904,4 +1941,42 @@ long getPos() {
       return pos;
     }
   }
+
+  /**
+   * A class that wraps all the push-based shuffle service metrics.
+   */
+  static class PushMergeMetrics implements MetricSet {
+    // couldNotFindOpportunityResponses tracks how many times a shuffle block collided because
+    // of another block for the same reduce partition was being written
+    static final String NO_OPPORTUNITY_RESPONSES_METRIC = "couldNotFindOpportunityResponses";
+    // tooLateResponses tracks how many times a shuffle block push request is too late
+    static final String TOO_LATE_RESPONSES_METRIC = "tooLateResponses";
+    // pushedBytesWritten tracks the length of the pushed block data written to file in bytes
+    static final String PUSHED_BYTES_WRITTEN_METRIC = "pushedBytesWritten";
+    // cachedBlocksBytes tracks the size of the current deferred block parts buffered in memory.
+    static final String CACHED_BLOCKS_BYTES_METRIC = "cachedBlocksBytes";

Review Comment:
   > Additionally, do we want to track the number of blocks which are deferred ?
   
   Not sure if this would be a bit redundant, given we have tracked the deferred blocks in bytes, @otterc thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r958962174


##########
common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java:
##########
@@ -296,10 +301,15 @@ protected void serviceInit(Configuration externalConf) throws Exception {
           DEFAULT_SPARK_SHUFFLE_SERVICE_METRICS_NAME);
       YarnShuffleServiceMetrics serviceMetrics =
           new YarnShuffleServiceMetrics(metricsNamespace, blockHandler.getAllMetrics());
+      YarnShuffleServiceMetrics mergeManagerMetrics =
+          new YarnShuffleServiceMetrics("mergeManagerMetrics", blockPushResolver.getMetrics());

Review Comment:
   We should not be directly creating `RemoteBlockPushResolver` within service init - it is done via `newMergedShuffleFileManagerInstance`, so that it returns the right instance configured by admin (by default disabled).
   
   We should be adding a `getMetrics` to `MergedShuffleFileManager` to retrieve the metrics we want to register.
   
   Something like this in `MergedShuffleFileManager`
   ```
   
     default MetricSet getMetrics() {
       return Collections::emptyMap;
     }
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rmcyang commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
rmcyang commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1055805408


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1904,4 +1941,42 @@ long getPos() {
       return pos;
     }
   }
+
+  /**
+   * A class that wraps all the push-based shuffle service metrics.
+   */
+  static class PushMergeMetrics implements MetricSet {
+    // couldNotFindOpportunityResponses tracks how many times a shuffle block collided because
+    // of another block for the same reduce partition was being written
+    static final String NO_OPPORTUNITY_RESPONSES_METRIC = "couldNotFindOpportunityResponses";
+    // tooLateResponses tracks how many times a shuffle block push request is too late
+    static final String TOO_LATE_RESPONSES_METRIC = "tooLateResponses";
+    // pushedBytesWritten tracks the length of the pushed block data written to file in bytes
+    static final String PUSHED_BYTES_WRITTEN_METRIC = "pushedBytesWritten";
+    // cachedBlocksBytes tracks the size of the current deferred block parts buffered in memory.
+    static final String CACHED_BLOCKS_BYTES_METRIC = "cachedBlocksBytes";

Review Comment:
   Thanks for the suggestion. Added `deferredBlocks`, PTAL.
   WIP on adding UT.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rmcyang commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
rmcyang commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1060968735


##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java:
##########
@@ -257,6 +274,7 @@ public void testDuplicateBlocksAreIgnoredWhenPrevStreamIsInProgress() throws IOE
     pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0);
     validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{4}, new int[][]{{0}});
+    verifyMetrics(4, 0, 0, 0, 0, 0);

Review Comment:
   > We should be capturing this in metrics
   
   Not sure if I'm following here, could you please elaborate? Thanks @mridulm !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on PR #37638:
URL: https://github.com/apache/spark/pull/37638#issuecomment-1225785587

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1003656427


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -593,6 +607,9 @@ public void onData(String streamId, ByteBuffer buf) {
 
         @Override
         public void onComplete(String streamId) {
+          if (isTooLate) {
+            pushMergeMetrics.tooLateResponses.mark();
+          }

Review Comment:
   Yes, but is there a reason to defer updating the metric to later ? It is clear at time of callback construction itself that it is late.
   
   I am not sure if we want to make the distinction for late w.r.t onComplete vs onFailure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rmcyang commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
rmcyang commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r997733472


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1124,9 +1143,23 @@ private boolean isDuplicateBlock() {
      * block parts buffered in memory.
      */
     private void writeDeferredBufs() throws IOException {
+      long totalSize = 0;
       for (ByteBuffer deferredBuf : deferredBufs) {
+        totalSize += deferredBuf.limit();
         writeBuf(deferredBuf);
       }
+      mergeManager.pushMergeMetrics.cachedBlockBytes.dec(totalSize);
+      deferredBufs = null;
+    }
+
+    private void freeDeferredBufs() {
+      if (deferredBufs != null && !deferredBufs.isEmpty()) {
+        long totalSize = 0;
+        for (ByteBuffer deferredBuf : deferredBufs) {
+          totalSize += deferredBuf.limit();
+        }
+        mergeManager.pushMergeMetrics.cachedBlockBytes.dec(totalSize);
+      }
       deferredBufs = null;

Review Comment:
   Makes sense. Have checked all the places and made the replacement, it seems just applies to `abortIfNecessary` and `onData`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1057885497


##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java:
##########
@@ -257,6 +274,7 @@ public void testDuplicateBlocksAreIgnoredWhenPrevStreamIsInProgress() throws IOE
     pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0);
     validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{4}, new int[][]{{0}});
+    verifyMetrics(4, 0, 0, 0, 0, 0);

Review Comment:
   We should be capturing this in metrics - essentially streams which were ignored due to a speculative task writing concurrently.
   We can add it as a follow up though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rmcyang commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
rmcyang commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1063902183


##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java:
##########
@@ -257,6 +274,7 @@ public void testDuplicateBlocksAreIgnoredWhenPrevStreamIsInProgress() throws IOE
     pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0);
     validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{4}, new int[][]{{0}});
+    verifyMetrics(4, 0, 0, 0, 0, 0);

Review Comment:
   Thanks @mridulm. So you are suggesting to add a new metric to capture size/byte of pushed blocks that got ignored, right? If so, I'm trying to figure out what insights we could get if we add number/size of ignored blocks, thoughts? ++ @otterc 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rmcyang commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
rmcyang commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1067643931


##########
docs/monitoring.md:
##########
@@ -1421,6 +1421,21 @@ Note: applies to the shuffle service
 - shuffle-server.usedDirectMemory
 - shuffle-server.usedHeapMemory
 
+Note: applies to the shuffle service when the server side flag
+`spark.shuffle.push.server.mergedShuffleFileManagerImpl` set as the appropriate
+org.apache.spark.network.shuffle.MergedShuffleFileManager implementation for the Push-Based Shuffle
+
+- blockAppendCollisions - the number of shuffle push blocks collided in shuffle services
+  as another block for the same reduce partition were being written
+- lateBlockPushes - the number of shuffle push blocks that are received in shuffle service
+  after the specific shuffle merge has been finalized
+- blockBytesWritten - the size of the pushed block data written to file in bytes
+- deferredBlockBytes - the size of the current deferred block parts buffered in memory
+- deferredBlocks - the number of the current deferred block parts buffered in memory
+- staleBlockPushes - the number of stale shuffle block push requests
+- ignoredBlockBytes - the size of the pushed block data that are ignored after the shuffle
+  file is finalized or when a request is for a duplicate block

Review Comment:
   Sounds good. Current preview:
   ![Screen Shot 2023-01-11 at 6 31 34 PM](https://user-images.githubusercontent.com/31781684/211961919-efa4426a-df6f-43b7-8e07-32a455b2bb5f.png)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rmcyang commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
rmcyang commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1066281891


##########
docs/monitoring.md:
##########
@@ -1403,6 +1403,15 @@ Note: applies to the shuffle service
 - shuffle-server.usedDirectMemory
 - shuffle-server.usedHeapMemory
 
+Below shuffle service server-side metrics are specific to the Push-Based Shuffle
+

Review Comment:
   Sure, below is the preview after the change, PTAL @zhouyejoe.
   ![Screen Shot 2023-01-10 at 12 15 47 PM](https://user-images.githubusercontent.com/31781684/211652893-14f01968-642c-4965-962b-af0c82d486db.png)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1050427667


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -593,6 +607,9 @@ public void onData(String streamId, ByteBuffer buf) {
 
         @Override
         public void onComplete(String streamId) {
+          if (isTooLate) {
+            pushMergeMetrics.tooLateResponses.mark();
+          }

Review Comment:
   Can you make this change @rmcyang  ? Thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1003638095


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1197,15 +1230,15 @@ public void onData(String streamId, ByteBuffer buf) throws IOException {
             appShuffleInfo.shuffles.get(partitionInfo.appAttemptShuffleMergeId.shuffleId);
         if (isStale(info, partitionInfo.appAttemptShuffleMergeId.shuffleMergeId) ||

Review Comment:
   QQ: Do we want to capture stale pushes ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1003562799


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -593,6 +607,9 @@ public void onData(String streamId, ByteBuffer buf) {
 
         @Override
         public void onComplete(String streamId) {
+          if (isTooLate) {
+            pushMergeMetrics.tooLateResponses.mark();
+          }

Review Comment:
   That looks like an impl detail in li-2.3.0 branch - any reason not to update this sooner ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1067641509


##########
docs/monitoring.md:
##########
@@ -1421,6 +1421,21 @@ Note: applies to the shuffle service
 - shuffle-server.usedDirectMemory
 - shuffle-server.usedHeapMemory
 
+Note: applies to the shuffle service when the server side flag
+`spark.shuffle.push.server.mergedShuffleFileManagerImpl` set as the appropriate
+org.apache.spark.network.shuffle.MergedShuffleFileManager implementation for the Push-Based Shuffle
+
+- blockAppendCollisions - the number of shuffle push blocks collided in shuffle services
+  as another block for the same reduce partition were being written
+- lateBlockPushes - the number of shuffle push blocks that are received in shuffle service
+  after the specific shuffle merge has been finalized
+- blockBytesWritten - the size of the pushed block data written to file in bytes
+- deferredBlockBytes - the size of the current deferred block parts buffered in memory
+- deferredBlocks - the number of the current deferred block parts buffered in memory
+- staleBlockPushes - the number of stale shuffle block push requests
+- ignoredBlockBytes - the size of the pushed block data that are ignored after the shuffle
+  file is finalized or when a request is for a duplicate block

Review Comment:
   Can we add a new line separation from the earlier list (before the `note: the metrics ...`) ?
   All metrics following the note are related to push based shuffle - would be good to visually group them together.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1057887707


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1904,4 +1951,52 @@ long getPos() {
       return pos;
     }
   }
+
+  /**
+   * A class that wraps all the push-based shuffle service metrics.
+   */
+  static class PushMergeMetrics implements MetricSet {
+    // blockAppendCollisions tracks how many times a shuffle block collided because
+    // of another block for the same reduce partition was being written
+    static final String BLOCK_APPEND_COLLISIONS_METRIC = "blockAppendCollisions";
+    // lateBlockPushes tracks how many times a shuffle block push request is too late
+    static final String LATE_BLOCK_PUSHES_METRIC = "lateBlockPushes";
+    // blockBytesWritten tracks the length of the pushed block data written to file in bytes
+    static final String BLOCK_BYTES_WRITTEN_METRIC = "blockBytesWritten";
+    // deferredBlockBytes tracks the size of the current deferred block parts buffered in memory.
+    static final String DEFERRED_BLOCK_BYTES_METRIC = "deferredBlockBytes";
+    // deferredBlocks tracks the number of deferred blocks got written to the merged shuffle file
+    static final String DEFERRED_BLOCKS_METRIC = "deferredBlocks";
+    // staleBlockPushes tracks how many times a shuffle block push request it stale
+    static final String STALE_BLOCK_PUSHES_METRIC = "staleBlockPushes";

Review Comment:
   We should be documenting these in `monitoring.md`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhouyejoe commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
zhouyejoe commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1065234001


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1904,4 +1950,52 @@ long getPos() {
       return pos;
     }
   }
+
+  /**
+   * A class that wraps all the push-based shuffle service metrics.
+   */
+  static class PushMergeMetrics implements MetricSet {
+    // blockAppendCollisions tracks how many times a shuffle block collided because
+    // of another block for the same reduce partition was being written
+    static final String BLOCK_APPEND_COLLISIONS_METRIC = "blockAppendCollisions";
+    // lateBlockPushes tracks how many times a shuffle block push request is too late

Review Comment:
   Nit: "how many times a shuffle block push request is too late" -----> "the number of shuffle push blocks that are received in shuffle service after the specific shuffle merge has been finalized". Please also update the wording in monitoring.md. Same for other wording suggestions below



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1904,4 +1950,52 @@ long getPos() {
       return pos;
     }
   }
+
+  /**
+   * A class that wraps all the push-based shuffle service metrics.
+   */
+  static class PushMergeMetrics implements MetricSet {
+    // blockAppendCollisions tracks how many times a shuffle block collided because
+    // of another block for the same reduce partition was being written

Review Comment:
   Nit: "how many times a shuffle block collided because of another block for the same reduce partition was being written" to "the number of shuffle push blocks collided in shuffle services as blocks for the same reduce partitions were being written".



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1904,4 +1950,52 @@ long getPos() {
       return pos;
     }
   }
+
+  /**
+   * A class that wraps all the push-based shuffle service metrics.
+   */
+  static class PushMergeMetrics implements MetricSet {
+    // blockAppendCollisions tracks how many times a shuffle block collided because
+    // of another block for the same reduce partition was being written
+    static final String BLOCK_APPEND_COLLISIONS_METRIC = "blockAppendCollisions";
+    // lateBlockPushes tracks how many times a shuffle block push request is too late
+    static final String LATE_BLOCK_PUSHES_METRIC = "lateBlockPushes";
+    // blockBytesWritten tracks the length of the pushed block data written to file in bytes
+    static final String BLOCK_BYTES_WRITTEN_METRIC = "blockBytesWritten";
+    // deferredBlockBytes tracks the size of the current deferred block parts buffered in memory
+    static final String DEFERRED_BLOCK_BYTES_METRIC = "deferredBlockBytes";
+    // deferredBlocks tracks the number of the current deferred block parts buffered in memory
+    static final String DEFERRED_BLOCKS_METRIC = "deferredBlocks";
+    // staleBlockPushes tracks how many times a shuffle block push request it stale

Review Comment:
   Nit: "how many times a shuffle block push request it stale" to "the number of stale shuffle block push requests"



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1904,4 +1950,52 @@ long getPos() {
       return pos;
     }
   }
+
+  /**
+   * A class that wraps all the push-based shuffle service metrics.
+   */
+  static class PushMergeMetrics implements MetricSet {
+    // blockAppendCollisions tracks how many times a shuffle block collided because
+    // of another block for the same reduce partition was being written
+    static final String BLOCK_APPEND_COLLISIONS_METRIC = "blockAppendCollisions";
+    // lateBlockPushes tracks how many times a shuffle block push request is too late
+    static final String LATE_BLOCK_PUSHES_METRIC = "lateBlockPushes";
+    // blockBytesWritten tracks the length of the pushed block data written to file in bytes

Review Comment:
   Nit: "the length" to "the size"



##########
docs/monitoring.md:
##########
@@ -1403,6 +1403,15 @@ Note: applies to the shuffle service
 - shuffle-server.usedDirectMemory
 - shuffle-server.usedHeapMemory
 
+Below shuffle service server-side metrics are specific to the Push-Based Shuffle
+

Review Comment:
   The format here is problematic.
   If it is note, we need to just use a single line start with "- **notes:** ".  We also need to mention the configuration needed to enable these metrics.
   ```suggestion
   - shuffle-server.usedDirectMemory
   - shuffle-server.usedHeapMemory
   - **notes:** 
     - The metrics below are specific to push-based shuffle and only emitted when 
     `spark.shuffle.push.server.mergedShuffleFileManagerImpl` is configured as 
     "org.apache.spark.network.shuffle.RemoteBlockPushResolver".
   - blockAppendCollisions ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on PR #37638:
URL: https://github.com/apache/spark/pull/37638#issuecomment-1260268934

   +CC @otterc, @Ngone51 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rmcyang commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
rmcyang commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1060951175


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1220,6 +1260,7 @@ public void onData(String streamId, ByteBuffer buf) throws IOException {
           try {
             if (deferredBufs != null && !deferredBufs.isEmpty()) {
               writeDeferredBufs();
+              mergeManager.pushMergeMetrics.deferredBlocks.mark();

Review Comment:
   Sure. In addition to the proposal here, to keep the `deferredBlockBytes` and `deferredBlocks ` fully in sync, I also went ahead and decrease the `deferredBlocks` whenever there is a `deferredBlockBytes.dec()` and accommodate the UTs, please take a look.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rmcyang commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
rmcyang commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1046495068


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1197,15 +1230,15 @@ public void onData(String streamId, ByteBuffer buf) throws IOException {
             appShuffleInfo.shuffles.get(partitionInfo.appAttemptShuffleMergeId.shuffleId);
         if (isStale(info, partitionInfo.appAttemptShuffleMergeId.shuffleMergeId) ||
             isTooLate(info, partitionInfo.reduceId)) {

Review Comment:
   Yes, we should mark `lateBlockPushes` here, thanks for the suggestion!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1057886779


##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java:
##########
@@ -235,6 +251,7 @@ public void testDuplicateBlocksAreIgnoredWhenPrevStreamHasCompleted() throws IOE
     pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0);
     validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{4}, new int[][]{{0}});
+    verifyMetrics(4, 0, 0, 0, 0, 0);

Review Comment:
   This will change with [the comment ](https://github.com/apache/spark/pull/37638#discussion_r1057874346) above ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on PR #37638:
URL: https://github.com/apache/spark/pull/37638#issuecomment-1373319429

   Can you fix the linter error @thejdeep ?
   ` src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:[1281] (sizes) LineLength: Line is longer than 100 characters (found 104).`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rmcyang commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
rmcyang commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1066466341


##########
docs/monitoring.md:
##########
@@ -1421,6 +1421,20 @@ Note: applies to the shuffle service
 - shuffle-server.usedDirectMemory
 - shuffle-server.usedHeapMemory
 
+- Notes: below shuffle service server-side metrics are specific to the Push-Based Shuffle
+(with `spark.shuffle.push.enabled` set as true on the client side, and with the server side
+flag `spark.shuffle.push.server.mergedShuffleFileManagerImpl` set as the appropriate
+org.apache.spark.network.shuffle.MergedShuffleFileManager implementation for the Push-Based
+Shuffle to be enabled)

Review Comment:
   I removed the client side config in the description. As for the format for note, I'm following the format at L1408, L1399, L1392, and L1384. This is the preview after the change:
   ![Screen Shot 2023-01-10 at 4 23 19 PM](https://user-images.githubusercontent.com/31781684/211689574-adf92b25-b0b7-406d-9dc4-847e64a09afd.png)
   Let me know if you have further input, thanks @zhouyejoe !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1067505986


##########
docs/monitoring.md:
##########
@@ -1421,6 +1421,21 @@ Note: applies to the shuffle service
 - shuffle-server.usedDirectMemory
 - shuffle-server.usedHeapMemory
 
+Note: applies to the shuffle service when the server side flag
+`spark.shuffle.push.server.mergedShuffleFileManagerImpl` set as the appropriate
+org.apache.spark.network.shuffle.MergedShuffleFileManager implementation for the Push-Based Shuffle
+
+- blockAppendCollisions - the number of shuffle push blocks collided in shuffle services
+  as another block for the same reduce partition were being written
+- lateBlockPushes - the number of shuffle push blocks that are received in shuffle service
+  after the specific shuffle merge has been finalized
+- blockBytesWritten - the size of the pushed block data written to file in bytes
+- deferredBlockBytes - the size of the current deferred block parts buffered in memory
+- deferredBlocks - the number of the current deferred block parts buffered in memory
+- staleBlockPushes - the number of stale shuffle block push requests
+- ignoredBlockBytes - the size of the pushed block data that are ignored after the shuffle
+  file is finalized or when a request is for a duplicate block

Review Comment:
   nit: 
   Can we reorder this so that we go from the common metrics to the more corner cases ?
   For example, something like:
   
   * blockBytesWritten
   * blockAppendCollisions
   * lateBlockPushes
   * staleBlockPushes
   * deferredBlocks
   * deferredBlockBytes
   * ignoredBlockBytes
   
   (If this is not the right order, please feel free to reorder them)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhouyejoe commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
zhouyejoe commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1067596021


##########
docs/monitoring.md:
##########
@@ -1421,6 +1421,21 @@ Note: applies to the shuffle service
 - shuffle-server.usedDirectMemory
 - shuffle-server.usedHeapMemory
 
+Note: applies to the shuffle service when the server side flag
+`spark.shuffle.push.server.mergedShuffleFileManagerImpl` set as the appropriate
+org.apache.spark.network.shuffle.MergedShuffleFileManager implementation for the Push-Based Shuffle
+
+- blockAppendCollisions - the number of shuffle push blocks collided in shuffle services
+  as another block for the same reduce partition were being written
+- lateBlockPushes - the number of shuffle push blocks that are received in shuffle service
+  after the specific shuffle merge has been finalized
+- blockBytesWritten - the size of the pushed block data written to file in bytes
+- deferredBlockBytes - the size of the current deferred block parts buffered in memory
+- deferredBlocks - the number of the current deferred block parts buffered in memory
+- staleBlockPushes - the number of stale shuffle block push requests
+- ignoredBlockBytes - the size of the pushed block data that are ignored after the shuffle
+  file is finalized or when a request is for a duplicate block

Review Comment:
   LGTM.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1003564958


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1904,4 +1941,42 @@ long getPos() {
       return pos;
     }
   }
+
+  /**
+   * A class that wraps all the push-based shuffle service metrics.
+   */
+  static class PushMergeMetrics implements MetricSet {
+    // couldNotFindOpportunityResponses tracks how many times a shuffle block collided because
+    // of another block for the same reduce partition was being written
+    static final String NO_OPPORTUNITY_RESPONSES_METRIC = "couldNotFindOpportunityResponses";
+    // tooLateResponses tracks how many times a shuffle block push request is too late
+    static final String TOO_LATE_RESPONSES_METRIC = "tooLateResponses";
+    // pushedBytesWritten tracks the length of the pushed block data written to file in bytes
+    static final String PUSHED_BYTES_WRITTEN_METRIC = "pushedBytesWritten";
+    // cachedBlocksBytes tracks the size of the current deferred block parts buffered in memory.
+    static final String CACHED_BLOCKS_BYTES_METRIC = "cachedBlocksBytes";

Review Comment:
   There is a correlation between number of deferred blocks and amount of bytes deferred - but they are not necessarily the same, and also are influenced by the spark application configuration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1038914958


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1197,15 +1230,15 @@ public void onData(String streamId, ByteBuffer buf) throws IOException {
             appShuffleInfo.shuffles.get(partitionInfo.appAttemptShuffleMergeId.shuffleId);
         if (isStale(info, partitionInfo.appAttemptShuffleMergeId.shuffleMergeId) ||
             isTooLate(info, partitionInfo.reduceId)) {

Review Comment:
   Any thoughts ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rmcyang commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
rmcyang commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1066281891


##########
docs/monitoring.md:
##########
@@ -1403,6 +1403,15 @@ Note: applies to the shuffle service
 - shuffle-server.usedDirectMemory
 - shuffle-server.usedHeapMemory
 
+Below shuffle service server-side metrics are specific to the Push-Based Shuffle
+

Review Comment:
   Sure, below is the preview after the change, PTAL.
   ![Screen Shot 2023-01-10 at 12 15 47 PM](https://user-images.githubusercontent.com/31781684/211652893-14f01968-642c-4965-962b-af0c82d486db.png)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rmcyang commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
rmcyang commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1067534717


##########
docs/monitoring.md:
##########
@@ -1421,6 +1421,21 @@ Note: applies to the shuffle service
 - shuffle-server.usedDirectMemory
 - shuffle-server.usedHeapMemory
 
+Note: applies to the shuffle service when the server side flag
+`spark.shuffle.push.server.mergedShuffleFileManagerImpl` set as the appropriate
+org.apache.spark.network.shuffle.MergedShuffleFileManager implementation for the Push-Based Shuffle
+
+- blockAppendCollisions - the number of shuffle push blocks collided in shuffle services
+  as another block for the same reduce partition were being written
+- lateBlockPushes - the number of shuffle push blocks that are received in shuffle service
+  after the specific shuffle merge has been finalized
+- blockBytesWritten - the size of the pushed block data written to file in bytes
+- deferredBlockBytes - the size of the current deferred block parts buffered in memory
+- deferredBlocks - the number of the current deferred block parts buffered in memory
+- staleBlockPushes - the number of stale shuffle block push requests
+- ignoredBlockBytes - the size of the pushed block data that are ignored after the shuffle
+  file is finalized or when a request is for a duplicate block

Review Comment:
   Thanks for the suggestion, I further put the `staleBlockPushes` to the middle of `deferredBlockBytes ` and `ignoredBlockBytes`, as a stale block push appears in the case of indeterminate stage retries, which I feel is less common to deferred block. Let me know if that's not right. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhouyejoe commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
zhouyejoe commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1066439800


##########
docs/monitoring.md:
##########
@@ -1421,6 +1421,20 @@ Note: applies to the shuffle service
 - shuffle-server.usedDirectMemory
 - shuffle-server.usedHeapMemory
 
+- Notes: below shuffle service server-side metrics are specific to the Push-Based Shuffle
+(with `spark.shuffle.push.enabled` set as true on the client side, and with the server side
+flag `spark.shuffle.push.server.mergedShuffleFileManagerImpl` set as the appropriate
+org.apache.spark.network.shuffle.MergedShuffleFileManager implementation for the Push-Based
+Shuffle to be enabled)

Review Comment:
   These metrics are not related to client side configurations, as if push is not enabled in client side, these metrics will still be shown up, but all 0.  Once server side configuration is set correctly to MergedShuffleFileManager, these metrics will be shown up.
   
   ```suggestion
   - **notes:** The metrics below are specific to push-based shuffle and only emitted when 
     `spark.shuffle.push.server.mergedShuffleFileManagerImpl` is configured as 
     "org.apache.spark.network.shuffle.RemoteBlockPushResolver".
   ```
   Please refer to other notes format in this file. You do need to follow the format there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1057878241


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1220,6 +1260,7 @@ public void onData(String streamId, ByteBuffer buf) throws IOException {
           try {
             if (deferredBufs != null && !deferredBufs.isEmpty()) {
               writeDeferredBufs();
+              mergeManager.pushMergeMetrics.deferredBlocks.mark();

Review Comment:
   This is when the deferred block is getting written out.
   Instead, we should be incrementing it when the deferred block is getting added.
   Move it to where we increment `deferredBlockBytes` below ?
   This will keep the metrics in sync - else we can have a large `deferredBlockBytes`, but very low `deferredBlockBytes`.
   
   An example of this is in the test case [here](https://github.com/apache/spark/pull/37638/files#diff-955b4eb8f495555d5e188ed93a4acf5393e1d9c7855dffddedb5e9201eebdef2R193) - the deferred blocks should be `1`m but is `0`



##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java:
##########
@@ -184,6 +190,10 @@ public void testDeferredBufsAreWrittenDuringOnData() throws IOException {
         new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 1, 0, 0));
     // This should be deferred
     stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[3]));
+    verifyMetrics(2, 0, 0, 3, 0, 0);

Review Comment:
   I have commented about this earlier in `RemoteBlockPushResolver` - and this will change when it is addressed.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -568,6 +580,12 @@ public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
     // getting killed. When this happens, we need to distinguish the duplicate blocks as they
     // arrive. More details on this is explained in later comments.
 
+    // Track if the block is received after shuffle merge finalize. The block would be considered
+    // as too late if it received after shuffle merge finalize, and hence mark it as a late block
+    // push to the pushMergeMetrics
+    if (partitionInfoBeforeCheck == null) {
+      pushMergeMetrics.lateBlockPushes.mark();
+    }

Review Comment:
   Even if `partitionInfoBeforeCheck.mapTracker.contains(msg.mapIndex)`, it would be considered late.
   Move `pushMergeMetrics.lateBlockPushes.mark();` to the `else` block below (before `new StreamCallbackWithID`) ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
otterc commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1046545776


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -593,6 +607,9 @@ public void onData(String streamId, ByteBuffer buf) {
 
         @Override
         public void onComplete(String streamId) {
+          if (isTooLate) {
+            pushMergeMetrics.tooLateResponses.mark();
+          }

Review Comment:
   Actually in our 2.3 version, we were incrementing this metric whenever, the server was responding to the client that the block is tooLate. In `onFailure` we don't respond to the client with `TooLate`. However, these metrics are the server's view so I think you are right. We should increment outside the callback.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rmcyang commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
rmcyang commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1046602772


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1197,15 +1230,15 @@ public void onData(String streamId, ByteBuffer buf) throws IOException {
             appShuffleInfo.shuffles.get(partitionInfo.appAttemptShuffleMergeId.shuffleId);
         if (isStale(info, partitionInfo.appAttemptShuffleMergeId.shuffleMergeId) ||

Review Comment:
   Thanks for the suggestion. It seems that a stale block push can appear during indeterminate stage retries ([reference](https://github.com/apache/spark/blob/05cd5f97c3dea25dacdbdb319243cdab9667c774/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java#L132)). Not sure how necessary capturing this would be, it can be captured similar to capturing `lateBlockPushes` @mridulm .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1038914754


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1904,4 +1941,42 @@ long getPos() {
       return pos;
     }
   }
+
+  /**
+   * A class that wraps all the push-based shuffle service metrics.
+   */
+  static class PushMergeMetrics implements MetricSet {
+    // couldNotFindOpportunityResponses tracks how many times a shuffle block collided because
+    // of another block for the same reduce partition was being written
+    static final String NO_OPPORTUNITY_RESPONSES_METRIC = "couldNotFindOpportunityResponses";
+    // tooLateResponses tracks how many times a shuffle block push request is too late
+    static final String TOO_LATE_RESPONSES_METRIC = "tooLateResponses";
+    // pushedBytesWritten tracks the length of the pushed block data written to file in bytes
+    static final String PUSHED_BYTES_WRITTEN_METRIC = "pushedBytesWritten";
+    // cachedBlocksBytes tracks the size of the current deferred block parts buffered in memory.
+    static final String CACHED_BLOCKS_BYTES_METRIC = "cachedBlocksBytes";

Review Comment:
   Can we add `deferredBlocks` as well ? Which track the number of deferred blocks (in addition to the size you had added support for here) ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1038914901


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1197,15 +1230,15 @@ public void onData(String streamId, ByteBuffer buf) throws IOException {
             appShuffleInfo.shuffles.get(partitionInfo.appAttemptShuffleMergeId.shuffleId);
         if (isStale(info, partitionInfo.appAttemptShuffleMergeId.shuffleMergeId) ||

Review Comment:
   Any thoughts ? Thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
otterc commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1066209617


##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java:
##########
@@ -257,6 +274,7 @@ public void testDuplicateBlocksAreIgnoredWhenPrevStreamIsInProgress() throws IOE
     pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0);
     validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{4}, new int[][]{{0}});
+    verifyMetrics(4, 0, 0, 0, 0, 0);

Review Comment:
   Capturing size/bytes of push blocks that get ignore can help with any improvements that we make to the shuffle service so I think it is a good idea to add the metrics for that as well. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhouyejoe commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
zhouyejoe commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1066440115


##########
docs/monitoring.md:
##########
@@ -1421,6 +1421,20 @@ Note: applies to the shuffle service
 - shuffle-server.usedDirectMemory
 - shuffle-server.usedHeapMemory
 
+- Notes: below shuffle service server-side metrics are specific to the Push-Based Shuffle
+(with `spark.shuffle.push.enabled` set as true on the client side, and with the server side
+flag `spark.shuffle.push.server.mergedShuffleFileManagerImpl` set as the appropriate
+org.apache.spark.network.shuffle.MergedShuffleFileManager implementation for the Push-Based
+Shuffle to be enabled)
+  - blockAppendCollisions - the number of shuffle push blocks collided in shuffle services
+    as another block for the same reduce partition were being written
+  - lateBlockPushes - the number of shuffle push blocks that are received in shuffle service
+    after the specific shuffle merge has been finalized
+  - blockBytesWritten - the size of the pushed block data written to file in bytes
+  - deferredBlockBytes - the size of the current deferred block parts buffered in memory
+  - deferredBlocks - the number of the current deferred block parts buffered in memory
+  - staleBlockPushes - the number of stale shuffle block push requests

Review Comment:
   Remove the indent. These metrics should be within the same level as other metrics in shuffle service.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1063072468


##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java:
##########
@@ -257,6 +274,7 @@ public void testDuplicateBlocksAreIgnoredWhenPrevStreamIsInProgress() throws IOE
     pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0);
     validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{4}, new int[][]{{0}});
+    verifyMetrics(4, 0, 0, 0, 0, 0);

Review Comment:
   The specific testcase here is, we have duplicate blocks being sent (via speculative execution for example), and so we want to ignore data from the "newer" push. For example, here we stream2 would be ignored. The metrics are not capturing it - and so we see the same result in metrics as what we see for a successful push (what we would see for `testBasicBlockMerge`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] asfgit closed pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle
URL: https://github.com/apache/spark/pull/37638


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1050430333


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1197,15 +1230,15 @@ public void onData(String streamId, ByteBuffer buf) throws IOException {
             appShuffleInfo.shuffles.get(partitionInfo.appAttemptShuffleMergeId.shuffleId);
         if (isStale(info, partitionInfo.appAttemptShuffleMergeId.shuffleMergeId) ||

Review Comment:
   I would look at these metrics as a means for identifying the effectiveness of push based shuffle, what is impacting the inefficiencies (how many stale blocks, how many late blocks, how that distribution is changing with time), etc.
   These will correlate in some way to overall impact on when push based shuffle was used, when normal shuffle was used, and what the network and task impact of those are.
   
   Do let me know if you have thoughts !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rmcyang commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
rmcyang commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1055804525


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -593,6 +607,9 @@ public void onData(String streamId, ByteBuffer buf) {
 
         @Override
         public void onComplete(String streamId) {
+          if (isTooLate) {
+            pushMergeMetrics.tooLateResponses.mark();
+          }

Review Comment:
   Thanks for the comments. Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rmcyang commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
rmcyang commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1055805408


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1904,4 +1941,42 @@ long getPos() {
       return pos;
     }
   }
+
+  /**
+   * A class that wraps all the push-based shuffle service metrics.
+   */
+  static class PushMergeMetrics implements MetricSet {
+    // couldNotFindOpportunityResponses tracks how many times a shuffle block collided because
+    // of another block for the same reduce partition was being written
+    static final String NO_OPPORTUNITY_RESPONSES_METRIC = "couldNotFindOpportunityResponses";
+    // tooLateResponses tracks how many times a shuffle block push request is too late
+    static final String TOO_LATE_RESPONSES_METRIC = "tooLateResponses";
+    // pushedBytesWritten tracks the length of the pushed block data written to file in bytes
+    static final String PUSHED_BYTES_WRITTEN_METRIC = "pushedBytesWritten";
+    // cachedBlocksBytes tracks the size of the current deferred block parts buffered in memory.
+    static final String CACHED_BLOCKS_BYTES_METRIC = "cachedBlocksBytes";

Review Comment:
   Thanks for the suggestion. Added `deferredBlocks`, PTAL.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1197,15 +1230,15 @@ public void onData(String streamId, ByteBuffer buf) throws IOException {
             appShuffleInfo.shuffles.get(partitionInfo.appAttemptShuffleMergeId.shuffleId);
         if (isStale(info, partitionInfo.appAttemptShuffleMergeId.shuffleMergeId) ||

Review Comment:
   Make sense. Added `staleBlockPushes`, PTAL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1003658008


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1904,4 +1941,42 @@ long getPos() {
       return pos;
     }
   }
+
+  /**
+   * A class that wraps all the push-based shuffle service metrics.
+   */
+  static class PushMergeMetrics implements MetricSet {
+    // couldNotFindOpportunityResponses tracks how many times a shuffle block collided because
+    // of another block for the same reduce partition was being written
+    static final String NO_OPPORTUNITY_RESPONSES_METRIC = "couldNotFindOpportunityResponses";
+    // tooLateResponses tracks how many times a shuffle block push request is too late
+    static final String TOO_LATE_RESPONSES_METRIC = "tooLateResponses";
+    // pushedBytesWritten tracks the length of the pushed block data written to file in bytes
+    static final String PUSHED_BYTES_WRITTEN_METRIC = "pushedBytesWritten";
+    // cachedBlocksBytes tracks the size of the current deferred block parts buffered in memory.
+    static final String CACHED_BLOCKS_BYTES_METRIC = "cachedBlocksBytes";

Review Comment:
   As I mentioned above, they are related but not the same - particularly when applications do not have homogeneous configurations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rmcyang commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
rmcyang commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1067536500


##########
docs/monitoring.md:
##########
@@ -1421,6 +1421,21 @@ Note: applies to the shuffle service
 - shuffle-server.usedDirectMemory
 - shuffle-server.usedHeapMemory
 
+Note: applies to the shuffle service when the server side flag
+`spark.shuffle.push.server.mergedShuffleFileManagerImpl` set as the appropriate
+org.apache.spark.network.shuffle.MergedShuffleFileManager implementation for the Push-Based Shuffle
+
+- blockAppendCollisions - the number of shuffle push blocks collided in shuffle services
+  as another block for the same reduce partition were being written
+- lateBlockPushes - the number of shuffle push blocks that are received in shuffle service
+  after the specific shuffle merge has been finalized
+- blockBytesWritten - the size of the pushed block data written to file in bytes
+- deferredBlockBytes - the size of the current deferred block parts buffered in memory
+- deferredBlocks - the number of the current deferred block parts buffered in memory
+- staleBlockPushes - the number of stale shuffle block push requests
+- ignoredBlockBytes - the size of the pushed block data that are ignored after the shuffle
+  file is finalized or when a request is for a duplicate block

Review Comment:
   Also adjust a bit after @zhouyejoe's suggestion, preview of the corresponding section:
   ![Screen Shot 2023-01-11 at 2 50 26 PM](https://user-images.githubusercontent.com/31781684/211935150-cc71545b-41ad-4a0c-bc3c-901ef2dd8b5f.png)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on PR #37638:
URL: https://github.com/apache/spark/pull/37638#issuecomment-1379889561

   Merged to master.
   Thanks for working on this @rmcyang !
   Thanks for the reviews @zhouyejoe, @otterc :-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1063072468


##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java:
##########
@@ -257,6 +274,7 @@ public void testDuplicateBlocksAreIgnoredWhenPrevStreamIsInProgress() throws IOE
     pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0);
     validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{4}, new int[][]{{0}});
+    verifyMetrics(4, 0, 0, 0, 0, 0);

Review Comment:
   The specific testcase here is, we have duplicate blocks being sent (via speculative execution for example), and so we want to ignore data from one of the streams. For example, here `stream2` would be ignored. The metrics are not capturing it - and so we see the same result in metrics as what we see for a successful push (what we would see for `testBasicBlockMerge`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
otterc commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1003631087


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -593,6 +607,9 @@ public void onData(String streamId, ByteBuffer buf) {
 
         @Override
         public void onComplete(String streamId) {
+          if (isTooLate) {
+            pushMergeMetrics.tooLateResponses.mark();
+          }

Review Comment:
   This is explained in the comments above that why we don't want throw exception and close the channel but rather respond with stream callback and only when the complete block is received, respond with tooLate 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1003656427


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -593,6 +607,9 @@ public void onData(String streamId, ByteBuffer buf) {
 
         @Override
         public void onComplete(String streamId) {
+          if (isTooLate) {
+            pushMergeMetrics.tooLateResponses.mark();
+          }

Review Comment:
   Yes, but is there a reason to defer updating the metric to later ? It is clear at time of callback construction itself that it is late.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhouyejoe commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
zhouyejoe commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1012349885


##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java:
##########
@@ -413,6 +437,7 @@ public void testFailureInAStreamDoesNotInterfereWithStreamWhichIsWriting() throw
     pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0);
     validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[] {4}, new int[][] {{0}});
+    verifyMetrics(4, 1,0, 0);

Review Comment:
   Nit: missing one space.



##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java:
##########
@@ -1415,6 +1440,24 @@ private void pushBlockHelper(
     }
   }
 
+  private void verifyMetrics(

Review Comment:
   Looks like the expectedCachedBlocksBytes are all 0 in all the callers, do we need to add any UTs for testing the CachedBytes metrics?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rmcyang commented on pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
rmcyang commented on PR #37638:
URL: https://github.com/apache/spark/pull/37638#issuecomment-1231893015

   @zhouyejoe Hm not really,  running those UTs locally looks to me.
   ```
   minyang@minyang-mn3 spark % build/mvn -Dtest=none -Pyarn -Phadoop-2.10 -Dscala-2.12 -Phive -Phive-2.3 -DwildcardSuites=org.apache.spark.network.yarn.YarnShuffleServiceWithLevelDBBackendSuite test -pl 'resource-managers/yarn'
   Using `mvn` from path: /Users/minyang/project/OSS-linkedin/spark/build/apache-maven-3.8.6/bin/mvn
   ...
   [INFO] Skipping execution of surefire because it has already been run for this configuration
   [INFO] 
   [INFO] --- scalatest-maven-plugin:2.1.0:test (test) @ spark-yarn_2.12 ---
   [INFO] ScalaTest report directory: /Users/minyang/project/OSS-linkedin/spark/resource-managers/yarn/target/surefire-reports
   Discovery starting.
   Discovery completed in 670 milliseconds.
   Run starting. Expected test count is: 17
   YarnShuffleServiceWithLevelDBBackendSuite:
   - executor and merged shuffle state kept across NM restart
   - removed applications should not be in registered executor file and merged shuffle file
   - shuffle service should be robust to corrupt registered executor file
   - get correct recovery path
   - moving recovery file from NM local dir to recovery path
   - service throws error if cannot start
   - Consistency in AppPathInfo between in-memory hashmap and the DB
   - Finalized merged shuffle are written into DB and cleaned up after application stopped
   - Dangling finalized merged partition info in DB will be removed during restart
   - Dangling application path or shuffle information in DB will be removed during restart
   - Cleanup for former attempts local path info should be triggered in applicationRemoved
   - recovery db should not be created if NM recovery is not enabled
   - SPARK-31646: metrics should be registered into Node Manager's metrics system
   - SPARK-34828: metrics should be registered with configured name
   - create default merged shuffle file manager instance
   - create remote block push resolver instance
   - invalid class name of merge manager will use noop instance
   Run completed in 2 seconds, 944 milliseconds.
   Total number of tests run: 17
   Suites: completed 2, aborted 0
   Tests: succeeded 17, failed 0, canceled 0, ignored 0, pending 0
   All tests passed.
   [INFO] ------------------------------------------------------------------------
   [INFO] BUILD SUCCESS
   [INFO] ------------------------------------------------------------------------
   [INFO] Total time:  24.019 s
   [INFO] Finished at: 2022-08-30T02:58:38-07:00
   [INFO] ------------------------------------------------------------------------
   [WARNING] The requested profile "hadoop-2.10" could not be activated because it does not exist.
   [WARNING] The requested profile "hive" could not be activated because it does not exist.
   [WARNING] The requested profile "hive-2.3" could not be activated because it does not exist.
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rmcyang commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
rmcyang commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r959829122


##########
common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java:
##########
@@ -296,10 +301,15 @@ protected void serviceInit(Configuration externalConf) throws Exception {
           DEFAULT_SPARK_SHUFFLE_SERVICE_METRICS_NAME);
       YarnShuffleServiceMetrics serviceMetrics =
           new YarnShuffleServiceMetrics(metricsNamespace, blockHandler.getAllMetrics());
+      YarnShuffleServiceMetrics mergeManagerMetrics =
+          new YarnShuffleServiceMetrics("mergeManagerMetrics", blockPushResolver.getMetrics());

Review Comment:
   Thanks, the UTs passed after the fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1038914754


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1904,4 +1941,42 @@ long getPos() {
       return pos;
     }
   }
+
+  /**
+   * A class that wraps all the push-based shuffle service metrics.
+   */
+  static class PushMergeMetrics implements MetricSet {
+    // couldNotFindOpportunityResponses tracks how many times a shuffle block collided because
+    // of another block for the same reduce partition was being written
+    static final String NO_OPPORTUNITY_RESPONSES_METRIC = "couldNotFindOpportunityResponses";
+    // tooLateResponses tracks how many times a shuffle block push request is too late
+    static final String TOO_LATE_RESPONSES_METRIC = "tooLateResponses";
+    // pushedBytesWritten tracks the length of the pushed block data written to file in bytes
+    static final String PUSHED_BYTES_WRITTEN_METRIC = "pushedBytesWritten";
+    // cachedBlocksBytes tracks the size of the current deferred block parts buffered in memory.
+    static final String CACHED_BLOCKS_BYTES_METRIC = "cachedBlocksBytes";

Review Comment:
   Can we add `deferredBlocks` as well ? Which tracks the number of deferred blocks (in addition to the size you had added support for here) ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rmcyang commented on a diff in pull request #37638: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

Posted by GitBox <gi...@apache.org>.
rmcyang commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r997751290


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -593,6 +607,9 @@ public void onData(String streamId, ByteBuffer buf) {
 
         @Override
         public void onComplete(String streamId) {
+          if (isTooLate) {
+            pushMergeMetrics.tooLateResponses.mark();
+          }

Review Comment:
   li-2.3.0 has the corresponding implementation as below:
   ```
   public void onComplete(String streamId) {
     if (isTooLate) {
       pushMergeMetrics.tooLateResponses.mark();
       // Throw an exception here so the block data is drained from channel and server
       // responds RpcFailure to the client.
       throw new RuntimeException(String.format("Block %s %s", msg.blockId,
         BlockPushException.TOO_LATE_MESSAGE_SUFFIX));
     }
     // For duplicate block that is received before the shuffle merge finalizes, the
     // server should respond success to the client.
   }
   ```
   I think the reason we are doing this only as part of `onComplete` is that we only mark tooLateResponse when all data from the stream has been received, and ignore those for the callbacks of `onData` and `onFailure`, correct? @otterc 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org