You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@commons.apache.org by gg...@apache.org on 2020/12/10 19:25:20 UTC
[commons-io] branch master updated: Add QueueInput/OutputStream as
simpler alternatives to PipedInput/OutputStream (#171)
This is an automated email from the ASF dual-hosted git repository.
ggregory pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/commons-io.git
The following commit(s) were added to refs/heads/master by this push:
new d9237fc Add QueueInput/OutputStream as simpler alternatives to PipedInput/OutputStream (#171)
d9237fc is described below
commit d9237fcadb398f514beddf35c8933a9fc5925a5d
Author: maxxedev <50...@users.noreply.github.com>
AuthorDate: Thu Dec 10 11:23:52 2020 -0800
Add QueueInput/OutputStream as simpler alternatives to PipedInput/OutputStream (#171)
* Add QueueInput/OutputStream as simpler alternatives to PipedInput/OutputStream.
PipedInput/OutputStream in JDK can have surprisingly complex behavior with respect
to how threads need to be arranged. QueueInput/OutputStream are much simpler
alternatives that is easier to use correctly.
* add more tests for edge cases
* remove static imports on Objects::requireNonNull
* improve documentation and exception handling
---
.../apache/commons/io/input/QueueInputStream.java | 90 ++++++++++++++
.../commons/io/output/QueueOutputStream.java | 98 +++++++++++++++
.../commons/io/input/QueueInputStreamTest.java | 137 +++++++++++++++++++++
.../commons/io/output/QueueOutputStreamTest.java | 122 ++++++++++++++++++
4 files changed, 447 insertions(+)
diff --git a/src/main/java/org/apache/commons/io/input/QueueInputStream.java b/src/main/java/org/apache/commons/io/input/QueueInputStream.java
new file mode 100644
index 0000000..4f3ff3a
--- /dev/null
+++ b/src/main/java/org/apache/commons/io/input/QueueInputStream.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.io.input;
+
+import org.apache.commons.io.output.QueueOutputStream;
+
+import java.io.InputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Simple alternative to JDK {@link java.io.PipedInputStream}; queue input stream provides what's written in queue output stream.
+ *
+ * Example usage:
+ * <pre>
+ * QueueInputStream inputStream = new QueueInputStream();
+ * QueueOutputStream outputStream = inputStream.newQueueOutputStream();
+ *
+ * outputStream.write("hello world".getBytes(UTF_8));
+ * inputStream.read();
+ * </pre>
+ *
+ * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be
+ * used safely in a single thread or multiple threads. Also, unlike JDK classes, no special meaning is
+ * attached to initial or current thread. Instances can be used longer after initial threads exited.
+ *
+ * Closing a {@code QueueInputStream} has no effect. The methods in this class can be called after
+ * the stream has been closed without generating an {@code IOException}.
+ *
+ * @see QueueOutputStream
+ * @since 2.9.0
+ */
+public class QueueInputStream extends InputStream {
+
+ private final BlockingQueue<Integer> queue;
+
+ /**
+ * Constructs a QueueInputStream with no limit to internal buffer size
+ */
+ public QueueInputStream() {
+ this(new LinkedBlockingQueue<>());
+ }
+
+ /**
+ * Constructs a QueueInputStream with given buffer
+ *
+ * @param queue backing queue for the stream
+ */
+ public QueueInputStream(final BlockingQueue<Integer> queue) {
+ this.queue = Objects.requireNonNull(queue, "queue is required");
+ }
+
+ /**
+ * Creates a new QueueOutputStream instance connected to this. Writes to the output stream will be visible to this input stream.
+ *
+ * @return QueueOutputStream connected to this stream
+ */
+ public QueueOutputStream newQueueOutputStream() {
+ return new QueueOutputStream(queue);
+ }
+
+ /**
+ * Reads a single byte.
+ *
+ * @return either the byte read or {@code -1} if the end of the stream has been reached
+ */
+ @Override
+ public int read() {
+ final Integer value = queue.poll();
+ return value == null ? -1 : ((0xFF) & value);
+ }
+
+}
diff --git a/src/main/java/org/apache/commons/io/output/QueueOutputStream.java b/src/main/java/org/apache/commons/io/output/QueueOutputStream.java
new file mode 100644
index 0000000..28b3c7d
--- /dev/null
+++ b/src/main/java/org/apache/commons/io/output/QueueOutputStream.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.io.output;
+
+import org.apache.commons.io.input.QueueInputStream;
+
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Simple alternative to JDK {@link java.io.PipedOutputStream}; queue input stream provides what's
+ * written in queue output stream.
+ *
+ * Example usage:
+ * <pre>
+ * QueueOutputStream outputStream = new QueueOutputStream();
+ * QueueInputStream inputStream = outputStream.newPipeInputStream();
+ *
+ * outputStream.write("hello world".getBytes(UTF_8));
+ * inputStream.read();
+ * </pre>
+ *
+ * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be
+ * used safely in a single thread or multiple threads. Also, unlike JDK classes, no special meaning is
+ * attached to initial or current thread. Instances can be used longer after initial threads exited.
+ *
+ * Closing a {@code QueueOutputStream} has no effect. The methods in this class can be called after
+ * the stream has been closed without generating an {@code IOException}.
+ *
+ * @see QueueInputStream
+ * @since 2.9.0
+ */
+public class QueueOutputStream extends OutputStream {
+
+ private final BlockingQueue<Integer> queue;
+
+ /**
+ * Constructs a QueueOutputStream with no limit to internal buffer size
+ */
+ public QueueOutputStream() {
+ this(new LinkedBlockingQueue<>());
+ }
+
+ /**
+ * Constructs a QueueOutputStream with given buffer
+ *
+ * @param queue backing queue for the stream
+ */
+ public QueueOutputStream(final BlockingQueue<Integer> queue) {
+ this.queue = Objects.requireNonNull(queue, "queue is required");
+ }
+
+ /**
+ * Creates a new QueueInputStream instance connected to this. Writes to this output stream will be visible to the input stream.
+ *
+ * @return QueueInputStream connected to this stream
+ */
+ public QueueInputStream newQueueInputStream() {
+ return new QueueInputStream(queue);
+ }
+
+ /**
+ * Writes a single byte.
+ *
+ * @throws InterruptedIOException if the thread is interrupted while writing to the queue.
+ */
+ @Override
+ public void write(final int b) throws InterruptedIOException {
+ try {
+ queue.put(0xFF & b);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ final InterruptedIOException interruptedIoException = new InterruptedIOException();
+ interruptedIoException.initCause(e);
+ throw interruptedIoException;
+ }
+ }
+}
+
diff --git a/src/test/java/org/apache/commons/io/input/QueueInputStreamTest.java b/src/test/java/org/apache/commons/io/input/QueueInputStreamTest.java
new file mode 100644
index 0000000..774b985
--- /dev/null
+++ b/src/test/java/org/apache/commons/io/input/QueueInputStreamTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.io.input;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.output.QueueOutputStream;
+import org.apache.commons.io.output.QueueOutputStreamTest;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Stream;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Test {@link QueueInputStream}.
+ *
+ * @see {@link QueueOutputStreamTest}
+ */
+public class QueueInputStreamTest {
+
+ public static Stream<Arguments> inputData() {
+ return Stream.of(Arguments.of(""),
+ Arguments.of("1"),
+ Arguments.of("12"),
+ Arguments.of("1234"),
+ Arguments.of("12345678"),
+ Arguments.of(StringUtils.repeat("A", 4095)),
+ Arguments.of(StringUtils.repeat("A", 4096)),
+ Arguments.of(StringUtils.repeat("A", 4097)),
+ Arguments.of(StringUtils.repeat("A", 8191)),
+ Arguments.of(StringUtils.repeat("A", 8192)),
+ Arguments.of(StringUtils.repeat("A", 8193)),
+ Arguments.of(StringUtils.repeat("A", 8192 * 4)));
+ }
+
+ @ParameterizedTest(name = "inputData={0}")
+ @MethodSource("inputData")
+ public void unbufferedReadWrite(final String inputData) throws IOException {
+ try (final QueueInputStream inputStream = new QueueInputStream();
+ final QueueOutputStream outputStream = inputStream.newQueueOutputStream()) {
+ writeUnbuffered(outputStream, inputData);
+ final String actualData = readUnbuffered(inputStream);
+ assertEquals(inputData, actualData);
+ }
+ }
+
+ @ParameterizedTest(name = "inputData={0}")
+ @MethodSource("inputData")
+ public void bufferedReads(final String inputData) throws IOException {
+ final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
+ try (final BufferedInputStream inputStream = new BufferedInputStream(new QueueInputStream(queue));
+ final QueueOutputStream outputStream = new QueueOutputStream(queue)) {
+ outputStream.write(inputData.getBytes(UTF_8));
+ final String actualData = IOUtils.toString(inputStream, UTF_8);
+ assertEquals(inputData, actualData);
+ }
+ }
+
+ @ParameterizedTest(name = "inputData={0}")
+ @MethodSource("inputData")
+ public void bufferedWrites(final String inputData) throws IOException {
+ final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
+ try (final QueueInputStream inputStream = new QueueInputStream(queue);
+ final BufferedOutputStream outputStream = new BufferedOutputStream(new QueueOutputStream(queue), defaultBufferSize())) {
+ outputStream.write(inputData.getBytes(UTF_8));
+ outputStream.flush();
+ final String actualData = readUnbuffered(inputStream);
+ assertEquals(inputData, actualData);
+ }
+ }
+
+ @ParameterizedTest(name = "inputData={0}")
+ @MethodSource("inputData")
+ public void bufferedReadWrite(final String inputData) throws IOException {
+ final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
+ try (final BufferedInputStream inputStream = new BufferedInputStream(new QueueInputStream(queue));
+ final BufferedOutputStream outputStream = new BufferedOutputStream(new QueueOutputStream(queue), defaultBufferSize())) {
+ outputStream.write(inputData.getBytes(UTF_8));
+ outputStream.flush();
+ final String dataCopy = IOUtils.toString(inputStream, UTF_8);
+ assertEquals(inputData, dataCopy);
+ }
+ }
+
+ @Test
+ public void testNullArgument() {
+ assertThrows(NullPointerException.class, () -> new QueueInputStream(null), "queue is required");
+ }
+
+ private int defaultBufferSize() {
+ return 8192;
+ }
+
+ private void writeUnbuffered(final QueueOutputStream outputStream, final String inputData) throws InterruptedIOException {
+ final byte[] bytes = inputData.getBytes(UTF_8);
+ for (byte oneByte : bytes) {
+ outputStream.write(oneByte);
+ }
+ }
+
+ private String readUnbuffered(final InputStream inputStream) throws IOException {
+ final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ int n = -1;
+ while ((n = inputStream.read()) != -1) {
+ byteArrayOutputStream.write(n);
+ }
+ return byteArrayOutputStream.toString("UTF-8");
+ }
+}
diff --git a/src/test/java/org/apache/commons/io/output/QueueOutputStreamTest.java b/src/test/java/org/apache/commons/io/output/QueueOutputStreamTest.java
new file mode 100644
index 0000000..28a726c
--- /dev/null
+++ b/src/test/java/org/apache/commons/io/output/QueueOutputStreamTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.io.output;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.input.QueueInputStream;
+import org.apache.commons.io.input.QueueInputStreamTest;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Test;
+
+import java.io.InterruptedIOException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Exchanger;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Test {@link QueueOutputStream} and {@link QueueInputStream}
+ *
+ * @see QueueInputStreamTest
+ */
+public class QueueOutputStreamTest {
+
+ private static final ExecutorService executorService = Executors.newFixedThreadPool(5);
+
+ @AfterAll
+ public static void afterAll() {
+ executorService.shutdown();
+ }
+
+ @Test
+ public void writeString() throws Exception {
+ try (final QueueOutputStream outputStream = new QueueOutputStream();
+ final QueueInputStream inputStream = outputStream.newQueueInputStream()) {
+ outputStream.write("ABC".getBytes(UTF_8));
+ final String value = IOUtils.toString(inputStream, UTF_8);
+ assertEquals("ABC", value);
+ }
+ }
+
+ @Test
+ public void writeStringMultiThread() throws Exception {
+ try (final QueueOutputStream outputStream = callInThrowAwayThread(QueueOutputStream::new);
+ final QueueInputStream inputStream = callInThrowAwayThread(outputStream::newQueueInputStream)) {
+ callInThrowAwayThread(() -> {
+ outputStream.write("ABC".getBytes(UTF_8));
+ return null;
+ });
+
+ final String value = callInThrowAwayThread(() -> IOUtils.toString(inputStream, UTF_8));
+ assertEquals("ABC", value);
+ }
+ }
+
+ @Test
+ public void writeInterrupted() throws Exception {
+ try (final QueueOutputStream outputStream = new QueueOutputStream(new LinkedBlockingQueue<>(1));
+ final QueueInputStream inputStream = outputStream.newQueueInputStream()) {
+
+ final int timeout = 1;
+ final Exchanger<Thread> writerThreadExchanger = new Exchanger<>();
+ final Exchanger<Exception> exceptionExchanger = new Exchanger<>();
+ executorService.submit(() -> {
+ final Thread writerThread = writerThreadExchanger.exchange(null, timeout, SECONDS);
+ writerThread.interrupt();
+ return null;
+ });
+
+ executorService.submit(() -> {
+ try {
+ writerThreadExchanger.exchange(Thread.currentThread(), timeout, SECONDS);
+ outputStream.write("ABC".getBytes(StandardCharsets.UTF_8));
+ } catch (Exception e) {
+ Thread.interrupted(); //clear interrupt
+ exceptionExchanger.exchange(e, timeout, SECONDS);
+ }
+ return null;
+ });
+
+ final Exception exception = exceptionExchanger.exchange(null, timeout, SECONDS);
+ assertNotNull(exception);
+ assertEquals(exception.getClass(), InterruptedIOException.class);
+ }
+ }
+
+ @Test
+ public void testNullArgument() {
+ assertThrows(NullPointerException.class, () -> new QueueOutputStream(null), "queue is required");
+ }
+
+ private static <T> T callInThrowAwayThread(final Callable<T> callable) throws Exception {
+ final Exchanger<T> exchanger = new Exchanger<>();
+ executorService.submit(() -> {
+ final T value = callable.call();
+ exchanger.exchange(value);
+ return null;
+ });
+ return exchanger.exchange(null);
+ }
+}