You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@commons.apache.org by gg...@apache.org on 2023/04/17 03:52:40 UTC

[commons-io] branch master updated: Add PollingQueueInputStream and TakingQueueInputStream to replace QueueInputStream

This is an automated email from the ASF dual-hosted git repository.

ggregory pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/commons-io.git


The following commit(s) were added to refs/heads/master by this push:
     new b21c2ac4 Add PollingQueueInputStream and TakingQueueInputStream to replace QueueInputStream
b21c2ac4 is described below

commit b21c2ac4009a31def29055468e05f290b1019bb5
Author: Gary Gregory <ga...@gmail.com>
AuthorDate: Sun Apr 16 23:52:36 2023 -0400

    Add PollingQueueInputStream and TakingQueueInputStream to replace
    QueueInputStream
---
 src/changes/changes.xml                            |   3 +
 .../io/input/AbstractBlockingQueueInputStream.java | 193 +++++++++++++++++++++
 .../apache/commons/io/input/QueueInputStream.java  |  26 +--
 ...a => AbstractBlockingQueueInputStreamTest.java} |  28 ++-
 .../io/input/PollingQueueInputStreamTest.java      |  39 +++++
 .../commons/io/input/QueueInputStreamTest.java     | 113 +-----------
 6 files changed, 268 insertions(+), 134 deletions(-)

diff --git a/src/changes/changes.xml b/src/changes/changes.xml
index 4500183e..5d97f7e4 100644
--- a/src/changes/changes.xml
+++ b/src/changes/changes.xml
@@ -471,6 +471,9 @@ The <action> type attribute can be add,update,fix,remove.
       <action dev="ggregory" type="add" due-to="Gary Gregory">
         Add builders and avoid creating more constructors for all permutations of current options.
       </action>
+      <action dev="ggregory" type="add" due-to="Gary Gregory, maxxedev">
+        Add PollingQueueInputStream and TakingQueueInputStream to replace QueueInputStream. See also #447.
+      </action>
       <!-- UPDATE -->
       <action dev="kinow" type="update" due-to="Dependabot, Gary Gregory">
         Bump actions/cache from 2.1.6 to 3.0.10 #307, #337, #393.
