You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/01/11 14:24:31 UTC

[GitHub] [arrow] lidavidm commented on a change in pull request #9147: ARROW-11177: [Java] ArrowMessage failed to parse compressed grpc stream

lidavidm commented on a change in pull request #9147:
URL: https://github.com/apache/arrow/pull/9147#discussion_r555078736



##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
##########
@@ -332,6 +333,23 @@ private static ArrowMessage frame(BufferAllocator allocator, final InputStream s
 
   }
 
+  /**
+   * Get first byte with EOF check, it is especially needed when using grpc compression.
+   * InflaterInputStream need another read to change reachEOF after all bytes has been read.
+   *
+   * @param is InputStream
+   * @return -1 if stream is not available, otherwise it will return the actual value.
+   * @throws IOException Read first byte failed.
+   */
+  private static int readRawVarint32WithEOFCheck(InputStream is) throws IOException {
+    int firstByte = is.read();

Review comment:
       Why can't we check `firstByte < 0` for EOF?

##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
##########
@@ -259,7 +259,8 @@ private static ArrowMessage frame(BufferAllocator allocator, final InputStream s
       ArrowBuf body = null;
       ArrowBuf appMetadata = null;
       while (stream.available() > 0) {
-        int tag = readRawVarint32(stream);
+        int tag = readRawVarint32WithEOFCheck(stream);
+
         switch (tag) {

Review comment:
       Can we explicitly handle -1 (EOF) here?

##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
##########
@@ -332,6 +333,23 @@ private static ArrowMessage frame(BufferAllocator allocator, final InputStream s
 
   }
 
+  /**
+   * Get first byte with EOF check, it is especially needed when using grpc compression.
+   * InflaterInputStream need another read to change reachEOF after all bytes has been read.
+   *
+   * @param is InputStream
+   * @return -1 if stream is not available, otherwise it will return the actual value.
+   * @throws IOException Read first byte failed.
+   */
+  private static int readRawVarint32WithEOFCheck(InputStream is) throws IOException {
+    int firstByte = is.read();
+    if (is.available() <= 0) {
+      return -1;
+    } else {
+      return CodedInputStream.readRawVarint32(firstByte, is);
+    }
+  }
+
   private static int readRawVarint32(InputStream is) throws IOException {
     int firstByte = is.read();

Review comment:
       It seems here we should be checking `firstByte < -1` since `readRawVarint32` expects the caller to do EOF-checking - so maybe we don't need a new method, and we should fix this for all callers of this method?

##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
##########
@@ -332,6 +333,23 @@ private static ArrowMessage frame(BufferAllocator allocator, final InputStream s
 
   }
 
+  /**
+   * Get first byte with EOF check, it is especially needed when using grpc compression.
+   * InflaterInputStream need another read to change reachEOF after all bytes has been read.
+   *
+   * @param is InputStream
+   * @return -1 if stream is not available, otherwise it will return the actual value.
+   * @throws IOException Read first byte failed.
+   */

Review comment:
       ```suggestion
     /**
      * Read a varint32 from the stream, checking for EOF.
      *
      * <p>When using gRPC compression, EOF may not be reported by the InflaterInputStream until another read has
      * been performed. This method checks {@link InputStream#available()} after reading the first byte in order
      * to handle this case.
      *
      * @param is InputStream
      * @return -1 if EOF reached, else the varint32 value.
      * @throws IOException if an error occurred while reading the stream.
      */
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org