You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@uniffle.apache.org by GitBox <gi...@apache.org> on 2022/10/10 08:09:35 UTC

[GitHub] [incubator-uniffle] fpkgithub opened a new issue, #257: [Bug] No data can be read from ShuffleServer when the reduce task number exceeds 1024

fpkgithub opened a new issue, #257:
URL: https://github.com/apache/incubator-uniffle/issues/257

   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct)
   
   
   ### Search before asking
   
   - [X] I have searched in the [issues](https://github.com/apache/incubator-uniffle/issues?q=is%3Aissue) and found no similar issues.
   
   
   ### Describe the bug
   
   I run some hive sql , No data can be read from ShuffleServer when the reduce task number exceeds 1024, but sql result State is  SUCCEEDED . At the same time, the sql result count in the original way is inconsistent with that in the uniffle. The relevant logs and codes are as follows.
   
   ## Hive Sql
   
   
   ![image](https://user-images.githubusercontent.com/22984063/194813378-acaeb7da-9500-4676-8159-36b1666e1702.png)
   
   ## Reduce Task 
   reduce task Id <1024
   ```
   2022-10-09 21:22:25,121 INFO [main] org.apache.hadoop.mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.RssShuffle@533377b
   2022-10-09 21:22:28,447 INFO [main] org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient: GetInMemoryShuffleData from 10.156.174.33:19999 for appId[appattempt_1665305651155_5705_000001], shuffleId[0], partitionId[19] cost 329 ms
   ...
   2022-10-09 21:22:28,876 INFO [main] org.apache.uniffle.client.impl.ShuffleReadClientImpl: blockIdBitmap.Size=2293, pendingBlockIds.Size=0, bufferSegmentQueue.Size=0
   2022-10-09 21:22:28,878 INFO [main] org.apache.uniffle.client.impl.ShuffleReadClientImpl: Metrics for shuffleId[0], partitionId[19], read data cost 380 ms, copy data cost 0 ms, crc check cost 7 ms
   2022-10-09 21:22:28,878 INFO [main] org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler: Client read 2293 blocks [ hot:2293 warm:0 cold:0 frozen:0 ]
   2022-10-09 21:22:28,878 INFO [main] org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler: Client read 971918 bytes [ hot:971918 warm:0 cold:0 frozen:0 ]
   2022-10-09 21:22:28,878 INFO [main] org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler: Client read 1372670 uncompressed bytes [ hot:1372670 warm:0 cold:0 frozen:0 ]
   
   ```
   
   reduce task Id >= 1024
   ```
   2022-09-28 11:46:04,379 INFO [main] org.apache.hadoop.mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.RssShuffle@10fde30a
   2022-09-28 11:46:05,060 INFO [main] org.apache.uniffle.client.impl.ShuffleReadClientImpl: Metrics for shuffleId[0], partitionId[2003], read data cost 0 ms, copy data cost 0 ms, crc check cost 0 ms
   2022-09-28 11:46:05,061 INFO [main] org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler: Client read 0 blocks [ hot:0 warm:0 cold:0 frozen:0 ]
   2022-09-28 11:46:05,061 INFO [main] org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler: Client read 0 bytes [ hot:0 warm:0 cold:0 frozen:0 ]
   2022-09-28 11:46:05,061 INFO [main] org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler: Client read 0 uncompressed bytes [ hot:0 warm:0 cold:0 frozen:0 ]
   ```
   
   ## Client Code
   At ShuffleReadClientImpl#readShuffleBlockData, When  reduce task exceed 1024 blockIdBitmap is empty
   
   ### org.apache.uniffle.client.impl.ShuffleReadClientImpl#readShuffleBlockData
   ```java
     @Override
     public CompressedShuffleBlock readShuffleBlockData() {
   
       LOG.info("blockIdBitmap.Size=" + blockIdBitmap.getLongCardinality() + ", pendingBlockIds.Size=" + pendingBlockIds.getLongCardinality()
               + ", bufferSegmentQueue.Size=" + bufferSegmentQueue.size());
   
       // empty data expected, just return null
       if (blockIdBitmap.isEmpty()) {
         LOG.info("blockIdBitmap empty , so return null ...");
         return null;
       }
   
       // All blocks are processed, so just return
       if (pendingBlockIds.isEmpty()) {
         return null;
       }
   
       // if need request new data from shuffle server
       if (bufferSegmentQueue.isEmpty()) {
         if (read() <= 0) {
           return null;
         }
       }
   
       // get next buffer segment
       BufferSegment bs = null;
   
       // blocks in bufferSegmentQueue may be from different partition in range partition mode,
       // or may be from speculation task, filter them and just read the necessary block
      
       ...
   
       // current segment hasn't data, try next segment
       return readShuffleBlockData();
     }
   ```
   
   ### log
   ```shell
   2022-10-09 21:23:21,530 INFO [org.apache.hadoop.mapreduce.TaskPauseMonitor$Monitor@376a312c] org.apache.hadoop.mapreduce.TaskPauseMonitor: Starting JVM pause monitor
   2022-10-09 21:23:21,539 INFO [main] org.apache.hadoop.mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.RssShuffle@3383649e
   2022-10-09 21:23:22,065 INFO [main] org.apache.uniffle.client.impl.ShuffleWriteClientImpl: getShuffleResult#appId=appattempt_1665305651155_5705_000001, shuffleId=0, partitionId=2003, shuffleServerInfoSet.Size=1
   2022-10-09 21:23:22,065 INFO [main] org.apache.uniffle.client.impl.ShuffleWriteClientImpl: getShuffleResult#star ss getBlockIdBitmap=10.156.175.32
   2022-10-09 21:23:23,799 INFO [main] org.apache.uniffle.client.impl.ShuffleWriteClientImpl: getShuffleResult#10.156.175.32:ss get blockIdBitmapOfServer.Size=0
   2022-10-09 21:23:23,799 INFO [main] org.apache.uniffle.client.impl.ShuffleWriteClientImpl: getShuffleResult# break, ss=10.156.175.32
   2022-10-09 21:23:23,799 INFO [main] org.apache.uniffle.client.impl.ShuffleWriteClientImpl: getShuffleResult#blockIdBitmap.Size=0
   2022-10-09 21:23:23,915 INFO [main] org.apache.uniffle.client.impl.ShuffleReadClientImpl: blockIdBitmap.Size=0, pendingBlockIds.Size=0, bufferSegmentQueue.Size=0
   2022-10-09 21:23:23,916 INFO [main] org.apache.uniffle.client.impl.ShuffleReadClientImpl: blockIdBitmap empty , so return null ...
   2022-10-09 21:23:23,916 INFO [main] org.apache.uniffle.client.impl.ShuffleReadClientImpl: Metrics for shuffleId[0], partitionId[2003], read data cost 0 ms, copy data cost 0 ms, crc check cost 0 ms
   2022-10-09 21:23:23,916 INFO [main] org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler: Client read 0 blocks [ hot:0 warm:0 cold:0 frozen:0 ]
   2022-10-09 21:23:23,916 INFO [main] org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler: Client read 0 bytes [ hot:0 warm:0 cold:0 frozen:0 ]
   2022-10-09 21:23:23,916 INFO [main] org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler: Client read 0 uncompressed bytes [ hot:0 warm:0 cold:0 frozen:0 ]
   ```
   
   
   
   ## ShuffleServer 
   At ShuffleTaskManager#getBlockIdsByPartitionId  code logic, By adding log and code analysis
   - when reduce task Id(partition) less than 1024, requestPartitions can found contain blockId 
   - when reduce task Id(partition) exceed 1024,   requestPartitions cann't found contain blockId 
   
   ### org.apache.uniffle.server.ShuffleTaskManager#getBlockIdsByPartitionId
   ```java
     // filter the specific partition blockId in the bitmap to the resultBitmap
     protected Roaring64NavigableMap getBlockIdsByPartitionId(Set<Integer> requestPartitions,
         Roaring64NavigableMap bitmap, Roaring64NavigableMap resultBitmap) {
       LOG.info("getBlockIdsByPartitionId# start requestPartitions="+ requestPartitions.toString()+ ", bitmap.size=" + bitmap.getLongCardinality());
       int findPartitionIdOKCount = 0;
       int findPartitionIdFailCount = 0;
       int partitionNum = Integer.parseInt(requestPartitions.toString().replace("[", "").replace("]", ""));
       LongIterator iter = bitmap.getLongIterator();
       long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1;
       while (iter.hasNext()) {
         long blockId = iter.next();
         int partitionId = Math.toIntExact((blockId >> Constants.TASK_ATTEMPT_ID_MAX_LENGTH) & mask);
         if (requestPartitions.contains(partitionId)) {
           resultBitmap.addLong(blockId);
           findPartitionIdOKCount++;
         }else{
           findPartitionIdFailCount++;
         }
       }
       LOG.info("getBlockIdsByPartitionId# end requestPartitions="+ requestPartitions.toString()
               + ", resultBitmap.size=" + resultBitmap.getLongCardinality()
               + ", findPartitionIdOKCount="+ findPartitionIdOKCount
               + ", findPartitionIdFailCount=" + findPartitionIdFailCount);
       return resultBitmap;
     }
   
   ```
   
   ### log
   
   - requestPartitions=[74]  return successful
   ```
   ## getShuffleResult
   [INFO] 2022-10-09 21:22:26,035 Grpc-512 ShuffleTaskManager getBlockIdsByPartitionId - getBlockIdsByPartitionId# start requestPartitions=[74], bitmap.size=1546249
   [INFO] 2022-10-09 21:22:26,063 Grpc-512 ShuffleTaskManager getBlockIdsByPartitionId - getBlockIdsByPartitionId# end requestPartitions=[74], resultBitmap.size=2292, findPartitionIdOKCount=2292, findPartitionIdFailCount=1543957
   
   ## getMemoryShuffleData
   [INFO] 2022-10-09 21:22:26,465 Grpc-464 ShuffleServerGrpcService getMemoryShuffleData - Successfully getInMemoryShuffleData cost 1 ms with 962828 bytes shuffle data for appId[appattempt_1665305651155_5705_000001], shuffleId[0], partitionId[74]
   ```
   
   - requestPartitions=[2003] return failed
   ``` 
   [INFO] 2022-10-09 21:23:23,728 Grpc-1478 ShuffleTaskManager getBlockIdsByPartitionId - getBlockIdsByPartitionId# start requestPartitions=[2003], bitmap.size=1546249
   [INFO] 2022-10-09 21:23:23,785 Grpc-1478 ShuffleTaskManager getBlockIdsByPartitionId - getBlockIdsByPartitionId# end requestPartitions=[2003], resultBitmap.size=0, findPartitionIdOKCount=0, findPartitionIdFailC
   ount=1546249
   ```
   
   
   
   
   
   ### Affects Version(s)
   
   0.7.0-snapshot
   
   ### Uniffle Server Log Output
   
   _No response_
   
   ### Uniffle Engine Log Output
   
   _No response_
   
   ### Uniffle Server Configurations
   
   ```yaml
   ## shuffle server config
   rss.rpc.server.port 19999
   rss.jetty.http.port 19998
   rss.rpc.executor.size 2000
   # it should be configed the same as in coordinator
   rss.storage.type MEMORY_LOCALFILE_HDFS
    
   rss.coordinator.quorum coordinator-01:19999,coordinator-02:19999,coordinator-03:19999
    
   # local storage path for shuffle server
   rss.storage.basePath /mnt/storage00/hadoop/uniffle-ss/rss_data,/mnt/storage01/hadoop/uniffle-ss/rss_data,/mnt/storage02/hadoop/uniffle-ss/rss_data,/mnt/storage03/hadoop/uniffle-ss/rss_data
   # it's better to config thread num according to local disk num
   rss.server.flush.thread.alive 20 
   rss.server.flush.threadPool.size 20
   rss.server.buffer.capacity 40gb
   rss.server.read.buffer.capacity 20gb
   rss.server.heartbeat.timeout 60000
   rss.server.heartbeat.interval 10000
   rss.rpc.message.max.size 1073741824
   rss.server.preAllocation.expired 120000
   rss.server.commit.timeout 600000
   rss.server.app.expired.withoutHeartbeat 120000
   # note: the default value of rss.server.flush.cold.storage.threshold.size is 64m
   # there will be no data written to DFS if set it as 100g even rss.storage.type=MEMORY_LOCALFILE_HDFS
   # please set proper value if DFS is used, eg, 64m, 128m.
   rss.server.flush.cold.storage.threshold.size 64m
    
   rss.server.health.check.enable true
   rss.server.disk.capacity 400gb
   ```
   
   
   ### Uniffle Engine Configurations
   
   _No response_
   
   ### Additional context
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@uniffle.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-uniffle] jerqi closed issue #257: [Bug] No data can be read from ShuffleServer when the reduce task number exceeds 1024

Posted by GitBox <gi...@apache.org>.
jerqi closed issue #257: [Bug] No data can be read from ShuffleServer when the reduce task number exceeds 1024
URL: https://github.com/apache/incubator-uniffle/issues/257


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-uniffle] leixm commented on issue #257: [Bug] No data can be read from ShuffleServer when the reduce task number exceeds 1024

