You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sis.apache.org by de...@apache.org on 2022/12/24 17:56:36 UTC
[sis] branch geoapi-4.0 updated: Bug fixes (EOFException and bad content caused by bad position). Also fix a bug that prevented `HttpByteChannel` to effectively use "HTTP Range".
This is an automated email from the ASF dual-hosted git repository.
desruisseaux pushed a commit to branch geoapi-4.0
in repository https://gitbox.apache.org/repos/asf/sis.git
The following commit(s) were added to refs/heads/geoapi-4.0 by this push:
new 4ff2a5e381 Bug fixes (EOFException and bad content caused by bad position). Also fix a bug that prevented `HttpByteChannel` to effectively use "HTTP Range".
4ff2a5e381 is described below
commit 4ff2a5e381af7166520fdbe4c36bf6696b68b830
Author: Martin Desruisseaux <ma...@geomatys.com>
AuthorDate: Sat Dec 24 15:32:15 2022 +0100
Bug fixes (EOFException and bad content caused by bad position).
Also fix a bug that prevented `HttpByteChannel` to effectively use "HTTP Range".
---
.../apache/sis/cloud/aws/s3/CachedByteChannel.java | 26 ++--
.../internal/storage/io/FileCacheByteChannel.java | 142 +++++++++++++--------
.../sis/internal/storage/io/HttpByteChannel.java | 21 ++-
.../storage/io/FileCacheByteChannelTest.java | 61 +++++++--
4 files changed, 169 insertions(+), 81 deletions(-)
diff --git a/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/CachedByteChannel.java b/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/CachedByteChannel.java
index 4bb3ae9cf1..b364cb92f1 100644
--- a/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/CachedByteChannel.java
+++ b/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/CachedByteChannel.java
@@ -72,8 +72,6 @@ final class CachedByteChannel extends FileCacheByteChannel {
@Override
protected Connection openConnection(final long start, final long end) throws IOException {
final ResponseInputStream<GetObjectResponse> stream;
- final String contentRange, acceptRanges;
- final Long contentLength;
try {
GetObjectRequest.Builder builder = GetObjectRequest.builder().bucket(path.bucket).key(path.key);
final String range = Connection.formatRange(start, end);
@@ -82,19 +80,23 @@ final class CachedByteChannel extends FileCacheByteChannel {
}
stream = path.fs.client().getObject(builder.build());
final GetObjectResponse response = stream.response();
- contentLength = response.contentLength();
- contentRange = response.contentRange();
- acceptRanges = response.acceptRanges();
+ final String contentRange = response.contentRange();
+ final String acceptRanges = response.acceptRanges();
+ final List<String> rangeUnits = (acceptRanges != null) ? List.of(acceptRanges) : List.of();
+ try {
+ if (contentRange == null) {
+ final Long contentLength = response.contentLength();
+ final long length = (contentLength != null) ? contentLength : -1;
+ return new Connection(stream, length, rangeUnits);
+ } else {
+ return new Connection(stream, contentRange, rangeUnits);
+ }
+ } catch (IllegalArgumentException e) {
+ throw new IOException(e);
+ }
} catch (SdkException e) {
throw FileService.failure(path, e);
}
- final List<String> rangeUnits = (acceptRanges != null) ? List.of(acceptRanges) : List.of();
- final long length = (contentLength != null) ? contentLength : -1;
- try {
- return new Connection(stream, contentRange, length, rangeUnits);
- } catch (IllegalArgumentException e) {
- throw new IOException(e);
- }
}
/**
diff --git a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/FileCacheByteChannel.java b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/FileCacheByteChannel.java
index 092fd52bc0..51c0ddccca 100644
--- a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/FileCacheByteChannel.java
+++ b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/FileCacheByteChannel.java
@@ -95,7 +95,7 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
/** Position of the last byte read by the input stream (inclusive). */
final long end;
- /** Total length of the stream, or -1 is unknown. */
+ /** Number of bytes in the full stream, or -1 is unknown. */
final long length;
/** Whether connection can be created for ranges of bytes. */
@@ -104,56 +104,71 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
/**
* Creates information about a connection.
*
- * @param input the input stream for reading the bytes.
- * @param start position of the first byte read by the input stream (inclusive).
- * @param end position of the last byte read by the input stream (inclusive).
- * @param contentLength total length of the stream, or -1 if unknown.
- * @param acceptRanges whether connection can be created for ranges of bytes.
+ * @param input the input stream for reading the bytes.
+ * @param start position of the first byte read by the input stream (inclusive).
+ * @param end position of the last byte read by the input stream (inclusive).
+ * @param length length of the full stream (not the content length), or -1 if unknown.
+ * @param acceptRanges whether connection can be created for ranges of bytes.
*
* @see #openConnection(long, long)
*/
- public Connection(final InputStream input, final long start, final long end, final long contentLength, final boolean acceptRanges) {
+ public Connection(final InputStream input, final long start, final long end, final long length, final boolean acceptRanges) {
this.input = input;
this.start = start;
this.end = end;
- this.length = contentLength;
+ this.length = length;
this.acceptRanges = acceptRanges;
}
/**
- * Creates information about a connection by parsing HTTP header.
- * Example: "Content-Range: bytes 25000-75000/100000".
+ * Creates information about a connection by parsing HTTP header without content range.
+ * The "Content-Length" header value is useful to this class only if the connection was
+ * opened for the full file.
+ *
+ * @param input the input stream for reading the bytes.
+ * @param contentLength length of the response content, or -1 if unknown.
+ * @param acceptRanges value of "Accept-Ranges" in HTTP header.
+ * @throws IllegalArgumentException if the start, end or length cannot be parsed.
+ */
+ public Connection(final InputStream input, final long contentLength, final Iterable<String> acceptRanges) {
+ this.input = input;
+ this.start = 0;
+ this.end = (contentLength > 0) ? contentLength - 1 : Long.MAX_VALUE;
+ this.length = contentLength;
+ this.acceptRanges = acceptRanges(acceptRanges);
+ }
+
+ /**
+ * Creates information about a connection by parsing HTTP header with content range.
+ * Note that the "Content-Length" header value is not useful when a range is specified
+ * because the content length is not the full length of the file.
+ *
+ * <p>Example of content range value: {@code "Content-Range: bytes 25000-75000/100000"}.</p>
*
* @param input the input stream for reading the bytes.
* @param contentRange value of "Content-Range" in HTTP header, or {@code null} if none.
* @param acceptRanges value of "Accept-Ranges" in HTTP header.
- * @param contentLength total length of the stream, or -1 if unknown.
* @throws IllegalArgumentException if the start, end or length cannot be parsed.
*/
- public Connection(final InputStream input, String contentRange, long contentLength, final Iterable<String> acceptRanges) {
+ public Connection(final InputStream input, String contentRange, final Iterable<String> acceptRanges) {
this.input = input;
- if (contentRange == null) {
- start = 0;
- end = (contentLength > 0) ? contentLength - 1 : Long.MAX_VALUE;
- length = contentLength;
- } else {
- contentRange = contentRange.trim();
- int s = contentRange.indexOf(' ');
- if (s >= 0 && (s != RANGES_UNIT.length() || !contentRange.regionMatches(true, 0, RANGES_UNIT, 0, s))) {
- throw new IllegalArgumentException(Errors.format(Errors.Keys.UnsupportedArgumentValue_1, contentRange));
- }
- int rs = contentRange.indexOf('-', ++s); // Index of range separator.
- int ls = contentRange.indexOf('/', Math.max(s, rs+1)); // Index of length separator.
- if (contentLength < 0 && ls >= 0) {
- final String t = contentRange.substring(ls+1).trim();
- if (!t.equals("*")) contentLength = Long.parseLong(t);
- }
- length = contentLength;
- if (ls < 0) ls = contentRange.length();
- if (rs < 0) rs = ls;
- start = Long.parseLong(contentRange.substring(s, rs).trim());
- end = (rs < ls) ? Long.parseLong(contentRange.substring(rs+1, ls).trim()) : length;
+ long contentLength = -1;
+ contentRange = contentRange.trim();
+ int s = contentRange.indexOf(' ');
+ if (s >= 0 && (s != RANGES_UNIT.length() || !contentRange.regionMatches(true, 0, RANGES_UNIT, 0, s))) {
+ throw new IllegalArgumentException(Errors.format(Errors.Keys.UnsupportedArgumentValue_1, contentRange));
}
+ int rs = contentRange.indexOf('-', ++s); // Index of range separator.
+ int ls = contentRange.indexOf('/', Math.max(s, rs+1)); // Index of length separator.
+ if (ls >= 0) {
+ final String t = contentRange.substring(ls+1).trim();
+ if (!t.equals("*")) contentLength = Long.parseLong(t);
+ }
+ length = contentLength;
+ if (ls < 0) ls = contentRange.length();
+ if (rs < 0) rs = ls;
+ start = Long.parseLong(contentRange.substring(s, rs).trim());
+ end = (rs < ls) ? Long.parseLong(contentRange.substring(rs+1, ls).trim()) : length;
this.acceptRanges = acceptRanges(acceptRanges);
}
@@ -260,9 +275,11 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
private final RangeSet<Long> rangesOfAvailableBytes;
/**
- * Number of bytes in the full stream, or 0 if not yet computed.
+ * Number of bytes in the full stream, or -1 if not yet computed.
+ * It will be set to {@link Connection#length} when a connection is established,
+ * and updated for every new connection in case the value change.
*/
- private long length;
+ private long length = -1;
/**
* Creates a new channel which will cache bytes in a temporary file.
@@ -333,6 +350,14 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
*/
@Override
public synchronized long size() throws IOException {
+ if (length < 0) {
+ if (connection == null) {
+ openConnection();
+ }
+ if (length < 0) {
+ throw new IOException(Errors.format(Errors.Keys.Uninitialized_1, "size"));
+ }
+ }
return length;
}
@@ -356,7 +381,11 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
*/
@Override
public synchronized SeekableByteChannel position(final long newPosition) throws IOException {
- ArgumentChecks.ensurePositive("newPosition", newPosition);
+ if (length > 0) {
+ ArgumentChecks.ensureBetween("newPosition", 0, length-1, newPosition);
+ } else {
+ ArgumentChecks.ensurePositive("newPosition", newPosition);
+ }
position = newPosition;
if (endOfInterest - newPosition < SKIP_THRESHOLD) {
endOfInterest = 0; // Read until end of stream.
@@ -414,6 +443,9 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
* or when the number of bytes to skip is too small for being worth to create a new connection.
* This method may skip less bytes than requested. The skipped bytes are saved in the cache.
*
+ * <p>The {@link #position} field (the channel position) is not modified by this method.
+ * This method is invoked when input position needs to become equal to the channel position.</p>
+ *
* @param count number of bytes to skip.
* @return remaining number of bytes to skip after this method execution.
* @throws IOException if an I/O error occurred.
@@ -434,9 +466,10 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
if (n != 0 || (n = input.read()) < 0) { // Block until we get one byte.
break; // End of stream, but maybe it was a sub-range.
}
- buffer.put((byte) n);
+ buffer.put(0, (byte) n); // Do not increment buffer position.
n = 1;
}
+ assert buffer.position() == 0;
cache(buffer.limit(n));
count -= n;
} while (count > 0);
@@ -470,10 +503,10 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
*/
Connection c = connection;
long offset = position - file.position();
- if (offset != 0 && c != null) {
+ if (c != null) {
if ((offset < 0 || (c.acceptRanges && (offset >= SKIP_THRESHOLD || position > c.end)))) {
offset -= drainAndAbort();
- c = connection;
+ c = connection; // May become null as a result of `drainAndAbort()`.
}
}
/*
@@ -488,13 +521,14 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
}
offset = skipInInput(offset);
if (offset != 0) {
- count = readFromCache(dst);
+ count = readFromCache(dst); // In case `skipInInput(…)` has read more bytes than desired.
usedConnection();
if (count >= 0) {
return count;
}
throw new EOFException(Errors.format(Errors.Keys.ValueOutOfRange_4, "position", 0, length, position));
}
+ assert file.position() == position;
/*
* Get a buffer that we can use with `InputStream.read(byte[])`.
* It must be a buffer backed by a Java array.
@@ -507,29 +541,28 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
buffer.clear().limit(dst.remaining());
}
/*
- * Transfer bytes from the input stream to the buffer.
- * The bytes are also copied to the temporary file.
+ * Transfer bytes from the input stream to the buffer. The bytes are also copied to the temporary file.
+ * We try to use `dst` instead of `buffer` in call to `cache(…)` because the former may be a direct buffer.
*/
- final int limit = buffer.limit();
- final int start = buffer.position();
- count = c.input.read(buffer.array(), Math.addExact(buffer.arrayOffset(), start), buffer.remaining());
+ final ByteBuffer slice = dst.slice();
+ count = c.input.read(buffer.array(), Math.addExact(buffer.arrayOffset(), buffer.position()), buffer.remaining());
if (count > 0) {
- try {
- cache(buffer.limit(start + count));
- } finally {
- buffer.limit(limit);
- }
+ position += count;
if (buffer != dst) {
- dst.put(buffer.flip()); // Transfer from temporary buffer to destination buffer.
+ dst.put(buffer.limit(count)); // Transfer from temporary buffer to destination buffer.
+ } else {
+ dst.position(dst.position() + count);
}
- position += count;
+ cache(slice.limit(count));
}
usedConnection();
return count;
}
/**
- * Attempts to read up to bytes from the cache.
+ * Attempts to read up to <i>r</i> bytes from the cache.
+ * This method does not use the connection (it may be null).
+ * The {@link #position} field is updated by the amount of bytes read.
*
* @param dst the buffer where to store the bytes that are read.
* @return number of bytes read, or -1 if the cache does not contain the requested range of bytes.
@@ -547,10 +580,11 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
final int count;
try {
count = file.read(dst.limit(end), position);
- position += count;
+ if (count >= 0) position += count;
} finally {
dst.limit(limit);
}
+ assert dst.position() == start + count;
return count;
}
diff --git a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/HttpByteChannel.java b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/HttpByteChannel.java
index fad086563a..3329a1865e 100644
--- a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/HttpByteChannel.java
+++ b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/HttpByteChannel.java
@@ -121,10 +121,27 @@ final class HttpByteChannel extends FileCacheByteChannel {
range = headers.firstValue("Content-Range").orElse(null);
final List<String> rangeUnits = headers.allValues("Accept-Ranges");
try {
- final long length = headers.firstValueAsLong("Content-Length").orElse(-1);
- return new Connection(stream, range, length, rangeUnits);
+ if (range == null) {
+ final long length = headers.firstValueAsLong("Content-Length").orElse(-1);
+ return new Connection(stream, length, rangeUnits);
+ } else {
+ return new Connection(stream, range, rangeUnits);
+ }
} catch (IllegalArgumentException e) {
throw new IOException(e);
}
}
+
+ /**
+ * Invoked when this channel is no longer interested in reading bytes from the specified stream.
+ *
+ * @param input the input stream to eventually close.
+ * @return whether the given input stream has been closed by this method.
+ * @throws IOException if an error occurred while closing the stream or preparing for next read operations.
+ */
+ @Override
+ protected boolean abort(final InputStream input) throws IOException {
+ input.close();
+ return true;
+ }
}
diff --git a/storage/sis-storage/src/test/java/org/apache/sis/internal/storage/io/FileCacheByteChannelTest.java b/storage/sis-storage/src/test/java/org/apache/sis/internal/storage/io/FileCacheByteChannelTest.java
index 6c633f5768..ca4e4bf0b1 100644
--- a/storage/sis-storage/src/test/java/org/apache/sis/internal/storage/io/FileCacheByteChannelTest.java
+++ b/storage/sis-storage/src/test/java/org/apache/sis/internal/storage/io/FileCacheByteChannelTest.java
@@ -16,12 +16,13 @@
*/
package org.apache.sis.internal.storage.io;
+import java.util.List;
import java.util.Random;
import java.util.OptionalLong;
+import java.util.function.IntFunction;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
-import java.util.List;
import org.apache.sis.test.TestCase;
import org.apache.sis.test.TestUtilities;
import org.junit.Test;
@@ -92,11 +93,14 @@ public final strictfp class FileCacheByteChannelTest extends TestCase {
@Override
protected Connection openConnection(long start, long end) {
assertTrue(end >= 0);
- if (end >= length) end = length - 1;
- start = Math.max(start - random.nextInt(40), 0);
- end = Math.min(end + random.nextInt(40), length - 1); // Inclusive.
+ if (end < length) end++; // Exclusive (temporarily).
+ else end = length; // Replace Long.MAX_VALUE.
+ do {
+ start = Math.max(start - random.nextInt(40), 0);
+ end = Math.min(end + random.nextInt(40), length);
+ } while (start >= end);
var input = new ComputedInputStream(Math.toIntExact(start), Math.toIntExact(end), random);
- return new Connection(input, start, end, length, true);
+ return new Connection(input, start, end-1, length, true);
}
/**
@@ -140,14 +144,36 @@ public final strictfp class FileCacheByteChannelTest extends TestCase {
*/
@Test
public void testRandomOperations() throws IOException {
+ testRandomOperations(ByteBuffer::allocate);
+ }
+
+ /**
+ * Tests random operations on a stream of computed values using a direct buffer.
+ * The code paths are slightly different compared to {@link #testRandomOperations()}.
+ *
+ * @throws IOException if an error occurred when reading or writing to the temporary file.
+ */
+ @Test
+ public void testWithDirectBuffer() throws IOException {
+ testRandomOperations(ByteBuffer::allocateDirect);
+ }
+
+ /**
+ * Implementation of {@link #testRandomOperations()} and {@link #testWithDirectBuffer()}.
+ *
+ * @param allocator the function to invoke for allocating a byte buffer.
+ * @throws IOException if an error occurred when reading or writing to the temporary file.
+ */
+ private void testRandomOperations(final IntFunction<ByteBuffer> allocator) throws IOException {
final Random random = TestUtilities.createRandomNumberGenerator();
final Implementation channel = new Implementation("test", random);
- final ByteBuffer buffer = ByteBuffer.allocate(random.nextInt(1000) + 1000);
+ final ByteBuffer buffer = allocator.apply(random.nextInt(1000) + 1000);
int position = 0;
- for (int i=0; i<10000; i++) {
+ for (int i=0; i<5000; i++) {
assertTrue(channel.isOpen());
assertEquals(position, channel.position());
- if (random.nextInt(4) == 0) {
+ final boolean seek = random.nextInt(4) == 0;
+ if (seek) {
position = random.nextInt(channel.length - 1);
int end = random.nextInt(channel.length - 1);
if (position > end) {
@@ -160,7 +186,16 @@ public final strictfp class FileCacheByteChannelTest extends TestCase {
}
channel.readInRandomRegion(buffer);
while (buffer.hasRemaining()) {
- assertEquals(ComputedInputStream.valueAt(position++), buffer.get());
+ final byte expected = ComputedInputStream.valueAt(position++);
+ final byte actual = buffer.get();
+ if (expected != actual) {
+ final var b = new StringBuilder(100).append("During iteration ").append(i)
+ .append(": Wrong byte value at position ").append(position);
+ if (seek) {
+ b.append(" (after seek)");
+ }
+ fail(b.append(". Expected ").append(expected).append(" but got ").append(actual).append('.').toString());
+ }
}
}
assertEquals(position, channel.position());
@@ -177,23 +212,23 @@ public final strictfp class FileCacheByteChannelTest extends TestCase {
public void testParseRange() {
final List<String> rangesUnit = List.of("bytes");
FileCacheByteChannel.Connection c;
- c = new FileCacheByteChannel.Connection(null, "bytes 25000-75000/100000", -1, rangesUnit);
+ c = new FileCacheByteChannel.Connection(null, "bytes 25000-75000/100000", rangesUnit);
assertEquals( 25000, c.start);
assertEquals( 75000, c.end);
assertEquals(100000, c.length);
- c = new FileCacheByteChannel.Connection(null, "bytes 25000-75000", -1, rangesUnit);
+ c = new FileCacheByteChannel.Connection(null, "bytes 25000-75000", rangesUnit);
assertEquals( 25000, c.start);
assertEquals( 75000, c.end);
assertEquals( -1, c.length);
- c = new FileCacheByteChannel.Connection(null, "bytes 25000/100000", -1, rangesUnit);
+ c = new FileCacheByteChannel.Connection(null, "bytes 25000/100000", rangesUnit);
assertEquals( 25000, c.start);
assertEquals(100000, c.end);
assertEquals(100000, c.length);
// Not legal, but we test robustness.
- c = new FileCacheByteChannel.Connection(null, "25000", -1, rangesUnit);
+ c = new FileCacheByteChannel.Connection(null, "25000", rangesUnit);
assertEquals( 25000, c.start);
assertEquals( -1, c.end);
assertEquals( -1, c.length);