You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@commons.apache.org by bo...@apache.org on 2019/08/18 14:59:13 UTC
[commons-compress] branch master updated: COMPRESS-231 add a
concatenated SeekableByteChannel based on code by Tim Underwood
This is an automated email from the ASF dual-hosted git repository.
bodewig pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/commons-compress.git
The following commit(s) were added to refs/heads/master by this push:
new e959762 COMPRESS-231 add a concatenated SeekableByteChannel based on code by Tim Underwood
e959762 is described below
commit e959762ed200d9bc03fd858b8c96e35f95cada90
Author: Stefan Bodewig <bo...@apache.org>
AuthorDate: Sun Aug 18 16:58:21 2019 +0200
COMPRESS-231 add a concatenated SeekableByteChannel based on code by Tim Underwood
---
.../utils/MultiReadOnlySeekableByteChannel.java | 219 +++++++++++++++++++++
.../MultiReadOnlySeekableByteChannelTest.java | 217 ++++++++++++++++++++
2 files changed, 436 insertions(+)
diff --git a/src/main/java/org/apache/commons/compress/utils/MultiReadOnlySeekableByteChannel.java b/src/main/java/org/apache/commons/compress/utils/MultiReadOnlySeekableByteChannel.java
new file mode 100644
index 0000000..fd78f9d
--- /dev/null
+++ b/src/main/java/org/apache/commons/compress/utils/MultiReadOnlySeekableByteChannel.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.commons.compress.utils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.NonWritableChannelException;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Read-Only Implementation of {@link SeekableByteChannel} that
+ * concatenates a collection of other {@link SeekableByteChannel}s.
+ *
+ * <p>This is a lose port of <a
+ * href="https://github.com/frugalmechanic/fm-common/blob/master/jvm/src/main/scala/fm/common/MultiReadOnlySeekableByteChannel.scala">MultiReadOnlySeekableByteChannel</a>
+ * by Tim Underwood.</p>
+ *
+ * @since 1.19
+ */
+public class MultiReadOnlySeekableByteChannel implements SeekableByteChannel {
+
+ private final List<SeekableByteChannel> channels;
+ private long globalPosition;
+ private int currentChannelIdx;
+
+ /**
+ * Concatenates the given channels.
+ *
+ * @param channels the channels to concatenate
+ * @throws NullPointerException if channels is null
+ */
+ public MultiReadOnlySeekableByteChannel(List<SeekableByteChannel> channels) {
+ this.channels = Collections.unmodifiableList(new ArrayList<>(
+ Objects.requireNonNull(channels, "channels must not be null")));
+ }
+
+ @Override
+ public synchronized int read(ByteBuffer dst) throws IOException {
+ if (!isOpen()) {
+ throw new ClosedChannelException();
+ }
+
+ int totalBytesRead = 0;
+ while (dst.hasRemaining() && currentChannelIdx < channels.size()) {
+ final SeekableByteChannel currentChannel = channels.get(currentChannelIdx);
+ final int newBytesRead = currentChannel.read(dst);
+ if (newBytesRead == -1) {
+ // EOF for this channel -- advance to next channel idx
+ currentChannelIdx += 1;
+ continue;
+ }
+ if (currentChannel.position() >= currentChannel.size()) {
+ // we are at the end of the current channel
+ currentChannelIdx++;
+ }
+ totalBytesRead += newBytesRead;
+ }
+ if (totalBytesRead > 0) {
+ globalPosition += totalBytesRead;
+ return totalBytesRead;
+ }
+ return -1;
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOException first = null;
+ for (SeekableByteChannel ch : channels) {
+ try {
+ ch.close();
+ } catch (IOException ex) {
+ if (first == null) {
+ first = ex;
+ }
+ }
+ }
+ if (first != null) {
+ throw new IOException("failed to close wrapped channel", first);
+ }
+ }
+
+ @Override
+ public boolean isOpen() {
+ for (SeekableByteChannel ch : channels) {
+ if (!ch.isOpen()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public long position() {
+ return globalPosition;
+ }
+
+ @Override
+ public long size() throws IOException {
+ long acc = 0;
+ for (SeekableByteChannel ch : channels) {
+ acc += ch.size();
+ }
+ return acc;
+ }
+
+ /**
+ * @throws NonWritableChannelException since this implementation is read-only.
+ */
+ @Override
+ public SeekableByteChannel truncate(long size) {
+ throw new NonWritableChannelException();
+ }
+
+ /**
+ * @throws NonWritableChannelException since this implementation is read-only.
+ */
+ @Override
+ public int write(ByteBuffer src) {
+ throw new NonWritableChannelException();
+ }
+
+ @Override
+ public synchronized SeekableByteChannel position(long newPosition) throws IOException {
+ if (newPosition < 0) {
+ throw new IllegalArgumentException("Negative position: " + newPosition);
+ }
+ if (!isOpen()) {
+ throw new ClosedChannelException();
+ }
+
+ globalPosition = newPosition;
+
+ long pos = newPosition;
+
+ for (int i = 0; i < channels.size(); i++) {
+ final SeekableByteChannel currentChannel = channels.get(i);
+ final long size = currentChannel.size();
+
+ final long newChannelPos;
+ if (pos == -1L) {
+ // Position is already set for the correct channel,
+ // the rest of the channels get reset to 0
+ newChannelPos = 0;
+ } else if (pos <= size) {
+ // This channel is where we want to be
+ currentChannelIdx = i;
+ long tmp = pos;
+ pos = -1L; // Mark pos as already being set
+ newChannelPos = tmp;
+ } else {
+ // newPosition is past this channel. Set channel
+ // position to the end and substract channel size from
+ // pos
+ pos -= size;
+ newChannelPos = size;
+ }
+
+ currentChannel.position(newChannelPos);
+ }
+ return this;
+ }
+
+ /**
+ * Concatenates the given channels.
+ *
+ * @param channels the channels to concatenate
+ * @throws NullPointerException if channels is null
+ */
+ public static SeekableByteChannel forSeekableByteChannels(SeekableByteChannel... channels) {
+ if (Objects.requireNonNull(channels, "channels must not be null").length == 1) {
+ return channels[0];
+ }
+ return new MultiReadOnlySeekableByteChannel(Arrays.asList(channels));
+ }
+
+ /**
+ * Concatenates the given files.
+ *
+ * @param files the files to concatenate
+ * @throws NullPointerException if files is null
+ * @throws IOException if opening a channel for one of the files fails
+ */
+ public static SeekableByteChannel forFiles(File... files) throws IOException {
+ List<SeekableByteChannel> channels = new ArrayList<>();
+ for (File f : Objects.requireNonNull(files, "files must not be null")) {
+ channels.add(Files.newByteChannel(f.toPath(), StandardOpenOption.READ));
+ }
+ if (channels.size() == 1) {
+ return channels.get(0);
+ }
+ return new MultiReadOnlySeekableByteChannel(channels);
+ }
+
+}
diff --git a/src/test/java/org/apache/commons/compress/utils/MultiReadOnlySeekableByteChannelTest.java b/src/test/java/org/apache/commons/compress/utils/MultiReadOnlySeekableByteChannelTest.java
new file mode 100644
index 0000000..0892077
--- /dev/null
+++ b/src/test/java/org/apache/commons/compress/utils/MultiReadOnlySeekableByteChannelTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.commons.compress.utils;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Initially based on <a
+ * href="https://github.com/frugalmechanic/fm-common/blob/master/jvm/src/test/scala/fm/common/TestMultiReadOnlySeekableByteChannel.scala">TestMultiReadOnlySeekableByteChannel.scala</a>
+ * by Tim Underwood.
+ */
+public class MultiReadOnlySeekableByteChannelTest {
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void constructorThrowsOnNullArg() {
+ thrown.expect(NullPointerException.class);
+ new MultiReadOnlySeekableByteChannel(null);
+ }
+
+ @Test
+ public void forSeekableByteChannelsThrowsOnNullArg() {
+ thrown.expect(NullPointerException.class);
+ MultiReadOnlySeekableByteChannel.forSeekableByteChannels(null);
+ }
+
+ @Test
+ public void forFilesThrowsOnNullArg() throws IOException {
+ thrown.expect(NullPointerException.class);
+ MultiReadOnlySeekableByteChannel.forFiles(null);
+ }
+
+ @Test
+ public void forSeekableByteChannelsReturnsIdentityForSingleElement() {
+ final SeekableByteChannel e = makeEmpty();
+ final SeekableByteChannel m = MultiReadOnlySeekableByteChannel.forSeekableByteChannels(e);
+ Assert.assertSame(e, m);
+ }
+
+ @Test
+ public void referenceBehaviorForEmptyChannel() throws IOException {
+ checkEmpty(makeEmpty());
+ }
+
+ @Test
+ public void twoEmptyChannelsConcatenateAsEmptyChannel() throws IOException {
+ checkEmpty(MultiReadOnlySeekableByteChannel.forSeekableByteChannels(makeEmpty(), makeEmpty()));
+ }
+
+ @Test
+ public void checkForSingleByte() throws IOException {
+ check(new byte[] { 0 });
+ }
+
+ @Test
+ public void checkForString() throws IOException {
+ check("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
+ .getBytes(StandardCharsets.UTF_8));
+ }
+
+ @Test
+ public void verifyGrouped() {
+ Assert.assertArrayEquals(new byte[][] {
+ new byte[] { 1, 2, 3, },
+ new byte[] { 4, 5, 6, },
+ new byte[] { 7, },
+ }, grouped(new byte[] { 1, 2, 3, 4, 5, 6, 7 }, 3));
+ Assert.assertArrayEquals(new byte[][] {
+ new byte[] { 1, 2, 3, },
+ new byte[] { 4, 5, 6, },
+ }, grouped(new byte[] { 1, 2, 3, 4, 5, 6 }, 3));
+ Assert.assertArrayEquals(new byte[][] {
+ new byte[] { 1, 2, 3, },
+ new byte[] { 4, 5, },
+ }, grouped(new byte[] { 1, 2, 3, 4, 5, }, 3));
+ }
+
+ private SeekableByteChannel makeEmpty() {
+ return makeSingle(new byte[0]);
+ }
+
+ private SeekableByteChannel makeSingle(byte[] arr) {
+ return new SeekableInMemoryByteChannel(arr);
+ }
+
+ private SeekableByteChannel makeMulti(byte[][] arr) {
+ SeekableByteChannel[] s = new SeekableByteChannel[arr.length];
+ for (int i = 0; i < s.length; i++) {
+ s[i] = makeSingle(arr[i]);
+ }
+ return MultiReadOnlySeekableByteChannel.forSeekableByteChannels(s);
+ }
+
+ private void checkEmpty(SeekableByteChannel channel) throws IOException {
+ ByteBuffer buf = ByteBuffer.allocate(10);
+
+ Assert.assertTrue(channel.isOpen());
+ Assert.assertEquals(0, channel.size());
+ Assert.assertEquals(0, channel.position());
+ Assert.assertEquals(-1, channel.read(buf));
+
+ channel.position(5);
+ Assert.assertEquals(-1, channel.read(buf));
+
+ channel.close();
+ Assert.assertFalse(channel.isOpen());
+
+ try {
+ channel.read(buf);
+ Assert.fail("expected a ClosedChannelException");
+ } catch (ClosedChannelException expected) {
+ }
+ try {
+ channel.position(100);
+ Assert.fail("expected a ClosedChannelException");
+ } catch (ClosedChannelException expected) {
+ }
+ }
+
+ private void check(final byte[] expected) throws IOException {
+ for (int channelSize = 1; channelSize <= expected.length; channelSize++) {
+ // Sanity check that all operations work for SeekableInMemoryByteChannel
+ check(expected, makeSingle(expected));
+ // Checks against our MultiReadOnlySeekableByteChannel instance
+ check(expected, makeMulti(grouped(expected, channelSize)));
+ }
+ }
+
+ private void check(final byte[] expected, SeekableByteChannel channel) throws IOException {
+ for (int readBufferSize = 1; readBufferSize <= expected.length + 5; readBufferSize++) {
+ check(expected, channel, readBufferSize);
+ }
+ }
+
+ private void check(final byte[] expected, final SeekableByteChannel channel, final int readBufferSize)
+ throws IOException {
+ Assert.assertTrue("readBufferSize " + readBufferSize, channel.isOpen());
+ Assert.assertEquals("readBufferSize " + readBufferSize, expected.length, channel.size());
+ channel.position(0);
+ Assert.assertEquals("readBufferSize " + readBufferSize, 0, channel.position());
+
+ // Will hold the entire result that we read
+ final ByteBuffer resultBuffer = ByteBuffer.allocate(expected.length + 100);
+
+ // Used for each read() method call
+ final ByteBuffer buf = ByteBuffer.allocate(readBufferSize);
+
+ int bytesRead = channel.read(buf);
+
+ while (bytesRead != -1) {
+ int remaining = buf.remaining();
+
+ buf.flip();
+ resultBuffer.put(buf);
+ buf.clear();
+ bytesRead = channel.read(buf);
+
+ // If this isn't the last read() then we expect the buf
+ // ByteBuffer to be full (i.e. have no remaining)
+ if (resultBuffer.position() < expected.length) {
+ Assert.assertEquals("readBufferSize " + readBufferSize, 0, remaining);
+ }
+
+ if (bytesRead == -1) {
+ Assert.assertEquals("readBufferSize " + readBufferSize, 0, buf.position());
+ } else {
+ Assert.assertEquals("readBufferSize " + readBufferSize, bytesRead, buf.position());
+ }
+ }
+
+ resultBuffer.flip();
+ byte[] arr = new byte[resultBuffer.remaining()];
+ resultBuffer.get(arr);
+ Assert.assertArrayEquals("readBufferSize " + readBufferSize, expected, arr);
+ }
+
+ private byte[][] grouped(final byte[] input, final int chunkSize) {
+ List<byte[]> groups = new ArrayList<>();
+ int idx = 0;
+ for (; idx + chunkSize <= input.length; idx += chunkSize) {
+ groups.add(Arrays.copyOfRange(input, idx, idx + chunkSize));
+ }
+ if (idx < input.length) {
+ groups.add(Arrays.copyOfRange(input, idx, input.length));
+ }
+ return groups.toArray(new byte[0][]);
+ }
+}