You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sis.apache.org by de...@apache.org on 2022/12/24 17:56:36 UTC

[sis] branch geoapi-4.0 updated: Bug fixes (EOFException and bad content caused by bad position). Also fix a bug that prevented `HttpByteChannel` to effectively use "HTTP Range".

This is an automated email from the ASF dual-hosted git repository.

desruisseaux pushed a commit to branch geoapi-4.0
in repository https://gitbox.apache.org/repos/asf/sis.git


The following commit(s) were added to refs/heads/geoapi-4.0 by this push:
     new 4ff2a5e381 Bug fixes (EOFException and bad content caused by bad position). Also fix a bug that prevented `HttpByteChannel` to effectively use "HTTP Range".
4ff2a5e381 is described below

commit 4ff2a5e381af7166520fdbe4c36bf6696b68b830
Author: Martin Desruisseaux <ma...@geomatys.com>
AuthorDate: Sat Dec 24 15:32:15 2022 +0100

    Bug fixes (EOFException and bad content caused by bad position).
    Also fix a bug that prevented `HttpByteChannel` to effectively use "HTTP Range".
---
 .../apache/sis/cloud/aws/s3/CachedByteChannel.java |  26 ++--
 .../internal/storage/io/FileCacheByteChannel.java  | 142 +++++++++++++--------
 .../sis/internal/storage/io/HttpByteChannel.java   |  21 ++-
 .../storage/io/FileCacheByteChannelTest.java       |  61 +++++++--
 4 files changed, 169 insertions(+), 81 deletions(-)

