You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2019/12/28 02:28:55 UTC

[arrow] branch master updated: ARROW-7437: [Java] ReadChannel#readFully does not set writer index correctly

This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 81f0d42  ARROW-7437: [Java] ReadChannel#readFully does not set writer index correctly
81f0d42 is described below

commit 81f0d4228e594b10434021c5d9f8187a4e56fc6a
Author: liyafan82 <fa...@foxmail.com>
AuthorDate: Fri Dec 27 21:28:29 2019 -0500

    ARROW-7437: [Java] ReadChannel#readFully does not set writer index correctly
    
    1. The writer index should be incremented by the amount of data actually read.
    2. When EOS is encounterned, the number of bytes read should be incremented before returning.
    
    Closes #6064 from liyafan82/fly_1219_idx and squashes the following commits:
    
    e268b0d31 <liyafan82>  Resolve comments
    baab80532 <liyafan82>  ReadChannel#readFully does not set writer index correctly
    
    Authored-by: liyafan82 <fa...@foxmail.com>
    Signed-off-by: David Li <li...@gmail.com>
---
 .../org/apache/arrow/vector/ipc/ReadChannel.java   |  9 +++--
 .../arrow/vector/ipc/TestArrowReaderWriter.java    | 42 ++++++++++++++++++++++
 2 files changed, 48 insertions(+), 3 deletions(-)

diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ReadChannel.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ReadChannel.java
index 3c3069e..02061c7 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ReadChannel.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ReadChannel.java
@@ -53,11 +53,14 @@ public class ReadChannel implements AutoCloseable {
    * @throws IOException if nit enough bytes left to read
    */
   public int readFully(ByteBuffer buffer) throws IOException {
-    LOGGER.debug("Reading buffer with size: {}", buffer.remaining());
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Reading buffer with size: {}", buffer.remaining());
+    }
     int totalRead = 0;
     while (buffer.remaining() != 0) {
       int read = in.read(buffer);
-      if (read < 0) {
+      if (read == -1) {
+        this.bytesRead += totalRead;
         return totalRead;
       }
       totalRead += read;
@@ -79,7 +82,7 @@ public class ReadChannel implements AutoCloseable {
    */
   public int readFully(ArrowBuf buffer, int l) throws IOException {
     int n = readFully(buffer.nioBuffer(buffer.writerIndex(), l));
-    buffer.writerIndex(n);
+    buffer.writerIndex(buffer.writerIndex() + n);
     return n;
   }
 
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java
index dda402d..4d6da12 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java
@@ -29,6 +29,8 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.Channels;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -525,4 +527,44 @@ public class TestArrowReaderWriter {
     batch.close();
     vector.close();
   }
+
+  @Test
+  public void testChannelReadFully() throws IOException {
+    final ByteBuffer buf = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN);
+    buf.putInt(200);
+    buf.rewind();
+
+    try (ReadChannel channel = new ReadChannel(Channels.newChannel(new ByteArrayInputStream(buf.array())));
+         ArrowBuf arrBuf = allocator.buffer(8)) {
+      arrBuf.setInt(0, 100);
+      arrBuf.writerIndex(4);
+      assertEquals(4, arrBuf.writerIndex());
+
+      int n = channel.readFully(arrBuf, 4);
+      assertEquals(4, n);
+      assertEquals(8, arrBuf.writerIndex());
+
+      assertEquals(100, arrBuf.getInt(0));
+      assertEquals(200, arrBuf.getInt(4));
+    }
+  }
+
+  @Test
+  public void testChannelReadFullyEos() throws IOException {
+    final ByteBuffer buf = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN);
+    buf.putInt(10);
+    buf.rewind();
+
+    try (ReadChannel channel = new ReadChannel(Channels.newChannel(new ByteArrayInputStream(buf.array())));
+         ArrowBuf arrBuf = allocator.buffer(8)) {
+      int n = channel.readFully(arrBuf.nioBuffer(0, 8));
+      assertEquals(4, n);
+
+      // the input has only 4 bytes, so the number of bytes read should be 4
+      assertEquals(4, channel.bytesRead());
+
+      // the first 4 bytes have been read successfully.
+      assertEquals(10, arrBuf.getInt(0));
+    }
+  }
 }