You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@commons.apache.org by gg...@apache.org on 2023/04/17 03:52:40 UTC
[commons-io] branch master updated: Add PollingQueueInputStream and TakingQueueInputStream to replace QueueInputStream
This is an automated email from the ASF dual-hosted git repository.
ggregory pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/commons-io.git
The following commit(s) were added to refs/heads/master by this push:
new b21c2ac4 Add PollingQueueInputStream and TakingQueueInputStream to replace QueueInputStream
b21c2ac4 is described below
commit b21c2ac4009a31def29055468e05f290b1019bb5
Author: Gary Gregory <ga...@gmail.com>
AuthorDate: Sun Apr 16 23:52:36 2023 -0400
Add PollingQueueInputStream and TakingQueueInputStream to replace
QueueInputStream
---
src/changes/changes.xml | 3 +
.../io/input/AbstractBlockingQueueInputStream.java | 193 +++++++++++++++++++++
.../apache/commons/io/input/QueueInputStream.java | 26 +--
...a => AbstractBlockingQueueInputStreamTest.java} | 28 ++-
.../io/input/PollingQueueInputStreamTest.java | 39 +++++
.../commons/io/input/QueueInputStreamTest.java | 113 +-----------
6 files changed, 268 insertions(+), 134 deletions(-)
diff --git a/src/changes/changes.xml b/src/changes/changes.xml
index 4500183e..5d97f7e4 100644
--- a/src/changes/changes.xml
+++ b/src/changes/changes.xml
@@ -471,6 +471,9 @@ The <action> type attribute can be add,update,fix,remove.
<action dev="ggregory" type="add" due-to="Gary Gregory">
Add builders and avoid creating more constructors for all permutations of current options.
</action>
+ <action dev="ggregory" type="add" due-to="Gary Gregory, maxxedev">
+ Add PollingQueueInputStream and TakingQueueInputStream to replace QueueInputStream. See also #447.
+ </action>
<!-- UPDATE -->
<action dev="kinow" type="update" due-to="Dependabot, Gary Gregory">
Bump actions/cache from 2.1.6 to 3.0.10 #307, #337, #393.
diff --git a/src/main/java/org/apache/commons/io/input/AbstractBlockingQueueInputStream.java b/src/main/java/org/apache/commons/io/input/AbstractBlockingQueueInputStream.java
new file mode 100644
index 00000000..6ba9a64d
--- /dev/null
+++ b/src/main/java/org/apache/commons/io/input/AbstractBlockingQueueInputStream.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.io.input;
+
+import static org.apache.commons.io.IOUtils.EOF;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.io.output.QueueOutputStream;
+
+/**
+ * Alternative to {@link java.io.PipedInputStream}, where this queue input stream provides what's written in a queue output stream.
+ * <p>
+ * Example usage, see {@link PollingQueueInputStream} and {@link TakingQueueInputStream}.
+ * </p>
+ *
+ * @see PollingQueueInputStream
+ * @see TakingQueueInputStream
+ * @since 2.12.0
+ */
+public abstract class AbstractBlockingQueueInputStream extends InputStream {
+
+ /**
+ * Simple alternative to JDK {@link java.io.PipedInputStream}; queue input stream provides what's written in queue output stream.
+ *
+ * <p>
+ * Example usage:
+ * </p>
+ *
+ * <pre>
+ * BlockingQueueInputStream inputStream = new BlockingQueueInputStream();
+ * QueueOutputStream outputStream = inputStream.newQueueOutputStream();
+ *
+ * outputStream.write("hello world".getBytes(UTF_8));
+ * inputStream.read();
+ * </pre>
+ * <p>
+ * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be used safely in a single thread or multiple threads.
+ * Also, unlike JDK classes, no special meaning is attached to initial or current thread. Instances can be used longer after initial threads exited.
+ * </p>
+ * <p>
+ * Closing a {@link PollingQueueInputStream} has no effect. The methods in this class can be called after the stream has been closed without generating an
+ * {@link IOException}.
+ * </p>
+ *
+ * @see QueueOutputStream
+ * @since 2.12.0
+ */
+ public static class PollingQueueInputStream extends AbstractBlockingQueueInputStream {
+
+ /**
+ * Constructs a new instance.
+ */
+ public PollingQueueInputStream() {
+ }
+
+ /**
+ * Constructs a new instance with given queue.
+ *
+ * @param blockingQueue backing queue for the stream.
+ */
+ public PollingQueueInputStream(final BlockingQueue<Integer> blockingQueue) {
+ super(blockingQueue);
+ }
+
+ @Override
+ public int read() throws IOException {
+ final Integer value = getBlockingQueue().poll();
+ return value == null ? EOF : 0xFF & value;
+ }
+ }
+
+ /**
+ * Simple alternative to JDK {@link java.io.PipedInputStream}; queue input stream provides what's written in queue output stream.
+ *
+ * <p>
+ * Example usage:
+ * </p>
+ *
+ * <pre>
+ * TakingQueueInputStream inputStream = new TakingQueueInputStream();
+ * QueueOutputStream outputStream = inputStream.newQueueOutputStream();
+ *
+ * outputStream.write("hello world".getBytes(UTF_8));
+ * inputStream.read();
+ * </pre>
+ * <p>
+ * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be used safely in a single thread or multiple threads.
+ * Also, unlike JDK classes, no special meaning is attached to initial or current thread. Instances can be used longer after initial threads exited.
+ * </p>
+ * <p>
+ * Closing a {@link TakingQueueInputStream} has no effect. The methods in this class can be called after the stream has been closed without generating an
+ * {@link IOException}.
+ * </p>
+ *
+ * @see QueueOutputStream
+ * @since 2.12.0
+ */
+ public static class TakingQueueInputStream extends AbstractBlockingQueueInputStream {
+
+ /**
+ * Constructs a new instance.
+ */
+ public TakingQueueInputStream() {
+ }
+
+ /**
+ * Constructs a new instance with given queue.
+ *
+ * @param blockingQueue backing queue for the stream.
+ */
+ public TakingQueueInputStream(final BlockingQueue<Integer> blockingQueue) {
+ super(blockingQueue);
+ }
+
+ /**
+ * Reads and returns a single byte.
+ *
+ * @return either the byte read or {@code -1} if the end of the stream has been reached
+ * @throws InterruptedIOException if interrupted while waiting to take.
+ */
+ @Override
+ public int read() throws InterruptedIOException {
+ try {
+ return getBlockingQueue().take();
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ final InterruptedIOException ioException = new InterruptedIOException();
+ ioException.initCause(e);
+ throw ioException;
+ }
+ }
+ }
+
+ private final BlockingQueue<Integer> blockingQueue;
+
+ /**
+ * Constructs a new instance with no limit to its internal buffer size.
+ */
+ protected AbstractBlockingQueueInputStream() {
+ this(new LinkedBlockingQueue<>());
+ }
+
+ /**
+ * Constructs a new instance with given buffer
+ *
+ * @param blockingQueue backing queue for the stream
+ */
+ protected AbstractBlockingQueueInputStream(final BlockingQueue<Integer> blockingQueue) {
+ this.blockingQueue = Objects.requireNonNull(blockingQueue, "blockingQueue");
+ }
+
+ /**
+ * Gets the underlying BlockingQueue.
+ *
+ * @return the underlying BlockingQueue.
+ */
+ protected BlockingQueue<Integer> getBlockingQueue() {
+ return blockingQueue;
+ }
+
+ /**
+ * Creates a new QueueOutputStream instance connected to this. Writes to the output stream will be visible to this input stream.
+ *
+ * @return QueueOutputStream connected to this stream
+ */
+ public QueueOutputStream newQueueOutputStream() {
+ return new QueueOutputStream(blockingQueue);
+ }
+
+}
diff --git a/src/main/java/org/apache/commons/io/input/QueueInputStream.java b/src/main/java/org/apache/commons/io/input/QueueInputStream.java
index 48cdadda..97be7a33 100644
--- a/src/main/java/org/apache/commons/io/input/QueueInputStream.java
+++ b/src/main/java/org/apache/commons/io/input/QueueInputStream.java
@@ -19,10 +19,8 @@ package org.apache.commons.io.input;
import static org.apache.commons.io.IOUtils.EOF;
import java.io.IOException;
-import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
-import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -54,10 +52,10 @@ import org.apache.commons.io.output.QueueOutputStream;
*
* @see QueueOutputStream
* @since 2.9.0
+ * @deprecated Use {@link AbstractBlockingQueueInputStream.PollingQueueInputStream}
*/
-public class QueueInputStream extends InputStream {
-
- private final BlockingQueue<Integer> blockingQueue;
+@Deprecated
+public class QueueInputStream extends AbstractBlockingQueueInputStream {
/**
* Constructs a new instance with no limit to its internal buffer size.
@@ -67,22 +65,12 @@ public class QueueInputStream extends InputStream {
}
/**
- * Constructs a new instance with given buffer
+ * Constructs a new instance with given queue.
*
- * @param blockingQueue backing queue for the stream
+ * @param blockingQueue backing queue for the stream.
*/
public QueueInputStream(final BlockingQueue<Integer> blockingQueue) {
- this.blockingQueue = Objects.requireNonNull(blockingQueue, "blockingQueue");
- }
-
- /**
- * Creates a new QueueOutputStream instance connected to this. Writes to the output stream will be visible to this
- * input stream.
- *
- * @return QueueOutputStream connected to this stream
- */
- public QueueOutputStream newQueueOutputStream() {
- return new QueueOutputStream(blockingQueue);
+ super(blockingQueue);
}
/**
@@ -92,7 +80,7 @@ public class QueueInputStream extends InputStream {
*/
@Override
public int read() {
- final Integer value = blockingQueue.poll();
+ final Integer value = getBlockingQueue().poll();
return value == null ? EOF : 0xFF & value;
}
diff --git a/src/test/java/org/apache/commons/io/input/QueueInputStreamTest.java b/src/test/java/org/apache/commons/io/input/AbstractBlockingQueueInputStreamTest.java
similarity index 84%
copy from src/test/java/org/apache/commons/io/input/QueueInputStreamTest.java
copy to src/test/java/org/apache/commons/io/input/AbstractBlockingQueueInputStreamTest.java
index 2902bb97..31de3671 100644
--- a/src/test/java/org/apache/commons/io/input/QueueInputStreamTest.java
+++ b/src/test/java/org/apache/commons/io/input/AbstractBlockingQueueInputStreamTest.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.commons.io.input;
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -32,6 +33,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Stream;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.input.AbstractBlockingQueueInputStream.PollingQueueInputStream;
import org.apache.commons.io.output.QueueOutputStream;
import org.apache.commons.io.output.QueueOutputStreamTest;
import org.apache.commons.lang3.StringUtils;
@@ -41,11 +43,11 @@ import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
/**
- * Test {@link QueueInputStream}.
+ * Test {@link PollingQueueInputStream}.
*
- * @see {@link QueueOutputStreamTest}
+ * @see QueueOutputStreamTest
*/
-public class QueueInputStreamTest {
+public abstract class AbstractBlockingQueueInputStreamTest {
public static Stream<Arguments> inputData() {
return Stream.of(Arguments.of(""),
@@ -66,7 +68,7 @@ public class QueueInputStreamTest {
@MethodSource("inputData")
public void bufferedReads(final String inputData) throws IOException {
final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
- try (BufferedInputStream inputStream = new BufferedInputStream(new QueueInputStream(queue));
+ try (BufferedInputStream inputStream = new BufferedInputStream(newQueueInputStream(queue));
final QueueOutputStream outputStream = new QueueOutputStream(queue)) {
outputStream.write(inputData.getBytes(UTF_8));
final String actualData = IOUtils.toString(inputStream, UTF_8);
@@ -78,7 +80,7 @@ public class QueueInputStreamTest {
@MethodSource("inputData")
public void bufferedReadWrite(final String inputData) throws IOException {
final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
- try (BufferedInputStream inputStream = new BufferedInputStream(new QueueInputStream(queue));
+ try (BufferedInputStream inputStream = new BufferedInputStream(newQueueInputStream(queue));
final BufferedOutputStream outputStream = new BufferedOutputStream(new QueueOutputStream(queue), defaultBufferSize())) {
outputStream.write(inputData.getBytes(UTF_8));
outputStream.flush();
@@ -91,7 +93,7 @@ public class QueueInputStreamTest {
@MethodSource("inputData")
public void bufferedWrites(final String inputData) throws IOException {
final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
- try (QueueInputStream inputStream = new QueueInputStream(queue);
+ try (AbstractBlockingQueueInputStream inputStream = newQueueInputStream(queue);
final BufferedOutputStream outputStream = new BufferedOutputStream(new QueueOutputStream(queue), defaultBufferSize())) {
outputStream.write(inputData.getBytes(UTF_8));
outputStream.flush();
@@ -104,6 +106,14 @@ public class QueueInputStreamTest {
return 8192;
}
+ protected abstract AbstractBlockingQueueInputStream newQueueInputStream();
+
+ protected abstract AbstractBlockingQueueInputStream newQueueInputStream(final BlockingQueue<Integer> queue);
+
+ protected QueueOutputStream newQueueOutputStream(AbstractBlockingQueueInputStream inputStream) {
+ return inputStream.newQueueOutputStream();
+ }
+
private String readUnbuffered(final InputStream inputStream) throws IOException {
final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
int n = -1;
@@ -115,14 +125,14 @@ public class QueueInputStreamTest {
@Test
public void testNullArgument() {
- assertThrows(NullPointerException.class, () -> new QueueInputStream(null), "queue is required");
+ assertThrows(NullPointerException.class, () -> newQueueInputStream(null), "queue is required");
}
@ParameterizedTest(name = "inputData={0}")
@MethodSource("inputData")
public void unbufferedReadWrite(final String inputData) throws IOException {
- try (QueueInputStream inputStream = new QueueInputStream();
- final QueueOutputStream outputStream = inputStream.newQueueOutputStream()) {
+ try (AbstractBlockingQueueInputStream inputStream = newQueueInputStream();
+ final QueueOutputStream outputStream = newQueueOutputStream(inputStream)) {
writeUnbuffered(outputStream, inputData);
final String actualData = readUnbuffered(inputStream);
assertEquals(inputData, actualData);
diff --git a/src/test/java/org/apache/commons/io/input/PollingQueueInputStreamTest.java b/src/test/java/org/apache/commons/io/input/PollingQueueInputStreamTest.java
new file mode 100644
index 00000000..303a55ea
--- /dev/null
+++ b/src/test/java/org/apache/commons/io/input/PollingQueueInputStreamTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.io.input;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.commons.io.input.AbstractBlockingQueueInputStream.PollingQueueInputStream;
+
+/**
+ * Tests {@link PollingQueueInputStream}.
+ */
+public class PollingQueueInputStreamTest extends AbstractBlockingQueueInputStreamTest {
+
+ @Override
+ protected AbstractBlockingQueueInputStream newQueueInputStream() {
+ return new PollingQueueInputStream();
+ }
+
+ @Override
+ protected AbstractBlockingQueueInputStream newQueueInputStream(final BlockingQueue<Integer> queue) {
+ return new PollingQueueInputStream(queue);
+ }
+
+}
diff --git a/src/test/java/org/apache/commons/io/input/QueueInputStreamTest.java b/src/test/java/org/apache/commons/io/input/QueueInputStreamTest.java
index 2902bb97..97c234a5 100644
--- a/src/test/java/org/apache/commons/io/input/QueueInputStreamTest.java
+++ b/src/test/java/org/apache/commons/io/input/QueueInputStreamTest.java
@@ -16,123 +16,24 @@
*/
package org.apache.commons.io.input;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InterruptedIOException;
-import java.nio.charset.StandardCharsets;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.stream.Stream;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.io.output.QueueOutputStream;
import org.apache.commons.io.output.QueueOutputStreamTest;
-import org.apache.commons.lang3.StringUtils;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
/**
* Test {@link QueueInputStream}.
*
* @see {@link QueueOutputStreamTest}
*/
-public class QueueInputStreamTest {
-
- public static Stream<Arguments> inputData() {
- return Stream.of(Arguments.of(""),
- Arguments.of("1"),
- Arguments.of("12"),
- Arguments.of("1234"),
- Arguments.of("12345678"),
- Arguments.of(StringUtils.repeat("A", 4095)),
- Arguments.of(StringUtils.repeat("A", 4096)),
- Arguments.of(StringUtils.repeat("A", 4097)),
- Arguments.of(StringUtils.repeat("A", 8191)),
- Arguments.of(StringUtils.repeat("A", 8192)),
- Arguments.of(StringUtils.repeat("A", 8193)),
- Arguments.of(StringUtils.repeat("A", 8192 * 4)));
- }
-
- @ParameterizedTest(name = "inputData={0}")
- @MethodSource("inputData")
- public void bufferedReads(final String inputData) throws IOException {
- final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
- try (BufferedInputStream inputStream = new BufferedInputStream(new QueueInputStream(queue));
- final QueueOutputStream outputStream = new QueueOutputStream(queue)) {
- outputStream.write(inputData.getBytes(UTF_8));
- final String actualData = IOUtils.toString(inputStream, UTF_8);
- assertEquals(inputData, actualData);
- }
- }
-
- @ParameterizedTest(name = "inputData={0}")
- @MethodSource("inputData")
- public void bufferedReadWrite(final String inputData) throws IOException {
- final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
- try (BufferedInputStream inputStream = new BufferedInputStream(new QueueInputStream(queue));
- final BufferedOutputStream outputStream = new BufferedOutputStream(new QueueOutputStream(queue), defaultBufferSize())) {
- outputStream.write(inputData.getBytes(UTF_8));
- outputStream.flush();
- final String dataCopy = IOUtils.toString(inputStream, UTF_8);
- assertEquals(inputData, dataCopy);
- }
- }
-
- @ParameterizedTest(name = "inputData={0}")
- @MethodSource("inputData")
- public void bufferedWrites(final String inputData) throws IOException {
- final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
- try (QueueInputStream inputStream = new QueueInputStream(queue);
- final BufferedOutputStream outputStream = new BufferedOutputStream(new QueueOutputStream(queue), defaultBufferSize())) {
- outputStream.write(inputData.getBytes(UTF_8));
- outputStream.flush();
- final String actualData = readUnbuffered(inputStream);
- assertEquals(inputData, actualData);
- }
- }
-
- private int defaultBufferSize() {
- return 8192;
- }
-
- private String readUnbuffered(final InputStream inputStream) throws IOException {
- final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- int n = -1;
- while ((n = inputStream.read()) != -1) {
- byteArrayOutputStream.write(n);
- }
- return byteArrayOutputStream.toString(StandardCharsets.UTF_8.name());
- }
-
- @Test
- public void testNullArgument() {
- assertThrows(NullPointerException.class, () -> new QueueInputStream(null), "queue is required");
- }
+public class QueueInputStreamTest extends AbstractBlockingQueueInputStreamTest {
- @ParameterizedTest(name = "inputData={0}")
- @MethodSource("inputData")
- public void unbufferedReadWrite(final String inputData) throws IOException {
- try (QueueInputStream inputStream = new QueueInputStream();
- final QueueOutputStream outputStream = inputStream.newQueueOutputStream()) {
- writeUnbuffered(outputStream, inputData);
- final String actualData = readUnbuffered(inputStream);
- assertEquals(inputData, actualData);
- }
+ @Override
+ protected AbstractBlockingQueueInputStream newQueueInputStream() {
+ return new QueueInputStream();
}
- private void writeUnbuffered(final QueueOutputStream outputStream, final String inputData) throws InterruptedIOException {
- final byte[] bytes = inputData.getBytes(UTF_8);
- for (final byte oneByte : bytes) {
- outputStream.write(oneByte);
- }
+ @Override
+ protected AbstractBlockingQueueInputStream newQueueInputStream(final BlockingQueue<Integer> queue) {
+ return new QueueInputStream(queue);
}
}