You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@uniffle.apache.org by "zuston (via GitHub)" <gi...@apache.org> on 2023/03/20 10:02:22 UTC

[GitHub] [incubator-uniffle] zuston opened a new pull request, #744: [#736] feat(storage): best effort to write same hdfs file when no race condition

zuston opened a new pull request, #744:
URL: https://github.com/apache/incubator-uniffle/pull/744

   
   ### What changes were proposed in this pull request?
   
   best effort to write same hdfs file when no race condition in `PooledHdfsShuffleWriteHandlerTest`
   
   ### Why are the changes needed?
   
   1. Reduce the file number for one partition to reduce HDFS pressure.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   1. UTs


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #744: [#736] feat(storage): best effort to write same hdfs file when no race condition

Posted by "zuston (via GitHub)" <gi...@apache.org>.
zuston commented on code in PR #744:
URL: https://github.com/apache/incubator-uniffle/pull/744#discussion_r1142839210


##########
storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandler.java:
##########
@@ -80,13 +89,13 @@ public void write(List<ShufflePartitionedBlock> shuffleBlocks) throws Exception
     if (queue.isEmpty()) {
       LOGGER.warn("No free hdfs writer handler, it will wait. storage path: {}", basePath);
     }
-    HdfsShuffleWriteHandler writeHandler = queue.take();
+    ShuffleWriteHandler writeHandler = queue.take();
     try {
       writeHandler.write(shuffleBlocks);
     } finally {
-      // Use add() here because we are sure the capacity will not be exceeded.
-      // Note: add() throws IllegalStateException when queue is full.
-      queue.add(writeHandler);
+      // Use addFirst() here because we are sure the capacity will not be exceeded.

Review Comment:
   Done. Add the javadoc for this class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston merged pull request #744: [#736] feat(storage): best effort to write same hdfs file when no race condition

Posted by "zuston (via GitHub)" <gi...@apache.org>.
zuston merged PR #744:
URL: https://github.com/apache/incubator-uniffle/pull/744


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #744: [#736] feat(storage): best effort to write same hdfs file when no race condition

Posted by "zuston (via GitHub)" <gi...@apache.org>.
zuston commented on code in PR #744:
URL: https://github.com/apache/incubator-uniffle/pull/744#discussion_r1150426734


##########
storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandler.java:
##########
@@ -29,13 +30,29 @@
 import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
 import org.apache.uniffle.storage.util.ShuffleStorageUtils;
 
+/**
+ * The {@link PooledHdfsShuffleWriteHandler} is a wrapper of underlying multiple
+ * {@link HdfsShuffleWriteHandler} to support concurrency control of writing single
+ * partition to multi files.
+ *
+ * By leveraging {@link LinkedBlockingDeque}, it will always write the same file when
+ * no race condition, which is good for reducing file numbers for HDFS.
+ */
 public class PooledHdfsShuffleWriteHandler implements ShuffleWriteHandler {
   private static final Logger LOGGER = LoggerFactory.getLogger(PooledHdfsShuffleWriteHandler.class);
 
-  private final BlockingQueue<HdfsShuffleWriteHandler> queue;
+  private final LinkedBlockingDeque<ShuffleWriteHandler> queue;
   private final int maxConcurrency;
   private final String basePath;
 
+  // Only for tests
+  @VisibleForTesting
+  public PooledHdfsShuffleWriteHandler(LinkedBlockingDeque<ShuffleWriteHandler> queue) {
+    this.queue = queue;
+    this.maxConcurrency =  queue.size();

Review Comment:
   Oh, sorry. I missed this thread.
   
   Got it. I will fix this in another PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #744: [#736] feat(storage): best effort to write same hdfs file when no race condition

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on code in PR #744:
URL: https://github.com/apache/incubator-uniffle/pull/744#discussion_r1142031620


##########
storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandler.java:
##########
@@ -80,13 +89,13 @@ public void write(List<ShufflePartitionedBlock> shuffleBlocks) throws Exception
     if (queue.isEmpty()) {
       LOGGER.warn("No free hdfs writer handler, it will wait. storage path: {}", basePath);
     }
-    HdfsShuffleWriteHandler writeHandler = queue.take();
+    ShuffleWriteHandler writeHandler = queue.take();
     try {
       writeHandler.write(shuffleBlocks);
     } finally {
-      // Use add() here because we are sure the capacity will not be exceeded.
-      // Note: add() throws IllegalStateException when queue is full.
-      queue.add(writeHandler);
+      // Use addFirst() here because we are sure the capacity will not be exceeded.

Review Comment:
   Could we add some comments to explain why we need to use method `addFirst`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] codecov-commenter commented on pull request #744: [#736] feat(storage): best effort to write same hdfs file when no race condition

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #744:
URL: https://github.com/apache/incubator-uniffle/pull/744#issuecomment-1477203049

   ## [Codecov](https://codecov.io/gh/apache/incubator-uniffle/pull/744?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#744](https://codecov.io/gh/apache/incubator-uniffle/pull/744?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (69d855f) into [master](https://codecov.io/gh/apache/incubator-uniffle/commit/d60d675d38c833b99b012a1f4c726a012ce93463?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (d60d675) will **increase** coverage by `2.24%`.
   > The diff coverage is `87.50%`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master     #744      +/-   ##
   ============================================
   + Coverage     60.63%   62.87%   +2.24%     
   - Complexity     1893     1901       +8     
   ============================================
     Files           238      224      -14     
     Lines         13000    11053    -1947     
     Branches       1090     1091       +1     
   ============================================
   - Hits           7883     6950     -933     
   + Misses         4679     3725     -954     
   + Partials        438      378      -60     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-uniffle/pull/744?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...ge/handler/impl/PooledHdfsShuffleWriteHandler.java](https://codecov.io/gh/apache/incubator-uniffle/pull/744?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmFnZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvdW5pZmZsZS9zdG9yYWdlL2hhbmRsZXIvaW1wbC9Qb29sZWRIZGZzU2h1ZmZsZVdyaXRlSGFuZGxlci5qYXZh) | `43.47% <87.50%> (+43.47%)` | :arrow_up: |
   
   ... and [17 files with indirect coverage changes](https://codecov.io/gh/apache/incubator-uniffle/pull/744/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] advancedxy commented on a diff in pull request #744: [#736] feat(storage): best effort to write same hdfs file when no race condition

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #744:
URL: https://github.com/apache/incubator-uniffle/pull/744#discussion_r1142861169


##########
storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandler.java:
##########
@@ -80,13 +89,13 @@ public void write(List<ShufflePartitionedBlock> shuffleBlocks) throws Exception
     if (queue.isEmpty()) {
       LOGGER.warn("No free hdfs writer handler, it will wait. storage path: {}", basePath);
     }
-    HdfsShuffleWriteHandler writeHandler = queue.take();
+    ShuffleWriteHandler writeHandler = queue.take();
     try {
       writeHandler.write(shuffleBlocks);
     } finally {
-      // Use add() here because we are sure the capacity will not be exceeded.
-      // Note: add() throws IllegalStateException when queue is full.
-      queue.add(writeHandler);
+      // Use addFirst() here because we are sure the capacity will not be exceeded.

Review Comment:
   I think you didn't address @jerqi's comment. A javadoc for this class is good.
   But it still doesn't explain why we are using addFirst here..
   And I believe the comment here is not up to date since it was for `add()` method.



##########
storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandler.java:
##########
@@ -29,13 +30,29 @@
 import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
 import org.apache.uniffle.storage.util.ShuffleStorageUtils;
 
+/**
+ * The {@link PooledHdfsShuffleWriteHandler} is a wrapper of underlying multiple
+ * {@link HdfsShuffleWriteHandler} to support concurrency control of writing single
+ * partition to multi files.
+ *
+ * By leveraging {@link LinkedBlockingDeque}, it will always write the same file when
+ * no race condition, which is good for reducing file numbers for HDFS.
+ */
 public class PooledHdfsShuffleWriteHandler implements ShuffleWriteHandler {
   private static final Logger LOGGER = LoggerFactory.getLogger(PooledHdfsShuffleWriteHandler.class);
 
-  private final BlockingQueue<HdfsShuffleWriteHandler> queue;
+  private final LinkedBlockingDeque<ShuffleWriteHandler> queue;
   private final int maxConcurrency;
   private final String basePath;
 
+  // Only for tests
+  @VisibleForTesting
+  public PooledHdfsShuffleWriteHandler(LinkedBlockingDeque<ShuffleWriteHandler> queue) {
+    this.queue = queue;
+    this.maxConcurrency =  queue.size();

Review Comment:
   nit: extra space.
   
   ```suggestion
       this.maxConcurrency = queue.size();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #744: [#736] feat(storage): best effort to write same hdfs file when no race condition

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on PR #744:
URL: https://github.com/apache/incubator-uniffle/pull/744#issuecomment-1486116272

   cc @zuston You seems that we miss some information from advancedxy.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #744: [#736] feat(storage): best effort to write same hdfs file when no race condition

Posted by "zuston (via GitHub)" <gi...@apache.org>.
zuston commented on code in PR #744:
URL: https://github.com/apache/incubator-uniffle/pull/744#discussion_r1150427931


##########
storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandler.java:
##########
@@ -80,13 +89,13 @@ public void write(List<ShufflePartitionedBlock> shuffleBlocks) throws Exception
     if (queue.isEmpty()) {
       LOGGER.warn("No free hdfs writer handler, it will wait. storage path: {}", basePath);
     }
-    HdfsShuffleWriteHandler writeHandler = queue.take();
+    ShuffleWriteHandler writeHandler = queue.take();
     try {
       writeHandler.write(shuffleBlocks);
     } finally {
-      // Use add() here because we are sure the capacity will not be exceeded.
-      // Note: add() throws IllegalStateException when queue is full.
-      queue.add(writeHandler);
+      // Use addFirst() here because we are sure the capacity will not be exceeded.

Review Comment:
   Emm. Yes, this comment is not for this. But I still think the javadoc is enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org