You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sis.apache.org by de...@apache.org on 2023/02/10 22:32:31 UTC
[sis] 01/03: Show which data are cached when a connection does not support HTTP range. This is for avoiding the impression that the application is blocked.
This is an automated email from the ASF dual-hosted git repository.
desruisseaux pushed a commit to branch geoapi-4.0
in repository https://gitbox.apache.org/repos/asf/sis.git
commit 67914142d82d61a67bd95779bac1767ae67de5d9
Author: Martin Desruisseaux <ma...@geomatys.com>
AuthorDate: Fri Feb 10 17:22:42 2023 +0100
Show which data are cached when a connection does not support HTTP range.
This is for avoiding the impression that the application is blocked.
---
.../apache/sis/internal/gui/io/FileAccessItem.java | 348 +++++++++++++++++----
.../apache/sis/internal/gui/io/FileAccessView.java | 41 ++-
.../apache/sis/internal/gui/io/package-info.java | 2 +-
.../apache/sis/cloud/aws/s3/CachedByteChannel.java | 20 +-
.../sis/internal/storage/io/ChannelFactory.java | 29 +-
.../internal/storage/io/FileCacheByteChannel.java | 117 ++++++-
.../sis/internal/storage/io/HttpByteChannel.java | 14 +-
.../org/apache/sis/storage/StorageConnector.java | 31 +-
.../storage/io/FileCacheByteChannelTest.java | 16 +-
9 files changed, 474 insertions(+), 144 deletions(-)
diff --git a/application/sis-javafx/src/main/java/org/apache/sis/internal/gui/io/FileAccessItem.java b/application/sis-javafx/src/main/java/org/apache/sis/internal/gui/io/FileAccessItem.java
index 1b11bbe936..328c516af9 100644
--- a/application/sis-javafx/src/main/java/org/apache/sis/internal/gui/io/FileAccessItem.java
+++ b/application/sis-javafx/src/main/java/org/apache/sis/internal/gui/io/FileAccessItem.java
@@ -18,7 +18,10 @@ package org.apache.sis.internal.gui.io;
import java.util.List;
import java.util.ListIterator;
+import java.util.EnumMap;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import javafx.application.Platform;
@@ -35,6 +38,7 @@ import javafx.scene.shape.StrokeType;
import javafx.animation.FadeTransition;
import javafx.util.Duration;
import org.apache.sis.measure.Range;
+import org.apache.sis.internal.util.Numerics;
import org.apache.sis.util.collection.RangeSet;
@@ -43,7 +47,7 @@ import org.apache.sis.util.collection.RangeSet;
* This is a row in the table shown by {@link FileAccessView} table.
*
* @author Martin Desruisseaux (Geomatys)
- * @version 1.2
+ * @version 1.4
* @since 1.2
*/
final class FileAccessItem implements Runnable, EventHandler<ActionEvent> {
@@ -62,16 +66,6 @@ final class FileAccessItem implements Runnable, EventHandler<ActionEvent> {
*/
private static final int MARGIN_RIGHT = 6;
- /**
- * Color to use for filling the rectangles.
- */
- private static final Color FILL_COLOR = Color.LIGHTSEAGREEN;
-
- /**
- * Color to use for rectangles border.
- */
- private static final Color BORDER_COLOR = FILL_COLOR.darker();
-
/**
* Width of the cursor in pixels.
*/
@@ -100,9 +94,33 @@ final class FileAccessItem implements Runnable, EventHandler<ActionEvent> {
final String filename;
/**
- * Range of bytes on which a read or write operation has been performed.
+ * The access mode. Rendering are done in enumeration order.
+ */
+ private enum Mode {
+ /** Cache a range of bytes. */ CACHE(Color.LIGHTGRAY),
+ /** Read a range of bytes. */ READ(Color.LIGHTSEAGREEN),
+ /** Write a range of bytes. */ WRITE(Color.LIGHTCORAL);
+
+ /** The color to use for rendering the rectangle. */
+ private final Color border, fill;
+
+ /** Creates a new enumeration value. */
+ private Mode(final Color fill) {
+ this.fill = fill;
+ border = fill.darker();
+ }
+
+ /** Sets the colors of the given rectangle for representing this mode. */
+ final void colorize(final Rectangle r) {
+ r.setStroke(border);
+ r.setFill(fill);
+ }
+ }
+
+ /**
+ * Range of bytes on which read or write operations have been performed.
*/
- private final RangeSet<Long> accessRanges;
+ private final EnumMap<Mode, RangeSet<Long>> accessRanges;
/**
* Visual representation of {@link #accessRanges}.
@@ -159,7 +177,7 @@ final class FileAccessItem implements Runnable, EventHandler<ActionEvent> {
staticGroup = staticView.getChildren();
seeksGroup = seeksView .getChildren();
accessView = new Pane(staticView, seeksView);
- accessRanges = RangeSet.create(Long.class, true, false);
+ accessRanges = new EnumMap<>(Mode.class);
staticView.setAutoSizeChildren(false);
/*
* Background rectangle.
@@ -167,7 +185,7 @@ final class FileAccessItem implements Runnable, EventHandler<ActionEvent> {
final Rectangle background = new Rectangle();
background.setY(MARGIN_TOP);
background.setHeight(HEIGHT);
- background.setStroke(FILL_COLOR.brighter());
+ background.setStroke(Mode.READ.fill.brighter());
background.setFill(Color.TRANSPARENT);
background.setStrokeType(StrokeType.INSIDE);
staticGroup.add(background);
@@ -218,35 +236,58 @@ final class FileAccessItem implements Runnable, EventHandler<ActionEvent> {
* Reports a read or write operation on a range of bytes.
* This method must be invoked from JavaFX thread.
*
- * @param position offset of the first byte read or written.
- * @param count number of bytes read or written.
- * @param write {@code false} for a read operation, or {@code true} for a write operation.
+ * @param lower offset of the first byte read or written.
+ * @param count offset after the last byte read or written.
+ * @param mode whether a read, write or cache operation is performed.
+ *
+ * @see #addRangeLater(long, long, Mode)
*/
- private void addRange(final long position, final int count, final boolean write) {
- cursorPosition = position;
- final boolean add = accessRanges.add(position, position + count);
+ private void addRange(final long lower, final long upper, final Mode mode) {
+ cursorPosition = lower;
+ /*
+ * Add the range for the specified mode and remove it for all other modes.
+ * Consequently the visual component will show the last access mode for
+ * the specified range of bytes.
+ */
+ RangeSet<Long> ranges = accessRanges.get(mode);
+ if (ranges == null) {
+ ranges = RangeSet.create(Long.class, true, false);
+ accessRanges.put(mode, ranges);
+ }
+ boolean add = ranges.add(lower, upper);
+ for (final RangeSet<Long> other : accessRanges.values()) {
+ if (other != null && other != ranges) {
+ add |= other.remove(lower, upper);
+ }
+ }
+ /*
+ * Update the visual component showing the position of last operation.
+ * An animation effect is used.
+ */
final double scale = columnWidth / fileSize;
if (Double.isFinite(scale)) {
if (add) {
adjustSizes(scale, false);
}
- final Rectangle r;
- if (cursor == null) {
- r = new Rectangle(0, MARGIN_TOP, CURSOR_WIDTH, HEIGHT);
- r.setArcWidth(CURSOR_WIDTH/2 - 1);
- r.setArcHeight(HEIGHT/2 - 2);
- r.setStroke(Color.ORANGE);
- r.setFill(Color.YELLOW);
- accessView.getChildren().add(r);
- cursor = new FadeTransition(CURSOR_DURATION, r);
- cursor.setOnFinished(this);
- cursor.setFromValue(1);
- cursor.setToValue(0);
- } else {
- r = (Rectangle) cursor.getNode();
+ if (mode != Mode.CACHE) {
+ final Rectangle r;
+ if (cursor == null) {
+ r = new Rectangle(0, MARGIN_TOP, CURSOR_WIDTH, HEIGHT);
+ r.setArcWidth(CURSOR_WIDTH/2 - 1);
+ r.setArcHeight(HEIGHT/2 - 2);
+ r.setStroke(Color.ORANGE);
+ r.setFill(Color.YELLOW);
+ accessView.getChildren().add(r);
+ cursor = new FadeTransition(CURSOR_DURATION, r);
+ cursor.setOnFinished(this);
+ cursor.setFromValue(1);
+ cursor.setToValue(0);
+ } else {
+ r = (Rectangle) cursor.getNode();
+ }
+ r.setX(Math.max(0, Math.min(scale*lower - CURSOR_WIDTH/2, columnWidth - CURSOR_WIDTH)));
+ cursor.playFromStart();
}
- r.setX(Math.max(0, Math.min(scale*position - CURSOR_WIDTH/2, columnWidth - CURSOR_WIDTH)));
- cursor.playFromStart();
}
}
@@ -287,36 +328,95 @@ final class FileAccessItem implements Runnable, EventHandler<ActionEvent> {
/*
* Adjust the position and width of all rectangles.
*/
- for (final Range<Long> range : accessRanges) {
- final long min = range.getMinValue();
- final long max = range.getMaxValue();
- final double x = scale * min;
- final double width = scale * (max - min);
- if (bars.hasNext()) {
- final Rectangle r = (Rectangle) bars.next();
- if (resized || r.getX() + r.getWidth() >= x) {
- r.setX(x);
- r.setWidth(width);
- continue;
+ for (final EnumMap.Entry<Mode, RangeSet<Long>> entry : accessRanges.entrySet()) {
+ final Mode mode = entry.getKey();
+ for (final Range<Long> range : entry.getValue()) {
+ final long min = range.getMinValue();
+ final long max = range.getMaxValue();
+ final double x = scale * min;
+ final double width = scale * (max - min);
+ if (bars.hasNext()) {
+ final Rectangle r = (Rectangle) bars.next();
+ if (resized || r.getX() + r.getWidth() >= x) {
+ r.setX(x);
+ r.setWidth(width);
+ mode.colorize(r);
+ continue;
+ }
+ /*
+ * Newly added range may have merged two or more ranges in a single one.
+ * Discard all ranges that are fully on the left side of current range.
+ * This is not really mandatory, but we do that in an effort to keep the
+ * most "relevant" rectangles (before change) for the new set of ranges.
+ */
+ bars.remove();
}
- /*
- * Newly added range may have merged two or more ranges in a single one.
- * Discard all ranges that are fully on the left side of current range.
- * This is not really mandatory, but we do that in an effort to keep the
- * most "relevant" rectangles (before change) for the new set of ranges.
- */
- bars.remove();
+ final Rectangle r = new Rectangle(x, MARGIN_TOP, width, HEIGHT);
+ r.setStrokeType(StrokeType.INSIDE);
+ mode.colorize(r);
+ bars.add(r);
}
- final Rectangle r = new Rectangle(x, MARGIN_TOP, width, HEIGHT);
- r.setStrokeType(StrokeType.INSIDE);
- r.setStroke(BORDER_COLOR);
- r.setFill(FILL_COLOR);
- bars.add(r);
}
// Remove all remaining children, if any.
staticGroup.remove(bars.nextIndex(), staticGroup.size());
}
+ /**
+ * A range of bytes determined from the background thread and to be consumed in the JavaFX thread.
+ * This range can be updated as long as it has not been consumed. Those modifications reduce the
+ * amount of events to be consumed by the JavaFX thread.
+ */
+ private final class NextAddRange implements Runnable {
+ /** Whether the range of bytes has been read, written or cached. */
+ private final Mode mode;
+
+ /** The range of bytes, modifiable as long as the event has not been consumed. */
+ long lower, upper;
+
+ /** Creates a new range of bytes for the given access mode. */
+ NextAddRange(final Mode mode) {
+ this.mode = mode;
+ }
+
+ /** Invoked in the JavaFX thread for saving the range in {@link #accessRanges}. */
+ @Override public void run() {
+ synchronized (FileAccessItem.this) {
+ if (next == this) {
+ next = null;
+ }
+ }
+ addRange(lower, upper, mode);
+ }
+ }
+
+ /**
+ * The next range of bytes to be merged into {@link #accessRanges}.
+ * Accesses to this field must be synchronized on {@code this}.
+ * The instance is created in a background thread and consumed in the JavaFX thread.
+ */
+ private NextAddRange next;
+
+ /**
+ * Reports a read or write operation on a range of bytes.
+ * This method should be invoked from a background thread.
+ *
+ * @param lower offset of the first byte read or written.
+ * @param count number of bytes read or written.
+ * @param mode whether a read, write or cache operation is performed.
+ */
+ private synchronized void addRangeLater(long lower, final long count, final Mode mode) {
+ long upper = Numerics.saturatingAdd(lower, count);
+ if (next != null && next.mode == mode && next.upper >= lower && next.lower <= upper) {
+ lower = Math.min(next.lower, lower);
+ upper = Math.max(next.upper, upper);
+ } else {
+ next = new NextAddRange(mode);
+ Platform.runLater(next);
+ }
+ next.lower = lower;
+ next.upper = upper;
+ }
+
/**
* Wrapper around a {@link SeekableByteChannel} which will observe the ranges of bytes read or written.
*/
@@ -341,7 +441,7 @@ final class FileAccessItem implements Runnable, EventHandler<ActionEvent> {
public int read(final ByteBuffer dst) throws IOException {
final long position = position();
final int count = channel.read(dst);
- Platform.runLater(() -> addRange(position, count, false));
+ addRangeLater(position, count, Mode.READ);
return count;
}
@@ -352,7 +452,7 @@ final class FileAccessItem implements Runnable, EventHandler<ActionEvent> {
public int write(final ByteBuffer src) throws IOException {
final long position = position();
final int count = channel.write(src);
- Platform.runLater(() -> addRange(position, count, true));
+ addRangeLater(position, count, Mode.WRITE);
return count;
}
@@ -409,6 +509,130 @@ final class FileAccessItem implements Runnable, EventHandler<ActionEvent> {
}
}
+ /**
+ * Wrapper around an {@link InputStream} which will observe the ranges of bytes read.
+ * It can be used directly when the input is an {@link InputStream}, or indirectly
+ * when the channel observed by {@link Observer} is itself wrapping an input stream.
+ * In such case, the bytes read from the input stream are typically cached in some temporary file.
+ *
+ * <h2>Implementation note</h2>
+ * We do not extend {@link java.io.FilterInputStream} because we override almost all methods anyway.
+ * This implementation avoids using a non-final {@code volatile} field for the wrapped input stream.
+ */
+ final class InputObserver extends InputStream {
+ /** The source input stream. */
+ private final InputStream in;
+
+ /** The mode, either read or cache. */
+ private final Mode mode;
+
+ /** Position of the stream, current and marked. */
+ private long position, mark;
+
+ /** Creates a new observer for the given input stream. */
+ InputObserver(final InputStream in) {
+ this.in = in;
+ this.mode = Mode.READ;
+ }
+
+ /** Creates a new observer for the given input stream used as a cache. */
+ InputObserver(final InputStream in, final long start) {
+ this.in = in;
+ this.mode = Mode.CACHE;
+ position = start;
+ }
+
+ /**
+ * Declares that a range of bytes has been read.
+ * This method update the rectangles in the JavaFX view.
+ *
+ * @param count number of bytes that have been read.
+ * @param mode the mode, usually {@link #mode}.
+ */
+ private void range(final long count, final Mode mode) {
+ if (count > 0) {
+ addRangeLater(position, count, mode);
+ if (position > (position += count)) {
+ position = Long.MAX_VALUE;
+ }
+ }
+ }
+
+ /**
+ * Declares that a range of bytes has been read.
+ * This method update the rectangles in the JavaFX view.
+ *
+ * @param count number of bytes that have been read.
+ */
+ private void range(final long count) {
+ range(count, mode);
+ }
+
+ /** Returns the next byte or -1 on EOF. */
+ @Override public int read() throws IOException {
+ final int b = in.read();
+ if (b >= 0) range(1);
+ return b;
+ }
+
+ /** Stores a sequence of bytes in the specified array. */
+ @Override public int read(final byte[] b, final int off, int len) throws IOException {
+ range(len = in.read(b, off, len));
+ return len;
+ }
+
+ /** Stores a sequence of bytes in a newly allocated array. */
+ @Override public byte[] readNBytes(final int len) throws IOException {
+ final byte[] b = in.readNBytes(len);
+ range(b.length);
+ return b;
+ }
+
+ /** Stores a sequence of bytes in the specified output stream. */
+ @Override public long transferTo(final OutputStream out) throws IOException {
+ final long n = in.transferTo(out);
+ range(n);
+ return n;
+ }
+
+ /** Skips some bytes without reporting them as read. */
+ @Override public long skip(long n) throws IOException {
+ range(n = in.skip(n), Mode.CACHE);
+ return n;
+ }
+
+ /** Returns an estimate of the number of bytes that can be read. */
+ @Override public int available() throws IOException {
+ return in.available();
+ }
+
+ /** Tells whether the input stream supports marks. */
+ @Override public boolean markSupported() {
+ return in.markSupported();
+ }
+
+ /** Marks the current position in this input stream. */
+ @Override public void mark(final int readlimit) {
+ in.mark(readlimit);
+ mark = position;
+ }
+
+ /** Repositions this stream to the position of the last mark. */
+ @Override public void reset() throws IOException {
+ in.reset();
+ position = mark;
+ }
+
+ /** Closes the wrapped input stream. */
+ @Override public void close() throws IOException {
+ if (mode == Mode.READ) {
+ Platform.runLater(FileAccessItem.this);
+ // Otherwise will be removed by `Observer`.
+ }
+ in.close();
+ }
+ }
+
/**
* Invoked in JavaFX thread for removing this row from the table.
*/
diff --git a/application/sis-javafx/src/main/java/org/apache/sis/internal/gui/io/FileAccessView.java b/application/sis-javafx/src/main/java/org/apache/sis/internal/gui/io/FileAccessView.java
index 4404007173..0de8fef6d6 100644
--- a/application/sis-javafx/src/main/java/org/apache/sis/internal/gui/io/FileAccessView.java
+++ b/application/sis-javafx/src/main/java/org/apache/sis/internal/gui/io/FileAccessView.java
@@ -17,6 +17,7 @@
package org.apache.sis.internal.gui.io;
import java.io.IOException;
+import java.io.InputStream;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.channels.WritableByteChannel;
@@ -32,6 +33,7 @@ import org.apache.sis.internal.gui.FixedHeaderColumnSize;
import org.apache.sis.internal.gui.ImmutableObjectProperty;
import org.apache.sis.internal.gui.Resources;
import org.apache.sis.internal.storage.io.ChannelFactory;
+import org.apache.sis.internal.storage.io.FileCacheByteChannel;
import org.apache.sis.storage.DataStoreException;
import org.apache.sis.storage.event.StoreListeners;
@@ -45,7 +47,7 @@ import org.apache.sis.storage.event.StoreListeners;
* in the vast majority of cases when user has no interest in those information.</p>
*
* @author Martin Desruisseaux (Geomatys)
- * @version 1.2
+ * @version 1.4
* @since 1.2
*/
public final class FileAccessView extends Widget implements UnaryOperator<ChannelFactory> {
@@ -113,6 +115,36 @@ public final class FileAccessView extends Widget implements UnaryOperator<Channe
return factory.canOpen();
}
+ /**
+ * Returns a new visual item and adds it as a row in the table of opened files.
+ * This method can be invoked from any thread (usually not the JavaFX one).
+ *
+ * @param filename data store name.
+ * @return the view of the row added in the table.
+ */
+ private FileAccessItem newItem(final String filename) {
+ final FileAccessItem item = new FileAccessItem(table.getItems(), filename);
+ Platform.runLater(() -> item.owner.add(item));
+ return item;
+ }
+
+ /**
+ * Returns the readable channel as an input stream.
+ *
+ * @param filename data store name.
+ * @param listeners set of registered {@code StoreListener}s for the data store, or {@code null} if none.
+ * @return the input stream.
+ * @throws DataStoreException if the channel is read-once.
+ * @throws IOException if the input stream or its underlying byte channel cannot be created.
+ */
+ @Override
+ public InputStream inputStream(final String filename, final StoreListeners listeners)
+ throws DataStoreException, IOException
+ {
+ final InputStream input = factory.inputStream(filename, listeners);
+ return newItem(filename).new InputObserver(input);
+ }
+
/**
* Creates a readable channel and listens (if possible) read operations.
* Current implementation listens only to {@link SeekableByteChannel}
@@ -130,8 +162,11 @@ public final class FileAccessView extends Widget implements UnaryOperator<Channe
{
final ReadableByteChannel channel = factory.readable(filename, listeners);
if (channel instanceof SeekableByteChannel) {
- final FileAccessItem item = new FileAccessItem(table.getItems(), filename);
- Platform.runLater(() -> item.owner.add(item));
+ final FileAccessItem item = newItem(filename);
+ if (channel instanceof FileCacheByteChannel) {
+ ((FileCacheByteChannel) channel).setFilter((input, start, end) ->
+ item.new InputObserver(input, start));
+ }
return item.new Observer((SeekableByteChannel) channel);
}
return channel;
diff --git a/application/sis-javafx/src/main/java/org/apache/sis/internal/gui/io/package-info.java b/application/sis-javafx/src/main/java/org/apache/sis/internal/gui/io/package-info.java
index a929e6e171..2f26f52e0a 100644
--- a/application/sis-javafx/src/main/java/org/apache/sis/internal/gui/io/package-info.java
+++ b/application/sis-javafx/src/main/java/org/apache/sis/internal/gui/io/package-info.java
@@ -24,7 +24,7 @@
* may change in incompatible ways in any future version without notice.
*
* @author Martin Desruisseaux (Geomatys)
- * @version 1.2
+ * @version 1.4
*
* @see org.apache.sis.internal.gui.DataStoreOpener
*
diff --git a/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/CachedByteChannel.java b/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/CachedByteChannel.java
index 5d4b80845a..66ada480a6 100644
--- a/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/CachedByteChannel.java
+++ b/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/CachedByteChannel.java
@@ -18,7 +18,6 @@ package org.apache.sis.cloud.aws.s3;
import java.util.List;
import java.io.IOException;
-import java.io.InputStream;
import org.apache.sis.internal.storage.io.FileCacheByteChannel;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
@@ -86,9 +85,9 @@ final class CachedByteChannel extends FileCacheByteChannel {
if (contentRange == null) {
final Long contentLength = response.contentLength();
final long length = (contentLength != null) ? contentLength : -1;
- return new Connection(stream, length, rangeUnits);
+ return new Connection(this, stream, length, rangeUnits);
} else {
- return new Connection(stream, contentRange, rangeUnits);
+ return new Connection(this, stream, contentRange, rangeUnits);
}
} catch (IllegalArgumentException e) {
throw new IOException(e);
@@ -99,18 +98,19 @@ final class CachedByteChannel extends FileCacheByteChannel {
}
/**
- * Invoked when this channel is no longer interested in reading bytes from the specified stream.
+ * Invoked when this channel is no longer interested in reading bytes from the specified connection.
*
- * @param input the input stream to eventually close.
- * @return whether the given input stream has been closed by this method.
+ * @param connection contains the input stream to eventually close.
+ * @return whether the input stream has been closed by this method.
+ * @throws IOException if an error occurred while closing the stream or preparing for next read operations.
*/
@Override
- protected boolean abort(final InputStream input) throws IOException {
- if (input instanceof Abortable) {
- ((Abortable) input).abort();
+ protected boolean abort(final Connection connection) throws IOException {
+ if (connection.rawInput instanceof Abortable) {
+ ((Abortable) connection.rawInput).abort();
return true;
} else {
- return super.abort(input);
+ return super.abort(connection);
}
}
}
diff --git a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/ChannelFactory.java b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/ChannelFactory.java
index 0da1f94cab..87a533cd72 100644
--- a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/ChannelFactory.java
+++ b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/ChannelFactory.java
@@ -22,7 +22,6 @@ import java.util.HashSet;
import java.util.Arrays;
import java.util.logging.Level;
import java.util.logging.LogRecord;
-import java.util.function.UnaryOperator;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
@@ -132,36 +131,10 @@ public abstract class ChannelFactory {
* If the URL is not encoded, then {@code null}. This argument is ignored if the given
* input does not need to be converted from URL to {@code File}.
* @param options the options to use for creating a new byte channel. Can be null or empty for read-only.
- * @param wrapper a function for creating wrapper around the factory, or {@code null} if none.
- * It can be used for installing listener or for transforming data on the fly.
* @return the channel factory for the given input, or {@code null} if the given input is of unknown type.
* @throws IOException if an error occurred while processing the given input.
*/
- public static ChannelFactory prepare(
- final Object storage, final boolean allowWriteOnly,
- final String encoding, final OpenOption[] options,
- final UnaryOperator<ChannelFactory> wrapper) throws IOException
- {
- ChannelFactory factory = prepare(storage, allowWriteOnly, encoding, options);
- if (factory != null && wrapper != null) {
- factory = wrapper.apply(factory);
- }
- return factory;
- }
-
- /**
- * Returns a byte channel factory without wrappers, or {@code null} if unsupported.
- * This method performs the same work than {@linkplain #prepare(Object, boolean, String,
- * OpenOption[], UnaryOperator, UnaryOperator) above method}, but without wrappers.
- *
- * @param storage the stream or the file to open, or {@code null}.
- * @param allowWriteOnly whether to allow wrapping {@link WritableByteChannel} and {@link OutputStream}.
- * @param encoding if the input is an encoded URL, the character encoding (normally {@code "UTF-8"}).
- * @param options the options to use for creating a new byte channel. Can be null or empty for read-only.
- * @return the channel factory for the given input, or {@code null} if the given input is of unknown type.
- * @throws IOException if an error occurred while processing the given input.
- */
- private static ChannelFactory prepare(Object storage, final boolean allowWriteOnly,
+ public static ChannelFactory prepare(Object storage, final boolean allowWriteOnly,
final String encoding, final OpenOption[] options) throws IOException
{
/*
diff --git a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/FileCacheByteChannel.java b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/FileCacheByteChannel.java
index 15fe5a29e6..5edbffec53 100644
--- a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/FileCacheByteChannel.java
+++ b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/FileCacheByteChannel.java
@@ -17,6 +17,7 @@
package org.apache.sis.internal.storage.io;
import java.util.Collection;
+import java.util.function.UnaryOperator;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
@@ -86,8 +87,17 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
/** The unit of ranges used in HTTP connections. */
private static final String RANGES_UNIT = "bytes";
- /** The input stream for reading the bytes. */
- final InputStream input;
+ /**
+ * The input stream without filtering, as specified at construction time.
+ * This is the same instance than {@link #input} when no filtering is applied.
+ */
+ public final InputStream rawInput;
+
+ /**
+ * The input stream for reading the bytes. It may be a wrapper around the input stream
+ * specified at construction time if {@linkplain #setFilter a filter has been set}.
+ */
+ public final InputStream input;
/** Position of the first byte read by the input stream (inclusive). */
final long start;
@@ -104,6 +114,7 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
/**
* Creates information about a connection.
*
+ * @param owner the channel which is opening this connection, or {@code null} if none.
* @param input the input stream for reading the bytes.
* @param start position of the first byte read by the input stream (inclusive).
* @param end position of the last byte read by the input stream (inclusive).
@@ -112,12 +123,15 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
*
* @see #openConnection(long, long)
*/
- public Connection(final InputStream input, final long start, final long end, final long length, final boolean acceptRanges) {
- this.input = input;
+ public Connection(final FileCacheByteChannel owner, final InputStream input,
+ final long start, final long end, final long length, final boolean acceptRanges)
+ {
+ rawInput = input;
this.start = start;
this.end = end;
this.length = length;
this.acceptRanges = acceptRanges;
+ this.input = filter(owner, input); // Must be last.
}
/**
@@ -125,17 +139,21 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
* The "Content-Length" header value is useful to this class only if the connection was
* opened for the full file.
*
+ * @param owner the channel which is opening this connection, or {@code null} if none.
* @param input the input stream for reading the bytes.
* @param contentLength length of the response content, or -1 if unknown.
* @param rangeUnits value of "Accept-Ranges" in HTTP header, which lists the accepted units.
* @throws IllegalArgumentException if the start, end or length cannot be parsed.
*/
- public Connection(final InputStream input, final long contentLength, final Iterable<String> rangeUnits) {
- this.input = input;
+ public Connection(final FileCacheByteChannel owner, final InputStream input,
+ final long contentLength, final Iterable<String> rangeUnits)
+ {
+ rawInput = input;
this.start = 0;
this.end = (contentLength > 0) ? contentLength - 1 : Long.MAX_VALUE;
this.length = contentLength;
acceptRanges = acceptRanges(rangeUnits);
+ this.input = filter(owner, input);
}
/**
@@ -145,13 +163,16 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
*
* <p>Example of content range value: {@code "Content-Range: bytes 25000-75000/100000"}.</p>
*
+ * @param owner the channel which is opening this connection, or {@code null} if none.
* @param input the input stream for reading the bytes.
* @param contentRange value of "Content-Range" in HTTP header.
* @param rangeUnits value of "Accept-Ranges" in HTTP header, which lists the accepted units.
* @throws IllegalArgumentException if the start, end or length cannot be parsed.
*/
- public Connection(final InputStream input, String contentRange, final Collection<String> rangeUnits) {
- this.input = input;
+ public Connection(final FileCacheByteChannel owner, final InputStream input,
+ String contentRange, final Collection<String> rangeUnits)
+ {
+ rawInput = input;
long contentLength = -1;
contentRange = contentRange.trim();
int s = contentRange.indexOf(' ');
@@ -175,6 +196,25 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
* supported units did not changed.
*/
acceptRanges = rangeUnits.isEmpty() || acceptRanges(rangeUnits);
+ this.input = filter(owner, input);
+ }
+
+ /**
+ * If an optional filtering has been specified, applied it on the given input stream.
+ * This method should be invoked last in constructor, because it needs other fields.
+ *
+ * @param owner the channel which is opening a connection, or {@code null} if none.
+ * @param input the input stream created for a new connection.
+ * @return the filtered input stream, or {@code input} if there is no filtering.
+ *
+ * @see #setFilter(UnaryOperator)
+ */
+ private InputStream filter(final FileCacheByteChannel owner, InputStream input) {
+ final Filter filter;
+ if (owner != null && (filter = owner.filter) != null) {
+ input = filter.apply(input, start, end);
+ }
+ return input;
}
/**
@@ -235,6 +275,36 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
*/
private Connection connection;
+ /**
+ * An optional filter to apply on the input stream opened for a connections.
+ * A filter may be installed for example for being notified of the ranges of
+ * bytes that are read, or for transforming the data.
+ *
+ * @see #setFilter(Filter)
+ * @see java.io.FilterInputStream
+ * @see java.io.FilterOutputStream
+ */
+ public interface Filter {
+ /**
+ * Invoked when an input stream is created for a new connection.
+ *
+ * @param input the input stream for the new connection.
+ * @param start position of the first byte to be returned by the input stream.
+ * @param end position (inclusive) of the last byte to be returned.
+ * @return the input stream to use for reading data.
+ *
+ * @see java.io.FilterInputStream
+ */
+ InputStream apply(InputStream input, long start, long end);
+ }
+
+ /**
+ * Optional filters to apply on the streams opened for a connection.
+ *
+ * @see #setFilter(Filter)
+ */
+ private Filter filter;
+
/**
* Input/output channel on the temporary or cached file where data are copied.
* The {@linkplain FileChannel#position() position of this file channel} shall
@@ -305,6 +375,19 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
StandardOpenOption.DELETE_ON_CLOSE);
}
+ /**
+ * Applies an optional filter on the streams opened for each new connection.
+ * A filter may be installed for example for being notified of the ranges of
+ * bytes that are read, or for transforming the data.
+ *
+ * @param filter a function which receives in argument the stream created
+ * for a new connection, and returns the stream to use.
+ * A {@code null} function remove filtering.
+ */
+ public final void setFilter(final Filter filter) {
+ this.filter = filter;
+ }
+
/**
* Returns the filename to use in error messages.
*
@@ -328,24 +411,24 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
/**
* Invoked when this channel is no longer interested in reading bytes from the specified stream.
* This method is invoked for example when this channel needs to skip an arbitrarily large number
- * of bytes because the {@linkplain #position(long) position changed}. The {@code input} argument
- * is the value in the record returned by a previous call to {@link #openConnection(long, long)}.
+ * of bytes because the {@linkplain #position(long) position changed}. The {@code connection}
+ * argument is the value returned by a previous call to {@link #openConnection(long, long)}.
* The boolean return value tells what this method has done:
*
* <ul class="verbose">
- * <li>If this method returns {@code true}, then the given stream has been closed by this method and this
+ * <li>If this method returns {@code true}, then the input stream has been closed by this method and this
* channel is ready to create a new stream on the next call to {@link #openConnection(long, long)}.</li>
- * <li>If this method returns {@code false}, then the given stream is still alive and should continue to be used.
+ * <li>If this method returns {@code false}, then the input stream is still alive and should continue to be used.
* The {@link #openConnection(long, long)} method will <em>not</em> be invoked.
* Instead, bytes will be skipped by reading them from the current input stream and caching them.</li>
* </ul>
*
- * @param input the input stream to eventually close.
- * @return whether the given input stream has been closed by this method. If {@code false},
+ * @param connection container of the input stream to eventually close.
+ * @return whether the input stream has been closed by this method. If {@code false},
* then this channel should continue to use that input stream instead of opening a new connection.
* @throws IOException if an error occurred while closing the stream or preparing for next read operations.
*/
- protected boolean abort(InputStream input) throws IOException {
+ protected boolean abort(Connection connection) throws IOException {
return false;
}
@@ -673,7 +756,7 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
cache(buffer.limit(n));
count += n;
}
- if (abort(input)) {
+ if (abort(connection)) {
connection = null;
}
return count;
@@ -729,7 +812,7 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
transfer = null;
idleHandler = null;
try (file) {
- if (c != null && !abort(c.input)) {
+ if (c != null && !abort(c)) {
c.input.close();
}
}
diff --git a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/HttpByteChannel.java b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/HttpByteChannel.java
index b9e91523a0..47d2ef6664 100644
--- a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/HttpByteChannel.java
+++ b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/HttpByteChannel.java
@@ -122,9 +122,9 @@ final class HttpByteChannel extends FileCacheByteChannel {
try {
if (range == null) {
final long length = headers.firstValueAsLong("Content-Length").orElse(-1);
- return new Connection(stream, length, rangeUnits);
+ return new Connection(this, stream, length, rangeUnits);
} else {
- return new Connection(stream, range, rangeUnits);
+ return new Connection(this, stream, range, rangeUnits);
}
} catch (IllegalArgumentException e) {
throw new IOException(e);
@@ -132,15 +132,15 @@ final class HttpByteChannel extends FileCacheByteChannel {
}
/**
- * Invoked when this channel is no longer interested in reading bytes from the specified stream.
+ * Invoked when this channel is no longer interested in reading bytes from the specified connection.
*
- * @param input the input stream to eventually close.
- * @return whether the given input stream has been closed by this method.
+ * @param connection contains the input stream to eventually close.
+ * @return whether the input stream has been closed by this method.
* @throws IOException if an error occurred while closing the stream or preparing for next read operations.
*/
@Override
- protected boolean abort(final InputStream input) throws IOException {
- input.close();
+ protected boolean abort(final Connection connection) throws IOException {
+ connection.input.close();
return true;
}
}
diff --git a/storage/sis-storage/src/main/java/org/apache/sis/storage/StorageConnector.java b/storage/sis-storage/src/main/java/org/apache/sis/storage/StorageConnector.java
index 808db94167..269ba41729 100644
--- a/storage/sis-storage/src/main/java/org/apache/sis/storage/StorageConnector.java
+++ b/storage/sis-storage/src/main/java/org/apache/sis/storage/StorageConnector.java
@@ -19,6 +19,7 @@ package org.apache.sis.storage;
import java.util.Map;
import java.util.Iterator;
import java.util.IdentityHashMap;
+import java.util.function.UnaryOperator;
import java.io.Reader;
import java.io.DataInput;
import java.io.DataOutput;
@@ -35,6 +36,7 @@ import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.NoSuchFileException;
+import java.nio.file.OpenOption;
import javax.imageio.stream.ImageInputStream;
import javax.imageio.stream.ImageOutputStream;
import javax.imageio.IIOException;
@@ -985,10 +987,7 @@ public class StorageConnector implements Serializable {
* URL, URI, File, Path or other types that may be added in future Apache SIS versions.
* If the given storage is already a ReadableByteChannel, then the factory will return it as-is.
*/
- final ChannelFactory factory = ChannelFactory.prepare(storage, false,
- getOption(OptionKey.URL_ENCODING),
- getOption(OptionKey.OPEN_OPTIONS),
- getOption(InternalOptionKey.CHANNEL_FACTORY_WRAPPER));
+ final ChannelFactory factory = createChannelFactory(false);
if (factory == null) {
return null;
}
@@ -1309,6 +1308,25 @@ public class StorageConnector implements Serializable {
addView(type, view, null, (byte) 0);
}
+ /**
+ * Returns a byte channel factory from the storage, or {@code null} if the storage is unsupported.
+ * See {@link ChannelFactory#prepare(Object, boolean, String, OpenOption[])} for more information.
+ *
+ * @param allowWriteOnly whether to allow wrapping {@link WritableByteChannel} and {@link OutputStream}.
+ * @return the channel factory for the given input, or {@code null} if the given input is of unknown type.
+ * @throws IOException if an error occurred while processing the given input.
+ */
+ private ChannelFactory createChannelFactory(final boolean allowWriteOnly) throws IOException {
+ ChannelFactory factory = ChannelFactory.prepare(storage, allowWriteOnly,
+ getOption(OptionKey.URL_ENCODING),
+ getOption(OptionKey.OPEN_OPTIONS));
+ final UnaryOperator<ChannelFactory> wrapper = getOption(InternalOptionKey.CHANNEL_FACTORY_WRAPPER);
+ if (factory != null && wrapper != null) {
+ factory = wrapper.apply(factory);
+ }
+ return factory;
+ }
+
/**
* Creates a view for the storage as a {@link ChannelDataOutput} if possible.
* This code is a partial copy of {@link #createDataInput()} adapted for output.
@@ -1328,10 +1346,7 @@ public class StorageConnector implements Serializable {
* URL, URI, File, Path or other types that may be added in future Apache SIS versions.
* If the given storage is already a WritableByteChannel, then the factory will return it as-is.
*/
- final ChannelFactory factory = ChannelFactory.prepare(storage, true,
- getOption(OptionKey.URL_ENCODING),
- getOption(OptionKey.OPEN_OPTIONS),
- getOption(InternalOptionKey.CHANNEL_FACTORY_WRAPPER));
+ final ChannelFactory factory = createChannelFactory(true);
if (factory == null) {
return null;
}
diff --git a/storage/sis-storage/src/test/java/org/apache/sis/internal/storage/io/FileCacheByteChannelTest.java b/storage/sis-storage/src/test/java/org/apache/sis/internal/storage/io/FileCacheByteChannelTest.java
index 5da0e3dce7..441cae8bea 100644
--- a/storage/sis-storage/src/test/java/org/apache/sis/internal/storage/io/FileCacheByteChannelTest.java
+++ b/storage/sis-storage/src/test/java/org/apache/sis/internal/storage/io/FileCacheByteChannelTest.java
@@ -99,15 +99,15 @@ public final class FileCacheByteChannelTest extends TestCase {
end = Math.min(end + random.nextInt(40), length);
} while (start >= end);
var input = new ComputedInputStream(Math.toIntExact(start), Math.toIntExact(end), random);
- return new Connection(input, start, end-1, length, true);
+ return new Connection(null, input, start, end-1, length, true);
}
/**
- * Marks the given input stream as closed and notify that a new one can be created.
+ * Marks the input stream as closed and notify that a new one can be created.
*/
@Override
- protected boolean abort(final InputStream input) throws IOException {
- input.close();
+ protected boolean abort(final Connection connection) throws IOException {
+ connection.input.close();
return true;
}
@@ -211,23 +211,23 @@ public final class FileCacheByteChannelTest extends TestCase {
public void testParseRange() {
final List<String> rangesUnit = List.of("bytes");
FileCacheByteChannel.Connection c;
- c = new FileCacheByteChannel.Connection(null, "bytes 25000-75000/100000", rangesUnit);
+ c = new FileCacheByteChannel.Connection(null, null, "bytes 25000-75000/100000", rangesUnit);
assertEquals( 25000, c.start);
assertEquals( 75000, c.end);
assertEquals(100000, c.length);
- c = new FileCacheByteChannel.Connection(null, "bytes 25000-75000", rangesUnit);
+ c = new FileCacheByteChannel.Connection(null, null, "bytes 25000-75000", rangesUnit);
assertEquals( 25000, c.start);
assertEquals( 75000, c.end);
assertEquals( -1, c.length);
- c = new FileCacheByteChannel.Connection(null, "bytes 25000/100000", rangesUnit);
+ c = new FileCacheByteChannel.Connection(null, null, "bytes 25000/100000", rangesUnit);
assertEquals( 25000, c.start);
assertEquals(100000, c.end);
assertEquals(100000, c.length);
// Not legal, but we test robustness.
- c = new FileCacheByteChannel.Connection(null, "25000", rangesUnit);
+ c = new FileCacheByteChannel.Connection(null, null, "25000", rangesUnit);
assertEquals( 25000, c.start);
assertEquals( -1, c.end);
assertEquals( -1, c.length);