diff --git a/src/main/java/org/apache/commons/io/input/AbstractBlockingQueueInputStream.java b/src/main/java/org/apache/commons/io/input/AbstractBlockingQueueInputStream.java
new file mode 100644
index 00000000..6ba9a64d
--- /dev/null
+++ b/src/main/java/org/apache/commons/io/input/AbstractBlockingQueueInputStream.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.io.input;
+
+import static org.apache.commons.io.IOUtils.EOF;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.io.output.QueueOutputStream;
+
+/**
+ * Alternative to {@link java.io.PipedInputStream}, where this queue input stream provides what's written in a queue output stream.
+ * <p>
+ * Example usage, see {@link PollingQueueInputStream} and {@link TakingQueueInputStream}.
+ * </p>
+ *
+ * @see PollingQueueInputStream
+ * @see TakingQueueInputStream
+ * @since 2.12.0
+ */
+public abstract class AbstractBlockingQueueInputStream extends InputStream {
+
+    /**
+     * Simple alternative to JDK {@link java.io.PipedInputStream}; queue input stream provides what's written in queue output stream.
+     *
+     * <p>
+     * Example usage:
+     * </p>
+     *
+     * <pre>
+     * BlockingQueueInputStream inputStream = new BlockingQueueInputStream();
+     * QueueOutputStream outputStream = inputStream.newQueueOutputStream();
+     *
+     * outputStream.write("hello world".getBytes(UTF_8));
+     * inputStream.read();
+     * </pre>
+     * <p>
+     * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be used safely in a single thread or multiple threads.
+     * Also, unlike JDK classes, no special meaning is attached to initial or current thread. Instances can be used longer after initial threads exited.
+     * </p>
+     * <p>
+     * Closing a {@link PollingQueueInputStream} has no effect. The methods in this class can be called after the stream has been closed without generating an
+     * {@link IOException}.
+     * </p>
+     *
+     * @see QueueOutputStream
+     * @since 2.12.0
+     */
+    public static class PollingQueueInputStream extends AbstractBlockingQueueInputStream {
+
+        /**
+         * Constructs a new instance.
+         */
+        public PollingQueueInputStream() {
+        }
+
+        /**
+         * Constructs a new instance with given queue.
+         *
+         * @param blockingQueue backing queue for the stream.
+         */
+        public PollingQueueInputStream(final BlockingQueue<Integer> blockingQueue) {
+            super(blockingQueue);
+        }
+
+        @Override
+        public int read() throws IOException {
+            final Integer value = getBlockingQueue().poll();
+            return value == null ? EOF : 0xFF & value;
+        }
+    }
+
+    /**
+     * Simple alternative to JDK {@link java.io.PipedInputStream}; queue input stream provides what's written in queue output stream.
+     *
+     * <p>
+     * Example usage:
+     * </p>
+     *
+     * <pre>
+     * TakingQueueInputStream inputStream = new TakingQueueInputStream();
+     * QueueOutputStream outputStream = inputStream.newQueueOutputStream();
+     *
+     * outputStream.write("hello world".getBytes(UTF_8));
+     * inputStream.read();
+     * </pre>
+     * <p>
+     * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be used safely in a single thread or multiple threads.
+     * Also, unlike JDK classes, no special meaning is attached to initial or current thread. Instances can be used longer after initial threads exited.
+     * </p>
+     * <p>
+     * Closing a {@link TakingQueueInputStream} has no effect. The methods in this class can be called after the stream has been closed without generating an
+     * {@link IOException}.
+     * </p>
+     *
+     * @see QueueOutputStream
+     * @since 2.12.0
+     */
+    public static class TakingQueueInputStream extends AbstractBlockingQueueInputStream {
+
+        /**
+         * Constructs a new instance.
+         */
+        public TakingQueueInputStream() {
+        }
+
+        /**
+         * Constructs a new instance with given queue.
+         *
+         * @param blockingQueue backing queue for the stream.
+         */
+        public TakingQueueInputStream(final BlockingQueue<Integer> blockingQueue) {
+            super(blockingQueue);
+        }
+
+        /**
+         * Reads and returns a single byte.
+         *
+         * @return either the byte read or {@code -1} if the end of the stream has been reached
+         * @throws InterruptedIOException if interrupted while waiting to take.
+         */
+        @Override
+        public int read() throws InterruptedIOException {
+            try {
+                return getBlockingQueue().take();
+            } catch (final InterruptedException e) {
+                Thread.currentThread().interrupt();
+                final InterruptedIOException ioException = new InterruptedIOException();
+                ioException.initCause(e);
+                throw ioException;
+            }
+        }
+    }
+
+    private final BlockingQueue<Integer> blockingQueue;
+
+    /**
+     * Constructs a new instance with no limit to its internal buffer size.
+     */
+    protected AbstractBlockingQueueInputStream() {
+        this(new LinkedBlockingQueue<>());
+    }
+
+    /**
+     * Constructs a new instance with given buffer
+     *
+     * @param blockingQueue backing queue for the stream
+     */
+    protected AbstractBlockingQueueInputStream(final BlockingQueue<Integer> blockingQueue) {
+        this.blockingQueue = Objects.requireNonNull(blockingQueue, "blockingQueue");
+    }
+
+    /**
+     * Gets the underlying BlockingQueue.
+     *
+     * @return the underlying BlockingQueue.
+     */
+    protected BlockingQueue<Integer> getBlockingQueue() {
+        return blockingQueue;
+    }
+
+    /**
+     * Creates a new QueueOutputStream instance connected to this. Writes to the output stream will be visible to this input stream.
+     *
+     * @return QueueOutputStream connected to this stream
+     */
+    public QueueOutputStream newQueueOutputStream() {
+        return new QueueOutputStream(blockingQueue);
+    }
+
+}
diff --git a/src/main/java/org/apache/commons/io/input/QueueInputStream.java b/src/main/java/org/apache/commons/io/input/QueueInputStream.java
index 48cdadda..97be7a33 100644
--- a/src/main/java/org/apache/commons/io/input/QueueInputStream.java
+++ b/src/main/java/org/apache/commons/io/input/QueueInputStream.java
@@ -19,10 +19,8 @@ package org.apache.commons.io.input;
 import static org.apache.commons.io.IOUtils.EOF;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
