You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@commons.apache.org by bo...@apache.org on 2019/08/18 14:59:13 UTC

[commons-compress] branch master updated: COMPRESS-231 add a concatenated SeekableByteChannel based on code by Tim Underwood

This is an automated email from the ASF dual-hosted git repository.

bodewig pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/commons-compress.git


The following commit(s) were added to refs/heads/master by this push:
     new e959762  COMPRESS-231 add a concatenated SeekableByteChannel based on code by Tim Underwood
e959762 is described below

commit e959762ed200d9bc03fd858b8c96e35f95cada90
Author: Stefan Bodewig <bo...@apache.org>
AuthorDate: Sun Aug 18 16:58:21 2019 +0200

    COMPRESS-231 add a concatenated SeekableByteChannel based on code by Tim Underwood
---
 .../utils/MultiReadOnlySeekableByteChannel.java    | 219 +++++++++++++++++++++
 .../MultiReadOnlySeekableByteChannelTest.java      | 217 ++++++++++++++++++++
 2 files changed, 436 insertions(+)

diff --git a/src/main/java/org/apache/commons/compress/utils/MultiReadOnlySeekableByteChannel.java b/src/main/java/org/apache/commons/compress/utils/MultiReadOnlySeekableByteChannel.java
new file mode 100644
index 0000000..fd78f9d
--- /dev/null
+++ b/src/main/java/org/apache/commons/compress/utils/MultiReadOnlySeekableByteChannel.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.commons.compress.utils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.NonWritableChannelException;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Read-Only Implementation of {@link SeekableByteChannel} that
+ * concatenates a collection of other {@link SeekableByteChannel}s.
+ *
+ * <p>This is a lose port of <a
+ * href="https://github.com/frugalmechanic/fm-common/blob/master/jvm/src/main/scala/fm/common/MultiReadOnlySeekableByteChannel.scala">MultiReadOnlySeekableByteChannel</a>
+ * by Tim Underwood.</p>
+ *
+ * @since 1.19
+ */
+public class MultiReadOnlySeekableByteChannel implements SeekableByteChannel {
+
+    private final List<SeekableByteChannel> channels;
+    private long globalPosition;
+    private int currentChannelIdx;
+
+    /**
+     * Concatenates the given channels.
+     *
+     * @param channels the channels to concatenate
+     * @throws NullPointerException if channels is null
+     */
+    public MultiReadOnlySeekableByteChannel(List<SeekableByteChannel> channels) {
+        this.channels = Collections.unmodifiableList(new ArrayList<>(
+            Objects.requireNonNull(channels, "channels must not be null")));
+    }
+
+    @Override
+    public synchronized int read(ByteBuffer dst) throws IOException {
+        if (!isOpen()) {
+            throw new ClosedChannelException();
+        }
+
+        int totalBytesRead = 0;
+        while (dst.hasRemaining() && currentChannelIdx < channels.size()) {
+            final SeekableByteChannel currentChannel = channels.get(currentChannelIdx);
+            final int newBytesRead = currentChannel.read(dst);
+            if (newBytesRead == -1) {
+                // EOF for this channel -- advance to next channel idx
+                currentChannelIdx += 1;
+                continue;
+            }
+            if (currentChannel.position() >= currentChannel.size()) {
+                // we are at the end of the current channel
+                currentChannelIdx++;
+            }
+            totalBytesRead += newBytesRead;
+        }
+        if (totalBytesRead > 0) {
+            globalPosition += totalBytesRead;
+            return totalBytesRead;
+        }
+        return -1;
+    }
+
+    @Override
+    public void close() throws IOException {
+        IOException first = null;
+        for (SeekableByteChannel ch : channels) {
+            try {
+                ch.close();
+            } catch (IOException ex) {
+                if (first == null) {
+                    first = ex;
+                }
+            }
+        }
+        if (first != null) {
+            throw new IOException("failed to close wrapped channel", first);
+        }
+    }
+
+    @Override
+    public boolean isOpen() {
+        for (SeekableByteChannel ch : channels) {
+            if (!ch.isOpen()) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public long position() {
+        return globalPosition;
+    }
+
+    @Override
+    public long size() throws IOException {
+        long acc = 0;
+        for (SeekableByteChannel ch : channels) {
+            acc += ch.size();
+        }
+        return acc;
+    }
+
+    /**
+     * @throws NonWritableChannelException since this implementation is read-only.
+     */
+    @Override
+    public SeekableByteChannel truncate(long size) {
+        throw new NonWritableChannelException();
+    }
+
+    /**
+     * @throws NonWritableChannelException since this implementation is read-only.
+     */
+    @Override
+    public int write(ByteBuffer src) {
+        throw new NonWritableChannelException();
+    }
+    
+    @Override
+    public synchronized SeekableByteChannel position(long newPosition) throws IOException {
+        if (newPosition < 0) {
+            throw new IllegalArgumentException("Negative position: " + newPosition);
+        }
+        if (!isOpen()) {
+            throw new ClosedChannelException();
+        }
+
+        globalPosition = newPosition;
+
+        long pos = newPosition;
+
+        for (int i = 0; i < channels.size(); i++) {
+            final SeekableByteChannel currentChannel = channels.get(i);
+            final long size = currentChannel.size();
+
+            final long newChannelPos;
+            if (pos == -1L) {
+                // Position is already set for the correct channel,
+                // the rest of the channels get reset to 0
+                newChannelPos = 0;
+            } else if (pos <= size) {
+                // This channel is where we want to be
+                currentChannelIdx = i;
+                long tmp = pos;
+                pos = -1L; // Mark pos as already being set
+                newChannelPos = tmp;
+            } else {
+                // newPosition is past this channel.  Set channel
+                // position to the end and substract channel size from
+                // pos
+                pos -= size;
+                newChannelPos = size;
+            }
+
+            currentChannel.position(newChannelPos);
+        }
+        return this;
+    }
+
+    /**
+     * Concatenates the given channels.
+     *
+     * @param channels the channels to concatenate
+     * @throws NullPointerException if channels is null
+     */
+    public static SeekableByteChannel forSeekableByteChannels(SeekableByteChannel... channels) {
+        if (Objects.requireNonNull(channels, "channels must not be null").length == 1) {
+            return channels[0];
+        }
+        return new MultiReadOnlySeekableByteChannel(Arrays.asList(channels));
+    }
+
+    /**
+     * Concatenates the given files.
+     *
+     * @param files the files to concatenate
+     * @throws NullPointerException if files is null
+     * @throws IOException if opening a channel for one of the files fails
+     */
+    public static SeekableByteChannel forFiles(File... files) throws IOException {
+        List<SeekableByteChannel> channels = new ArrayList<>();
+        for (File f : Objects.requireNonNull(files, "files must not be null")) {
+            channels.add(Files.newByteChannel(f.toPath(), StandardOpenOption.READ));
+        }
+        if (channels.size() == 1) {
+            return channels.get(0);
+        }
+        return new MultiReadOnlySeekableByteChannel(channels);
+    }
+
+}
diff --git a/src/test/java/org/apache/commons/compress/utils/MultiReadOnlySeekableByteChannelTest.java b/src/test/java/org/apache/commons/compress/utils/MultiReadOnlySeekableByteChannelTest.java
new file mode 100644
index 0000000..0892077
--- /dev/null
+++ b/src/test/java/org/apache/commons/compress/utils/MultiReadOnlySeekableByteChannelTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.commons.compress.utils;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Initially based on <a
+ * href="https://github.com/frugalmechanic/fm-common/blob/master/jvm/src/test/scala/fm/common/TestMultiReadOnlySeekableByteChannel.scala">TestMultiReadOnlySeekableByteChannel.scala</a>
+ * by Tim Underwood.
+ */
+public class MultiReadOnlySeekableByteChannelTest {
+    @Rule
+    public ExpectedException thrown = ExpectedException.none();
+
+    @Test
+    public void constructorThrowsOnNullArg() {
+        thrown.expect(NullPointerException.class);
+        new MultiReadOnlySeekableByteChannel(null);
+    }
+    
+    @Test
+    public void forSeekableByteChannelsThrowsOnNullArg() {
+        thrown.expect(NullPointerException.class);
+        MultiReadOnlySeekableByteChannel.forSeekableByteChannels(null);
+    }
+    
+    @Test
+    public void forFilesThrowsOnNullArg() throws IOException {
+        thrown.expect(NullPointerException.class);
+        MultiReadOnlySeekableByteChannel.forFiles(null);
+    }
+
+    @Test
+    public void forSeekableByteChannelsReturnsIdentityForSingleElement() {
+        final SeekableByteChannel e = makeEmpty();
+        final SeekableByteChannel m = MultiReadOnlySeekableByteChannel.forSeekableByteChannels(e);
+        Assert.assertSame(e, m);
+    }
+
+    @Test
+    public void referenceBehaviorForEmptyChannel() throws IOException {
+        checkEmpty(makeEmpty());
+    }
+
+    @Test
+    public void twoEmptyChannelsConcatenateAsEmptyChannel() throws IOException {
+        checkEmpty(MultiReadOnlySeekableByteChannel.forSeekableByteChannels(makeEmpty(), makeEmpty()));
+    }
+
+    @Test
+    public void checkForSingleByte() throws IOException {
+        check(new byte[] { 0 });
+    }
+    
+    @Test
+    public void checkForString() throws IOException {
+        check("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
+            .getBytes(StandardCharsets.UTF_8));
+    }
+    
+    @Test
+    public void verifyGrouped() {
+        Assert.assertArrayEquals(new byte[][] {
+                new byte[] { 1, 2, 3, },
+                new byte[] { 4, 5, 6, },
+                new byte[] { 7, },
+            }, grouped(new byte[] { 1, 2, 3, 4, 5, 6, 7 }, 3));
+        Assert.assertArrayEquals(new byte[][] {
+                new byte[] { 1, 2, 3, },
+                new byte[] { 4, 5, 6, },
+            }, grouped(new byte[] { 1, 2, 3, 4, 5, 6 }, 3));
+        Assert.assertArrayEquals(new byte[][] {
+                new byte[] { 1, 2, 3, },
+                new byte[] { 4, 5, },
+            }, grouped(new byte[] { 1, 2, 3, 4, 5, }, 3));
+    }
+
+    private SeekableByteChannel makeEmpty() {
+        return makeSingle(new byte[0]);
+    }
+
+    private SeekableByteChannel makeSingle(byte[] arr) {
+        return new SeekableInMemoryByteChannel(arr);
+    }
+
+    private SeekableByteChannel makeMulti(byte[][] arr) {
+        SeekableByteChannel[] s = new SeekableByteChannel[arr.length];
+        for (int i = 0; i < s.length; i++) {
+            s[i] = makeSingle(arr[i]);
+        }
+        return MultiReadOnlySeekableByteChannel.forSeekableByteChannels(s);
+    }
+
+    private void checkEmpty(SeekableByteChannel channel) throws IOException {
+        ByteBuffer buf = ByteBuffer.allocate(10);
+
+        Assert.assertTrue(channel.isOpen());
+        Assert.assertEquals(0, channel.size());
+        Assert.assertEquals(0, channel.position());
+        Assert.assertEquals(-1, channel.read(buf));
+
+        channel.position(5);
+        Assert.assertEquals(-1, channel.read(buf));
+
+        channel.close();
+        Assert.assertFalse(channel.isOpen());
+
+        try {
+            channel.read(buf);
+            Assert.fail("expected a ClosedChannelException");
+        } catch (ClosedChannelException expected) {
+        }
+        try {
+            channel.position(100);
+            Assert.fail("expected a ClosedChannelException");
+        } catch (ClosedChannelException expected) {
+        }
+    }
+
+    private void check(final byte[] expected) throws IOException {
+        for (int channelSize = 1; channelSize <= expected.length; channelSize++) {
+            // Sanity check that all operations work for SeekableInMemoryByteChannel
+            check(expected, makeSingle(expected));
+            // Checks against our MultiReadOnlySeekableByteChannel instance
+            check(expected, makeMulti(grouped(expected, channelSize)));
+        }
+    }
+
+    private void check(final byte[] expected, SeekableByteChannel channel) throws IOException {
+        for (int readBufferSize = 1; readBufferSize <= expected.length + 5; readBufferSize++) {
+            check(expected, channel, readBufferSize);
+        }
+    }
+
+    private void check(final byte[] expected, final SeekableByteChannel channel, final int readBufferSize)
+        throws IOException {
+        Assert.assertTrue("readBufferSize " + readBufferSize, channel.isOpen());
+        Assert.assertEquals("readBufferSize " + readBufferSize, expected.length, channel.size());
+        channel.position(0);
+        Assert.assertEquals("readBufferSize " + readBufferSize, 0, channel.position());
+
+        // Will hold the entire result that we read
+        final ByteBuffer resultBuffer = ByteBuffer.allocate(expected.length + 100);
+
+        // Used for each read() method call
+        final ByteBuffer buf = ByteBuffer.allocate(readBufferSize);
+
+        int bytesRead = channel.read(buf);
+
+        while (bytesRead != -1) {
+            int remaining = buf.remaining();
+
+            buf.flip();
+            resultBuffer.put(buf);
+            buf.clear();
+            bytesRead = channel.read(buf);
+
+            // If this isn't the last read() then we expect the buf
+            // ByteBuffer to be full (i.e. have no remaining)
+            if (resultBuffer.position() < expected.length) {
+                Assert.assertEquals("readBufferSize " + readBufferSize, 0, remaining);
+            }
+
+            if (bytesRead == -1) {
+                Assert.assertEquals("readBufferSize " + readBufferSize, 0, buf.position());
+            } else {
+                Assert.assertEquals("readBufferSize " + readBufferSize, bytesRead, buf.position());
+            }
+        }
+
+        resultBuffer.flip();
+        byte[] arr = new byte[resultBuffer.remaining()];
+        resultBuffer.get(arr);
+        Assert.assertArrayEquals("readBufferSize " + readBufferSize, expected, arr);
+    }
+
+    private byte[][] grouped(final byte[] input, final int chunkSize) {
+        List<byte[]> groups = new ArrayList<>();
+        int idx = 0;
+        for (; idx + chunkSize <= input.length; idx += chunkSize) {
+            groups.add(Arrays.copyOfRange(input, idx, idx + chunkSize));
+        }
+        if (idx < input.length) {
+            groups.add(Arrays.copyOfRange(input, idx, input.length));
+        }
+        return groups.toArray(new byte[0][]);
+    }
+}