You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sis.apache.org by de...@apache.org on 2023/10/25 03:04:09 UTC
[sis] 01/03: Redesign the way that readers and writers co-exist in `DataStore` implementations: - `isWritable(…)` needs to distinguish between opening an existing file or creating a new one. - `setStreamPosition(long)` removed. It was misused in most places, causing probable bugs. - Allow creation of `ChannelDataOutput` from a `ChannelDataInput`, sharing same internal. - Add `synchronize(…)` for making input `ChannelData` consistent with output, or conversely.
This is an automated email from the ASF dual-hosted git repository.
desruisseaux pushed a commit to branch geoapi-4.0
in repository https://gitbox.apache.org/repos/asf/sis.git
commit 3762fd80fe3177d3070e0b601a6387fc85e6b78d
Author: Martin Desruisseaux <ma...@geomatys.com>
AuthorDate: Sun Oct 22 15:34:50 2023 +0200
Redesign the way that readers and writers co-exist in `DataStore` implementations:
- `isWritable(…)` needs to distinguish between opening an existing file or creating a new one.
- `setStreamPosition(long)` removed. It was misused in most places, causing probable bugs.
- Allow creation of `ChannelDataOutput` from a `ChannelDataInput`, sharing same internal.
- Add `synchronize(…)` for making input `ChannelData` consistent with output, or conversely.
---
.../apache/sis/storage/geotiff/GeoTiffStore.java | 2 +-
.../sis/storage/geotiff/ReversedBitsChannel.java | 19 +-
.../org/apache/sis/storage/geotiff/Writer.java | 2 +-
.../sis/storage/geotiff/inflater/Inflater.java | 3 +-
.../org/apache/sis/storage/gpx/StoreProvider.java | 2 +-
.../main/org/apache/sis/io/stream/ChannelData.java | 235 +++++++++++++++------
.../org/apache/sis/io/stream/ChannelDataInput.java | 86 +++++---
.../apache/sis/io/stream/ChannelDataOutput.java | 191 ++++++++++++-----
.../main/org/apache/sis/io/stream/Markable.java | 2 +-
.../org/apache/sis/io/stream/UpdatableWrite.java | 39 +++-
.../org/apache/sis/storage/StorageConnector.java | 25 ++-
.../org/apache/sis/storage/base/URIDataStore.java | 26 ++-
.../sis/storage/esri/AsciiGridStoreProvider.java | 2 +-
.../org/apache/sis/storage/image/FormatFinder.java | 2 +-
.../storage/internal/WritableResourceSupport.java | 1 -
.../main/org/apache/sis/storage/package-info.java | 2 +-
.../io/stream/ChannelImageOutputStreamTest.java | 2 +-
17 files changed, 457 insertions(+), 184 deletions(-)
diff --git a/endorsed/src/org.apache.sis.storage.geotiff/main/org/apache/sis/storage/geotiff/GeoTiffStore.java b/endorsed/src/org.apache.sis.storage.geotiff/main/org/apache/sis/storage/geotiff/GeoTiffStore.java
index cd0d520035..4b8fa1f835 100644
--- a/endorsed/src/org.apache.sis.storage.geotiff/main/org/apache/sis/storage/geotiff/GeoTiffStore.java
+++ b/endorsed/src/org.apache.sis.storage.geotiff/main/org/apache/sis/storage/geotiff/GeoTiffStore.java
@@ -224,7 +224,7 @@ public class GeoTiffStore extends DataStore implements Aggregate {
location = connector.getStorageAs(URI.class);
path = connector.getStorageAs(Path.class);
try {
- if (URIDataStore.Provider.isWritable(connector)) {
+ if (URIDataStore.Provider.isWritable(connector, true)) {
ChannelDataOutput output = connector.commit(ChannelDataOutput.class, Constants.GEOTIFF);
writer = new Writer(this, output, connector.getOption(GeoTiffOption.OPTION_KEY));
} else {
diff --git a/endorsed/src/org.apache.sis.storage.geotiff/main/org/apache/sis/storage/geotiff/ReversedBitsChannel.java b/endorsed/src/org.apache.sis.storage.geotiff/main/org/apache/sis/storage/geotiff/ReversedBitsChannel.java
index 394d10adf0..8d181cb727 100644
--- a/endorsed/src/org.apache.sis.storage.geotiff/main/org/apache/sis/storage/geotiff/ReversedBitsChannel.java
+++ b/endorsed/src/org.apache.sis.storage.geotiff/main/org/apache/sis/storage/geotiff/ReversedBitsChannel.java
@@ -63,11 +63,10 @@ final class ReversedBitsChannel implements ReadableByteChannel, SeekableByteChan
* and because a new buffer is created for each strip or tile to read.
*/
static ChannelDataInput wrap(final ChannelDataInput input) throws IOException {
- final ChannelDataInput output = new ChannelDataInput(
- input.filename, new ReversedBitsChannel(input),
- ByteBuffer.allocate(2048).order(input.buffer.order()).limit(0), true);
- output.setStreamPosition(input.getStreamPosition());
- return output;
+ final var buffer = ByteBuffer.allocate(2048).order(input.buffer.order());
+ final var reverse = new ChannelDataInput(input.filename, new ReversedBitsChannel(input), buffer, true);
+ input.yield(reverse);
+ return reverse;
}
/**
@@ -75,11 +74,9 @@ final class ReversedBitsChannel implements ReadableByteChannel, SeekableByteChan
*/
@Override
public long size() throws IOException {
- if (input.channel instanceof SeekableByteChannel) {
- return ((SeekableByteChannel) input.channel).size();
- } else {
- throw unsupported("size");
- }
+ final long size = input.length();
+ if (size >= 0) return size;
+ throw unsupported("size");
}
/**
@@ -95,7 +92,7 @@ final class ReversedBitsChannel implements ReadableByteChannel, SeekableByteChan
*/
@Override
public SeekableByteChannel position(final long p) throws IOException {
- input.setStreamPosition(p);
+ input.seek(p);
return this;
}
diff --git a/endorsed/src/org.apache.sis.storage.geotiff/main/org/apache/sis/storage/geotiff/Writer.java b/endorsed/src/org.apache.sis.storage.geotiff/main/org/apache/sis/storage/geotiff/Writer.java
index 360504e823..4331f9a5d4 100644
--- a/endorsed/src/org.apache.sis.storage.geotiff/main/org/apache/sis/storage/geotiff/Writer.java
+++ b/endorsed/src/org.apache.sis.storage.geotiff/main/org/apache/sis/storage/geotiff/Writer.java
@@ -172,7 +172,7 @@ final class Writer extends GeoTIFF implements Flushable {
* Write the TIFF file header before first IFD. Stream position matter and must start at zero.
* Note that it does not necessarily mean that the stream has no bytes before current position.
*/
- output.setStreamPosition(0); // Not a seek, only setting the counter.
+ output.relocateOrigin();
output.writeShort(ByteOrder.LITTLE_ENDIAN.equals(output.buffer.order()) ? LITTLE_ENDIAN : BIG_ENDIAN);
output.writeShort(isBigTIFF ? BIG_TIFF : CLASSIC);
if (isBigTIFF) {
diff --git a/endorsed/src/org.apache.sis.storage.geotiff/main/org/apache/sis/storage/geotiff/inflater/Inflater.java b/endorsed/src/org.apache.sis.storage.geotiff/main/org/apache/sis/storage/geotiff/inflater/Inflater.java
index 308ade26c4..3adadb067f 100644
--- a/endorsed/src/org.apache.sis.storage.geotiff/main/org/apache/sis/storage/geotiff/inflater/Inflater.java
+++ b/endorsed/src/org.apache.sis.storage.geotiff/main/org/apache/sis/storage/geotiff/inflater/Inflater.java
@@ -265,8 +265,7 @@ public abstract class Inflater implements Closeable {
*/
if (input.channel instanceof PixelChannel) {
((PixelChannel) input.channel).setInputRegion(start, byteCount);
- input.buffer.limit(0);
- input.setStreamPosition(start); // Must be after above call to `limit(0)`.
+ input.refresh(start);
}
}
diff --git a/endorsed/src/org.apache.sis.storage.xml/main/org/apache/sis/storage/gpx/StoreProvider.java b/endorsed/src/org.apache.sis.storage.xml/main/org/apache/sis/storage/gpx/StoreProvider.java
index 52a176f91f..48e63bbf5b 100644
--- a/endorsed/src/org.apache.sis.storage.xml/main/org/apache/sis/storage/gpx/StoreProvider.java
+++ b/endorsed/src/org.apache.sis.storage.xml/main/org/apache/sis/storage/gpx/StoreProvider.java
@@ -99,7 +99,7 @@ public final class StoreProvider extends StaxDataStoreProvider {
*/
@Override
public DataStore open(final StorageConnector connector) throws DataStoreException {
- if (isWritable(connector)) {
+ if (isWritable(connector, false)) {
return new WritableStore(this, connector);
} else {
return new Store(this, connector);
diff --git a/endorsed/src/org.apache.sis.storage/main/org/apache/sis/io/stream/ChannelData.java b/endorsed/src/org.apache.sis.storage/main/org/apache/sis/io/stream/ChannelData.java
index 6df0d552ec..3e67483dc9 100644
--- a/endorsed/src/org.apache.sis.storage/main/org/apache/sis/io/stream/ChannelData.java
+++ b/endorsed/src/org.apache.sis.storage/main/org/apache/sis/io/stream/ChannelData.java
@@ -62,8 +62,10 @@ public abstract class ChannelData implements Markable {
*
* <p>This value is added to the argument given to the {@link #seek(long)} method. Users can ignore
* this field, unless they want to invoke {@link SeekableByteChannel#position(long)} directly.</p>
+ *
+ * @see #toSeekableByteChannelPosition(long)
*/
- public final long channelOffset;
+ private long channelOffset;
/**
* The channel position where is located the {@link #buffer} value at index 0.
@@ -117,25 +119,31 @@ public abstract class ChannelData implements Markable {
* @throws IOException if an error occurred while reading the channel.
*/
ChannelData(final String filename, final Channel channel, final ByteBuffer buffer) throws IOException {
- this.filename = filename;
- this.buffer = buffer;
- this.channelOffset = (channel instanceof SeekableByteChannel) ? ((SeekableByteChannel) channel).position() : 0;
+ this.filename = filename;
+ this.buffer = buffer;
+ if (channel instanceof SeekableByteChannel) {
+ channelOffset = ((SeekableByteChannel) channel).position();
+ }
}
/**
* Creates a new instance from the given {@code ChannelData}.
* This constructor is invoked when we need to change the implementation class.
- * The old {@code ChannelData} should not be used anymore after this constructor.
+ * If {@code replacing} is {@code true}, then the old {@code ChannelData} should
+ * not be used anymore after this constructor.
*
- * @param old the existing instance from which to takes the channel and buffer.
+ * @param other the existing instance from which to takes the channel and buffer.
+ * @param replacing {@code true} if {@code other} will be discarded in favor of the new instance.
*/
- ChannelData(final ChannelData old) {
- filename = old.filename;
- buffer = old.buffer;
- channelOffset = old.channelOffset;
- bufferOffset = old.bufferOffset;
- bitPosition = old.bitPosition;
- mark = old.mark;
+ ChannelData(final ChannelData other, final boolean replacing) {
+ filename = other.filename;
+ buffer = other.buffer;
+ channelOffset = other.channelOffset;
+ bufferOffset = other.bufferOffset;
+ bitPosition = other.bitPosition;
+ if (replacing) {
+ mark = other.mark;
+ }
}
/**
@@ -150,7 +158,35 @@ public abstract class ChannelData implements Markable {
this.filename = filename;
buffer = data.asReadOnlyBuffer();
buffer.order(data.order());
- channelOffset = 0;
+ }
+
+ /**
+ * {@return the wrapped channel where data are read or written}.
+ * This is the {@code channel} field of the {@code ChannelData} subclass.
+ *
+ * @see ChannelDataInput#channel
+ * @see ChannelDataOutput#channel
+ */
+ public abstract Channel channel();
+
+ /**
+ * Returns the length of the stream (in bytes), or -1 if unknown.
+ * The length is relative to the position during the last call to {@link #relocateOrigin()}.
+ * If the latter method has never been invoked, then the length is relative to the channel
+ * position at {@code ChannelData} construction time.
+ *
+ * @return the length of the stream (in bytes) relative to origin, or -1 if unknown.
+ * @throws IOException if an error occurred while fetching the stream length.
+ */
+ public final long length() throws IOException { // Method signature must match ImageInputStream.length().
+ final Channel channel = channel();
+ if (channel instanceof SeekableByteChannel) {
+ final long length = Math.subtractExact(((SeekableByteChannel) channel).size(), channelOffset);
+ if (length >= 0) {
+ return Math.max(length, Math.addExact(bufferOffset, buffer.limit()));
+ }
+ }
+ return -1;
}
/**
@@ -220,47 +256,25 @@ public abstract class ChannelData implements Markable {
/**
* Returns the current byte position of the stream.
- * The returned value is relative to the position that the stream had at {@code ChannelData} construction time.
+ * The returned value is relative to the position during the last call to {@link #relocateOrigin()}.
+ * If the latter method has never been invoked, then the returned value is relative to the channel
+ * position at {@code ChannelData} construction time.
*
* @return the position of the stream.
*/
@Override
- public long getStreamPosition() {
- return position();
- }
+ public abstract long getStreamPosition();
/**
* Returns the current byte position of the stream, ignoring overriding by subclasses.
- * The returned value is relative to the position that the stream had at {@code ChannelData} construction time.
+ * The returned value is relative to the position during the last call to {@link #relocateOrigin()}.
+ * If the latter method has never been invoked, then the returned value is relative to the channel
+ * position at {@code ChannelData} construction time.
*/
- private long position() {
+ final long position() {
return Math.addExact(bufferOffset, buffer.position());
}
- /**
- * Sets the current byte position of the stream. This method does <strong>not</strong> seeks the stream;
- * this method only modifies the value to be returned by {@link #getStreamPosition()}. This method can
- * be invoked when some external code has performed some work with the channel and wants to inform this
- * {@code ChannelData} about the new position resulting from this work.
- *
- * <b>Notes:</b>
- * <ul>
- * <li>Invoking this method clears the {@linkplain #getBitOffset() bit offset}
- * and the {@linkplain #mark() marks}.</li>
- * <li>This method does not need to be invoked when only the {@linkplain ByteBuffer#position() buffer position}
- * has changed.</li>
- * </ul>
- *
- * @param position the new position of the stream.
- */
- public final void setStreamPosition(final long position) {
- bufferOffset = Math.subtractExact(position, buffer.position());
- // Clearing the bit offset is needed if we don't want to handle the case of ChannelDataOutput,
- // which use a different stream position calculation when the bit offset is non-zero.
- clearBitOffset();
- mark = null;
- }
-
/**
* Returns the earliest position in the stream to which {@linkplain #seek(long) seeking} may be performed.
*
@@ -291,11 +305,10 @@ public abstract class ChannelData implements Markable {
if (buffer.isReadOnly()) {
return;
}
- final int n = (int) Math.max(position - bufferOffset, 0);
- final int p = buffer.position() - n;
- final int r = buffer.limit() - n;
- flushAndSetPosition(n); // Number of bytes to forget.
- buffer.compact().position(p).limit(r);
+ final long count = Math.subtractExact(position, bufferOffset);
+ if (count > 0) {
+ flushNBytes((int) Math.min(count, buffer.limit()));
+ }
/*
* Discard trailing obsolete marks. Note that obsolete marks between valid marks
* cannot be discarded - only the trailing obsolete marks can be removed.
@@ -314,18 +327,20 @@ public abstract class ChannelData implements Markable {
}
/**
- * Writes (if applicable) the buffer content up to the given position, then sets the buffer position
- * to the given value. The {@linkplain ByteBuffer#limit() buffer limit} is unchanged, and the buffer
- * offset is incremented by the given value.
+ * Flushes the given number of bytes in the buffer.
+ * This is invoked for making room for more bytes.
+ * If the given count is larger than the buffer content, then this method flushes everything.
+ * If the given count is zero or negative, then this method does nothing.
+ *
+ * @param count number of bytes to write, between 1 and buffer limit.
+ * @throws IOException if an error occurred while writing the bytes to the channel.
*/
- void flushAndSetPosition(final int position) throws IOException {
- buffer.position(position);
- bufferOffset += position;
- }
+ abstract void flushNBytes(final int count) throws IOException;
/**
- * Moves to the given position in the stream. The given position is relative to
- * the position that the stream had at {@code ChannelData} construction time.
+ * Moves to the given position in the stream. The given position is relative to the position during
+ * the last call to {@link #relocateOrigin()}. If the latter method has never been invoked, then the
+ * argument is relative to the channel position at {@code ChannelData} construction time.
*
* @param position the position where to move.
* @throws IOException if the stream cannot be moved to the given position.
@@ -339,7 +354,7 @@ public abstract class ChannelData implements Markable {
*/
@Override
public final void mark() {
- mark = new Mark(getStreamPosition(), (byte) getBitOffset(), mark);
+ mark = new Mark(position(), (byte) getBitOffset(), mark);
}
/**
@@ -388,6 +403,108 @@ public abstract class ChannelData implements Markable {
}
}
+ /**
+ * Empties the buffer and sets the channel position to the beginning of this stream (the origin).
+ * This method is similar to {@code seek(0)} except that the buffer content and all marks are discarded,
+ * and that this method returns {@code false} instead of throwing an exception if the channel is not seekable.
+ *
+ * @return {@code true} on success, or {@code false} if it is not possible to reset the position.
+ * @throws IOException if an error occurred while setting the channel position.
+ */
+ public final boolean rewind() throws IOException {
+ final Channel channel = channel();
+ if (channel instanceof SeekableByteChannel) {
+ ((SeekableByteChannel) channel).position(channelOffset);
+ buffer.clear().limit(0);
+ bufferOffset = 0;
+ clearBitOffset();
+ mark = null;
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Notifies two {@code ChannelData} instances that operations will continue with the specified take over.
+ * The two {@code ChannelData} instances should share the same {@link Channel}, or use two channels that
+ * are at the same {@linkplain SeekableByteChannel#position() channel position}.
+ *
+ * <h4>Usage</h4>
+ * This method is used when a {@link ChannelDataInput} and a {@link ChannelDataOutput} are wrapping
+ * the same {@link java.nio.channels.ByteChannel} and used alternatively for reading and writing.
+ * After a read operation, {@code in.yield(put)} should be invoked for ensuring that the output
+ * position is valid for the new channel position.
+ *
+ * <h4>Implementation note</h4>
+ * Subclasses <strong>must</strong> override this method and set the buffer position to zero.
+ * Whether this need to be done before or after {@code super.field(takeOver)} depends on whether
+ * this {@code ChannelData} is for input or output.
+ *
+ * @param takeOver the {@code ChannelData} which will continue operations after this one.
+ *
+ * @see ChannelDataOutput#ChannelDataOutput(ChannelDataInput)
+ */
+ public void yield(final ChannelData takeOver) throws IOException {
+ takeOver.bufferOffset = bufferOffset;
+ takeOver.channelOffset = channelOffset;
+ takeOver.bitPosition = bitPosition;
+ }
+
+ /**
+ * Invalidates the buffer content and updates the value reported as the stream position.
+ * This method is not a {@linkplain #seek(long) seek},
+ * i.e. it does not change the {@linkplain #channel() channel} position,
+ * This method only modifies the value returned by the {@link #getStreamPosition()} method.
+ * This {@code refresh(long)} method can be invoked when external code has performed some work
+ * directly on the {@linkplain #channel() channel} and wants to inform this {@code ChannelData}
+ * about the new position resulting from this work.
+ *
+ * <b>Notes:</b>
+ * <ul>
+ * <li>Invoking this method clears the {@linkplain #getBitOffset() bit offset} and {@linkplain #mark() marks}.</li>
+ * <li>Invoking this method sets the {@linkplain #buffer} {@linkplain ByteBuffer#limit() limit} to zero.</li>
+ * <li>This method does not need to be invoked when only the {@linkplain ByteBuffer#position() buffer position}
+ * has changed.</li>
+ * </ul>
+ *
+ * @param position the new position of the stream.
+ */
+ public final void refresh(final long position) {
+ buffer.limit(0);
+ bufferOffset = position;
+ /*
+ * Clearing the bit offset is needed if we don't want to handle the case of `ChannelDataOutput`,
+ * which use a different stream position calculation when the bit offset is non-zero.
+ */
+ clearBitOffset();
+ mark = null;
+ }
+
+ /**
+ * Sets the current position as the new origin of this {@code ChannelData}.
+ * After this method call, {@link #getStreamPosition()} will return zero when
+ * {@code ChannelData} is about to read the byte located at the current position.
+ *
+ * <p>Note that invoking this method may change the value returned by {@link #length()},
+ * because the length is relative to the origin.</p>
+ */
+ public final void relocateOrigin() {
+ final long position = getStreamPosition();
+ channelOffset = toSeekableByteChannelPosition(position);
+ bufferOffset = Math.subtractExact(bufferOffset, position);
+ }
+
+ /**
+ * Converts a position in this {@code ChannelData} to position in the Java NIO channel.
+ * This is often the same value, but not necessarily.
+ *
+ * @param position position in this {@code ChannelData}.
+ * @return Corresponding position in the {@code SeekableByteChannel}.
+ */
+ final long toSeekableByteChannelPosition(final long position) {
+ return Math.addExact(channelOffset, position);
+ }
+
/**
* Invoked when an operation between the channel and the buffer transferred no byte. Note that this is unrelated
* to end-of-file, in which case {@link java.nio.channels.ReadableByteChannel#read(ByteBuffer)} returns -1.
diff --git a/endorsed/src/org.apache.sis.storage/main/org/apache/sis/io/stream/ChannelDataInput.java b/endorsed/src/org.apache.sis.storage/main/org/apache/sis/io/stream/ChannelDataInput.java
index a7aa272888..7480b27156 100644
--- a/endorsed/src/org.apache.sis.storage/main/org/apache/sis/io/stream/ChannelDataInput.java
+++ b/endorsed/src/org.apache.sis.storage/main/org/apache/sis/io/stream/ChannelDataInput.java
@@ -30,6 +30,7 @@ import java.nio.LongBuffer;
import java.nio.FloatBuffer;
import java.nio.DoubleBuffer;
import java.nio.charset.Charset;
+import java.nio.channels.Channel;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import org.apache.sis.storage.internal.Resources;
@@ -127,22 +128,19 @@ public class ChannelDataInput extends ChannelData implements DataInput {
* @param input the existing instance from which to takes the channel and buffer.
*/
ChannelDataInput(final ChannelDataInput input) {
- super(input);
+ super(input, true);
channel = input.channel;
}
/**
- * Returns the length of the stream (in bytes), or -1 if unknown.
- * The length is relative to the channel position at {@linkplain #ChannelDataInput construction time}.
+ * {@return the wrapped channel where data are read}.
+ * This is the {@link #channel} field value.
*
- * @return the length of the stream (in bytes) relative to {@link #channelOffset}, or -1 if unknown.
- * @throws IOException if an error occurred while fetching the stream length.
+ * @see #channel
*/
- public final long length() throws IOException { // Method signature must match ImageInputStream.length().
- if (channel instanceof SeekableByteChannel) {
- return Math.subtractExact(((SeekableByteChannel) channel).size(), channelOffset);
- }
- return -1;
+ @Override
+ public final Channel channel() {
+ return channel;
}
/**
@@ -236,6 +234,16 @@ public class ChannelDataInput extends ChannelData implements DataInput {
}
}
+ /**
+ * Returns the current byte position of the stream.
+ *
+ * @return the position of the stream.
+ */
+ @Override
+ public final long getStreamPosition() {
+ return position();
+ }
+
/**
* Returns the "end of file" error message, for {@link EOFException} creations.
*/
@@ -998,8 +1006,7 @@ loop: while (hasRemaining()) {
}
/**
- * Moves to the given position in the stream. The given position is relative to
- * the position that the stream had at {@code ChannelDataInput} construction time.
+ * Moves to the given position in this stream.
*
* @param position the position where to move.
* @throws IOException if the stream cannot be moved to the given position.
@@ -1017,7 +1024,7 @@ loop: while (hasRemaining()) {
* Requested position is outside the current limits of the buffer,
* but we can set the new position directly in the channel.
*/
- ((SeekableByteChannel) channel).position(Math.addExact(channelOffset, position));
+ ((SeekableByteChannel) channel).position(toSeekableByteChannelPosition(position));
bufferOffset = position;
buffer.clear().limit(0);
} else if (p >= 0) {
@@ -1058,27 +1065,52 @@ loop: while (hasRemaining()) {
*/
public final void rangeOfInterest(long lower, long upper) {
if (channel instanceof ByteRangeChannel) {
- lower = Math.addExact(lower, channelOffset);
- upper = Math.addExact(upper, channelOffset);
+ lower = toSeekableByteChannelPosition(lower);
+ upper = toSeekableByteChannelPosition(upper);
((ByteRangeChannel) channel).rangeOfInterest(lower, upper);
}
}
/**
- * Empties the buffer and reset the channel position at the beginning of the stream.
- * This method is similar to {@code seek(0)} except that the buffer content is discarded.
+ * Forgets the given number of bytes in the buffer.
+ * This is invoked for making room for more bytes.
*
- * @return {@code true} on success, or {@code false} if it is not possible to reset the position.
- * @throws IOException if the stream cannot be moved to the original position.
+ * @param count number of bytes to forget, between 1 and buffer limit.
*/
- public final boolean rewind() throws IOException {
- if (channel instanceof SeekableByteChannel) {
- ((SeekableByteChannel) channel).position(channelOffset);
- buffer.clear().limit(0);
- bufferOffset = 0;
- clearBitOffset();
- return true;
+ @Override
+ final void flushNBytes(final int count) throws IOException {
+ final int p = buffer.position();
+ buffer.position(count).compact()
+ .limit(buffer.position()) // Not the same value as `p`. It is rather equal to `limit - count`.
+ .position(p - count);
+ bufferOffset = Math.addExact(bufferOffset, count);
+ }
+
+ /**
+ * Notifies two {@code ChannelData} instances that operations will continue with the specified take over.
+ * This method should be invoked when read operations with this {@code ChannelDataInput} are completed for
+ * now, and write operations are about to begin with a {@link ChannelDataOutput} sharing the same channel.
+ *
+ * @param takeOver the {@link ChannelDataOutput} which will continue operations after this instance.
+ */
+ @Override
+ public void yield(final ChannelData takeOver) throws IOException {
+ /*
+ * If we filled the buffer with more bytes than the buffer position,
+ * the channel position is too far ahead. We need to seek backward.
+ */
+ if (buffer.hasRemaining()) {
+ if (channel instanceof SeekableByteChannel) {
+ ((SeekableByteChannel) channel).position(toSeekableByteChannelPosition(position()));
+ } else {
+ throw new IOException(Resources.format(Resources.Keys.StreamIsForwardOnly_1, takeOver.filename));
+ }
+ }
+ clearBitOffset();
+ bufferOffset = position();
+ if (buffer.limit(0) != takeOver.buffer) { // Must be after `position()`.
+ takeOver.buffer.limit(0);
}
- return false;
+ super.yield(takeOver);
}
}
diff --git a/endorsed/src/org.apache.sis.storage/main/org/apache/sis/io/stream/ChannelDataOutput.java b/endorsed/src/org.apache.sis.storage/main/org/apache/sis/io/stream/ChannelDataOutput.java
index ccf2e6701e..419410c591 100644
--- a/endorsed/src/org.apache.sis.storage/main/org/apache/sis/io/stream/ChannelDataOutput.java
+++ b/endorsed/src/org.apache.sis.storage/main/org/apache/sis/io/stream/ChannelDataOutput.java
@@ -22,13 +22,13 @@ import java.io.Flushable;
import java.io.IOException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
import java.nio.CharBuffer;
import java.nio.DoubleBuffer;
import java.nio.FloatBuffer;
import java.nio.IntBuffer;
import java.nio.LongBuffer;
import java.nio.ShortBuffer;
+import java.nio.channels.Channel;
import java.nio.channels.SeekableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
@@ -53,6 +53,11 @@ import static org.apache.sis.util.ArgumentChecks.ensureBetween;
* <p>Since this class is only a helper tool, it does not "own" the channel and consequently does not provide
* {@code close()} method. It is users responsibility to close the channel after usage.</p>
*
+ * <h2>Interpretation of buffer position and limit</h2>
+ * The buffer position is the position where to write the next byte.
+ * It may be either a new byte appended to the channel, or byte overwriting an existing byte.
+ * Those two case are differentiated by the buffer limit, which is the number of valid bytes in the buffer.
+ *
* <h2>Relationship with {@code ImageOutputStream}</h2>
* This class API is compatibly with the {@link javax.imageio.stream.ImageOutputStream} interface, so subclasses
* can implement that interface if they wish. This class does not implement {@code ImageOutputStream} because it
@@ -84,20 +89,65 @@ public class ChannelDataOutput extends ChannelData implements DataOutput, Flusha
buffer.limit(0);
}
+ /**
+ * Creates a new data output which will write in the same channel than the given input.
+ * The new instance will share the same channel and buffer than the given {@code input}.
+ * Callers should not use the two {@code ChannelData} in same time for avoiding chaos.
+ * Bytes will be written starting at the current position of the given input.
+ *
+ * <p>Callers <strong>must</strong> invoke {@link ChannelDataInput#yield(ChannelData)}
+ * before the first use of this output. Example:</p>
+ *
+ * {@snippet lang="java":
+ * ChannelDataInput input = ...;
+ * ChannelDataOutput output = new ChannelDataOutput(input);
+ * input.yield(output)
+ * // ...some writing to `output` here...
+ * output.yield(input);
+ * // ...some reading from `input` here...
+ * input.yield(output)
+ * // ...some writing to `output` here...
+ * }
+ *
+ * @param input the input to make writable.
+ * @throws ClassCastException if the given input is not writable.
+ *
+ * @see #flush()
+ * @see #yield(ChannelData)
+ */
+ public ChannelDataOutput(final ChannelDataInput input) {
+ super(input, false);
+ channel = (WritableByteChannel) input.channel; // `ClassCastException` is part of the contract.
+ // Do not invoke `synchronized(input)` because caller may want to do some more read operations first.
+ }
+
+ /**
+ * {@return the wrapped channel where data are written}.
+ * This is the {@link #channel} field value.
+ *
+ * @see #channel
+ */
+ @Override
+ public final Channel channel() {
+ return channel;
+ }
+
/**
* Makes sure that the buffer can accept at least <var>n</var> more bytes.
* It is caller's responsibility to ensure that the given number of bytes is
* not greater than the {@linkplain ByteBuffer#capacity() buffer capacity}.
*
* <p>After this method call, the buffer {@linkplain ByteBuffer#limit() limit}
- * will be equal or greater than {@code position + n}.</p>
+ * will be equal or greater than {@code position + n}. This limit is the number
+ * of valid bytes in the buffer, i.e. bytes that already exist in the channel.
+ * If the caller is appending new bytes and does not use all the space specified
+ * to this method, then the caller should adjust the limit after writing.</p>
*
* @param n the minimal number of additional bytes that the {@linkplain #buffer buffer} shall accept.
* @throws IOException if an error occurred while writing to the channel.
*/
public final void ensureBufferAccepts(final int n) throws IOException {
- final int capacity = buffer.capacity();
- assert n >= 0 && n <= capacity : n;
+ assert n >= 0 && n <= buffer.capacity() : n;
int after = buffer.position() + n;
if (after > buffer.limit()) {
/*
@@ -105,7 +155,7 @@ public class ChannelDataOutput extends ChannelData implements DataOutput, Flusha
* of valid bytes in the buffer. If the new limit would exceed the buffer capacity, then we
* need to write some bytes now.
*/
- if (after > capacity) {
+ if (after > buffer.capacity()) {
buffer.flip();
do {
final int c = channel.write(buffer);
@@ -113,14 +163,14 @@ public class ChannelDataOutput extends ChannelData implements DataOutput, Flusha
onEmptyTransfer();
}
after -= c;
- } while (after > capacity);
+ } while (after > buffer.capacity());
/*
* We wrote a sufficient amount of bytes - usually all of them, but not necessarily.
* If there is some unwritten bytes, move them the beginning of the buffer.
*/
bufferOffset += buffer.position();
buffer.compact();
- assert after >= buffer.position();
+ assert after >= buffer.position() : after;
}
buffer.limit(after);
}
@@ -132,8 +182,8 @@ public class ChannelDataOutput extends ChannelData implements DataOutput, Flusha
* @return the position of the stream.
*/
@Override
- public long getStreamPosition() {
- long position = super.getStreamPosition();
+ public final long getStreamPosition() {
+ long position = position();
/*
* ChannelDataOutput uses a different strategy than ChannelDataInput: if some bits were in process
* of being written, the buffer position is set to the byte AFTER the byte containing the bits.
@@ -540,11 +590,11 @@ public class ChannelDataOutput extends ChannelData implements DataOutput, Flusha
* @param length the number of chars to write.
* @throws IOException if an error occurred while writing the stream.
*/
- public final void writeChars(final char[] src, int offset, int length) throws IOException {
+ public final void writeChars(final char[] src, final int offset, final int length) throws IOException {
new ArrayWriter() {
private CharBuffer view;
@Override Buffer createView() {return view = buffer.asCharBuffer();}
- @Override void transfer(int offset, int n) {view.put(src, offset, n);}
+ @Override void transfer(int start, int n) {view.put(src, start, n);}
}.writeFully(Character.BYTES, offset, length);
}
@@ -556,11 +606,11 @@ public class ChannelDataOutput extends ChannelData implements DataOutput, Flusha
* @param length the number of shorts to write.
* @throws IOException if an error occurred while writing the stream.
*/
- public final void writeShorts(final short[] src, int offset, int length) throws IOException {
+ public final void writeShorts(final short[] src, final int offset, final int length) throws IOException {
new ArrayWriter() {
private ShortBuffer view;
@Override Buffer createView() {return view = buffer.asShortBuffer();}
- @Override void transfer(int offset, int length) {view.put(src, offset, length);}
+ @Override void transfer(int start, int n) {view.put(src, start, n);}
}.writeFully(Short.BYTES, offset, length);
}
@@ -572,11 +622,11 @@ public class ChannelDataOutput extends ChannelData implements DataOutput, Flusha
* @param length the number of integers to write.
* @throws IOException if an error occurred while writing the stream.
*/
- public final void writeInts(final int[] src, int offset, int length) throws IOException {
+ public final void writeInts(final int[] src, final int offset, final int length) throws IOException {
new ArrayWriter() {
private IntBuffer view;
@Override Buffer createView() {return view = buffer.asIntBuffer();}
- @Override void transfer(int offset, int n) {view.put(src, offset, n);}
+ @Override void transfer(int start, int n) {view.put(src, start, n);}
}.writeFully(Integer.BYTES, offset, length);
}
@@ -588,11 +638,11 @@ public class ChannelDataOutput extends ChannelData implements DataOutput, Flusha
* @param length the number of longs to write.
* @throws IOException if an error occurred while writing the stream.
*/
- public final void writeLongs(final long[] src, int offset, int length) throws IOException {
+ public final void writeLongs(final long[] src, final int offset, final int length) throws IOException {
new ArrayWriter() {
private LongBuffer view;
@Override Buffer createView() {return view = buffer.asLongBuffer();}
- @Override void transfer(int offset, int n) {view.put(src, offset, n);}
+ @Override void transfer(int start, int n) {view.put(src, start, n);}
}.writeFully(Long.BYTES, offset, length);
}
@@ -604,11 +654,11 @@ public class ChannelDataOutput extends ChannelData implements DataOutput, Flusha
* @param length the number of floats to write.
* @throws IOException if an error occurred while writing the stream.
*/
- public final void writeFloats(final float[] src, int offset, int length) throws IOException {
+ public final void writeFloats(final float[] src, final int offset, final int length) throws IOException {
new ArrayWriter() {
private FloatBuffer view;
@Override Buffer createView() {return view = buffer.asFloatBuffer();}
- @Override void transfer(int offset, int n) {view.put(src, offset, n);}
+ @Override void transfer(int start, int n) {view.put(src, start, n);}
}.writeFully(Float.BYTES, offset, length);
}
@@ -620,11 +670,11 @@ public class ChannelDataOutput extends ChannelData implements DataOutput, Flusha
* @param length the number of doubles to write.
* @throws IOException if an error occurred while writing the stream.
*/
- public final void writeDoubles(final double[] src, int offset, int length) throws IOException {
+ public final void writeDoubles(final double[] src, final int offset, final int length) throws IOException {
new ArrayWriter() {
private DoubleBuffer view;
@Override Buffer createView() {return view = buffer.asDoubleBuffer();}
- @Override void transfer(int offset, int n) {view.put(src, offset, n);}
+ @Override void transfer(int start, int n) {view.put(src, start, n);}
}.writeFully(Double.BYTES, offset, length);
}
@@ -669,18 +719,15 @@ public class ChannelDataOutput extends ChannelData implements DataOutput, Flusha
@Override
public void writeUTF(final String s) throws IOException {
byte[] data = s.getBytes(StandardCharsets.UTF_8);
- if (data.length > Short.MAX_VALUE) {
+ final int length = data.length;
+ if (length > Short.MAX_VALUE) {
throw new IllegalArgumentException(Resources.format(
- Resources.Keys.ExcessiveStringSize_3, filename, Short.MAX_VALUE, data.length));
- }
- final ByteOrder oldOrder = buffer.order();
- buffer.order(ByteOrder.BIG_ENDIAN);
- try {
- writeShort(data.length);
- write(data);
- } finally {
- buffer.order(oldOrder);
+ Resources.Keys.ExcessiveStringSize_3, filename, Short.MAX_VALUE, length));
}
+ ensureBufferAccepts(Short.BYTES);
+ buffer.put((byte) (length >>> Byte.SIZE)); // Write using ByteOrder.BIG_ENDIAN.
+ buffer.put((byte) length);
+ write(data);
}
/**
@@ -734,9 +781,9 @@ public class ChannelDataOutput extends ChannelData implements DataOutput, Flusha
}
/**
- * Moves to the given position in the stream, relative to the stream position at construction time.
- * If the given position is greater than the stream length, then the values of bytes between the
- * previous stream length and the given position are unspecified. The limit is unchanged.
+ * Moves to the given position in this stream.
+ * If the given position is greater than the stream length, then the values of all bytes between
+ * the previous stream length and the given position are unspecified. The limit is unchanged.
*
* @param position the position where to move.
* @throws IOException if the stream cannot be moved to the given position.
@@ -756,7 +803,7 @@ public class ChannelDataOutput extends ChannelData implements DataOutput, Flusha
* but we can set the new position directly in the channel.
*/
flush();
- ((SeekableByteChannel) channel).position(Math.addExact(channelOffset, position));
+ ((SeekableByteChannel) channel).position(toSeekableByteChannelPosition(position));
bufferOffset = position;
} else if ((p -= buffer.position()) >= 0) {
/*
@@ -768,52 +815,86 @@ public class ChannelDataOutput extends ChannelData implements DataOutput, Flusha
// We cannot move position beyond the buffered part.
throw new IOException(Resources.format(Resources.Keys.StreamIsForwardOnly_1, filename));
}
- assert super.getStreamPosition() == position;
+ assert position() == position;
}
/**
- * Flushes the {@link #buffer buffer} content to the channel.
+ * Flushes the buffer content to the channel, from buffer beginning to buffer limit.
+ * If the buffer position is not already at the buffer limit, the position is moved.
+ * The buffer is empty after this method call, i.e. the limit is zero.
* This method does <strong>not</strong> flush the channel itself.
*
* @throws IOException if an error occurred while writing to the channel.
*/
@Override
public final void flush() throws IOException {
- buffer.rewind();
+ clearBitOffset();
writeFully();
+ bufferOffset = position();
buffer.limit(0);
- clearBitOffset();
}
/**
- * Writes the buffer content up to the given position, then set the buffer position to the given value.
- * The {@linkplain ByteBuffer#limit() buffer limit} is unchanged, and the buffer offset is incremented
- * by the given value.
+ * Writes the given number of bytes from the buffer.
+ * This is invoked for making room for more bytes.
+ *
+ * @param count number of bytes to write, between 1 and buffer limit.
+ * @throws IOException if an error occurred while writing the bytes to the channel.
*/
@Override
- final void flushAndSetPosition(final int position) throws IOException {
- final int limit = buffer.limit();
- buffer.rewind().limit(position);
+ final void flushNBytes(final int count) throws IOException {
+ final int position = buffer.position();
+ final int validity = buffer.limit();
+ buffer.limit(count);
writeFully();
- buffer.limit(limit);
+ bufferOffset = position();
+ buffer.limit(validity).compact().limit(buffer.position()).position(position - count);
}
/**
- * Writes fully the buffer content from its position to its limit.
- * After this method call, the buffer position is equal to its limit.
+ * Writes fully the buffer content from beginning to buffer limit.
+ * Caller must update the buffer position after this method call.
*
* @throws IOException if an error occurred while writing to the channel.
*/
private void writeFully() throws IOException {
- int n = buffer.remaining();
- bufferOffset += n;
- while (n != 0) {
- final int c = channel.write(buffer);
- if (c == 0) {
+ buffer.rewind();
+ while (buffer.hasRemaining()) {
+ if (channel.write(buffer) == 0) {
onEmptyTransfer();
}
- n -= c;
}
- assert !buffer.hasRemaining() : buffer;
+ }
+
+ /**
+ * Notifies two {@code ChannelData} instances that operations will continue with the specified take over.
+ * This method should be invoked when write operations with this {@code ChannelDataOutput} are completed
+ * for now, and read operations are about to begin with a {@link ChannelDataInput} sharing the same channel.
+ *
+ * @param takeOver the {@link ChannelDataInput} which will continue operations after this instance.
+ */
+ @Override
+ public void yield(final ChannelData takeOver) throws IOException {
+ final int position = buffer.position();
+ final int limit = buffer.limit();
+ /*
+ * Flush the full buffer content for avoiding data lost. Note that the buffer position
+ * is not necessarily at the end, so we may write more bytes than the stream position.
+ * It may force us to seek backward after flushing.
+ */
+ clearBitOffset();
+ writeFully();
+ if (position >= limit) {
+ // We were at the end of the buffer, so the channel is already at the right position.
+ bufferOffset = position();
+ buffer.limit(0);
+ } else if (channel instanceof SeekableByteChannel) {
+ // Do not move `bufferOffset`. Instead make the channel position consistent with it.
+ buffer.limit(limit).position(position);
+ ((SeekableByteChannel) channel).position(toSeekableByteChannelPosition(position()));
+ } else {
+ throw new IOException(Resources.format(Resources.Keys.StreamIsForwardOnly_1, filename));
+ }
+ super.yield(takeOver);
}
}
diff --git a/endorsed/src/org.apache.sis.storage/main/org/apache/sis/io/stream/Markable.java b/endorsed/src/org.apache.sis.storage/main/org/apache/sis/io/stream/Markable.java
index a4e1acd9a8..2400329043 100644
--- a/endorsed/src/org.apache.sis.storage/main/org/apache/sis/io/stream/Markable.java
+++ b/endorsed/src/org.apache.sis.storage/main/org/apache/sis/io/stream/Markable.java
@@ -30,7 +30,7 @@ import java.io.IOException;
*
* <h2>Design note</h2>
* An alternative could be to support the {@code seek(long)} method. But using marks instead allows the stream
- * to invalidate the marks if needed (for example when {@link ChannelData#setStreamPosition(long)} is invoked).
+ * to invalidate the marks if needed (for example when {@link ChannelData#refresh(long)} is invoked).
*
* @author Martin Desruisseaux (Geomatys)
*
diff --git a/endorsed/src/org.apache.sis.storage/main/org/apache/sis/io/stream/UpdatableWrite.java b/endorsed/src/org.apache.sis.storage/main/org/apache/sis/io/stream/UpdatableWrite.java
index 22416314be..26b5e103ce 100644
--- a/endorsed/src/org.apache.sis.storage/main/org/apache/sis/io/stream/UpdatableWrite.java
+++ b/endorsed/src/org.apache.sis.storage/main/org/apache/sis/io/stream/UpdatableWrite.java
@@ -43,7 +43,7 @@ public abstract class UpdatableWrite<V> implements CheckedContainer<V> {
public final long position;
/**
- * Prepares a new updatable value.
+ * Prepares a new updatable value at the current output position.
*
* @param position stream where to write the value.
*/
@@ -52,7 +52,16 @@ public abstract class UpdatableWrite<V> implements CheckedContainer<V> {
}
/**
- * Creates a pseudo-updatable associated to no value.
+ * Prepares a new updatable value at the specified position.
+ *
+ * @param position position where to write the value.
+ */
+ private UpdatableWrite(final long position) {
+ this.position = position;
+ }
+
+ /**
+ * Creates a pseudo-updatable associated to no value at the current output position.
* This variant can be used when the caller only want to record the position, with no write operation.
*
* @param output stream where to write the value.
@@ -65,7 +74,7 @@ public abstract class UpdatableWrite<V> implements CheckedContainer<V> {
}
/**
- * Creates an updatable unsigned short value.
+ * Creates an updatable unsigned short value at the current output position.
*
* @param output stream where to write the value.
* @param value the unsigned short value to write.
@@ -79,7 +88,7 @@ public abstract class UpdatableWrite<V> implements CheckedContainer<V> {
}
/**
- * Creates an updatable unsigned integer value.
+ * Creates an updatable unsigned integer value at the current output position.
*
* @param output stream where to write the value.
* @param value the unsigned integer value to write.
@@ -93,7 +102,7 @@ public abstract class UpdatableWrite<V> implements CheckedContainer<V> {
}
/**
- * Creates an updatable long value.
+ * Creates an updatable long value at the current output position.
*
* @param output stream where to write the value.
* @param value the value to write.
@@ -106,6 +115,23 @@ public abstract class UpdatableWrite<V> implements CheckedContainer<V> {
return dw;
}
+ /**
+ * Creates an updatable value at the specified position.
+ * The existing value is assumed to be zero.
+ *
+ * @param <V> compile-time value of {@code type}.
+ * @param position position where the value is written. Current value must be zero (this is not verified).
+ * @param type class of the value as {@code Short.class}, {@code Integer.class} or {@code Long.class}.
+ * @return handler for modifying the value later.
+ */
+ @SuppressWarnings("unchecked")
+ public static <V extends Number> UpdatableWrite<V> ofZeroAt(final long position, final Class<V> type) {
+ if (type == Integer.class) return (UpdatableWrite<V>) new OfInt (position);
+ if (type == Short.class) return (UpdatableWrite<V>) new OfShort(position);
+ if (type == Long.class) return (UpdatableWrite<V>) new OfLong (position);
+ throw new IllegalArgumentException(Errors.format(Errors.Keys.IllegalArgumentValue_2, "type", type));
+ }
+
/**
* Implementation of {@link UpdatableWrite#of(ChannelDataOutput)}.
* This class only records the stream position, with no associated value.
@@ -136,6 +162,7 @@ public abstract class UpdatableWrite<V> implements CheckedContainer<V> {
defined = value;
}
+ OfShort(long position) {super(position);}
@Override public Class<Short> getElementType() {return Short.class;}
@Override public int sizeInBytes() {return Short.BYTES;}
@Override public boolean changed() {return defined != current;}
@@ -164,6 +191,7 @@ public abstract class UpdatableWrite<V> implements CheckedContainer<V> {
defined = value;
}
+ OfInt(long position) {super(position);}
@Override public Class<Integer> getElementType() {return Integer.class;}
@Override public int sizeInBytes() {return Integer.BYTES;}
@Override public boolean changed() {return defined != current;}
@@ -192,6 +220,7 @@ public abstract class UpdatableWrite<V> implements CheckedContainer<V> {
defined = value;
}
+ OfLong(long position) {super(position);}
@Override public Class<Long> getElementType() {return Long.class;}
@Override public int sizeInBytes() {return Long.BYTES;}
@Override public boolean changed() {return defined != current;}
diff --git a/endorsed/src/org.apache.sis.storage/main/org/apache/sis/storage/StorageConnector.java b/endorsed/src/org.apache.sis.storage/main/org/apache/sis/storage/StorageConnector.java
index eb2f6d4628..c1cef85344 100644
--- a/endorsed/src/org.apache.sis.storage/main/org/apache/sis/storage/StorageConnector.java
+++ b/endorsed/src/org.apache.sis.storage/main/org/apache/sis/storage/StorageConnector.java
@@ -37,7 +37,6 @@ import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
-import java.nio.channels.SeekableByteChannel;
import java.nio.file.Path;
import java.nio.file.OpenOption;
import java.nio.file.NoSuchFileException;
@@ -64,6 +63,7 @@ import org.apache.sis.storage.base.StoreUtilities;
import org.apache.sis.io.InvalidSeekException;
import org.apache.sis.io.stream.IOUtilities;
import org.apache.sis.io.stream.ChannelFactory;
+import org.apache.sis.io.stream.ChannelData;
import org.apache.sis.io.stream.ChannelDataInput;
import org.apache.sis.io.stream.ChannelDataOutput;
import org.apache.sis.io.stream.ChannelImageInputStream;
@@ -106,7 +106,7 @@ import org.apache.sis.setup.OptionKey;
*
* @author Martin Desruisseaux (Geomatys)
* @author Alexis Manin (Geomatys)
- * @version 1.4
+ * @version 1.5
* @since 0.3
*/
public class StorageConnector implements Serializable {
@@ -513,15 +513,14 @@ public class StorageConnector implements Serializable {
* (except in BufferedReader if the original storage does not support mark/reset).
*/
((Reader) view).reset();
- } else if (view instanceof ChannelDataInput) {
+ } else if (view instanceof ChannelData) {
/*
- * ChannelDataInput can be recycled without the need to discard and recreate them. Note that
- * this code requires that SeekableByteChannel has been seek to the channel beginning first.
- * This should be done by the above `wrapperFor.reset()` call.
+ * `ChannelDataInput` can be recycled without the need to discard and recreate it.
+ * However if a `Channel` was used directly, it should have been seek to the channel
+ * beginning first. This seek should be done by above call to `wrapperFor.reset()`,
+ * which should cause the block below (with a call to `rewind()`) to be executed.
*/
- final ChannelDataInput input = (ChannelDataInput) view;
- input.buffer.limit(0); // Must be after channel reset.
- input.setStreamPosition(0); // Must be after buffer.limit(0).
+ ((ChannelData) view).seek(0);
} else if (view instanceof Channel) {
/*
* Searches for a ChannelDataInput wrapping the channel, because it contains the original position
@@ -532,10 +531,10 @@ public class StorageConnector implements Serializable {
String name = null;
if (wrappedBy != null) {
for (Coupled c : wrappedBy) {
- if (c.view instanceof ChannelDataInput) {
- final ChannelDataInput in = ((ChannelDataInput) c.view);
- if (view instanceof SeekableByteChannel) {
- ((SeekableByteChannel) view).position(in.channelOffset);
+ if (c.view instanceof ChannelData) {
+ final var in = (ChannelData) c.view;
+ assert in.channel() == view : c;
+ if (in.rewind()) {
return true;
}
name = in.filename; // For the error message.
diff --git a/endorsed/src/org.apache.sis.storage/main/org/apache/sis/storage/base/URIDataStore.java b/endorsed/src/org.apache.sis.storage/main/org/apache/sis/storage/base/URIDataStore.java
index acfbb6cb9b..e7f4bbf143 100644
--- a/endorsed/src/org.apache.sis.storage/main/org/apache/sis/storage/base/URIDataStore.java
+++ b/endorsed/src/org.apache.sis.storage/main/org/apache/sis/storage/base/URIDataStore.java
@@ -24,6 +24,8 @@ import java.io.OutputStream;
import java.io.File;
import java.net.URI;
import java.nio.file.Path;
+import java.nio.file.Files;
+import java.nio.file.OpenOption;
import java.nio.file.StandardOpenOption;
import java.nio.file.FileSystemNotFoundException;
import java.nio.charset.Charset;
@@ -317,17 +319,35 @@ public abstract class URIDataStore extends DataStore implements StoreResource, R
/**
* Returns {@code true} if the open options contains {@link StandardOpenOption#WRITE}
- * or if the storage type is some kind of output stream.
+ * or if the storage type is some kind of output stream. An ambiguity may exist between
+ * the case when a new file would be created and when an existing file would be updated.
+ * This ambiguity is resolved by the {@code ifNew} argument:
+ * if {@code false}, then the two cases are not distinguished.
+ * If {@code true}, then this method returns {@code true} only if a new file would be created.
*
* @param connector the connector to use for opening a file.
+ * @param ifNew whether to return {@code true} only if a new file would be created.
* @return whether the specified connector should open a writable data store.
* @throws DataStoreException if the storage object has already been used and cannot be reused.
*/
- public static boolean isWritable(final StorageConnector connector) throws DataStoreException {
+ public static boolean isWritable(final StorageConnector connector, final boolean ifNew) throws DataStoreException {
final Object storage = connector.getStorage();
if (storage instanceof OutputStream || storage instanceof DataOutput) return true; // Must be tested first.
if (storage instanceof InputStream || storage instanceof DataInput) return false; // Ignore options.
- return ArraysExt.contains(connector.getOption(OptionKey.OPEN_OPTIONS), StandardOpenOption.WRITE);
+ final OpenOption[] options = connector.getOption(OptionKey.OPEN_OPTIONS);
+ if (ArraysExt.contains(options, StandardOpenOption.WRITE)) {
+ if (!ifNew || ArraysExt.contains(options, StandardOpenOption.TRUNCATE_EXISTING)) {
+ return true;
+ }
+ if (ArraysExt.contains(options, StandardOpenOption.CREATE_NEW)) {
+ return IOUtilities.isKindOfPath(storage);
+ }
+ if (ArraysExt.contains(options, StandardOpenOption.CREATE)) {
+ final Path path = connector.getStorageAs(Path.class);
+ return (path != null) && Files.notExists(path);
+ }
+ }
+ return false;
}
}
diff --git a/endorsed/src/org.apache.sis.storage/main/org/apache/sis/storage/esri/AsciiGridStoreProvider.java b/endorsed/src/org.apache.sis.storage/main/org/apache/sis/storage/esri/AsciiGridStoreProvider.java
index 6af287a2d7..65a5791c12 100644
--- a/endorsed/src/org.apache.sis.storage/main/org/apache/sis/storage/esri/AsciiGridStoreProvider.java
+++ b/endorsed/src/org.apache.sis.storage/main/org/apache/sis/storage/esri/AsciiGridStoreProvider.java
@@ -135,7 +135,7 @@ cellsize: if (!header.containsKey(AsciiGridStore.CELLSIZE)) {
*/
@Override
public DataStore open(final StorageConnector connector) throws DataStoreException {
- if (isWritable(connector)) {
+ if (isWritable(connector, false)) {
return new WritableStore(this, connector);
} else {
return new AsciiGridStore(this, connector, true);
diff --git a/endorsed/src/org.apache.sis.storage/main/org/apache/sis/storage/image/FormatFinder.java b/endorsed/src/org.apache.sis.storage/main/org/apache/sis/storage/image/FormatFinder.java
index 5df4febc2e..8110dca82f 100644
--- a/endorsed/src/org.apache.sis.storage/main/org/apache/sis/storage/image/FormatFinder.java
+++ b/endorsed/src/org.apache.sis.storage/main/org/apache/sis/storage/image/FormatFinder.java
@@ -155,7 +155,7 @@ final class FormatFinder implements AutoCloseable {
openAsWriter = false;
fileIsEmpty = false;
} else {
- isWritable = WorldFileStoreProvider.isWritable(connector);
+ isWritable = WorldFileStoreProvider.isWritable(connector, false);
if (isWritable) {
final Path path = connector.getStorageAs(Path.class);
if (path != null) {
diff --git a/endorsed/src/org.apache.sis.storage/main/org/apache/sis/storage/internal/WritableResourceSupport.java b/endorsed/src/org.apache.sis.storage/main/org/apache/sis/storage/internal/WritableResourceSupport.java
index 8b73eef922..a36e06002e 100644
--- a/endorsed/src/org.apache.sis.storage/main/org/apache/sis/storage/internal/WritableResourceSupport.java
+++ b/endorsed/src/org.apache.sis.storage/main/org/apache/sis/storage/internal/WritableResourceSupport.java
@@ -34,7 +34,6 @@ import org.apache.sis.storage.ReadOnlyStorageException;
import org.apache.sis.storage.ResourceAlreadyExistsException;
import org.apache.sis.storage.IncompatibleResourceException;
import org.apache.sis.storage.WritableGridCoverageResource;
-import org.apache.sis.storage.internal.Resources;
import org.apache.sis.io.stream.ChannelDataInput;
import org.apache.sis.io.stream.ChannelDataOutput;
import org.apache.sis.referencing.operation.matrix.AffineTransforms2D;
diff --git a/endorsed/src/org.apache.sis.storage/main/org/apache/sis/storage/package-info.java b/endorsed/src/org.apache.sis.storage/main/org/apache/sis/storage/package-info.java
index 5c474f44c4..6a1020cb19 100644
--- a/endorsed/src/org.apache.sis.storage/main/org/apache/sis/storage/package-info.java
+++ b/endorsed/src/org.apache.sis.storage/main/org/apache/sis/storage/package-info.java
@@ -26,7 +26,7 @@
*
* @author Johann Sorel (Geomatys)
* @author Martin Desruisseaux (Geomatys)
- * @version 1.4
+ * @version 1.5
* @since 0.3
*/
package org.apache.sis.storage;
diff --git a/endorsed/src/org.apache.sis.storage/test/org/apache/sis/io/stream/ChannelImageOutputStreamTest.java b/endorsed/src/org.apache.sis.storage/test/org/apache/sis/io/stream/ChannelImageOutputStreamTest.java
index 9fe710f87d..818a3feacd 100644
--- a/endorsed/src/org.apache.sis.storage/test/org/apache/sis/io/stream/ChannelImageOutputStreamTest.java
+++ b/endorsed/src/org.apache.sis.storage/test/org/apache/sis/io/stream/ChannelImageOutputStreamTest.java
@@ -84,7 +84,7 @@ public final class ChannelImageOutputStreamTest extends ChannelDataOutputTest {
*/
@Test
public void testMarkAndReset() throws IOException {
- initialize("testMarkAndReset", STREAM_LENGTH, 1000); // We need a larger buffer for this test.
+ initialize("testMarkAndReset", STREAM_LENGTH, 1000); // We need a larger buffer for this test.
/*
* Fill both streams with random data.
* During this process, randomly takes mark.