You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sis.apache.org by de...@apache.org on 2023/02/10 22:32:31 UTC

[sis] 01/03: Show which data are cached when a connection does not support HTTP range. This is for avoiding the impression that the application is blocked.

This is an automated email from the ASF dual-hosted git repository.

desruisseaux pushed a commit to branch geoapi-4.0
in repository https://gitbox.apache.org/repos/asf/sis.git

commit 67914142d82d61a67bd95779bac1767ae67de5d9
Author: Martin Desruisseaux <ma...@geomatys.com>
AuthorDate: Fri Feb 10 17:22:42 2023 +0100

    Show which data are cached when a connection does not support HTTP range.
    This is for avoiding the impression that the application is blocked.
---
 .../apache/sis/internal/gui/io/FileAccessItem.java | 348 +++++++++++++++++----
 .../apache/sis/internal/gui/io/FileAccessView.java |  41 ++-
 .../apache/sis/internal/gui/io/package-info.java   |   2 +-
 .../apache/sis/cloud/aws/s3/CachedByteChannel.java |  20 +-
 .../sis/internal/storage/io/ChannelFactory.java    |  29 +-
 .../internal/storage/io/FileCacheByteChannel.java  | 117 ++++++-
 .../sis/internal/storage/io/HttpByteChannel.java   |  14 +-
 .../org/apache/sis/storage/StorageConnector.java   |  31 +-
 .../storage/io/FileCacheByteChannelTest.java       |  16 +-
 9 files changed, 474 insertions(+), 144 deletions(-)

diff --git a/application/sis-javafx/src/main/java/org/apache/sis/internal/gui/io/FileAccessItem.java b/application/sis-javafx/src/main/java/org/apache/sis/internal/gui/io/FileAccessItem.java
index 1b11bbe936..328c516af9 100644
--- a/application/sis-javafx/src/main/java/org/apache/sis/internal/gui/io/FileAccessItem.java
+++ b/application/sis-javafx/src/main/java/org/apache/sis/internal/gui/io/FileAccessItem.java
@@ -18,7 +18,10 @@ package org.apache.sis.internal.gui.io;
 
 import java.util.List;
 import java.util.ListIterator;
+import java.util.EnumMap;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.nio.channels.SeekableByteChannel;
 import javafx.application.Platform;
@@ -35,6 +38,7 @@ import javafx.scene.shape.StrokeType;
 import javafx.animation.FadeTransition;
 import javafx.util.Duration;
 import org.apache.sis.measure.Range;
+import org.apache.sis.internal.util.Numerics;
 import org.apache.sis.util.collection.RangeSet;
 
 
@@ -43,7 +47,7 @@ import org.apache.sis.util.collection.RangeSet;
  * This is a row in the table shown by {@link FileAccessView} table.
  *
  * @author  Martin Desruisseaux (Geomatys)
