You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/11 09:17:45 UTC

[GitHub] [beam] lukemin89 opened a new pull request #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write

lukemin89 opened a new pull request #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write
URL: https://github.com/apache/beam/pull/11397
 
 
   ReadableByteChannel/WritableByteChannel specifies that a single read/write request might be ignored, and read/write from 0 bytes to n bytes, where n is buffer.remaining(), even if there's more contents/space left in the channel. 
   
   I recently encountered this issue when trying to switch from GZIP compression to ZSTD compression.
   
   Make it try harder to read/write header/footer/data. 
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ R: @lukecwik ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukemin89 commented on issue #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write

Posted by GitBox <gi...@apache.org>.
lukemin89 commented on issue #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write
URL: https://github.com/apache/beam/pull/11397#issuecomment-613152729
 
 
   R: @lukecwik 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukemin89 commented on a change in pull request #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write

Posted by GitBox <gi...@apache.org>.
lukemin89 commented on a change in pull request #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write
URL: https://github.com/apache/beam/pull/11397#discussion_r409952123
 
 

 ##########
 File path: sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java
 ##########
 @@ -440,4 +456,115 @@ public void processElement(ProcessContext c) {
       c.output(c.element().getBytes(Charsets.UTF_8));
     }
   }
+
+  static boolean maybeThisTime() {
+    return ThreadLocalRandom.current().nextBoolean();
+  }
+
+  static class PickyReadChannel extends FilterInputStream implements ReadableByteChannel {
+    protected PickyReadChannel(InputStream in) {
+      super(in);
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int read(ByteBuffer dst) throws IOException {
+      if (!maybeThisTime() || !dst.hasRemaining()) {
+        return 0;
+      }
+      int n = read();
+      if (n == -1) {
+        return -1;
+      }
+      dst.put((byte) n);
+      return 1;
+    }
+
+    @Override
+    public boolean isOpen() {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  static class PickyWriteChannel extends FilterOutputStream implements WritableByteChannel {
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    public PickyWriteChannel(OutputStream out) {
+      super(out);
+    }
+
+    @Override
+    public int write(ByteBuffer src) throws IOException {
+      if (!maybeThisTime() || !src.hasRemaining()) {
+        return 0;
+      }
+      write(src.get());
+      return 1;
+    }
+
+    @Override
+    public boolean isOpen() {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  @Test
+  public void testReadFully() throws IOException {
+    byte[] data = "Hello World".getBytes(StandardCharsets.UTF_8);
+    ReadableByteChannel chan = new PickyReadChannel(new ByteArrayInputStream(data));
+
+    ByteBuffer buffer = ByteBuffer.allocate(data.length);
+    TFRecordCodec.readFully(chan, buffer);
+
+    assertArrayEquals(data, buffer.array());
+  }
+
+  @Test(expected = IOException.class)
 
 Review comment:
   done! changed to use `ExpectedExcepton`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukemin89 commented on a change in pull request #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write

Posted by GitBox <gi...@apache.org>.
lukemin89 commented on a change in pull request #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write
URL: https://github.com/apache/beam/pull/11397#discussion_r407041130
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
 ##########
 @@ -669,34 +670,31 @@ public int recordLength(byte[] data) {
 
     public @Nullable byte[] read(ReadableByteChannel inChannel) throws IOException {
       header.clear();
-      int headerBytes = inChannel.read(header);
-      if (headerBytes <= 0) {
+      int firstRead = read(inChannel, header);
+      if (firstRead == 0) {
         return null;
       }
-      checkState(headerBytes == HEADER_LEN, "Not a valid TFRecord. Fewer than 12 bytes.");
 
       header.rewind();
-      long length = header.getLong();
-      long lengthHash = hashLong(length);
+      long length64 = header.getLong();
+      long lengthHash = hashLong(length64);
       int maskedCrc32OfLength = header.getInt();
       if (lengthHash != maskedCrc32OfLength) {
         throw new IOException(
             String.format(
                 "Mismatch of length mask when reading a record. Expected %d but received %d.",
                 maskedCrc32OfLength, lengthHash));
       }
-
-      ByteBuffer data = ByteBuffer.allocate((int) length);
-      while (data.hasRemaining() && inChannel.read(data) >= 0) {}
-      if (data.hasRemaining()) {
-        throw new IOException(
-            String.format(
-                "EOF while reading record of length %d. Read only %d bytes. Input might be truncated.",
-                length, data.position()));
+      int length = (int) length64;
+      if (length != length64) {
+        throw new IOException(String.format("length overflow %d", length64));
       }
 
 Review comment:
   minor, yet required check for integer overflow. I'm not sure if this is viable, but if I remember correctly, other languages can make/write more than 2GB. (Maybe java can do it with a direct buffer?)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukemin89 commented on a change in pull request #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write

Posted by GitBox <gi...@apache.org>.
lukemin89 commented on a change in pull request #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write
URL: https://github.com/apache/beam/pull/11397#discussion_r407042280
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
 ##########
 @@ -717,14 +715,38 @@ public void write(WritableByteChannel outChannel, byte[] data) throws IOExceptio
       header.clear();
       header.putLong(data.length).putInt(maskedCrc32OfLength);
       header.rewind();
-      outChannel.write(header);
+      writeFully(outChannel, header);
 
-      outChannel.write(ByteBuffer.wrap(data));
+      writeFully(outChannel, ByteBuffer.wrap(data));
 
       footer.clear();
       footer.putInt(maskedCrc32OfData);
       footer.rewind();
-      outChannel.write(footer);
+      writeFully(outChannel, footer);
+    }
+
+    @VisibleForTesting
+    static void readFully(ReadableByteChannel in, ByteBuffer bb) throws IOException {
+      int expected = bb.remaining();
+      int actual = read(in, bb);
+      if (expected != actual) {
+        throw new IOException(String.format("expected %d, but got %d", expected, expected));
+      }
+    }
+
+    private static int read(ReadableByteChannel in, ByteBuffer bb) throws IOException {
+      int n, read = 0;
+      while (bb.hasRemaining() && (n = in.read(bb)) >= 0) {
+        read += n;
+      }
+      return read;
+    }
+
+    @VisibleForTesting
+    static void writeFully(WritableByteChannel channel, ByteBuffer buffer) throws IOException {
+      while (buffer.hasRemaining()) {
+        channel.write(buffer);
+      }
 
 Review comment:
   Or I can make the loop condition stricter since I am pretty sure, within beam, all channels will read/write at least 1 byte.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukemin89 commented on a change in pull request #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write

Posted by GitBox <gi...@apache.org>.
lukemin89 commented on a change in pull request #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write
URL: https://github.com/apache/beam/pull/11397#discussion_r407041552
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
 ##########
 @@ -717,14 +715,38 @@ public void write(WritableByteChannel outChannel, byte[] data) throws IOExceptio
       header.clear();
       header.putLong(data.length).putInt(maskedCrc32OfLength);
       header.rewind();
-      outChannel.write(header);
+      writeFully(outChannel, header);
 
-      outChannel.write(ByteBuffer.wrap(data));
+      writeFully(outChannel, ByteBuffer.wrap(data));
 
       footer.clear();
       footer.putInt(maskedCrc32OfData);
       footer.rewind();
-      outChannel.write(footer);
+      writeFully(outChannel, footer);
+    }
+
+    @VisibleForTesting
+    static void readFully(ReadableByteChannel in, ByteBuffer bb) throws IOException {
+      int expected = bb.remaining();
+      int actual = read(in, bb);
+      if (expected != actual) {
+        throw new IOException(String.format("expected %d, but got %d", expected, expected));
+      }
+    }
+
+    private static int read(ReadableByteChannel in, ByteBuffer bb) throws IOException {
+      int n, read = 0;
+      while (bb.hasRemaining() && (n = in.read(bb)) >= 0) {
+        read += n;
+      }
+      return read;
+    }
+
+    @VisibleForTesting
+    static void writeFully(WritableByteChannel channel, ByteBuffer buffer) throws IOException {
+      while (buffer.hasRemaining()) {
+        channel.write(buffer);
+      }
 
 Review comment:
   I'm not sure if I can/should make these better.
   If the channel does follow Javadoc description and keeps returning 0 without throwing,
   it might have an infinite loop.
   That might be channel's problem, but not sure if I have to add something like `retry` or hard limit.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukemin89 opened a new pull request #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write

Posted by GitBox <gi...@apache.org>.
lukemin89 opened a new pull request #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write
URL: https://github.com/apache/beam/pull/11397
 
 
   ReadableByteChannel/WritableByteChannel specifies that a single read/write request might be ignored, and read/write from 0 bytes to n bytes, where n is buffer.remaining(), even if there's more contents/space left in the channel. 
   
   A similar issue has been pointed out and dealt with once. https://issues.apache.org/jira/browse/BEAM-5412?jql=text%20~%20%22tfrecord%22
   
   But the same issue can happen for `header` and `footer` as well, and when writing data.
   
   I recently encountered this issue when trying to switch from GZIP compression to ZSTD compression.
   
   Make it try harder to read/write header/footer/data. 
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ R: @lukecwik ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write
URL: https://github.com/apache/beam/pull/11397#discussion_r409002145
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
 ##########
 @@ -717,14 +716,38 @@ public void write(WritableByteChannel outChannel, byte[] data) throws IOExceptio
       header.clear();
       header.putLong(data.length).putInt(maskedCrc32OfLength);
       header.rewind();
-      outChannel.write(header);
+      writeFully(outChannel, header);
 
-      outChannel.write(ByteBuffer.wrap(data));
+      writeFully(outChannel, ByteBuffer.wrap(data));
 
       footer.clear();
       footer.putInt(maskedCrc32OfData);
       footer.rewind();
-      outChannel.write(footer);
+      writeFully(outChannel, footer);
+    }
+
+    @VisibleForTesting
+    static void readFully(ReadableByteChannel in, ByteBuffer bb) throws IOException {
+      int expected = bb.remaining();
+      int actual = read(in, bb);
+      if (expected != actual) {
+        throw new IOException(String.format("expected %d, but got %d", expected, actual));
+      }
+    }
+
+    private static int read(ReadableByteChannel in, ByteBuffer bb) throws IOException {
+      int n, read = 0;
+      while (bb.hasRemaining() && (n = in.read(bb)) >= 0) {
+        read += n;
+      }
+      return read;
+    }
 
 Review comment:
   ```suggestion
       private static int read(ReadableByteChannel in, ByteBuffer bb) throws IOException {
         int expected = bb.remaining();
         while (bb.hasRemaining() && in.read(bb) >= 0) {
         }
         return expected - bb.remaining();
       }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write
URL: https://github.com/apache/beam/pull/11397#discussion_r408996520
 
 

 ##########
 File path: sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java
 ##########
 @@ -440,4 +456,115 @@ public void processElement(ProcessContext c) {
       c.output(c.element().getBytes(Charsets.UTF_8));
     }
   }
+
+  static boolean maybeThisTime() {
+    return ThreadLocalRandom.current().nextBoolean();
+  }
+
+  static class PickyReadChannel extends FilterInputStream implements ReadableByteChannel {
+    protected PickyReadChannel(InputStream in) {
+      super(in);
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int read(ByteBuffer dst) throws IOException {
+      if (!maybeThisTime() || !dst.hasRemaining()) {
+        return 0;
+      }
+      int n = read();
+      if (n == -1) {
+        return -1;
+      }
+      dst.put((byte) n);
+      return 1;
+    }
+
+    @Override
+    public boolean isOpen() {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  static class PickyWriteChannel extends FilterOutputStream implements WritableByteChannel {
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    public PickyWriteChannel(OutputStream out) {
+      super(out);
+    }
+
+    @Override
+    public int write(ByteBuffer src) throws IOException {
+      if (!maybeThisTime() || !src.hasRemaining()) {
+        return 0;
+      }
+      write(src.get());
+      return 1;
+    }
+
+    @Override
+    public boolean isOpen() {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  @Test
+  public void testReadFully() throws IOException {
+    byte[] data = "Hello World".getBytes(StandardCharsets.UTF_8);
+    ReadableByteChannel chan = new PickyReadChannel(new ByteArrayInputStream(data));
+
+    ByteBuffer buffer = ByteBuffer.allocate(data.length);
+    TFRecordCodec.readFully(chan, buffer);
+
+    assertArrayEquals(data, buffer.array());
+  }
+
+  @Test(expected = IOException.class)
 
 Review comment:
   You want to ensure that the exception is because of having a truncated read so it is useful to check that the message contains a part of what you think it should have such as `expected` and `but got`. Check out https://junit.org/junit4/javadoc/4.12/org/junit/rules/ExpectedException.html or anything like it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukemin89 commented on a change in pull request #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write

Posted by GitBox <gi...@apache.org>.
lukemin89 commented on a change in pull request #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write
URL: https://github.com/apache/beam/pull/11397#discussion_r407042280
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
 ##########
 @@ -717,14 +715,38 @@ public void write(WritableByteChannel outChannel, byte[] data) throws IOExceptio
       header.clear();
       header.putLong(data.length).putInt(maskedCrc32OfLength);
       header.rewind();
-      outChannel.write(header);
+      writeFully(outChannel, header);
 
-      outChannel.write(ByteBuffer.wrap(data));
+      writeFully(outChannel, ByteBuffer.wrap(data));
 
       footer.clear();
       footer.putInt(maskedCrc32OfData);
       footer.rewind();
-      outChannel.write(footer);
+      writeFully(outChannel, footer);
+    }
+
+    @VisibleForTesting
+    static void readFully(ReadableByteChannel in, ByteBuffer bb) throws IOException {
+      int expected = bb.remaining();
+      int actual = read(in, bb);
+      if (expected != actual) {
+        throw new IOException(String.format("expected %d, but got %d", expected, expected));
+      }
+    }
+
+    private static int read(ReadableByteChannel in, ByteBuffer bb) throws IOException {
+      int n, read = 0;
+      while (bb.hasRemaining() && (n = in.read(bb)) >= 0) {
+        read += n;
+      }
+      return read;
+    }
+
+    @VisibleForTesting
+    static void writeFully(WritableByteChannel channel, ByteBuffer buffer) throws IOException {
+      while (buffer.hasRemaining()) {
+        channel.write(buffer);
+      }
 
 Review comment:
   Or I can make the loop condition stricter since I am pretty sure, within beam, they will read/write at least 1 byte.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write
URL: https://github.com/apache/beam/pull/11397#discussion_r408989532
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
 ##########
 @@ -717,14 +715,38 @@ public void write(WritableByteChannel outChannel, byte[] data) throws IOExceptio
       header.clear();
       header.putLong(data.length).putInt(maskedCrc32OfLength);
       header.rewind();
-      outChannel.write(header);
+      writeFully(outChannel, header);
 
-      outChannel.write(ByteBuffer.wrap(data));
+      writeFully(outChannel, ByteBuffer.wrap(data));
 
       footer.clear();
       footer.putInt(maskedCrc32OfData);
       footer.rewind();
-      outChannel.write(footer);
+      writeFully(outChannel, footer);
+    }
+
+    @VisibleForTesting
+    static void readFully(ReadableByteChannel in, ByteBuffer bb) throws IOException {
+      int expected = bb.remaining();
+      int actual = read(in, bb);
+      if (expected != actual) {
+        throw new IOException(String.format("expected %d, but got %d", expected, expected));
+      }
+    }
+
+    private static int read(ReadableByteChannel in, ByteBuffer bb) throws IOException {
+      int n, read = 0;
+      while (bb.hasRemaining() && (n = in.read(bb)) >= 0) {
+        read += n;
+      }
+      return read;
+    }
+
+    @VisibleForTesting
+    static void writeFully(WritableByteChannel channel, ByteBuffer buffer) throws IOException {
+      while (buffer.hasRemaining()) {
+        channel.write(buffer);
+      }
 
 Review comment:
   This will only become a busy loop on async channels but will still make progress successfully. If this becomes an issue, the code can be expanded to correctly handle the busy loop.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukemin89 closed pull request #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write

Posted by GitBox <gi...@apache.org>.
lukemin89 closed pull request #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write
URL: https://github.com/apache/beam/pull/11397
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on issue #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write
URL: https://github.com/apache/beam/pull/11397#issuecomment-615005288
 
 
   retest this please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukemin89 commented on a change in pull request #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write

Posted by GitBox <gi...@apache.org>.
lukemin89 commented on a change in pull request #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write
URL: https://github.com/apache/beam/pull/11397#discussion_r409952021
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
 ##########
 @@ -717,14 +716,38 @@ public void write(WritableByteChannel outChannel, byte[] data) throws IOExceptio
       header.clear();
       header.putLong(data.length).putInt(maskedCrc32OfLength);
       header.rewind();
-      outChannel.write(header);
+      writeFully(outChannel, header);
 
-      outChannel.write(ByteBuffer.wrap(data));
+      writeFully(outChannel, ByteBuffer.wrap(data));
 
       footer.clear();
       footer.putInt(maskedCrc32OfData);
       footer.rewind();
-      outChannel.write(footer);
+      writeFully(outChannel, footer);
+    }
+
+    @VisibleForTesting
+    static void readFully(ReadableByteChannel in, ByteBuffer bb) throws IOException {
+      int expected = bb.remaining();
+      int actual = read(in, bb);
+      if (expected != actual) {
+        throw new IOException(String.format("expected %d, but got %d", expected, actual));
+      }
+    }
+
+    private static int read(ReadableByteChannel in, ByteBuffer bb) throws IOException {
+      int n, read = 0;
+      while (bb.hasRemaining() && (n = in.read(bb)) >= 0) {
+        read += n;
+      }
+      return read;
+    }
 
 Review comment:
   Done! (`spotlessApply` forced me to change bracket loc) 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukemin89 commented on a change in pull request #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write

Posted by GitBox <gi...@apache.org>.
lukemin89 commented on a change in pull request #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write
URL: https://github.com/apache/beam/pull/11397#discussion_r407041552
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
 ##########
 @@ -717,14 +715,38 @@ public void write(WritableByteChannel outChannel, byte[] data) throws IOExceptio
       header.clear();
       header.putLong(data.length).putInt(maskedCrc32OfLength);
       header.rewind();
-      outChannel.write(header);
+      writeFully(outChannel, header);
 
-      outChannel.write(ByteBuffer.wrap(data));
+      writeFully(outChannel, ByteBuffer.wrap(data));
 
       footer.clear();
       footer.putInt(maskedCrc32OfData);
       footer.rewind();
-      outChannel.write(footer);
+      writeFully(outChannel, footer);
+    }
+
+    @VisibleForTesting
+    static void readFully(ReadableByteChannel in, ByteBuffer bb) throws IOException {
+      int expected = bb.remaining();
+      int actual = read(in, bb);
+      if (expected != actual) {
+        throw new IOException(String.format("expected %d, but got %d", expected, expected));
+      }
+    }
+
+    private static int read(ReadableByteChannel in, ByteBuffer bb) throws IOException {
+      int n, read = 0;
+      while (bb.hasRemaining() && (n = in.read(bb)) >= 0) {
+        read += n;
+      }
+      return read;
+    }
+
+    @VisibleForTesting
+    static void writeFully(WritableByteChannel channel, ByteBuffer buffer) throws IOException {
+      while (buffer.hasRemaining()) {
+        channel.write(buffer);
+      }
 
 Review comment:
   I'm not sure if I can/should make these better.
   If the channel does not follow Javadoc description and keeps returning 0 without throwing,
   it might have an infinite loop.
   That might be channel's problem, but not sure if I have to add something like hard limit on the number of retry.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukemin89 commented on a change in pull request #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write

Posted by GitBox <gi...@apache.org>.
lukemin89 commented on a change in pull request #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write
URL: https://github.com/apache/beam/pull/11397#discussion_r407041130
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
 ##########
 @@ -669,34 +670,31 @@ public int recordLength(byte[] data) {
 
     public @Nullable byte[] read(ReadableByteChannel inChannel) throws IOException {
       header.clear();
-      int headerBytes = inChannel.read(header);
-      if (headerBytes <= 0) {
+      int firstRead = read(inChannel, header);
+      if (firstRead == 0) {
         return null;
       }
-      checkState(headerBytes == HEADER_LEN, "Not a valid TFRecord. Fewer than 12 bytes.");
 
       header.rewind();
-      long length = header.getLong();
-      long lengthHash = hashLong(length);
+      long length64 = header.getLong();
+      long lengthHash = hashLong(length64);
       int maskedCrc32OfLength = header.getInt();
       if (lengthHash != maskedCrc32OfLength) {
         throw new IOException(
             String.format(
                 "Mismatch of length mask when reading a record. Expected %d but received %d.",
                 maskedCrc32OfLength, lengthHash));
       }
-
-      ByteBuffer data = ByteBuffer.allocate((int) length);
-      while (data.hasRemaining() && inChannel.read(data) >= 0) {}
-      if (data.hasRemaining()) {
-        throw new IOException(
-            String.format(
-                "EOF while reading record of length %d. Read only %d bytes. Input might be truncated.",
-                length, data.position()));
+      int length = (int) length64;
+      if (length != length64) {
+        throw new IOException(String.format("length overflow %d", length64));
       }
 
 Review comment:
   minor, yet required check for integer overflow. I'm not sure if this is viable, but if I remember correctly, other languages can make/write more than 2GB. (Maybe java can do it with a direct buffer?)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukemin89 commented on a change in pull request #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write

Posted by GitBox <gi...@apache.org>.
lukemin89 commented on a change in pull request #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write
URL: https://github.com/apache/beam/pull/11397#discussion_r407041552
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
 ##########
 @@ -717,14 +715,38 @@ public void write(WritableByteChannel outChannel, byte[] data) throws IOExceptio
       header.clear();
       header.putLong(data.length).putInt(maskedCrc32OfLength);
       header.rewind();
-      outChannel.write(header);
+      writeFully(outChannel, header);
 
-      outChannel.write(ByteBuffer.wrap(data));
+      writeFully(outChannel, ByteBuffer.wrap(data));
 
       footer.clear();
       footer.putInt(maskedCrc32OfData);
       footer.rewind();
-      outChannel.write(footer);
+      writeFully(outChannel, footer);
+    }
+
+    @VisibleForTesting
+    static void readFully(ReadableByteChannel in, ByteBuffer bb) throws IOException {
+      int expected = bb.remaining();
+      int actual = read(in, bb);
+      if (expected != actual) {
+        throw new IOException(String.format("expected %d, but got %d", expected, expected));
+      }
+    }
+
+    private static int read(ReadableByteChannel in, ByteBuffer bb) throws IOException {
+      int n, read = 0;
+      while (bb.hasRemaining() && (n = in.read(bb)) >= 0) {
+        read += n;
+      }
+      return read;
+    }
+
+    @VisibleForTesting
+    static void writeFully(WritableByteChannel channel, ByteBuffer buffer) throws IOException {
+      while (buffer.hasRemaining()) {
+        channel.write(buffer);
+      }
 
 Review comment:
   I'm not sure if I can/should make these better.
   If the channel does follow Javadoc description and keeps returning 0 without throwing,
   it might have an infinite loop.
   That might be channel's problem, but not sure if I have to add something like hard limit on the number of retry.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukemin89 commented on a change in pull request #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write

Posted by GitBox <gi...@apache.org>.
lukemin89 commented on a change in pull request #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write
URL: https://github.com/apache/beam/pull/11397#discussion_r409952691
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
 ##########
 @@ -717,14 +715,38 @@ public void write(WritableByteChannel outChannel, byte[] data) throws IOExceptio
       header.clear();
       header.putLong(data.length).putInt(maskedCrc32OfLength);
       header.rewind();
-      outChannel.write(header);
+      writeFully(outChannel, header);
 
-      outChannel.write(ByteBuffer.wrap(data));
+      writeFully(outChannel, ByteBuffer.wrap(data));
 
       footer.clear();
       footer.putInt(maskedCrc32OfData);
       footer.rewind();
-      outChannel.write(footer);
+      writeFully(outChannel, footer);
+    }
+
+    @VisibleForTesting
+    static void readFully(ReadableByteChannel in, ByteBuffer bb) throws IOException {
+      int expected = bb.remaining();
+      int actual = read(in, bb);
+      if (expected != actual) {
+        throw new IOException(String.format("expected %d, but got %d", expected, expected));
+      }
+    }
+
+    private static int read(ReadableByteChannel in, ByteBuffer bb) throws IOException {
+      int n, read = 0;
+      while (bb.hasRemaining() && (n = in.read(bb)) >= 0) {
+        read += n;
+      }
+      return read;
+    }
+
+    @VisibleForTesting
+    static void writeFully(WritableByteChannel channel, ByteBuffer buffer) throws IOException {
+      while (buffer.hasRemaining()) {
+        channel.write(buffer);
+      }
 
 Review comment:
   Thanks for the confirmation :)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik merged pull request #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write

Posted by GitBox <gi...@apache.org>.
lukecwik merged pull request #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write
URL: https://github.com/apache/beam/pull/11397
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukemin89 commented on a change in pull request #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write

Posted by GitBox <gi...@apache.org>.
lukemin89 commented on a change in pull request #11397: [BEAM-9743] Fix TFRecordCodec to try harder to read/write
URL: https://github.com/apache/beam/pull/11397#discussion_r407041552
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
 ##########
 @@ -717,14 +715,38 @@ public void write(WritableByteChannel outChannel, byte[] data) throws IOExceptio
       header.clear();
       header.putLong(data.length).putInt(maskedCrc32OfLength);
       header.rewind();
-      outChannel.write(header);
+      writeFully(outChannel, header);
 
-      outChannel.write(ByteBuffer.wrap(data));
+      writeFully(outChannel, ByteBuffer.wrap(data));
 
       footer.clear();
       footer.putInt(maskedCrc32OfData);
       footer.rewind();
-      outChannel.write(footer);
+      writeFully(outChannel, footer);
+    }
+
+    @VisibleForTesting
+    static void readFully(ReadableByteChannel in, ByteBuffer bb) throws IOException {
+      int expected = bb.remaining();
+      int actual = read(in, bb);
+      if (expected != actual) {
+        throw new IOException(String.format("expected %d, but got %d", expected, expected));
+      }
+    }
+
+    private static int read(ReadableByteChannel in, ByteBuffer bb) throws IOException {
+      int n, read = 0;
+      while (bb.hasRemaining() && (n = in.read(bb)) >= 0) {
+        read += n;
+      }
+      return read;
+    }
+
+    @VisibleForTesting
+    static void writeFully(WritableByteChannel channel, ByteBuffer buffer) throws IOException {
+      while (buffer.hasRemaining()) {
+        channel.write(buffer);
+      }
 
 Review comment:
   I'm not sure if I can/should make these better.
   If the channel does follow Javadoc description and keeps returning 0 without throwing,
   it might have an infinite loop.
   That might be channel's problem, but not sure if I have to add something like hard limit on the number of  retry.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services