You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@commons.apache.org by gg...@apache.org on 2023/06/13 21:43:45 UTC

[commons-io] branch master updated: Add IOUtils.skip[Fully](InputStream, long, Supplier)

This is an automated email from the ASF dual-hosted git repository.

ggregory pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/commons-io.git


The following commit(s) were added to refs/heads/master by this push:
     new fcb1ce52 Add IOUtils.skip[Fully](InputStream, long, Supplier<byte>)
fcb1ce52 is described below

commit fcb1ce52f8662c9dda205be344ab02721c00aa99
Author: Gary Gregory <ga...@gmail.com>
AuthorDate: Tue Jun 13 17:43:40 2023 -0400

    Add IOUtils.skip[Fully](InputStream, long, Supplier<byte>)
---
 src/changes/changes.xml                            |   3 +
 src/main/java/org/apache/commons/io/IOUtils.java   |  68 +++++++++-
 .../commons/io/IOUtilsMultithreadedSkipTest.java   | 147 +++++++++++++++++++++
 .../java/org/apache/commons/io/IOUtilsTest.java    |  66 +++++++--
 .../resources/org/apache/commons/io/TIKA-4065.bin  | Bin 0 -> 1952 bytes
 5 files changed, 268 insertions(+), 16 deletions(-)

diff --git a/src/changes/changes.xml b/src/changes/changes.xml
index ec5c285d..9265b572 100644
--- a/src/changes/changes.xml
+++ b/src/changes/changes.xml
@@ -55,6 +55,9 @@ The <action> type attribute can be add,update,fix,remove.
       <action dev="ggregory" type="add" due-to="Gary Gregory">
         Add FileCleaningTracker.track(Path, Object[, FileDeleteStrategy]).
       </action>
+      <action dev="ggregory" type="add" due-to="Gary Gregory">
+        Add IOUtils.skip[Fully](InputStream, long, Supplier&lt;byte[]&gt;).
+      </action>
       <!-- FIX -->
       <action dev="ggregory" type="fix" issue="IO-799" due-to="Jeroen van der Vegt, Gary Gregory">
         ReaderInputStream.read() throws an exception instead of returning -1 when called again after returning -1.
diff --git a/src/main/java/org/apache/commons/io/IOUtils.java b/src/main/java/org/apache/commons/io/IOUtils.java
index a8d8ffb2..b6b6f137 100644
--- a/src/main/java/org/apache/commons/io/IOUtils.java
+++ b/src/main/java/org/apache/commons/io/IOUtils.java
@@ -52,8 +52,10 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 import java.util.function.Consumer;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import java.util.zip.InflaterInputStream;
 
 import org.apache.commons.io.function.IOConsumer;
 import org.apache.commons.io.function.IOSupplier;