-import java.util.Objects;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -54,10 +52,10 @@ import org.apache.commons.io.output.QueueOutputStream;
  *
  * @see QueueOutputStream
  * @since 2.9.0
+ * @deprecated Use {@link AbstractBlockingQueueInputStream.PollingQueueInputStream}
  */
-public class QueueInputStream extends InputStream {
-
-    private final BlockingQueue<Integer> blockingQueue;
+@Deprecated
+public class QueueInputStream extends AbstractBlockingQueueInputStream {
 
     /**
      * Constructs a new instance with no limit to its internal buffer size.
@@ -67,22 +65,12 @@ public class QueueInputStream extends InputStream {
     }
 
     /**
-     * Constructs a new instance with given buffer
+     * Constructs a new instance with given queue.
      *
-     * @param blockingQueue backing queue for the stream
+     * @param blockingQueue backing queue for the stream.
      */
     public QueueInputStream(final BlockingQueue<Integer> blockingQueue) {
-        this.blockingQueue = Objects.requireNonNull(blockingQueue, "blockingQueue");
-    }
-
-    /**
-     * Creates a new QueueOutputStream instance connected to this. Writes to the output stream will be visible to this
-     * input stream.
-     *
-     * @return QueueOutputStream connected to this stream
-     */
-    public QueueOutputStream newQueueOutputStream() {
-        return new QueueOutputStream(blockingQueue);
+        super(blockingQueue);
     }
 
     /**
@@ -92,7 +80,7 @@ public class QueueInputStream extends InputStream {
      */
     @Override
     public int read() {
-        final Integer value = blockingQueue.poll();
+        final Integer value = getBlockingQueue().poll();
         return value == null ? EOF : 0xFF & value;
     }
 
diff --git a/src/test/java/org/apache/commons/io/input/QueueInputStreamTest.java b/src/test/java/org/apache/commons/io/input/AbstractBlockingQueueInputStreamTest.java
similarity index 84%
copy from src/test/java/org/apache/commons/io/input/QueueInputStreamTest.java
copy to src/test/java/org/apache/commons/io/input/AbstractBlockingQueueInputStreamTest.java
index 2902bb97..31de3671 100644
--- a/src/test/java/org/apache/commons/io/input/QueueInputStreamTest.java
+++ b/src/test/java/org/apache/commons/io/input/AbstractBlockingQueueInputStreamTest.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.commons.io.input;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
@@ -32,6 +33,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.stream.Stream;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.input.AbstractBlockingQueueInputStream.PollingQueueInputStream;
 import org.apache.commons.io.output.QueueOutputStream;
 import org.apache.commons.io.output.QueueOutputStreamTest;
 import org.apache.commons.lang3.StringUtils;
@@ -41,11 +43,11 @@ import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
 /**
- * Test {@link QueueInputStream}.
+ * Test {@link PollingQueueInputStream}.
  *
- * @see {@link QueueOutputStreamTest}
+ * @see QueueOutputStreamTest
  */
-public class QueueInputStreamTest {
+public abstract class AbstractBlockingQueueInputStreamTest {
 
     public static Stream<Arguments> inputData() {
         return Stream.of(Arguments.of(""),
@@ -66,7 +68,7 @@ public class QueueInputStreamTest {
     @MethodSource("inputData")
     public void bufferedReads(final String inputData) throws IOException {
         final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
-        try (BufferedInputStream inputStream = new BufferedInputStream(new QueueInputStream(queue));
+        try (BufferedInputStream inputStream = new BufferedInputStream(newQueueInputStream(queue));
                 final QueueOutputStream outputStream = new QueueOutputStream(queue)) {
             outputStream.write(inputData.getBytes(UTF_8));
             final String actualData = IOUtils.toString(inputStream, UTF_8);
@@ -78,7 +80,7 @@ public class QueueInputStreamTest {
     @MethodSource("inputData")
     public void bufferedReadWrite(final String inputData) throws IOException {
         final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
-        try (BufferedInputStream inputStream = new BufferedInputStream(new QueueInputStream(queue));
+        try (BufferedInputStream inputStream = new BufferedInputStream(newQueueInputStream(queue));
                 final BufferedOutputStream outputStream = new BufferedOutputStream(new QueueOutputStream(queue), defaultBufferSize())) {
             outputStream.write(inputData.getBytes(UTF_8));
             outputStream.flush();
@@ -91,7 +93,7 @@ public class QueueInputStreamTest {
     @MethodSource("inputData")
     public void bufferedWrites(final String inputData) throws IOException {
         final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
-        try (QueueInputStream inputStream = new QueueInputStream(queue);
+        try (AbstractBlockingQueueInputStream inputStream = newQueueInputStream(queue);
                 final BufferedOutputStream outputStream = new BufferedOutputStream(new QueueOutputStream(queue), defaultBufferSize())) {
             outputStream.write(inputData.getBytes(UTF_8));
             outputStream.flush();
@@ -104,6 +106,14 @@ public class QueueInputStreamTest {
         return 8192;
     }
 
+    protected abstract AbstractBlockingQueueInputStream newQueueInputStream();
+
+    protected abstract AbstractBlockingQueueInputStream newQueueInputStream(final BlockingQueue<Integer> queue);
+
+    protected QueueOutputStream newQueueOutputStream(AbstractBlockingQueueInputStream inputStream) {
+        return inputStream.newQueueOutputStream();
+    }
+
     private String readUnbuffered(final InputStream inputStream) throws IOException {
         final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
         int n = -1;
@@ -115,14 +125,14 @@ public class QueueInputStreamTest {
 
     @Test
     public void testNullArgument() {
-        assertThrows(NullPointerException.class, () -> new QueueInputStream(null), "queue is required");
+        assertThrows(NullPointerException.class, () -> newQueueInputStream(null), "queue is required");
     }
 
     @ParameterizedTest(name = "inputData={0}")
     @MethodSource("inputData")
     public void unbufferedReadWrite(final String inputData) throws IOException {
-        try (QueueInputStream inputStream = new QueueInputStream();
-                final QueueOutputStream outputStream = inputStream.newQueueOutputStream()) {
+        try (AbstractBlockingQueueInputStream inputStream = newQueueInputStream();
+                final QueueOutputStream outputStream = newQueueOutputStream(inputStream)) {
             writeUnbuffered(outputStream, inputData);
             final String actualData = readUnbuffered(inputStream);
             assertEquals(inputData, actualData);
diff --git a/src/test/java/org/apache/commons/io/input/PollingQueueInputStreamTest.java b/src/test/java/org/apache/commons/io/input/PollingQueueInputStreamTest.java
new file mode 100644
index 00000000..303a55ea
--- /dev/null
+++ b/src/test/java/org/apache/commons/io/input/PollingQueueInputStreamTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.io.input;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.commons.io.input.AbstractBlockingQueueInputStream.PollingQueueInputStream;
+
+/**
+ * Tests {@link PollingQueueInputStream}.
+ */
+public class PollingQueueInputStreamTest extends AbstractBlockingQueueInputStreamTest {
+
+    @Override
+    protected AbstractBlockingQueueInputStream newQueueInputStream() {
+        return new PollingQueueInputStream();
+    }
+
+    @Override
+    protected AbstractBlockingQueueInputStream newQueueInputStream(final BlockingQueue<Integer> queue) {
+        return new PollingQueueInputStream(queue);
+    }
+
+}
diff --git a/src/test/java/org/apache/commons/io/input/QueueInputStreamTest.java b/src/test/java/org/apache/commons/io/input/QueueInputStreamTest.java
index 2902bb97..97c234a5 100644
--- a/src/test/java/org/apache/commons/io/input/QueueInputStreamTest.java
+++ b/src/test/java/org/apache/commons/io/input/QueueInputStreamTest.java
@@ -16,123 +16,24 @@
  */
 package org.apache.commons.io.input;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InterruptedIOException;
-import java.nio.charset.StandardCharsets;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.stream.Stream;
 
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.io.output.QueueOutputStream;
 import org.apache.commons.io.output.QueueOutputStreamTest;
-import org.apache.commons.lang3.StringUtils;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
 
 /**
  * Test {@link QueueInputStream}.
  *
  * @see {@link QueueOutputStreamTest}
  */
-public class QueueInputStreamTest {
-
-    public static Stream<Arguments> inputData() {
-        return Stream.of(Arguments.of(""),
-                Arguments.of("1"),
-                Arguments.of("12"),
-                Arguments.of("1234"),
-                Arguments.of("12345678"),
-                Arguments.of(StringUtils.repeat("A", 4095)),
-                Arguments.of(StringUtils.repeat("A", 4096)),
-                Arguments.of(StringUtils.repeat("A", 4097)),
-                Arguments.of(StringUtils.repeat("A", 8191)),
-                Arguments.of(StringUtils.repeat("A", 8192)),
-                Arguments.of(StringUtils.repeat("A", 8193)),
-                Arguments.of(StringUtils.repeat("A", 8192 * 4)));
-    }
-
-    @ParameterizedTest(name = "inputData={0}")
-    @MethodSource("inputData")
-    public void bufferedReads(final String inputData) throws IOException {
-        final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
-        try (BufferedInputStream inputStream = new BufferedInputStream(new QueueInputStream(queue));
-                final QueueOutputStream outputStream = new QueueOutputStream(queue)) {
-            outputStream.write(inputData.getBytes(UTF_8));
-            final String actualData = IOUtils.toString(inputStream, UTF_8);
-            assertEquals(inputData, actualData);
-        }
-    }
-
-    @ParameterizedTest(name = "inputData={0}")
-    @MethodSource("inputData")
-    public void bufferedReadWrite(final String inputData) throws IOException {
-        final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
-        try (BufferedInputStream inputStream = new BufferedInputStream(new QueueInputStream(queue));
-                final BufferedOutputStream outputStream = new BufferedOutputStream(new QueueOutputStream(queue), defaultBufferSize())) {
-            outputStream.write(inputData.getBytes(UTF_8));
-            outputStream.flush();
-            final String dataCopy = IOUtils.toString(inputStream, UTF_8);
-            assertEquals(inputData, dataCopy);
-        }
-    }
-
-    @ParameterizedTest(name = "inputData={0}")
-    @MethodSource("inputData")
-    public void bufferedWrites(final String inputData) throws IOException {
-        final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
-        try (QueueInputStream inputStream = new QueueInputStream(queue);
-                final BufferedOutputStream outputStream = new BufferedOutputStream(new QueueOutputStream(queue), defaultBufferSize())) {
-            outputStream.write(inputData.getBytes(UTF_8));
-            outputStream.flush();
-            final String actualData = readUnbuffered(inputStream);
-            assertEquals(inputData, actualData);
-        }
-    }
-
-    private int defaultBufferSize() {
-        return 8192;
-    }
-
-    private String readUnbuffered(final InputStream inputStream) throws IOException {
-        final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
-        int n = -1;
-        while ((n = inputStream.read()) != -1) {
-            byteArrayOutputStream.write(n);
-        }
-        return byteArrayOutputStream.toString(StandardCharsets.UTF_8.name());
-    }
-
-    @Test
-    public void testNullArgument() {
-        assertThrows(NullPointerException.class, () -> new QueueInputStream(null), "queue is required");
-    }
+public class QueueInputStreamTest extends AbstractBlockingQueueInputStreamTest {
 
-    @ParameterizedTest(name = "inputData={0}")
-    @MethodSource("inputData")
-    public void unbufferedReadWrite(final String inputData) throws IOException {
-        try (QueueInputStream inputStream = new QueueInputStream();
-                final QueueOutputStream outputStream = inputStream.newQueueOutputStream()) {
-            writeUnbuffered(outputStream, inputData);
-            final String actualData = readUnbuffered(inputStream);
-            assertEquals(inputData, actualData);
-        }
+    @Override
+    protected AbstractBlockingQueueInputStream newQueueInputStream() {
+        return new QueueInputStream();
     }
 
-    private void writeUnbuffered(final QueueOutputStream outputStream, final String inputData) throws InterruptedIOException {
-        final byte[] bytes = inputData.getBytes(UTF_8);
-        for (final byte oneByte : bytes) {
-            outputStream.write(oneByte);
-        }
+    @Override
+    protected AbstractBlockingQueueInputStream newQueueInputStream(final BlockingQueue<Integer> queue) {
+        return new QueueInputStream(queue);
     }
 }