You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@commons.apache.org by gg...@apache.org on 2023/06/13 21:43:45 UTC
[commons-io] branch master updated: Add IOUtils.skip[Fully](InputStream, long, Supplier)
This is an automated email from the ASF dual-hosted git repository.
ggregory pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/commons-io.git
The following commit(s) were added to refs/heads/master by this push:
new fcb1ce52 Add IOUtils.skip[Fully](InputStream, long, Supplier<byte>)
fcb1ce52 is described below
commit fcb1ce52f8662c9dda205be344ab02721c00aa99
Author: Gary Gregory <ga...@gmail.com>
AuthorDate: Tue Jun 13 17:43:40 2023 -0400
Add IOUtils.skip[Fully](InputStream, long, Supplier<byte>)
---
src/changes/changes.xml | 3 +
src/main/java/org/apache/commons/io/IOUtils.java | 68 +++++++++-
.../commons/io/IOUtilsMultithreadedSkipTest.java | 147 +++++++++++++++++++++
.../java/org/apache/commons/io/IOUtilsTest.java | 66 +++++++--
.../resources/org/apache/commons/io/TIKA-4065.bin | Bin 0 -> 1952 bytes
5 files changed, 268 insertions(+), 16 deletions(-)
diff --git a/src/changes/changes.xml b/src/changes/changes.xml
index ec5c285d..9265b572 100644
--- a/src/changes/changes.xml
+++ b/src/changes/changes.xml
@@ -55,6 +55,9 @@ The <action> type attribute can be add,update,fix,remove.
<action dev="ggregory" type="add" due-to="Gary Gregory">
Add FileCleaningTracker.track(Path, Object[, FileDeleteStrategy]).
</action>
+ <action dev="ggregory" type="add" due-to="Gary Gregory">
+ Add IOUtils.skip[Fully](InputStream, long, Supplier<byte[]>).
+ </action>
<!-- FIX -->
<action dev="ggregory" type="fix" issue="IO-799" due-to="Jeroen van der Vegt, Gary Gregory">
ReaderInputStream.read() throws an exception instead of returning -1 when called again after returning -1.
diff --git a/src/main/java/org/apache/commons/io/IOUtils.java b/src/main/java/org/apache/commons/io/IOUtils.java
index a8d8ffb2..b6b6f137 100644
--- a/src/main/java/org/apache/commons/io/IOUtils.java
+++ b/src/main/java/org/apache/commons/io/IOUtils.java
@@ -52,8 +52,10 @@ import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import java.util.zip.InflaterInputStream;
import org.apache.commons.io.function.IOConsumer;
import org.apache.commons.io.function.IOSupplier;
@@ -2375,6 +2377,36 @@ public class IOUtils {
* @since 2.0
*/
public static long skip(final InputStream input, final long toSkip) throws IOException {
+ return skip(input, toSkip, IOUtils::getScratchByteArrayWriteOnly);
+ }
+
+ /**
+ * Skips bytes from an input byte stream.
+ * <p>
+ * Intended for special cases when customization of the temporary buffer is needed because, for example, a nested input stream has requirements for the
+ * bytes read. For example, when using {@link InflaterInputStream}s from multiple threads.
+ * </p>
+ * <p>
+ * This implementation guarantees that it will read as many bytes as possible before giving up; this may not always be the case for skip() implementations
+ * in subclasses of {@link InputStream}.
+ * </p>
+ * <p>
+ * Note that the implementation uses {@link InputStream#read(byte[], int, int)} rather than delegating to {@link InputStream#skip(long)}. This means that
+ * the method may be considerably less efficient than using the actual skip implementation, this is done to guarantee that the correct number of bytes are
+ * skipped.
+ * </p>
+ *
+ * @param input byte stream to skip
+ * @param toSkip number of bytes to skip.
+ * @param skipBufferSupplier Supplies the buffer to use for reading.
+ * @return number of bytes actually skipped.
+ * @throws IOException if there is a problem reading the file
+ * @throws IllegalArgumentException if toSkip is negative
+ * @see InputStream#skip(long)
+ * @see <a href="https://issues.apache.org/jira/browse/IO-203">IO-203 - Add skipFully() method for InputStreams</a>
+ * @since 2.14.0
+ */
+ public static long skip(final InputStream input, final long toSkip, final Supplier<byte[]> skipBufferSupplier) throws IOException {
if (toSkip < 0) {
throw new IllegalArgumentException("Skip count must be non-negative, actual: " + toSkip);
}
@@ -2385,9 +2417,9 @@ public class IOUtils {
//
long remain = toSkip;
while (remain > 0) {
+ final byte[] skipBuffer = skipBufferSupplier.get();
// See https://issues.apache.org/jira/browse/IO-203 for why we use read() rather than delegating to skip()
- final byte[] byteArray = getScratchByteArrayWriteOnly();
- final long n = input.read(byteArray, 0, (int) Math.min(remain, byteArray.length));
+ final long n = input.read(skipBuffer, 0, (int) Math.min(remain, skipBuffer.length));
if (n < 0) { // EOF
break;
}
@@ -2485,10 +2517,40 @@ public class IOUtils {
* @since 2.0
*/
public static void skipFully(final InputStream input, final long toSkip) throws IOException {
+ final long skipped = skip(input, toSkip, IOUtils::getScratchByteArrayWriteOnly);
+ if (skipped != toSkip) {
+ throw new EOFException("Bytes to skip: " + toSkip + " actual: " + skipped);
+ }
+ }
+
+ /**
+ * Skips the requested number of bytes or fail if there are not enough left.
+ * <p>
+ * Intended for special cases when customization of the temporary buffer is needed because, for example, a nested input stream has requirements for the
+ * bytes read. For example, when using {@link InflaterInputStream}s from multiple threads.
+ * </p>
+ * <p>
+ * This allows for the possibility that {@link InputStream#skip(long)} may not skip as many bytes as requested (most likely because of reaching EOF).
+ * </p>
+ * <p>
+ * Note that the implementation uses {@link #skip(InputStream, long)}. This means that the method may be considerably less efficient than using the actual
+ * skip implementation, this is done to guarantee that the correct number of characters are skipped.
+ * </p>
+ *
+ * @param input stream to skip
+ * @param toSkip the number of bytes to skip
+ * @param skipBufferSupplier Supplies the buffer to use for reading.
+ * @throws IOException if there is a problem reading the file
+ * @throws IllegalArgumentException if toSkip is negative
+ * @throws EOFException if the number of bytes skipped was incorrect
+ * @see InputStream#skip(long)
+ * @since 2.14.0
+ */
+ public static void skipFully(final InputStream input, final long toSkip, final Supplier<byte[]> skipBufferSupplier) throws IOException {
if (toSkip < 0) {
throw new IllegalArgumentException("Bytes to skip must not be negative: " + toSkip);
}
- final long skipped = skip(input, toSkip);
+ final long skipped = skip(input, toSkip, skipBufferSupplier);
if (skipped != toSkip) {
throw new EOFException("Bytes to skip: " + toSkip + " actual: " + skipped);
}
diff --git a/src/test/java/org/apache/commons/io/IOUtilsMultithreadedSkipTest.java b/src/test/java/org/apache/commons/io/IOUtilsMultithreadedSkipTest.java
new file mode 100644
index 00000000..cd196c8b
--- /dev/null
+++ b/src/test/java/org/apache/commons/io/IOUtilsMultithreadedSkipTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.io;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Random;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.function.Supplier;
+import java.util.zip.Inflater;
+import java.util.zip.InflaterInputStream;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * See Jira ticket IO-802.
+ */
+public class IOUtilsMultithreadedSkipTest {
+
+ private static final String FIXTURE = "TIKA-4065.bin";
+ long seed = 1;
+ private final ThreadLocal<byte[]> threadLocal = ThreadLocal.withInitial(() -> new byte[4096]);
+
+ private int[] generateExpected(final InputStream is, final int[] skips) throws IOException {
+ final int[] testBytes = new int[skips.length];
+ for (int i = 0; i < skips.length; i++) {
+ try {
+ IOUtils.skipFully(is, skips[i]);
+ testBytes[i] = is.read();
+ } catch (final EOFException e) {
+ testBytes[i] = -1;
+ }
+ }
+ return testBytes;
+ }
+
+ private int[] generateSkips(final byte[] bytes, final int numSkips, final Random random) {
+ final int[] skips = new int[numSkips];
+ for (int i = 0; i < skips.length; i++) {
+ skips[i] = random.nextInt(bytes.length / numSkips) + bytes.length / 10;
+ }
+ return skips;
+ }
+
+ private InputStream inflate(final byte[] deflated) throws IOException {
+ final ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ IOUtils.copy(new InflaterInputStream(new ByteArrayInputStream(deflated), new Inflater(true)), bos);
+ return new ByteArrayInputStream(bos.toByteArray());
+ }
+
+ @BeforeEach
+ public void setUp() {
+ // Not the best random we can use but good enough here.
+ seed = new Random().nextLong();
+ }
+
+ private void testSkipFullyOnInflaterInputStream(final Supplier<byte[]> baSupplier) throws Exception {
+ final long thisSeed = seed;
+ // thisSeed = -727624427837034313l;
+ final Random random = new Random(thisSeed);
+ final byte[] bytes;
+ try (final InputStream inputStream = getClass().getResourceAsStream(FIXTURE)) {
+ bytes = IOUtils.toByteArray(inputStream);
+ }
+ final int numSkips = (random.nextInt(bytes.length) / 100) + 1;
+
+ final int skips[] = generateSkips(bytes, numSkips, random);
+ final int[] expected;
+ try (final InputStream inflate = inflate(bytes)) {
+ expected = generateExpected(inflate, skips);
+ }
+
+ final int numThreads = 2;
+ final int iterations = 100;
+ final ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
+ final ExecutorCompletionService<Integer> executorCompletionService = new ExecutorCompletionService<>(executorService);
+
+ for (int i = 0; i < numThreads; i++) {
+ executorCompletionService.submit(() -> {
+ for (int iteration = 0; iteration < iterations; iteration++) {
+ try (InputStream is = new InflaterInputStream(new ByteArrayInputStream(bytes), new Inflater(true))) {
+ for (int skipIndex = 0; skipIndex < skips.length; skipIndex++) {
+ try {
+ IOUtils.skipFully(is, skips[skipIndex], baSupplier);
+ final int c = is.read();
+ assertEquals(expected[skipIndex], c, "failed on seed=" + seed + " iteration=" + iteration);
+ } catch (final EOFException e) {
+ assertEquals(expected[skipIndex], is.read(), "failed on " + "seed=" + seed + " iteration=" + iteration);
+ }
+ }
+ }
+ }
+ return 1;
+ });
+ }
+
+ int finished = 0;
+ while (finished < numThreads) {
+ // blocking
+ final Future<Integer> future = executorCompletionService.take();
+ try {
+ future.get();
+ } catch (final Exception e) {
+ // printStackTrace() for simpler debugging
+ e.printStackTrace();
+ fail("failed on seed=" + seed);
+ }
+ finished++;
+ }
+ }
+
+ @Test
+ public void testSkipFullyOnInflaterInputStream_New_bytes() throws Exception {
+ testSkipFullyOnInflaterInputStream(() -> new byte[4096]);
+ }
+
+ @Test
+ public void testSkipFullyOnInflaterInputStream_ThreadLocal() throws Exception {
+ testSkipFullyOnInflaterInputStream(threadLocal::get);
+ }
+
+}
diff --git a/src/test/java/org/apache/commons/io/IOUtilsTest.java b/src/test/java/org/apache/commons/io/IOUtilsTest.java
index 909893df..c84bb2aa 100644
--- a/src/test/java/org/apache/commons/io/IOUtilsTest.java
+++ b/src/test/java/org/apache/commons/io/IOUtilsTest.java
@@ -60,6 +60,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
+import java.util.function.Supplier;
import java.util.stream.Stream;
import org.apache.commons.io.function.IOConsumer;
@@ -1310,13 +1311,53 @@ public class IOUtilsTest {
public void testSkipFully_InputStream() throws Exception {
final int size = 1027;
- final InputStream input = new ByteArrayInputStream(new byte[size]);
- assertThrows(IllegalArgumentException.class, () -> IOUtils.skipFully(input, -1), "Should have failed with IllegalArgumentException");
+ try (final InputStream input = new ByteArrayInputStream(new byte[size])) {
+ assertThrows(IllegalArgumentException.class, () -> IOUtils.skipFully(input, -1), "Should have failed with IllegalArgumentException");
- IOUtils.skipFully(input, 0);
- IOUtils.skipFully(input, size - 1);
- assertThrows(IOException.class, () -> IOUtils.skipFully(input, 2), "Should have failed with IOException");
- IOUtils.closeQuietly(input);
+ IOUtils.skipFully(input, 0);
+ IOUtils.skipFully(input, size - 1);
+ assertThrows(IOException.class, () -> IOUtils.skipFully(input, 2), "Should have failed with IOException");
+ }
+ }
+
+ @Test
+ public void testSkipFully_InputStream_Buffer_New_bytes() throws Exception {
+ final int size = 1027;
+ final Supplier<byte[]> bas = () -> new byte[size];
+ try (final InputStream input = new ByteArrayInputStream(new byte[size])) {
+ assertThrows(IllegalArgumentException.class, () -> IOUtils.skipFully(input, -1, bas), "Should have failed with IllegalArgumentException");
+
+ IOUtils.skipFully(input, 0, bas);
+ IOUtils.skipFully(input, size - 1, bas);
+ assertThrows(IOException.class, () -> IOUtils.skipFully(input, 2, bas), "Should have failed with IOException");
+ }
+ }
+
+ @Test
+ public void testSkipFully_InputStream_Buffer_Resuse_bytes() throws Exception {
+ final int size = 1027;
+ final byte[] ba = new byte[size];
+ final Supplier<byte[]> bas = () -> ba;
+ try (final InputStream input = new ByteArrayInputStream(new byte[size])) {
+ assertThrows(IllegalArgumentException.class, () -> IOUtils.skipFully(input, -1, bas), "Should have failed with IllegalArgumentException");
+
+ IOUtils.skipFully(input, 0, bas);
+ IOUtils.skipFully(input, size - 1, bas);
+ assertThrows(IOException.class, () -> IOUtils.skipFully(input, 2, bas), "Should have failed with IOException");
+ }
+ }
+
+ @Test
+ public void testSkipFully_InputStream_Buffer_Resuse_ThreadLocal() throws Exception {
+ final int size = 1027;
+ final ThreadLocal<byte[]> tl = ThreadLocal.withInitial(() -> new byte[size]);
+ try (final InputStream input = new ByteArrayInputStream(new byte[size])) {
+ assertThrows(IllegalArgumentException.class, () -> IOUtils.skipFully(input, -1, tl::get), "Should have failed with IllegalArgumentException");
+
+ IOUtils.skipFully(input, 0, tl::get);
+ IOUtils.skipFully(input, size - 1, tl::get);
+ assertThrows(IOException.class, () -> IOUtils.skipFully(input, 2, tl::get), "Should have failed with IOException");
+ }
}
@Test
@@ -1336,13 +1377,12 @@ public class IOUtilsTest {
@Test
public void testSkipFully_Reader() throws Exception {
final int size = 1027;
- final Reader input = new CharArrayReader(new char[size]);
-
- IOUtils.skipFully(input, 0);
- IOUtils.skipFully(input, size - 3);
- assertThrows(IllegalArgumentException.class, () -> IOUtils.skipFully(input, -1), "Should have failed with IllegalArgumentException");
- assertThrows(IOException.class, () -> IOUtils.skipFully(input, 5), "Should have failed with IOException");
- IOUtils.closeQuietly(input);
+ try (final Reader input = new CharArrayReader(new char[size])) {
+ IOUtils.skipFully(input, 0);
+ IOUtils.skipFully(input, size - 3);
+ assertThrows(IllegalArgumentException.class, () -> IOUtils.skipFully(input, -1), "Should have failed with IllegalArgumentException");
+ assertThrows(IOException.class, () -> IOUtils.skipFully(input, 5), "Should have failed with IOException");
+ }
}
@Test
diff --git a/src/test/resources/org/apache/commons/io/TIKA-4065.bin b/src/test/resources/org/apache/commons/io/TIKA-4065.bin
new file mode 100644
index 00000000..1803df9d
Binary files /dev/null and b/src/test/resources/org/apache/commons/io/TIKA-4065.bin differ