You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@uniffle.apache.org by "advancedxy (via GitHub)" <gi...@apache.org> on 2023/03/19 13:29:44 UTC

[GitHub] [incubator-uniffle] advancedxy commented on a diff in pull request #717: [#674] improvement(server): use ByteString#asReadOnlyByteBuffer to reduce the memory allocation

advancedxy commented on code in PR #717:
URL: https://github.com/apache/incubator-uniffle/pull/717#discussion_r1141353067


##########
common/src/main/java/org/apache/uniffle/common/ShuffleDataResult.java:
##########
@@ -35,20 +36,26 @@ public ShuffleDataResult(byte[] data) {
   }
 
   public ShuffleDataResult(byte[] data, List<BufferSegment> bufferSegments) {
-    this.data = data;
+    this(bufferSegments, data == null ? null : ByteBuffer.wrap(data));
+  }
+
+  public ShuffleDataResult(List<BufferSegment> bufferSegments, ByteBuffer data) {
+    this.bufferData = data;
     this.bufferSegments = bufferSegments;

Review Comment:
   ```suggestion
       this.bufferSegments = bufferSegments;
       this.bufferData = data;
   ```



##########
common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java:
##########
@@ -17,31 +17,46 @@
 
 package org.apache.uniffle.common;
 
-import java.util.Arrays;
+import java.nio.ByteBuffer;
 import java.util.Objects;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class ShufflePartitionedBlock {
 
   private int length;
   private long crc;
   private long blockId;
   private int uncompressLength;
-  private byte[] data;
   private long taskAttemptId;
+  // Read only byte buffer
+  private ByteBuffer bufferData;
 
   public ShufflePartitionedBlock(
       int length,
       int uncompressLength,
       long crc,
       long blockId,
-      long taskAttemptId,
-      byte[] data) {
+      ByteBuffer data,
+      long taskAttemptId) {

Review Comment:
   ```suggestion
         long taskAttemptId,
         ByteBuffer data) {
   ```



##########
common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java:
##########
@@ -62,12 +77,12 @@ public boolean equals(Object o) {
     return length == that.length
         && crc == that.crc
         && blockId == that.blockId
-        && Arrays.equals(data, that.data);
+        && bufferData.equals(that.bufferData);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(length, crc, blockId, Arrays.hashCode(data));
+    return Objects.hash(length, crc, blockId, bufferData.hashCode());

Review Comment:
   ditto



##########
common/src/main/java/org/apache/uniffle/common/ShuffleDataResult.java:
##########
@@ -35,20 +36,26 @@ public ShuffleDataResult(byte[] data) {
   }
 
   public ShuffleDataResult(byte[] data, List<BufferSegment> bufferSegments) {
-    this.data = data;
+    this(bufferSegments, data == null ? null : ByteBuffer.wrap(data));
+  }
+
+  public ShuffleDataResult(List<BufferSegment> bufferSegments, ByteBuffer data) {
+    this.bufferData = data;
     this.bufferSegments = bufferSegments;
   }
 
   public byte[] getData() {
-    return data;
+    return bufferData.array();

Review Comment:
   please add null check here since bufferData could be null



##########
common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java:
##########
@@ -62,12 +77,12 @@ public boolean equals(Object o) {
     return length == that.length
         && crc == that.crc
         && blockId == that.blockId
-        && Arrays.equals(data, that.data);
+        && bufferData.equals(that.bufferData);

Review Comment:
   null check



##########
common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java:
##########
@@ -95,11 +110,7 @@ public void setBlockId(long blockId) {
   }
 
   public byte[] getData() {
-    return data;
-  }
-
-  public void setData(byte[] data) {
-    this.data = data;
+    return bufferData.array();

Review Comment:
   ditto



##########
integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageHdfsFallbackTest.java:
##########
@@ -51,8 +49,6 @@ public static void setupServers(@TempDir File tmpDir) throws Exception {
 
   @Override
   public void makeChaos() {
-    assertEquals(1, cluster.getDataNodes().size());
-    cluster.stopDataNode(0);
-    assertEquals(0, cluster.getDataNodes().size());
+    cluster.shutdown();

Review Comment:
   why this change?



##########
server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java:
##########
@@ -155,17 +156,18 @@ public synchronized ShuffleDataResult getShuffleData(
   // todo: if block was flushed, it's possible to get duplicated data
   public synchronized ShuffleDataResult getShuffleData(
       long lastBlockId, int readBufferSize, Roaring64NavigableMap expectedTaskIds) {
+    LOG.info("Last block id: {}", lastBlockId);

Review Comment:
   is this needed?



##########
storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriter.java:
##########
@@ -31,15 +33,26 @@ public class LocalFileWriter implements FileWriter, Closeable {
 
   private DataOutputStream dataOutputStream;
   private FileOutputStream fileOutputStream;
+  private FileChannel fileChannel;
   private long nextOffset;
 
   public LocalFileWriter(File file) throws IOException {
     fileOutputStream = new FileOutputStream(file, true);
+    fileChannel = fileOutputStream.getChannel();
     // init fsDataOutputStream
     dataOutputStream = new DataOutputStream(new BufferedOutputStream(fileOutputStream));

Review Comment:
   I'm worried about this, especially the dataOutputStream is buffered. 
   
   Did a quick overview of FileChannel, I'm not sure that we can mix writing data to fileChannel and a buffered dataOutputStream



##########
server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java:
##########
@@ -238,19 +240,12 @@ private int calculateDataLength(List<BufferSegment> bufferSegments) {
     return bufferSegment.getOffset() + bufferSegment.getLength();
   }
 
-  private void updateShuffleData(List<ShufflePartitionedBlock> readBlocks, byte[] data) {
-    int offset = 0;
+  private void updateShuffleData(List<ShufflePartitionedBlock> readBlocks, ByteBuffer bufferData) {
     for (ShufflePartitionedBlock block : readBlocks) {
-      // fill shuffle data
-      try {
-        System.arraycopy(block.getData(), 0, data, offset, block.getLength());
-      } catch (Exception e) {
-        LOG.error("Unexpected exception for System.arraycopy, length["
-            + block.getLength() + "], offset["
-            + offset + "], dataLength[" + data.length + "]", e);
-        throw e;
-      }
-      offset += block.getLength();
+      ByteBuffer tmp = block.getBufferData().duplicate();
+      tmp.clear();
+      bufferData.put(tmp);
+      tmp.clear();

Review Comment:
   double clear? Seems like a bug.



##########
server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java:
##########
@@ -125,10 +127,12 @@ public class LocalStorageManager extends SingleStorageManager {
               .localStorageMedia(storageType)
               .build();
           successCount.incrementAndGet();
+          LOG.info("Initialized path: {}", storagePath);
         } catch (Exception e) {
           LOG.error("LocalStorage init failed!", e);
         } finally {
           countDownLatch.countDown();
+          LOG.info("Finished. {}", storagePath);

Review Comment:
   is this related?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org