diff --git a/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/CachedByteChannel.java b/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/CachedByteChannel.java
index 4bb3ae9cf1..b364cb92f1 100644
--- a/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/CachedByteChannel.java
+++ b/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/CachedByteChannel.java
@@ -72,8 +72,6 @@ final class CachedByteChannel extends FileCacheByteChannel {
     @Override
     protected Connection openConnection(final long start, final long end) throws IOException {
         final ResponseInputStream<GetObjectResponse> stream;
-        final String contentRange, acceptRanges;
-        final Long contentLength;
         try {
             GetObjectRequest.Builder builder = GetObjectRequest.builder().bucket(path.bucket).key(path.key);
             final String range = Connection.formatRange(start, end);
@@ -82,19 +80,23 @@ final class CachedByteChannel extends FileCacheByteChannel {
             }
             stream = path.fs.client().getObject(builder.build());
             final GetObjectResponse response = stream.response();
-            contentLength = response.contentLength();
-            contentRange  = response.contentRange();
-            acceptRanges  = response.acceptRanges();
+            final String contentRange = response.contentRange();
+            final String acceptRanges = response.acceptRanges();
+            final List<String> rangeUnits = (acceptRanges != null) ? List.of(acceptRanges) : List.of();
+            try {
+                if (contentRange == null) {
+                    final Long contentLength = response.contentLength();
+                    final long length = (contentLength != null) ? contentLength : -1;
+                    return new Connection(stream, length, rangeUnits);
+                } else {
+                    return new Connection(stream, contentRange, rangeUnits);
+                }
+            } catch (IllegalArgumentException e) {
+                throw new IOException(e);
+            }
         } catch (SdkException e) {
             throw FileService.failure(path, e);
         }
-        final List<String> rangeUnits = (acceptRanges != null) ? List.of(acceptRanges) : List.of();
-        final long length = (contentLength != null) ? contentLength : -1;
-        try {
-            return new Connection(stream, contentRange, length, rangeUnits);
-        } catch (IllegalArgumentException e) {
-            throw new IOException(e);
-        }
     }
 
     /**
diff --git a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/FileCacheByteChannel.java b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/FileCacheByteChannel.java
index 092fd52bc0..51c0ddccca 100644
--- a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/FileCacheByteChannel.java
+++ b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/FileCacheByteChannel.java
@@ -95,7 +95,7 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
         /** Position of the last byte read by the input stream (inclusive). */
         final long end;
 
-        /** Total length of the stream, or -1 is unknown. */
+        /** Number of bytes in the full stream, or -1 is unknown. */
         final long length;
 
         /** Whether connection can be created for ranges of bytes. */
@@ -104,56 +104,71 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
         /**
          * Creates information about a connection.
          *
-         * @param input          the input stream for reading the bytes.
-         * @param start          position of the first byte read by the input stream (inclusive).
-         * @param end            position of the last byte read by the input stream (inclusive).
-         * @param contentLength  total length of the stream, or -1 if unknown.
-         * @param acceptRanges   whether connection can be created for ranges of bytes.
+         * @param input         the input stream for reading the bytes.
+         * @param start         position of the first byte read by the input stream (inclusive).
+         * @param end           position of the last byte read by the input stream (inclusive).
+         * @param length        length of the full stream (not the content length), or -1 if unknown.
+         * @param acceptRanges  whether connection can be created for ranges of bytes.
          *
          * @see #openConnection(long, long)
          */
-        public Connection(final InputStream input, final long start, final long end, final long contentLength, final boolean acceptRanges) {
+        public Connection(final InputStream input, final long start, final long end, final long length, final boolean acceptRanges) {
             this.input  = input;
             this.start  = start;
             this.end    = end;
-            this.length = contentLength;
+            this.length = length;
             this.acceptRanges = acceptRanges;
         }
 
         /**
-         * Creates information about a connection by parsing HTTP header.
-         * Example: "Content-Range: bytes 25000-75000/100000".
+         * Creates information about a connection by parsing HTTP header without content range.
+         * The "Content-Length" header value is useful to this class only if the connection was
+         * opened for the full file.
+         *
+         * @param  input          the input stream for reading the bytes.
+         * @param  contentLength  length of the response content, or -1 if unknown.
+         * @param  acceptRanges   value of "Accept-Ranges" in HTTP header.
+         * @throws IllegalArgumentException if the start, end or length cannot be parsed.
+         */
+        public Connection(final InputStream input, final long contentLength, final Iterable<String> acceptRanges) {
+            this.input  = input;
+            this.start  = 0;
+            this.end    = (contentLength > 0) ? contentLength - 1 : Long.MAX_VALUE;
+            this.length = contentLength;
+            this.acceptRanges = acceptRanges(acceptRanges);
+        }
+
+        /**
+         * Creates information about a connection by parsing HTTP header with content range.
+         * Note that the "Content-Length" header value is not useful when a range is specified
+         * because the content length is not the full length of the file.
+         *
+         * <p>Example of content range value: {@code "Content-Range: bytes 25000-75000/100000"}.</p>
          *
          * @param  input          the input stream for reading the bytes.
          * @param  contentRange   value of "Content-Range" in HTTP header, or {@code null} if none.
          * @param  acceptRanges   value of "Accept-Ranges" in HTTP header.
-         * @param  contentLength  total length of the stream, or -1 if unknown.
          * @throws IllegalArgumentException if the start, end or length cannot be parsed.
          */
-        public Connection(final InputStream input, String contentRange, long contentLength, final Iterable<String> acceptRanges) {
+        public Connection(final InputStream input, String contentRange, final Iterable<String> acceptRanges) {
             this.input = input;
-            if (contentRange == null) {
-                start  = 0;
-                end    = (contentLength > 0) ? contentLength - 1 : Long.MAX_VALUE;
-                length = contentLength;
-            } else {
-                contentRange = contentRange.trim();
-                int s = contentRange.indexOf(' ');
-                if (s >= 0 && (s != RANGES_UNIT.length() || !contentRange.regionMatches(true, 0, RANGES_UNIT, 0, s))) {
-                    throw new IllegalArgumentException(Errors.format(Errors.Keys.UnsupportedArgumentValue_1, contentRange));
-                }
-                int rs = contentRange.indexOf('-', ++s);                    // Index of range separator.
-                int ls = contentRange.indexOf('/', Math.max(s, rs+1));      // Index of length separator.
-                if (contentLength < 0 && ls >= 0) {
-                    final String t = contentRange.substring(ls+1).trim();
-                    if (!t.equals("*")) contentLength = Long.parseLong(t);
-                }
-                length = contentLength;
-                if (ls < 0) ls = contentRange.length();
-                if (rs < 0) rs = ls;
-                start = Long.parseLong(contentRange.substring(s, rs).trim());
-                end = (rs < ls) ? Long.parseLong(contentRange.substring(rs+1, ls).trim()) : length;
+            long contentLength = -1;
+            contentRange = contentRange.trim();
+            int s = contentRange.indexOf(' ');
+            if (s >= 0 && (s != RANGES_UNIT.length() || !contentRange.regionMatches(true, 0, RANGES_UNIT, 0, s))) {
+                throw new IllegalArgumentException(Errors.format(Errors.Keys.UnsupportedArgumentValue_1, contentRange));
             }
+            int rs = contentRange.indexOf('-', ++s);                    // Index of range separator.
+            int ls = contentRange.indexOf('/', Math.max(s, rs+1));      // Index of length separator.
+            if (ls >= 0) {
+                final String t = contentRange.substring(ls+1).trim();
+                if (!t.equals("*")) contentLength = Long.parseLong(t);
+            }
+            length = contentLength;
+            if (ls < 0) ls = contentRange.length();
+            if (rs < 0) rs = ls;
+            start = Long.parseLong(contentRange.substring(s, rs).trim());
+            end = (rs < ls) ? Long.parseLong(contentRange.substring(rs+1, ls).trim()) : length;
             this.acceptRanges = acceptRanges(acceptRanges);
         }
 
@@ -260,9 +275,11 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
     private final RangeSet<Long> rangesOfAvailableBytes;
 
     /**
-     * Number of bytes in the full stream, or 0 if not yet computed.
+     * Number of bytes in the full stream, or -1 if not yet computed.
+     * It will be set to {@link Connection#length} when a connection is established,
+     * and updated for every new connection in case the value change.
      */
-    private long length;
+    private long length = -1;
 
     /**
      * Creates a new channel which will cache bytes in a temporary file.
@@ -333,6 +350,14 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
      */
     @Override
     public synchronized long size() throws IOException {
+        if (length < 0) {
+            if (connection == null) {
+                openConnection();
+            }
+            if (length < 0) {
+                throw new IOException(Errors.format(Errors.Keys.Uninitialized_1, "size"));
+            }
+        }
         return length;
     }
 
@@ -356,7 +381,11 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
      */
     @Override
     public synchronized SeekableByteChannel position(final long newPosition) throws IOException {
-        ArgumentChecks.ensurePositive("newPosition", newPosition);
+        if (length > 0) {
+            ArgumentChecks.ensureBetween("newPosition", 0, length-1, newPosition);
+        } else {
+            ArgumentChecks.ensurePositive("newPosition", newPosition);
+        }
         position = newPosition;
         if (endOfInterest - newPosition < SKIP_THRESHOLD) {
             endOfInterest = 0;      // Read until end of stream.
@@ -414,6 +443,9 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
      * or when the number of bytes to skip is too small for being worth to create a new connection.
      * This method may skip less bytes than requested. The skipped bytes are saved in the cache.
      *
+     * <p>The {@link #position} field (the channel position) is not modified by this method.
+     * This method is invoked when input position needs to become equal to the channel position.</p>
+     *
      * @param  count  number of bytes to skip.
      * @return remaining number of bytes to skip after this method execution.
      * @throws IOException if an I/O error occurred.
@@ -434,9 +466,10 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
                     if (n != 0 || (n = input.read()) < 0) {     // Block until we get one byte.
                         break;                                  // End of stream, but maybe it was a sub-range.
                     }
-                    buffer.put((byte) n);
+                    buffer.put(0, (byte) n);                    // Do not increment buffer position.
                     n = 1;
                 }
+                assert buffer.position() == 0;
                 cache(buffer.limit(n));
                 count -= n;
             } while (count > 0);
@@ -470,10 +503,10 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
          */
         Connection c = connection;
         long offset = position - file.position();
-        if (offset != 0 && c != null) {
+        if (c != null) {
             if ((offset < 0 || (c.acceptRanges && (offset >= SKIP_THRESHOLD || position > c.end)))) {
                 offset -= drainAndAbort();
-                c = connection;
+                c = connection;                 // May become null as a result of `drainAndAbort()`.
             }
         }
         /*
@@ -488,13 +521,14 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
         }
         offset = skipInInput(offset);
         if (offset != 0) {
-            count = readFromCache(dst);
+            count = readFromCache(dst);         // In case `skipInInput(…)` has read more bytes than desired.
             usedConnection();
             if (count >= 0) {
                 return count;
             }
             throw new EOFException(Errors.format(Errors.Keys.ValueOutOfRange_4, "position", 0, length, position));
         }
+        assert file.position() == position;
         /*
          * Get a buffer that we can use with `InputStream.read(byte[])`.
          * It must be a buffer backed by a Java array.
@@ -507,29 +541,28 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
             buffer.clear().limit(dst.remaining());
         }
         /*
-         * Transfer bytes from the input stream to the buffer.
-         * The bytes are also copied to the temporary file.
+         * Transfer bytes from the input stream to the buffer. The bytes are also copied to the temporary file.
+         * We try to use `dst` instead of `buffer` in call to `cache(…)` because the former may be a direct buffer.
          */
-        final int limit = buffer.limit();
-        final int start = buffer.position();
-        count = c.input.read(buffer.array(), Math.addExact(buffer.arrayOffset(), start), buffer.remaining());
+        final ByteBuffer slice = dst.slice();
+        count = c.input.read(buffer.array(), Math.addExact(buffer.arrayOffset(), buffer.position()), buffer.remaining());
         if (count > 0) {
-            try {
-                cache(buffer.limit(start + count));
-            } finally {
-                buffer.limit(limit);
-            }
+            position += count;
             if (buffer != dst) {
-                dst.put(buffer.flip());             // Transfer from temporary buffer to destination buffer.
+                dst.put(buffer.limit(count));               // Transfer from temporary buffer to destination buffer.
+            } else {
+                dst.position(dst.position() + count);
             }
-            position += count;
+            cache(slice.limit(count));
         }
         usedConnection();
         return count;
     }
 
     /**
-     * Attempts to read up to bytes from the cache.
+     * Attempts to read up to <i>r</i> bytes from the cache.
+     * This method does not use the connection (it may be null).
+     * The {@link #position} field is updated by the amount of bytes read.
      *
      * @param  dst  the buffer where to store the bytes that are read.
      * @return number of bytes read, or -1 if the cache does not contain the requested range of bytes.
@@ -547,10 +580,11 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
         final int count;
         try {
             count = file.read(dst.limit(end), position);
-            position += count;
+            if (count >= 0) position += count;
         } finally {
             dst.limit(limit);
         }
+        assert dst.position() == start + count;
         return count;
     }
 
diff --git a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/HttpByteChannel.java b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/HttpByteChannel.java
index fad086563a..3329a1865e 100644
--- a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/HttpByteChannel.java
+++ b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/HttpByteChannel.java
@@ -121,10 +121,27 @@ final class HttpByteChannel extends FileCacheByteChannel {
         range = headers.firstValue("Content-Range").orElse(null);
         final List<String> rangeUnits = headers.allValues("Accept-Ranges");
         try {
-            final long length = headers.firstValueAsLong("Content-Length").orElse(-1);
-            return new Connection(stream, range, length, rangeUnits);
+            if (range == null) {
+                final long length = headers.firstValueAsLong("Content-Length").orElse(-1);
+                return new Connection(stream, length, rangeUnits);
+            } else {
+                return new Connection(stream, range, rangeUnits);
+            }
         } catch (IllegalArgumentException e) {
             throw new IOException(e);
         }
     }
+
+    /**
+     * Invoked when this channel is no longer interested in reading bytes from the specified stream.
+     *
+     * @param  input  the input stream to eventually close.
+     * @return whether the given input stream has been closed by this method.
+     * @throws IOException if an error occurred while closing the stream or preparing for next read operations.
+     */
+    @Override
+    protected boolean abort(final InputStream input) throws IOException {
+        input.close();
+        return true;
+    }
 }
diff --git a/storage/sis-storage/src/test/java/org/apache/sis/internal/storage/io/FileCacheByteChannelTest.java b/storage/sis-storage/src/test/java/org/apache/sis/internal/storage/io/FileCacheByteChannelTest.java
index 6c633f5768..ca4e4bf0b1 100644
--- a/storage/sis-storage/src/test/java/org/apache/sis/internal/storage/io/FileCacheByteChannelTest.java
+++ b/storage/sis-storage/src/test/java/org/apache/sis/internal/storage/io/FileCacheByteChannelTest.java
@@ -16,12 +16,13 @@
  */
 package org.apache.sis.internal.storage.io;
 
+import java.util.List;
 import java.util.Random;
 import java.util.OptionalLong;
+import java.util.function.IntFunction;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
-import java.util.List;
 import org.apache.sis.test.TestCase;
 import org.apache.sis.test.TestUtilities;
 import org.junit.Test;
@@ -92,11 +93,14 @@ public final strictfp class FileCacheByteChannelTest extends TestCase {
         @Override
         protected Connection openConnection(long start, long end) {
             assertTrue(end >= 0);
-            if (end >= length) end = length - 1;
-            start = Math.max(start - random.nextInt(40), 0);
-            end = Math.min(end + random.nextInt(40), length - 1);       // Inclusive.
+            if (end < length) end++;    // Exclusive (temporarily).
+            else end = length;          // Replace Long.MAX_VALUE.
+            do {
+                start = Math.max(start - random.nextInt(40), 0);
+                end = Math.min(end + random.nextInt(40), length);
+            } while (start >= end);
             var input = new ComputedInputStream(Math.toIntExact(start), Math.toIntExact(end), random);
-            return new Connection(input, start, end, length, true);
+            return new Connection(input, start, end-1, length, true);
         }
 
         /**
@@ -140,14 +144,36 @@ public final strictfp class FileCacheByteChannelTest extends TestCase {
      */
     @Test
     public void testRandomOperations() throws IOException {
+        testRandomOperations(ByteBuffer::allocate);
+    }
+
+    /**
+     * Tests random operations on a stream of computed values using a direct buffer.
+     * The code paths are slightly different compared to {@link #testRandomOperations()}.
+     *
+     * @throws IOException if an error occurred when reading or writing to the temporary file.
+     */
+    @Test
+    public void testWithDirectBuffer() throws IOException {
+        testRandomOperations(ByteBuffer::allocateDirect);
+    }
+
+    /**
+     * Implementation of {@link #testRandomOperations()} and {@link #testWithDirectBuffer()}.
+     *
+     * @param  allocator  the function to invoke for allocating a byte buffer.
+     * @throws IOException if an error occurred when reading or writing to the temporary file.
+     */
+    private void testRandomOperations(final IntFunction<ByteBuffer> allocator) throws IOException {
         final Random random = TestUtilities.createRandomNumberGenerator();
         final Implementation channel = new Implementation("test", random);
-        final ByteBuffer buffer = ByteBuffer.allocate(random.nextInt(1000) + 1000);
+        final ByteBuffer buffer = allocator.apply(random.nextInt(1000) + 1000);
         int position = 0;
-        for (int i=0; i<10000; i++) {
+        for (int i=0; i<5000; i++) {
             assertTrue(channel.isOpen());
             assertEquals(position, channel.position());
-            if (random.nextInt(4) == 0) {
+            final boolean seek = random.nextInt(4) == 0;
+            if (seek) {
                 position = random.nextInt(channel.length - 1);
                 int end  = random.nextInt(channel.length - 1);
                 if (position > end) {
@@ -160,7 +186,16 @@ public final strictfp class FileCacheByteChannelTest extends TestCase {
             }
             channel.readInRandomRegion(buffer);
             while (buffer.hasRemaining()) {
-                assertEquals(ComputedInputStream.valueAt(position++), buffer.get());
+                final byte expected = ComputedInputStream.valueAt(position++);
+                final byte actual = buffer.get();
+                if (expected != actual) {
+                    final var b = new StringBuilder(100).append("During iteration ").append(i)
+                            .append(": Wrong byte value at position ").append(position);
+                    if (seek) {
+                        b.append(" (after seek)");
+                    }
+                    fail(b.append(". Expected ").append(expected).append(" but got ").append(actual).append('.').toString());
+                }
             }
         }
         assertEquals(position, channel.position());
@@ -177,23 +212,23 @@ public final strictfp class FileCacheByteChannelTest extends TestCase {
     public void testParseRange() {
         final List<String> rangesUnit = List.of("bytes");
         FileCacheByteChannel.Connection c;
-        c = new FileCacheByteChannel.Connection(null, "bytes 25000-75000/100000", -1, rangesUnit);
+        c = new FileCacheByteChannel.Connection(null, "bytes 25000-75000/100000", rangesUnit);
         assertEquals( 25000, c.start);
         assertEquals( 75000, c.end);
         assertEquals(100000, c.length);
 
-        c = new FileCacheByteChannel.Connection(null, "bytes 25000-75000", -1, rangesUnit);
+        c = new FileCacheByteChannel.Connection(null, "bytes 25000-75000", rangesUnit);
         assertEquals( 25000, c.start);
         assertEquals( 75000, c.end);
         assertEquals(    -1, c.length);
 
-        c = new FileCacheByteChannel.Connection(null, "bytes 25000/100000", -1, rangesUnit);
+        c = new FileCacheByteChannel.Connection(null, "bytes 25000/100000", rangesUnit);
         assertEquals( 25000, c.start);
         assertEquals(100000, c.end);
         assertEquals(100000, c.length);
 
         // Not legal, but we test robustness.
-        c = new FileCacheByteChannel.Connection(null, "25000", -1, rangesUnit);
+        c = new FileCacheByteChannel.Connection(null, "25000", rangesUnit);
         assertEquals( 25000, c.start);
         assertEquals(    -1, c.end);
         assertEquals(    -1, c.length);