@@ -2375,6 +2377,36 @@ public class IOUtils {
      * @since 2.0
      */
     public static long skip(final InputStream input, final long toSkip) throws IOException {
+        return skip(input, toSkip, IOUtils::getScratchByteArrayWriteOnly);
+    }
+
+    /**
+     * Skips bytes from an input byte stream.
+     * <p>
+     * Intended for special cases when customization of the temporary buffer is needed because, for example, a nested input stream has requirements for the
+     * bytes read. For example, when using {@link InflaterInputStream}s from multiple threads.
+     * </p>
+     * <p>
+     * This implementation guarantees that it will read as many bytes as possible before giving up; this may not always be the case for skip() implementations
+     * in subclasses of {@link InputStream}.
+     * </p>
+     * <p>
+     * Note that the implementation uses {@link InputStream#read(byte[], int, int)} rather than delegating to {@link InputStream#skip(long)}. This means that
+     * the method may be considerably less efficient than using the actual skip implementation, this is done to guarantee that the correct number of bytes are
+     * skipped.
+     * </p>
+     *
+     * @param input              byte stream to skip
+     * @param toSkip             number of bytes to skip.
+     * @param skipBufferSupplier Supplies the buffer to use for reading.
+     * @return number of bytes actually skipped.
+     * @throws IOException              if there is a problem reading the file
+     * @throws IllegalArgumentException if toSkip is negative
+     * @see InputStream#skip(long)
+     * @see <a href="https://issues.apache.org/jira/browse/IO-203">IO-203 - Add skipFully() method for InputStreams</a>
+     * @since 2.14.0
+     */
+    public static long skip(final InputStream input, final long toSkip, final Supplier<byte[]> skipBufferSupplier) throws IOException {
         if (toSkip < 0) {
             throw new IllegalArgumentException("Skip count must be non-negative, actual: " + toSkip);
         }
@@ -2385,9 +2417,9 @@ public class IOUtils {
         //
         long remain = toSkip;
         while (remain > 0) {
+            final byte[] skipBuffer = skipBufferSupplier.get();
             // See https://issues.apache.org/jira/browse/IO-203 for why we use read() rather than delegating to skip()
-            final byte[] byteArray = getScratchByteArrayWriteOnly();
-            final long n = input.read(byteArray, 0, (int) Math.min(remain, byteArray.length));
+            final long n = input.read(skipBuffer, 0, (int) Math.min(remain, skipBuffer.length));
             if (n < 0) { // EOF
                 break;
             }
@@ -2485,10 +2517,40 @@ public class IOUtils {
      * @since 2.0
      */
     public static void skipFully(final InputStream input, final long toSkip) throws IOException {
+        final long skipped = skip(input, toSkip, IOUtils::getScratchByteArrayWriteOnly);
+        if (skipped != toSkip) {
+            throw new EOFException("Bytes to skip: " + toSkip + " actual: " + skipped);
+        }
+    }
+
+    /**
+     * Skips the requested number of bytes or fail if there are not enough left.
+     * <p>
+     * Intended for special cases when customization of the temporary buffer is needed because, for example, a nested input stream has requirements for the
+     * bytes read. For example, when using {@link InflaterInputStream}s from multiple threads.
+     * </p>
+     * <p>
+     * This allows for the possibility that {@link InputStream#skip(long)} may not skip as many bytes as requested (most likely because of reaching EOF).
+     * </p>
+     * <p>
+     * Note that the implementation uses {@link #skip(InputStream, long)}. This means that the method may be considerably less efficient than using the actual
+     * skip implementation, this is done to guarantee that the correct number of characters are skipped.
+     * </p>
+     *
+     * @param input              stream to skip
+     * @param toSkip             the number of bytes to skip
+     * @param skipBufferSupplier Supplies the buffer to use for reading.
+     * @throws IOException              if there is a problem reading the file
+     * @throws IllegalArgumentException if toSkip is negative
+     * @throws EOFException             if the number of bytes skipped was incorrect
+     * @see InputStream#skip(long)
+     * @since 2.14.0
+     */
+    public static void skipFully(final InputStream input, final long toSkip, final Supplier<byte[]> skipBufferSupplier) throws IOException {
         if (toSkip < 0) {
             throw new IllegalArgumentException("Bytes to skip must not be negative: " + toSkip);
         }
-        final long skipped = skip(input, toSkip);
+        final long skipped = skip(input, toSkip, skipBufferSupplier);
         if (skipped != toSkip) {
             throw new EOFException("Bytes to skip: " + toSkip + " actual: " + skipped);
         }
diff --git a/src/test/java/org/apache/commons/io/IOUtilsMultithreadedSkipTest.java b/src/test/java/org/apache/commons/io/IOUtilsMultithreadedSkipTest.java
new file mode 100644
index 00000000..cd196c8b
--- /dev/null
+++ b/src/test/java/org/apache/commons/io/IOUtilsMultithreadedSkipTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.io;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Random;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.function.Supplier;
+import java.util.zip.Inflater;
+import java.util.zip.InflaterInputStream;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * See Jira ticket IO-802.
+ */
+public class IOUtilsMultithreadedSkipTest {
+
+    private static final String FIXTURE = "TIKA-4065.bin";
+    long seed = 1;
+    private final ThreadLocal<byte[]> threadLocal = ThreadLocal.withInitial(() -> new byte[4096]);
+
+    private int[] generateExpected(final InputStream is, final int[] skips) throws IOException {
+        final int[] testBytes = new int[skips.length];
+        for (int i = 0; i < skips.length; i++) {
+            try {
+                IOUtils.skipFully(is, skips[i]);
+                testBytes[i] = is.read();
+            } catch (final EOFException e) {
+                testBytes[i] = -1;
+            }
+        }
+        return testBytes;
+    }
+
+    private int[] generateSkips(final byte[] bytes, final int numSkips, final Random random) {
+        final int[] skips = new int[numSkips];
+        for (int i = 0; i < skips.length; i++) {
+            skips[i] = random.nextInt(bytes.length / numSkips) + bytes.length / 10;
+        }
+        return skips;
+    }
+
+    private InputStream inflate(final byte[] deflated) throws IOException {
+        final ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        IOUtils.copy(new InflaterInputStream(new ByteArrayInputStream(deflated), new Inflater(true)), bos);
+        return new ByteArrayInputStream(bos.toByteArray());
+    }
+
+    @BeforeEach
+    public void setUp() {
+        // Not the best random we can use but good enough here.
+        seed = new Random().nextLong();
+    }
+
+    private void testSkipFullyOnInflaterInputStream(final Supplier<byte[]> baSupplier) throws Exception {
+        final long thisSeed = seed;
+        // thisSeed = -727624427837034313l;
+        final Random random = new Random(thisSeed);
+        final byte[] bytes;
+        try (final InputStream inputStream = getClass().getResourceAsStream(FIXTURE)) {
+            bytes = IOUtils.toByteArray(inputStream);
+        }
+        final int numSkips = (random.nextInt(bytes.length) / 100) + 1;
+
+        final int skips[] = generateSkips(bytes, numSkips, random);
+        final int[] expected;
+        try (final InputStream inflate = inflate(bytes)) {
+            expected = generateExpected(inflate, skips);
+        }
+
+        final int numThreads = 2;
+        final int iterations = 100;
+        final ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
+        final ExecutorCompletionService<Integer> executorCompletionService = new ExecutorCompletionService<>(executorService);
+
+        for (int i = 0; i < numThreads; i++) {
+            executorCompletionService.submit(() -> {
+                for (int iteration = 0; iteration < iterations; iteration++) {
+                    try (InputStream is = new InflaterInputStream(new ByteArrayInputStream(bytes), new Inflater(true))) {
+                        for (int skipIndex = 0; skipIndex < skips.length; skipIndex++) {
+                            try {
+                                IOUtils.skipFully(is, skips[skipIndex], baSupplier);
+                                final int c = is.read();
+                                assertEquals(expected[skipIndex], c, "failed on seed=" + seed + " iteration=" + iteration);
+                            } catch (final EOFException e) {
+                                assertEquals(expected[skipIndex], is.read(), "failed on " + "seed=" + seed + " iteration=" + iteration);
+                            }
+                        }
+                    }
+                }
+                return 1;
+            });
+        }
+
+        int finished = 0;
+        while (finished < numThreads) {
+            // blocking
+            final Future<Integer> future = executorCompletionService.take();
+            try {
+                future.get();
+            } catch (final Exception e) {
+                // printStackTrace() for simpler debugging
+                e.printStackTrace();
+                fail("failed on seed=" + seed);
+            }
+            finished++;
+        }
+    }
+
+    @Test
+    public void testSkipFullyOnInflaterInputStream_New_bytes() throws Exception {
+        testSkipFullyOnInflaterInputStream(() -> new byte[4096]);
+    }
+
+    @Test
+    public void testSkipFullyOnInflaterInputStream_ThreadLocal() throws Exception {
+        testSkipFullyOnInflaterInputStream(threadLocal::get);
+    }
+
+}
diff --git a/src/test/java/org/apache/commons/io/IOUtilsTest.java b/src/test/java/org/apache/commons/io/IOUtilsTest.java
index 909893df..c84bb2aa 100644
--- a/src/test/java/org/apache/commons/io/IOUtilsTest.java
+++ b/src/test/java/org/apache/commons/io/IOUtilsTest.java
@@ -60,6 +60,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Arrays;
 import java.util.List;
+import java.util.function.Supplier;
 import java.util.stream.Stream;
 
 import org.apache.commons.io.function.IOConsumer;
@@ -1310,13 +1311,53 @@ public class IOUtilsTest {
     public void testSkipFully_InputStream() throws Exception {
         final int size = 1027;
 
-        final InputStream input = new ByteArrayInputStream(new byte[size]);
-        assertThrows(IllegalArgumentException.class, () -> IOUtils.skipFully(input, -1), "Should have failed with IllegalArgumentException");
+        try (final InputStream input = new ByteArrayInputStream(new byte[size])) {
+            assertThrows(IllegalArgumentException.class, () -> IOUtils.skipFully(input, -1), "Should have failed with IllegalArgumentException");
 
-        IOUtils.skipFully(input, 0);
-        IOUtils.skipFully(input, size - 1);
-        assertThrows(IOException.class, () -> IOUtils.skipFully(input, 2), "Should have failed with IOException");
-        IOUtils.closeQuietly(input);
+            IOUtils.skipFully(input, 0);
+            IOUtils.skipFully(input, size - 1);
+            assertThrows(IOException.class, () -> IOUtils.skipFully(input, 2), "Should have failed with IOException");
+        }
+    }
+
+    @Test
+    public void testSkipFully_InputStream_Buffer_New_bytes() throws Exception {
+        final int size = 1027;
+        final Supplier<byte[]> bas = () -> new byte[size];
+        try (final InputStream input = new ByteArrayInputStream(new byte[size])) {
+            assertThrows(IllegalArgumentException.class, () -> IOUtils.skipFully(input, -1, bas), "Should have failed with IllegalArgumentException");
+
+            IOUtils.skipFully(input, 0, bas);
+            IOUtils.skipFully(input, size - 1, bas);
+            assertThrows(IOException.class, () -> IOUtils.skipFully(input, 2, bas), "Should have failed with IOException");
+        }
+    }
+
+    @Test
+    public void testSkipFully_InputStream_Buffer_Resuse_bytes() throws Exception {
+        final int size = 1027;
+        final byte[] ba = new byte[size];
+        final Supplier<byte[]> bas = () -> ba;
+        try (final InputStream input = new ByteArrayInputStream(new byte[size])) {
+            assertThrows(IllegalArgumentException.class, () -> IOUtils.skipFully(input, -1, bas), "Should have failed with IllegalArgumentException");
+
+            IOUtils.skipFully(input, 0, bas);
+            IOUtils.skipFully(input, size - 1, bas);
+            assertThrows(IOException.class, () -> IOUtils.skipFully(input, 2, bas), "Should have failed with IOException");
+        }
+    }
+
+    @Test
+    public void testSkipFully_InputStream_Buffer_Resuse_ThreadLocal() throws Exception {
+        final int size = 1027;
+        final ThreadLocal<byte[]> tl = ThreadLocal.withInitial(() -> new byte[size]);
+        try (final InputStream input = new ByteArrayInputStream(new byte[size])) {
+            assertThrows(IllegalArgumentException.class, () -> IOUtils.skipFully(input, -1, tl::get), "Should have failed with IllegalArgumentException");
+
+            IOUtils.skipFully(input, 0, tl::get);
+            IOUtils.skipFully(input, size - 1, tl::get);
+            assertThrows(IOException.class, () -> IOUtils.skipFully(input, 2, tl::get), "Should have failed with IOException");
+        }
     }
 
     @Test
@@ -1336,13 +1377,12 @@ public class IOUtilsTest {
     @Test
     public void testSkipFully_Reader() throws Exception {
         final int size = 1027;
-        final Reader input = new CharArrayReader(new char[size]);
-
-        IOUtils.skipFully(input, 0);
-        IOUtils.skipFully(input, size - 3);
-        assertThrows(IllegalArgumentException.class, () -> IOUtils.skipFully(input, -1), "Should have failed with IllegalArgumentException");
-        assertThrows(IOException.class, () -> IOUtils.skipFully(input, 5), "Should have failed with IOException");
-        IOUtils.closeQuietly(input);
+        try (final Reader input = new CharArrayReader(new char[size])) {
+            IOUtils.skipFully(input, 0);
+            IOUtils.skipFully(input, size - 3);
+            assertThrows(IllegalArgumentException.class, () -> IOUtils.skipFully(input, -1), "Should have failed with IllegalArgumentException");
+            assertThrows(IOException.class, () -> IOUtils.skipFully(input, 5), "Should have failed with IOException");
+        }
     }
 
     @Test
diff --git a/src/test/resources/org/apache/commons/io/TIKA-4065.bin b/src/test/resources/org/apache/commons/io/TIKA-4065.bin
new file mode 100644
index 00000000..1803df9d
Binary files /dev/null and b/src/test/resources/org/apache/commons/io/TIKA-4065.bin differ