- * @version 1.2
+ * @version 1.4
  * @since   1.2
  */
 final class FileAccessItem implements Runnable, EventHandler<ActionEvent> {
@@ -62,16 +66,6 @@ final class FileAccessItem implements Runnable, EventHandler<ActionEvent> {
      */
     private static final int MARGIN_RIGHT = 6;
 
-    /**
-     * Color to use for filling the rectangles.
-     */
-    private static final Color FILL_COLOR = Color.LIGHTSEAGREEN;
-
-    /**
-     * Color to use for rectangles border.
-     */
-    private static final Color BORDER_COLOR = FILL_COLOR.darker();
-
     /**
      * Width of the cursor in pixels.
      */
@@ -100,9 +94,33 @@ final class FileAccessItem implements Runnable, EventHandler<ActionEvent> {
     final String filename;
 
     /**
-     * Range of bytes on which a read or write operation has been performed.
+     * The access mode. Rendering are done in enumeration order.
+     */
+    private enum Mode {
+        /** Cache a range of bytes. */ CACHE(Color.LIGHTGRAY),
+        /** Read  a range of bytes. */ READ(Color.LIGHTSEAGREEN),
+        /** Write a range of bytes. */ WRITE(Color.LIGHTCORAL);
+
+        /** The color to use for rendering the rectangle. */
+        private final Color border, fill;
+
+        /** Creates a new enumeration value. */
+        private Mode(final Color fill) {
+            this.fill = fill;
+            border = fill.darker();
+        }
+
+        /** Sets the colors of the given rectangle for representing this mode. */
+        final void colorize(final Rectangle r) {
+            r.setStroke(border);
+            r.setFill(fill);
+        }
+    }
+
+    /**
+     * Range of bytes on which read or write operations have been performed.
      */
-    private final RangeSet<Long> accessRanges;
+    private final EnumMap<Mode, RangeSet<Long>> accessRanges;
 
     /**
      * Visual representation of {@link #accessRanges}.
@@ -159,7 +177,7 @@ final class FileAccessItem implements Runnable, EventHandler<ActionEvent> {
         staticGroup   = staticView.getChildren();
         seeksGroup    = seeksView .getChildren();
         accessView    = new Pane(staticView, seeksView);
-        accessRanges  = RangeSet.create(Long.class, true, false);
+        accessRanges  = new EnumMap<>(Mode.class);
         staticView.setAutoSizeChildren(false);
         /*
          * Background rectangle.
@@ -167,7 +185,7 @@ final class FileAccessItem implements Runnable, EventHandler<ActionEvent> {
         final Rectangle background = new Rectangle();
         background.setY(MARGIN_TOP);
         background.setHeight(HEIGHT);
-        background.setStroke(FILL_COLOR.brighter());
+        background.setStroke(Mode.READ.fill.brighter());
         background.setFill(Color.TRANSPARENT);
         background.setStrokeType(StrokeType.INSIDE);
         staticGroup.add(background);
@@ -218,35 +236,58 @@ final class FileAccessItem implements Runnable, EventHandler<ActionEvent> {
      * Reports a read or write operation on a range of bytes.
      * This method must be invoked from JavaFX thread.
      *
-     * @param  position  offset of the first byte read or written.
-     * @param  count     number of bytes read or written.
-     * @param  write     {@code false} for a read operation, or {@code true} for a write operation.
+     * @param  lower  offset of the first byte read or written.
+     * @param  count  offset after the last byte read or written.
+     * @param  mode   whether a read, write or cache operation is performed.
+     *
+     * @see #addRangeLater(long, long, Mode)
      */
-    private void addRange(final long position, final int count, final boolean write) {
-        cursorPosition = position;
-        final boolean add = accessRanges.add(position, position + count);
+    private void addRange(final long lower, final long upper, final Mode mode) {
+        cursorPosition = lower;
+        /*
+         * Add the range for the specified mode and remove it for all other modes.
+         * Consequently the visual component will show the last access mode for
+         * the specified range of bytes.
+         */
+        RangeSet<Long> ranges = accessRanges.get(mode);
+        if (ranges == null) {
+            ranges = RangeSet.create(Long.class, true, false);
+            accessRanges.put(mode, ranges);
+        }
+        boolean add = ranges.add(lower, upper);
+        for (final RangeSet<Long> other : accessRanges.values()) {
+            if (other != null && other != ranges) {
+                add |= other.remove(lower, upper);
+            }
+        }
+        /*
+         * Update the visual component showing the position of last operation.
+         * An animation effect is used.
+         */
         final double scale = columnWidth / fileSize;
         if (Double.isFinite(scale)) {
             if (add) {
                 adjustSizes(scale, false);
             }
-            final Rectangle r;
-            if (cursor == null) {
-                r = new Rectangle(0, MARGIN_TOP, CURSOR_WIDTH, HEIGHT);
-                r.setArcWidth(CURSOR_WIDTH/2 - 1);
-                r.setArcHeight(HEIGHT/2 - 2);
-                r.setStroke(Color.ORANGE);
-                r.setFill(Color.YELLOW);
-                accessView.getChildren().add(r);
-                cursor = new FadeTransition(CURSOR_DURATION, r);
-                cursor.setOnFinished(this);
-                cursor.setFromValue(1);
-                cursor.setToValue(0);
-            } else {
-                r = (Rectangle) cursor.getNode();
+            if (mode != Mode.CACHE) {
+                final Rectangle r;
+                if (cursor == null) {
+                    r = new Rectangle(0, MARGIN_TOP, CURSOR_WIDTH, HEIGHT);
+                    r.setArcWidth(CURSOR_WIDTH/2 - 1);
+                    r.setArcHeight(HEIGHT/2 - 2);
+                    r.setStroke(Color.ORANGE);
+                    r.setFill(Color.YELLOW);
+                    accessView.getChildren().add(r);
+                    cursor = new FadeTransition(CURSOR_DURATION, r);
+                    cursor.setOnFinished(this);
+                    cursor.setFromValue(1);
+                    cursor.setToValue(0);
+                } else {
+                    r = (Rectangle) cursor.getNode();
+                }
+                r.setX(Math.max(0, Math.min(scale*lower - CURSOR_WIDTH/2, columnWidth - CURSOR_WIDTH)));
+                cursor.playFromStart();
             }
-            r.setX(Math.max(0, Math.min(scale*position - CURSOR_WIDTH/2, columnWidth - CURSOR_WIDTH)));
-            cursor.playFromStart();
         }
     }
 
@@ -287,36 +328,95 @@ final class FileAccessItem implements Runnable, EventHandler<ActionEvent> {
         /*
          * Adjust the position and width of all rectangles.
          */
-        for (final Range<Long> range : accessRanges) {
-            final long min = range.getMinValue();
-            final long max = range.getMaxValue();
-            final double x = scale * min;
-            final double width = scale * (max - min);
-            if (bars.hasNext()) {
-                final Rectangle r = (Rectangle) bars.next();
-                if (resized || r.getX() + r.getWidth() >= x) {
-                    r.setX(x);
-                    r.setWidth(width);
-                    continue;
+        for (final EnumMap.Entry<Mode, RangeSet<Long>> entry : accessRanges.entrySet()) {
+            final Mode mode = entry.getKey();
+            for (final Range<Long> range : entry.getValue()) {
+                final long min = range.getMinValue();
+                final long max = range.getMaxValue();
+                final double x = scale * min;
+                final double width = scale * (max - min);
+                if (bars.hasNext()) {
+                    final Rectangle r = (Rectangle) bars.next();
+                    if (resized || r.getX() + r.getWidth() >= x) {
+                        r.setX(x);
+                        r.setWidth(width);
+                        mode.colorize(r);
+                        continue;
+                    }
+                    /*
+                     * Newly added range may have merged two or more ranges in a single one.
+                     * Discard all ranges that are fully on the left side of current range.
+                     * This is not really mandatory, but we do that in an effort to keep the
+                     * most "relevant" rectangles (before change) for the new set of ranges.
+                     */
+                    bars.remove();
                 }
-                /*
-                 * Newly added range may have merged two or more ranges in a single one.
-                 * Discard all ranges that are fully on the left side of current range.
-                 * This is not really mandatory, but we do that in an effort to keep the
-                 * most "relevant" rectangles (before change) for the new set of ranges.
-                 */
-                bars.remove();
+                final Rectangle r = new Rectangle(x, MARGIN_TOP, width, HEIGHT);
+                r.setStrokeType(StrokeType.INSIDE);
+                mode.colorize(r);
+                bars.add(r);
             }
-            final Rectangle r = new Rectangle(x, MARGIN_TOP, width, HEIGHT);
-            r.setStrokeType(StrokeType.INSIDE);
-            r.setStroke(BORDER_COLOR);
-            r.setFill(FILL_COLOR);
-            bars.add(r);
         }
         // Remove all remaining children, if any.
         staticGroup.remove(bars.nextIndex(), staticGroup.size());
     }
 
+    /**
+     * A range of bytes determined from the background thread and to be consumed in the JavaFX thread.
+     * This range can be updated as long as it has not been consumed. Those modifications reduce the
+     * amount of events to be consumed by the JavaFX thread.
+     */
+    private final class NextAddRange implements Runnable {
+        /** Whether the range of bytes has been read, written or cached. */
+        private final Mode mode;
+
+        /** The range of bytes, modifiable as long as the event has not been consumed. */
+        long lower, upper;
+
+        /** Creates a new range of bytes for the given access mode. */
+        NextAddRange(final Mode mode) {
+            this.mode = mode;
+        }
+
+        /** Invoked in the JavaFX thread for saving the range in {@link #accessRanges}. */
+        @Override public void run() {
+            synchronized (FileAccessItem.this) {
+                if (next == this) {
+                    next = null;
+                }
+            }
+            addRange(lower, upper, mode);
+        }
+    }
+
+    /**
+     * The next range of bytes to be merged into {@link #accessRanges}.
+     * Accesses to this field must be synchronized on {@code this}.
+     * The instance is created in a background thread and consumed in the JavaFX thread.
+     */
+    private NextAddRange next;
+
+    /**
+     * Reports a read or write operation on a range of bytes.
+     * This method should be invoked from a background thread.
+     *
+     * @param  lower  offset of the first byte read or written.
+     * @param  count  number of bytes read or written.
+     * @param  mode   whether a read, write or cache operation is performed.
+     */
+    private synchronized void addRangeLater(long lower, final long count, final Mode mode) {
+        long upper = Numerics.saturatingAdd(lower, count);
+        if (next != null && next.mode == mode && next.upper >= lower && next.lower <= upper) {
+            lower = Math.min(next.lower, lower);
+            upper = Math.max(next.upper, upper);
+        } else {
+            next = new NextAddRange(mode);
+            Platform.runLater(next);
+        }
+        next.lower = lower;
+        next.upper = upper;
+    }
+
     /**
      * Wrapper around a {@link SeekableByteChannel} which will observe the ranges of bytes read or written.
      */
@@ -341,7 +441,7 @@ final class FileAccessItem implements Runnable, EventHandler<ActionEvent> {
         public int read(final ByteBuffer dst) throws IOException {
             final long position = position();
             final int count = channel.read(dst);
-            Platform.runLater(() -> addRange(position, count, false));
+            addRangeLater(position, count, Mode.READ);
             return count;
         }
 
@@ -352,7 +452,7 @@ final class FileAccessItem implements Runnable, EventHandler<ActionEvent> {
         public int write(final ByteBuffer src) throws IOException {
             final long position = position();
             final int count = channel.write(src);
-            Platform.runLater(() -> addRange(position, count, true));
+            addRangeLater(position, count, Mode.WRITE);
             return count;
         }
 
@@ -409,6 +509,130 @@ final class FileAccessItem implements Runnable, EventHandler<ActionEvent> {
         }
     }
 
+    /**
+     * Wrapper around an {@link InputStream} which will observe the ranges of bytes read.
+     * It can be used directly when the input is an {@link InputStream}, or indirectly
+     * when the channel observed by {@link Observer} is itself wrapping an input stream.
+     * In such case, the bytes read from the input stream are typically cached in some temporary file.
+     *
+     * <h2>Implementation note</h2>
+     * We do not extend {@link java.io.FilterInputStream} because we override almost all methods anyway.
+     * This implementation avoids using a non-final {@code volatile} field for the wrapped input stream.
+     */
+    final class InputObserver extends InputStream {
+        /** The source input stream. */
+        private final InputStream in;
+
+        /** The mode, either read or cache. */
+        private final Mode mode;
+
+        /** Position of the stream, current and marked. */
+        private long position, mark;
+
+        /** Creates a new observer for the given input stream. */
+        InputObserver(final InputStream in) {
+            this.in   = in;
+            this.mode = Mode.READ;
+        }
+
+        /** Creates a new observer for the given input stream used as a cache. */
+        InputObserver(final InputStream in, final long start) {
+            this.in   = in;
+            this.mode = Mode.CACHE;
+            position  = start;
+        }
+
+        /**
+         * Declares that a range of bytes has been read.
+         * This method update the rectangles in the JavaFX view.
+         *
+         * @param  count  number of bytes that have been read.
+         * @param  mode   the mode, usually {@link #mode}.
+         */
+        private void range(final long count, final Mode mode) {
+            if (count > 0) {
+                addRangeLater(position, count, mode);
+                if (position > (position += count)) {
+                    position = Long.MAX_VALUE;
+                }
+            }
+        }
+
+        /**
+         * Declares that a range of bytes has been read.
+         * This method update the rectangles in the JavaFX view.
+         *
+         * @param  count  number of bytes that have been read.
+         */
+        private void range(final long count) {
+            range(count, mode);
+        }
+
+        /** Returns the next byte or -1 on EOF. */
+        @Override public int read() throws IOException {
+            final int b = in.read();
+            if (b >= 0) range(1);
+            return b;
+        }
+
+        /** Stores a sequence of bytes in the specified array. */
+        @Override public int read(final byte[] b, final int off, int len) throws IOException {
+            range(len = in.read(b, off, len));
+            return len;
+        }
+
+        /** Stores a sequence of bytes in a newly allocated array. */
+        @Override public byte[] readNBytes(final int len) throws IOException {
+            final byte[] b = in.readNBytes(len);
+            range(b.length);
+            return b;
+        }
+
+        /** Stores a sequence of bytes in the specified output stream. */
+        @Override public long transferTo(final OutputStream out) throws IOException {
+            final long n = in.transferTo(out);
+            range(n);
+            return n;
+        }
+
+        /** Skips some bytes without reporting them as read. */
+        @Override public long skip(long n) throws IOException {
+            range(n = in.skip(n), Mode.CACHE);
+            return n;
+        }
+
+        /** Returns an estimate of the number of bytes that can be read. */
+        @Override public int available() throws IOException {
+            return in.available();
+        }
+
+        /** Tells whether the input stream supports marks. */
+        @Override public boolean markSupported() {
+            return in.markSupported();
+        }
+
+        /** Marks the current position in this input stream. */
+        @Override public void mark(final int readlimit) {
+            in.mark(readlimit);
+            mark = position;
+        }
+
+        /** Repositions this stream to the position of the last mark. */
+        @Override public void reset() throws IOException {
+            in.reset();
+            position = mark;
+        }
+
+        /** Closes the wrapped input stream. */
+        @Override public void close() throws IOException {
+            if (mode == Mode.READ) {
+                Platform.runLater(FileAccessItem.this);
+                // Otherwise will be removed by `Observer`.
+            }
+            in.close();
+        }
+    }
+
     /**
      * Invoked in JavaFX thread for removing this row from the table.
      */
diff --git a/application/sis-javafx/src/main/java/org/apache/sis/internal/gui/io/FileAccessView.java b/application/sis-javafx/src/main/java/org/apache/sis/internal/gui/io/FileAccessView.java
index 4404007173..0de8fef6d6 100644
--- a/application/sis-javafx/src/main/java/org/apache/sis/internal/gui/io/FileAccessView.java
+++ b/application/sis-javafx/src/main/java/org/apache/sis/internal/gui/io/FileAccessView.java
@@ -17,6 +17,7 @@
 package org.apache.sis.internal.gui.io;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.SeekableByteChannel;
 import java.nio.channels.WritableByteChannel;
@@ -32,6 +33,7 @@ import org.apache.sis.internal.gui.FixedHeaderColumnSize;
 import org.apache.sis.internal.gui.ImmutableObjectProperty;
 import org.apache.sis.internal.gui.Resources;
 import org.apache.sis.internal.storage.io.ChannelFactory;
+import org.apache.sis.internal.storage.io.FileCacheByteChannel;
 import org.apache.sis.storage.DataStoreException;
 import org.apache.sis.storage.event.StoreListeners;
 
@@ -45,7 +47,7 @@ import org.apache.sis.storage.event.StoreListeners;
  * in the vast majority of cases when user has no interest in those information.</p>
  *
  * @author  Martin Desruisseaux (Geomatys)
- * @version 1.2
+ * @version 1.4
  * @since   1.2
  */
 public final class FileAccessView extends Widget implements UnaryOperator<ChannelFactory> {
@@ -113,6 +115,36 @@ public final class FileAccessView extends Widget implements UnaryOperator<Channe
                 return factory.canOpen();
             }
 
+            /**
+             * Returns a new visual item and adds it as a row in the table of opened files.
+             * This method can be invoked from any thread (usually not the JavaFX one).
+             *
+             * @param  filename  data store name.
+             * @return the view of the row added in the table.
+             */
+            private FileAccessItem newItem(final String filename) {
+                final FileAccessItem item = new FileAccessItem(table.getItems(), filename);
+                Platform.runLater(() -> item.owner.add(item));
+                return item;
+            }
+
+            /**
+             * Returns the readable channel as an input stream.
+             *
+             * @param  filename  data store name.
+             * @param  listeners set of registered {@code StoreListener}s for the data store, or {@code null} if none.
+             * @return the input stream.
+             * @throws DataStoreException if the channel is read-once.
+             * @throws IOException if the input stream or its underlying byte channel cannot be created.
+             */
+            @Override
+            public InputStream inputStream(final String filename, final StoreListeners listeners)
+                    throws DataStoreException, IOException
+            {
+                final InputStream input = factory.inputStream(filename, listeners);
+                return newItem(filename).new InputObserver(input);
+            }
+
             /**
              * Creates a readable channel and listens (if possible) read operations.
              * Current implementation listens only to {@link SeekableByteChannel}
@@ -130,8 +162,11 @@ public final class FileAccessView extends Widget implements UnaryOperator<Channe
             {
                 final ReadableByteChannel channel = factory.readable(filename, listeners);
                 if (channel instanceof SeekableByteChannel) {
-                    final FileAccessItem item = new FileAccessItem(table.getItems(), filename);
-                    Platform.runLater(() -> item.owner.add(item));
+                    final FileAccessItem item = newItem(filename);
+                    if (channel instanceof FileCacheByteChannel) {
+                        ((FileCacheByteChannel) channel).setFilter((input, start, end) ->
+                                item.new InputObserver(input, start));
+                    }
                     return item.new Observer((SeekableByteChannel) channel);
                 }
                 return channel;
diff --git a/application/sis-javafx/src/main/java/org/apache/sis/internal/gui/io/package-info.java b/application/sis-javafx/src/main/java/org/apache/sis/internal/gui/io/package-info.java
index a929e6e171..2f26f52e0a 100644
--- a/application/sis-javafx/src/main/java/org/apache/sis/internal/gui/io/package-info.java
+++ b/application/sis-javafx/src/main/java/org/apache/sis/internal/gui/io/package-info.java
@@ -24,7 +24,7 @@
  * may change in incompatible ways in any future version without notice.
  *
  * @author  Martin Desruisseaux (Geomatys)
- * @version 1.2
+ * @version 1.4
  *
  * @see org.apache.sis.internal.gui.DataStoreOpener
  *
diff --git a/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/CachedByteChannel.java b/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/CachedByteChannel.java
index 5d4b80845a..66ada480a6 100644
--- a/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/CachedByteChannel.java
+++ b/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/CachedByteChannel.java
@@ -18,7 +18,6 @@ package org.apache.sis.cloud.aws.s3;
 
 import java.util.List;
 import java.io.IOException;
-import java.io.InputStream;
 import org.apache.sis.internal.storage.io.FileCacheByteChannel;
 import software.amazon.awssdk.services.s3.model.GetObjectRequest;
 import software.amazon.awssdk.services.s3.model.GetObjectResponse;
@@ -86,9 +85,9 @@ final class CachedByteChannel extends FileCacheByteChannel {
                 if (contentRange == null) {
                     final Long contentLength = response.contentLength();
                     final long length = (contentLength != null) ? contentLength : -1;
-                    return new Connection(stream, length, rangeUnits);
+                    return new Connection(this, stream, length, rangeUnits);
                 } else {
-                    return new Connection(stream, contentRange, rangeUnits);
+                    return new Connection(this, stream, contentRange, rangeUnits);
                 }
             } catch (IllegalArgumentException e) {
                 throw new IOException(e);
@@ -99,18 +98,19 @@ final class CachedByteChannel extends FileCacheByteChannel {
     }
 
     /**
-     * Invoked when this channel is no longer interested in reading bytes from the specified stream.
+     * Invoked when this channel is no longer interested in reading bytes from the specified connection.
      *
-     * @param  input  the input stream to eventually close.
-     * @return whether the given input stream has been closed by this method.
+     * @param  connection  contains the input stream to eventually close.
+     * @return whether the input stream has been closed by this method.
+     * @throws IOException if an error occurred while closing the stream or preparing for next read operations.
      */
     @Override
-    protected boolean abort(final InputStream input) throws IOException {
-        if (input instanceof Abortable) {
-            ((Abortable) input).abort();
+    protected boolean abort(final Connection connection) throws IOException {
+        if (connection.rawInput instanceof Abortable) {
+            ((Abortable) connection.rawInput).abort();
             return true;
         } else {
-            return super.abort(input);
+            return super.abort(connection);
         }
     }
 }
diff --git a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/ChannelFactory.java b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/ChannelFactory.java
index 0da1f94cab..87a533cd72 100644
--- a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/ChannelFactory.java
+++ b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/ChannelFactory.java
@@ -22,7 +22,6 @@ import java.util.HashSet;
 import java.util.Arrays;
 import java.util.logging.Level;
 import java.util.logging.LogRecord;
-import java.util.function.UnaryOperator;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
@@ -132,36 +131,10 @@ public abstract class ChannelFactory {
      *                         If the URL is not encoded, then {@code null}. This argument is ignored if the given
      *                         input does not need to be converted from URL to {@code File}.
      * @param  options         the options to use for creating a new byte channel. Can be null or empty for read-only.
-     * @param  wrapper         a function for creating wrapper around the factory, or {@code null} if none.
-     *                         It can be used for installing listener or for transforming data on the fly.
      * @return the channel factory for the given input, or {@code null} if the given input is of unknown type.
      * @throws IOException if an error occurred while processing the given input.
      */
-    public static ChannelFactory prepare(
-            final Object storage, final boolean allowWriteOnly,
-            final String encoding, final OpenOption[] options,
-            final UnaryOperator<ChannelFactory> wrapper) throws IOException
-    {
-        ChannelFactory factory = prepare(storage, allowWriteOnly, encoding, options);
-        if (factory != null && wrapper != null) {
-            factory = wrapper.apply(factory);
-        }
-        return factory;
-    }
-
-    /**
-     * Returns a byte channel factory without wrappers, or {@code null} if unsupported.
-     * This method performs the same work than {@linkplain #prepare(Object, boolean, String,
-     * OpenOption[], UnaryOperator, UnaryOperator) above method}, but without wrappers.
-     *
-     * @param  storage         the stream or the file to open, or {@code null}.
-     * @param  allowWriteOnly  whether to allow wrapping {@link WritableByteChannel} and {@link OutputStream}.
-     * @param  encoding        if the input is an encoded URL, the character encoding (normally {@code "UTF-8"}).
-     * @param  options         the options to use for creating a new byte channel. Can be null or empty for read-only.
-     * @return the channel factory for the given input, or {@code null} if the given input is of unknown type.
-     * @throws IOException if an error occurred while processing the given input.
-     */
-    private static ChannelFactory prepare(Object storage, final boolean allowWriteOnly,
+    public static ChannelFactory prepare(Object storage, final boolean allowWriteOnly,
             final String encoding, final OpenOption[] options) throws IOException
     {
         /*
diff --git a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/FileCacheByteChannel.java b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/FileCacheByteChannel.java
index 15fe5a29e6..5edbffec53 100644
--- a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/FileCacheByteChannel.java
+++ b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/FileCacheByteChannel.java
@@ -17,6 +17,7 @@
 package org.apache.sis.internal.storage.io;
 
 import java.util.Collection;
+import java.util.function.UnaryOperator;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
@@ -86,8 +87,17 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
         /** The unit of ranges used in HTTP connections. */
         private static final String RANGES_UNIT = "bytes";
 
-        /** The input stream for reading the bytes. */
-        final InputStream input;
+        /**
+         * The input stream without filtering, as specified at construction time.
+         * This is the same instance than {@link #input} when no filtering is applied.
+         */
+        public final InputStream rawInput;
+
+        /**
+         * The input stream for reading the bytes. It may be a wrapper around the input stream
+         * specified at construction time if {@linkplain #setFilter a filter has been set}.
+         */
+        public final InputStream input;
 
         /** Position of the first byte read by the input stream (inclusive). */
         final long start;
@@ -104,6 +114,7 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
         /**
          * Creates information about a connection.
          *
+         * @param owner         the channel which is opening this connection, or {@code null} if none.
          * @param input         the input stream for reading the bytes.
          * @param start         position of the first byte read by the input stream (inclusive).
          * @param end           position of the last byte read by the input stream (inclusive).
@@ -112,12 +123,15 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
          *
          * @see #openConnection(long, long)
          */
-        public Connection(final InputStream input, final long start, final long end, final long length, final boolean acceptRanges) {
-            this.input  = input;
+        public Connection(final FileCacheByteChannel owner, final InputStream input,
+                final long start, final long end, final long length, final boolean acceptRanges)
+        {
+            rawInput    = input;
             this.start  = start;
             this.end    = end;
             this.length = length;
             this.acceptRanges = acceptRanges;
+            this.input  = filter(owner, input);     // Must be last.
         }
 
         /**
@@ -125,17 +139,21 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
          * The "Content-Length" header value is useful to this class only if the connection was
          * opened for the full file.
          *
+         * @param  owner          the channel which is opening this connection, or {@code null} if none.
          * @param  input          the input stream for reading the bytes.
          * @param  contentLength  length of the response content, or -1 if unknown.
          * @param  rangeUnits     value of "Accept-Ranges" in HTTP header, which lists the accepted units.
          * @throws IllegalArgumentException if the start, end or length cannot be parsed.
          */
-        public Connection(final InputStream input, final long contentLength, final Iterable<String> rangeUnits) {
-            this.input   = input;
+        public Connection(final FileCacheByteChannel owner, final InputStream input,
+                          final long contentLength, final Iterable<String> rangeUnits)
+        {
+            rawInput     = input;
             this.start   = 0;
             this.end     = (contentLength > 0) ? contentLength - 1 : Long.MAX_VALUE;
             this.length  = contentLength;
             acceptRanges = acceptRanges(rangeUnits);
+            this.input   = filter(owner, input);
         }
 
         /**
@@ -145,13 +163,16 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
          *
          * <p>Example of content range value: {@code "Content-Range: bytes 25000-75000/100000"}.</p>
          *
+         * @param  owner         the channel which is opening this connection, or {@code null} if none.
          * @param  input         the input stream for reading the bytes.
          * @param  contentRange  value of "Content-Range" in HTTP header.
          * @param  rangeUnits    value of "Accept-Ranges" in HTTP header, which lists the accepted units.
          * @throws IllegalArgumentException if the start, end or length cannot be parsed.
          */
-        public Connection(final InputStream input, String contentRange, final Collection<String> rangeUnits) {
-            this.input = input;
+        public Connection(final FileCacheByteChannel owner, final InputStream input,
+                          String contentRange, final Collection<String> rangeUnits)
+        {
+            rawInput = input;
             long contentLength = -1;
             contentRange = contentRange.trim();
             int s = contentRange.indexOf(' ');
@@ -175,6 +196,25 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
              * supported units did not changed.
              */
             acceptRanges = rangeUnits.isEmpty() || acceptRanges(rangeUnits);
+            this.input = filter(owner, input);
+        }
+
+        /**
+         * If an optional filtering has been specified, applied it on the given input stream.
+         * This method should be invoked last in constructor, because it needs other fields.
+         *
+         * @param  owner  the channel which is opening a connection, or {@code null} if none.
+         * @param  input  the input stream created for a new connection.
+         * @return the filtered input stream, or {@code input} if there is no filtering.
+         *
+         * @see #setFilter(UnaryOperator)
+         */
+        private InputStream filter(final FileCacheByteChannel owner, InputStream input) {
+            final Filter filter;
+            if (owner != null && (filter = owner.filter) != null) {
+                input = filter.apply(input, start, end);
+            }
+            return input;
         }
 
         /**
@@ -235,6 +275,36 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
      */
     private Connection connection;
 
+    /**
+     * An optional filter to apply on the input stream opened for a connections.
+     * A filter may be installed for example for being notified of the ranges of
+     * bytes that are read, or for transforming the data.
+     *
+     * @see #setFilter(Filter)
+     * @see java.io.FilterInputStream
+     * @see java.io.FilterOutputStream
+     */
+    public interface Filter {
+        /**
+         * Invoked when an input stream is created for a new connection.
+         *
+         * @param  input  the input stream for the new connection.
+         * @param  start  position of the first byte to be returned by the input stream.
+         * @param  end    position (inclusive) of the last byte to be returned.
+         * @return the input stream to use for reading data.
+         *
+         * @see java.io.FilterInputStream
+         */
+        InputStream apply(InputStream input, long start, long end);
+    }
+
+    /**
+     * Optional filters to apply on the streams opened for a connection.
+     *
+     * @see #setFilter(Filter)
+     */
+    private Filter filter;
+
     /**
      * Input/output channel on the temporary or cached file where data are copied.
      * The {@linkplain FileChannel#position() position of this file channel} shall
@@ -305,6 +375,19 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
                 StandardOpenOption.DELETE_ON_CLOSE);
     }
 
+    /**
+     * Applies an optional filter on the streams opened for each new connection.
+     * A filter may be installed for example for being notified of the ranges of
+     * bytes that are read, or for transforming the data.
+     *
+     * @param  filter  a function which receives in argument the stream created
+     *         for a new connection, and returns the stream to use.
+     *         A {@code null} function remove filtering.
+     */
+    public final void setFilter(final Filter filter) {
+        this.filter = filter;
+    }
+
     /**
      * Returns the filename to use in error messages.
      *
@@ -328,24 +411,24 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
     /**
      * Invoked when this channel is no longer interested in reading bytes from the specified stream.
      * This method is invoked for example when this channel needs to skip an arbitrarily large number
-     * of bytes because the {@linkplain #position(long) position changed}. The {@code input} argument
-     * is the value in the record returned by a previous call to {@link #openConnection(long, long)}.
+     * of bytes because the {@linkplain #position(long) position changed}. The {@code connection}
+     * argument is the value returned by a previous call to {@link #openConnection(long, long)}.
      * The boolean return value tells what this method has done:
      *
      * <ul class="verbose">
-     *   <li>If this method returns {@code true}, then the given stream has been closed by this method and this
+     *   <li>If this method returns {@code true}, then the input stream has been closed by this method and this
      *       channel is ready to create a new stream on the next call to {@link #openConnection(long, long)}.</li>
-     *   <li>If this method returns {@code false}, then the given stream is still alive and should continue to be used.
+     *   <li>If this method returns {@code false}, then the input stream is still alive and should continue to be used.
      *       The {@link #openConnection(long, long)} method will <em>not</em> be invoked.
      *       Instead, bytes will be skipped by reading them from the current input stream and caching them.</li>
      * </ul>
      *
-     * @param  input  the input stream to eventually close.
-     * @return whether the given input stream has been closed by this method. If {@code false},
+     * @param  connection  container of the input stream to eventually close.
+     * @return whether the input stream has been closed by this method. If {@code false},
      *         then this channel should continue to use that input stream instead of opening a new connection.
      * @throws IOException if an error occurred while closing the stream or preparing for next read operations.
      */
-    protected boolean abort(InputStream input) throws IOException {
+    protected boolean abort(Connection connection) throws IOException {
         return false;
     }
 
@@ -673,7 +756,7 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
             cache(buffer.limit(n));
             count += n;
         }
-        if (abort(input)) {
+        if (abort(connection)) {
             connection = null;
         }
         return count;
@@ -729,7 +812,7 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel {
         transfer    = null;
         idleHandler = null;
         try (file) {
-            if (c != null && !abort(c.input)) {
+            if (c != null && !abort(c)) {
                 c.input.close();
             }
         }
diff --git a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/HttpByteChannel.java b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/HttpByteChannel.java
index b9e91523a0..47d2ef6664 100644
--- a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/HttpByteChannel.java
+++ b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/HttpByteChannel.java
@@ -122,9 +122,9 @@ final class HttpByteChannel extends FileCacheByteChannel {
         try {
             if (range == null) {
                 final long length = headers.firstValueAsLong("Content-Length").orElse(-1);
-                return new Connection(stream, length, rangeUnits);
+                return new Connection(this, stream, length, rangeUnits);
             } else {
-                return new Connection(stream, range, rangeUnits);
+                return new Connection(this, stream, range, rangeUnits);
             }
         } catch (IllegalArgumentException e) {
             throw new IOException(e);
@@ -132,15 +132,15 @@ final class HttpByteChannel extends FileCacheByteChannel {
     }
 
     /**
-     * Invoked when this channel is no longer interested in reading bytes from the specified stream.
+     * Invoked when this channel is no longer interested in reading bytes from the specified connection.
      *
-     * @param  input  the input stream to eventually close.
-     * @return whether the given input stream has been closed by this method.
+     * @param  connection  contains the input stream to eventually close.
+     * @return whether the input stream has been closed by this method.
      * @throws IOException if an error occurred while closing the stream or preparing for next read operations.
      */
     @Override
-    protected boolean abort(final InputStream input) throws IOException {
-        input.close();
+    protected boolean abort(final Connection connection) throws IOException {
+        connection.input.close();
         return true;
     }
 }
diff --git a/storage/sis-storage/src/main/java/org/apache/sis/storage/StorageConnector.java b/storage/sis-storage/src/main/java/org/apache/sis/storage/StorageConnector.java
index 808db94167..269ba41729 100644
--- a/storage/sis-storage/src/main/java/org/apache/sis/storage/StorageConnector.java
+++ b/storage/sis-storage/src/main/java/org/apache/sis/storage/StorageConnector.java
@@ -19,6 +19,7 @@ package org.apache.sis.storage;
 import java.util.Map;
 import java.util.Iterator;
 import java.util.IdentityHashMap;
+import java.util.function.UnaryOperator;
 import java.io.Reader;
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -35,6 +36,7 @@ import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
 import java.nio.channels.SeekableByteChannel;
 import java.nio.file.NoSuchFileException;
+import java.nio.file.OpenOption;
 import javax.imageio.stream.ImageInputStream;
 import javax.imageio.stream.ImageOutputStream;
 import javax.imageio.IIOException;
@@ -985,10 +987,7 @@ public class StorageConnector implements Serializable {
          * URL, URI, File, Path or other types that may be added in future Apache SIS versions.
          * If the given storage is already a ReadableByteChannel, then the factory will return it as-is.
          */
-        final ChannelFactory factory = ChannelFactory.prepare(storage, false,
-                getOption(OptionKey.URL_ENCODING),
-                getOption(OptionKey.OPEN_OPTIONS),
-                getOption(InternalOptionKey.CHANNEL_FACTORY_WRAPPER));
+        final ChannelFactory factory = createChannelFactory(false);
         if (factory == null) {
             return null;
         }
@@ -1309,6 +1308,25 @@ public class StorageConnector implements Serializable {
         addView(type, view, null, (byte) 0);
     }
 
+    /**
+     * Returns a byte channel factory from the storage, or {@code null} if the storage is unsupported.
+     * See {@link ChannelFactory#prepare(Object, boolean, String, OpenOption[])} for more information.
+     *
+     * @param  allowWriteOnly  whether to allow wrapping {@link WritableByteChannel} and {@link OutputStream}.
+     * @return the channel factory for the given input, or {@code null} if the given input is of unknown type.
+     * @throws IOException if an error occurred while processing the given input.
+     */
+    private ChannelFactory createChannelFactory(final boolean allowWriteOnly) throws IOException {
+        ChannelFactory factory = ChannelFactory.prepare(storage, allowWriteOnly,
+                getOption(OptionKey.URL_ENCODING),
+                getOption(OptionKey.OPEN_OPTIONS));
+        final UnaryOperator<ChannelFactory> wrapper = getOption(InternalOptionKey.CHANNEL_FACTORY_WRAPPER);
+        if (factory != null && wrapper != null) {
+            factory = wrapper.apply(factory);
+        }
+        return factory;
+    }
+
     /**
      * Creates a view for the storage as a {@link ChannelDataOutput} if possible.
      * This code is a partial copy of {@link #createDataInput()} adapted for output.
@@ -1328,10 +1346,7 @@ public class StorageConnector implements Serializable {
          * URL, URI, File, Path or other types that may be added in future Apache SIS versions.
          * If the given storage is already a WritableByteChannel, then the factory will return it as-is.
          */
-        final ChannelFactory factory = ChannelFactory.prepare(storage, true,
-                getOption(OptionKey.URL_ENCODING),
-                getOption(OptionKey.OPEN_OPTIONS),
-                getOption(InternalOptionKey.CHANNEL_FACTORY_WRAPPER));
+        final ChannelFactory factory = createChannelFactory(true);
         if (factory == null) {
             return null;
         }
diff --git a/storage/sis-storage/src/test/java/org/apache/sis/internal/storage/io/FileCacheByteChannelTest.java b/storage/sis-storage/src/test/java/org/apache/sis/internal/storage/io/FileCacheByteChannelTest.java
index 5da0e3dce7..441cae8bea 100644
--- a/storage/sis-storage/src/test/java/org/apache/sis/internal/storage/io/FileCacheByteChannelTest.java
+++ b/storage/sis-storage/src/test/java/org/apache/sis/internal/storage/io/FileCacheByteChannelTest.java
@@ -99,15 +99,15 @@ public final class FileCacheByteChannelTest extends TestCase {
                 end = Math.min(end + random.nextInt(40), length);
             } while (start >= end);
             var input = new ComputedInputStream(Math.toIntExact(start), Math.toIntExact(end), random);
-            return new Connection(input, start, end-1, length, true);
+            return new Connection(null, input, start, end-1, length, true);
         }
 
         /**
-         * Marks the given input stream as closed and notify that a new one can be created.
+         * Marks the input stream as closed and notify that a new one can be created.
          */
         @Override
-        protected boolean abort(final InputStream input) throws IOException {
-            input.close();
+        protected boolean abort(final Connection connection) throws IOException {
+            connection.input.close();
             return true;
         }
 
@@ -211,23 +211,23 @@ public final class FileCacheByteChannelTest extends TestCase {
     public void testParseRange() {
         final List<String> rangesUnit = List.of("bytes");
         FileCacheByteChannel.Connection c;
-        c = new FileCacheByteChannel.Connection(null, "bytes 25000-75000/100000", rangesUnit);
+        c = new FileCacheByteChannel.Connection(null, null, "bytes 25000-75000/100000", rangesUnit);
         assertEquals( 25000, c.start);
         assertEquals( 75000, c.end);
         assertEquals(100000, c.length);
 
-        c = new FileCacheByteChannel.Connection(null, "bytes 25000-75000", rangesUnit);
+        c = new FileCacheByteChannel.Connection(null, null, "bytes 25000-75000", rangesUnit);
         assertEquals( 25000, c.start);
         assertEquals( 75000, c.end);
         assertEquals(    -1, c.length);
 
-        c = new FileCacheByteChannel.Connection(null, "bytes 25000/100000", rangesUnit);
+        c = new FileCacheByteChannel.Connection(null, null, "bytes 25000/100000", rangesUnit);
         assertEquals( 25000, c.start);
         assertEquals(100000, c.end);
         assertEquals(100000, c.length);
 
         // Not legal, but we test robustness.
-        c = new FileCacheByteChannel.Connection(null, "25000", rangesUnit);
+        c = new FileCacheByteChannel.Connection(null, null, "25000", rangesUnit);
         assertEquals( 25000, c.start);
         assertEquals(    -1, c.end);
         assertEquals(    -1, c.length);