Posted by GitBox <gi...@apache.org>.
leixm commented on issue #257:
URL: https://github.com/apache/incubator-uniffle/issues/257#issuecomment-1272980545

   I will follow up on this. @jerqi 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-uniffle] jerqi commented on issue #257: [Bug] No data can be read from ShuffleServer when the reduce task number exceeds 1024

Posted by GitBox <gi...@apache.org>.
jerqi commented on issue #257:
URL: https://github.com/apache/incubator-uniffle/issues/257#issuecomment-1272983644

   > I will follow up on this. @jerqi
   
   @fpkgithub seems  that he has found solution to fix this issue. If you have time, you can help me review his pr.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-uniffle] jerqi commented on issue #257: [Bug] No data can be read from ShuffleServer when the reduce task number exceeds 1024

Posted by GitBox <gi...@apache.org>.
jerqi commented on issue #257:
URL: https://github.com/apache/incubator-uniffle/issues/257#issuecomment-1272973816

   cc @leixm Could you help me take a look at this issue.  PR https://github.com/apache/incubator-uniffle/pull/190 modify this place's logic.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-uniffle] fpkgithub commented on issue #257: [Bug] No data can be read from ShuffleServer when the reduce task number exceeds 1024

Posted by GitBox <gi...@apache.org>.
fpkgithub commented on issue #257:
URL: https://github.com/apache/incubator-uniffle/issues/257#issuecomment-1273003837

   > > I will follow up on this. @jerqi
   > 
   > @fpkgithub seems that he has found solution to fix this issue. If you have time, you can help me review his pr.
   
   Sure, I've been looking at this logic recently
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org