You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@commons.apache.org by gg...@apache.org on 2020/12/10 19:25:20 UTC

[commons-io] branch master updated: Add QueueInput/OutputStream as simpler alternatives to PipedInput/OutputStream (#171)

This is an automated email from the ASF dual-hosted git repository.

ggregory pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/commons-io.git


The following commit(s) were added to refs/heads/master by this push:
     new d9237fc  Add QueueInput/OutputStream as simpler alternatives to PipedInput/OutputStream (#171)
d9237fc is described below

commit d9237fcadb398f514beddf35c8933a9fc5925a5d
Author: maxxedev <50...@users.noreply.github.com>
AuthorDate: Thu Dec 10 11:23:52 2020 -0800

    Add QueueInput/OutputStream as simpler alternatives to PipedInput/OutputStream (#171)
    
    * Add QueueInput/OutputStream as simpler alternatives to PipedInput/OutputStream.
    
    PipedInput/OutputStream in JDK can have surprisingly complex behavior with respect
    to how threads need to be arranged. QueueInput/OutputStream are much simpler
    alternatives that is easier to use correctly.
    
    * add more tests for edge cases
    
    * remove static imports on Objects::requireNonNull
    
    * improve documentation and exception handling
---
 .../apache/commons/io/input/QueueInputStream.java  |  90 ++++++++++++++
 .../commons/io/output/QueueOutputStream.java       |  98 +++++++++++++++
 .../commons/io/input/QueueInputStreamTest.java     | 137 +++++++++++++++++++++
 .../commons/io/output/QueueOutputStreamTest.java   | 122 ++++++++++++++++++
 4 files changed, 447 insertions(+)

diff --git a/src/main/java/org/apache/commons/io/input/QueueInputStream.java b/src/main/java/org/apache/commons/io/input/QueueInputStream.java
new file mode 100644
index 0000000..4f3ff3a
--- /dev/null
+++ b/src/main/java/org/apache/commons/io/input/QueueInputStream.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.io.input;
+
+import org.apache.commons.io.output.QueueOutputStream;
+
+import java.io.InputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Simple alternative to JDK {@link java.io.PipedInputStream}; queue input stream provides what's written in queue output stream.
+ * 
+ * Example usage:
+ * <pre>
+ * QueueInputStream inputStream = new QueueInputStream();
+ * QueueOutputStream outputStream = inputStream.newQueueOutputStream();
+ * 
+ * outputStream.write("hello world".getBytes(UTF_8));
+ * inputStream.read();
+ * </pre>
+ * 
+ * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be
+ * used safely in a single thread or multiple threads. Also, unlike JDK classes, no special meaning is
+ * attached to initial or current thread. Instances can be used longer after initial threads exited.
+ * 
+ * Closing a {@code QueueInputStream} has no effect. The methods in this class can be called after
+ * the stream has been closed without generating an {@code IOException}.
+ * 
+ * @see QueueOutputStream
+ * @since 2.9.0
+ */
+public class QueueInputStream extends InputStream {
+
+    private final BlockingQueue<Integer> queue;
+
+    /**
+     * Constructs a QueueInputStream with no limit to internal buffer size
+     */
+    public QueueInputStream() {
+        this(new LinkedBlockingQueue<>());
+    }
+
+    /**
+     * Constructs a QueueInputStream with given buffer
+     * 
+     * @param queue backing queue for the stream
+     */
+    public QueueInputStream(final BlockingQueue<Integer> queue) {
+        this.queue = Objects.requireNonNull(queue, "queue is required");
+    }
+
+    /**
+     * Creates a new QueueOutputStream instance connected to this. Writes to the output stream will be visible to this input stream.
+     * 
+     * @return QueueOutputStream connected to this stream
+     */
+    public QueueOutputStream newQueueOutputStream() {
+        return new QueueOutputStream(queue);
+    }
+
+    /**
+     * Reads a single byte.
+     *
+     * @return either the byte read or {@code -1} if the end of the stream has been reached
+     */
+    @Override
+    public int read() {
+        final Integer value = queue.poll();
+        return value == null ? -1 : ((0xFF) & value);
+    }
+
+}
diff --git a/src/main/java/org/apache/commons/io/output/QueueOutputStream.java b/src/main/java/org/apache/commons/io/output/QueueOutputStream.java
new file mode 100644
index 0000000..28b3c7d
--- /dev/null
+++ b/src/main/java/org/apache/commons/io/output/QueueOutputStream.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.io.output;
+
+import org.apache.commons.io.input.QueueInputStream;
+
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Simple alternative to JDK {@link java.io.PipedOutputStream}; queue input stream provides what's
+ * written in queue output stream.
+ * 
+ * Example usage:
+ * <pre>
+ * QueueOutputStream outputStream = new QueueOutputStream();
+ * QueueInputStream inputStream = outputStream.newPipeInputStream();
+ * 
+ * outputStream.write("hello world".getBytes(UTF_8));
+ * inputStream.read();
+ * </pre>
+ * 
+ * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be
+ * used safely in a single thread or multiple threads. Also, unlike JDK classes, no special meaning is
+ * attached to initial or current thread. Instances can be used longer after initial threads exited.
+ * 
+ * Closing a {@code QueueOutputStream} has no effect. The methods in this class can be called after
+ * the stream has been closed without generating an {@code IOException}.
+ * 
+ * @see QueueInputStream
+ * @since 2.9.0
+ */
+public class QueueOutputStream extends OutputStream {
+
+    private final BlockingQueue<Integer> queue;
+
+    /**
+     * Constructs a QueueOutputStream with no limit to internal buffer size
+     */
+    public QueueOutputStream() {
+        this(new LinkedBlockingQueue<>());
+    }
+
+    /**
+     * Constructs a QueueOutputStream with given buffer
+     * 
+     * @param queue backing queue for the stream
+     */
+    public QueueOutputStream(final BlockingQueue<Integer> queue) {
+        this.queue = Objects.requireNonNull(queue, "queue is required");
+    }
+
+    /**
+     * Creates a new QueueInputStream instance connected to this. Writes to this output stream will be visible to the input stream.
+     * 
+     * @return QueueInputStream connected to this stream
+     */
+    public QueueInputStream newQueueInputStream() {
+        return new QueueInputStream(queue);
+    }
+
+    /**
+     * Writes a single byte.
+     *
+     * @throws InterruptedIOException if the thread is interrupted while writing to the queue.
+     */
+    @Override
+    public void write(final int b) throws InterruptedIOException {
+        try {
+            queue.put(0xFF & b);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            final InterruptedIOException interruptedIoException = new InterruptedIOException();
+            interruptedIoException.initCause(e);
+            throw interruptedIoException;
+        }
+    }
+}
+
diff --git a/src/test/java/org/apache/commons/io/input/QueueInputStreamTest.java b/src/test/java/org/apache/commons/io/input/QueueInputStreamTest.java
new file mode 100644
index 0000000..774b985
--- /dev/null
+++ b/src/test/java/org/apache/commons/io/input/QueueInputStreamTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.io.input;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.output.QueueOutputStream;
+import org.apache.commons.io.output.QueueOutputStreamTest;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Stream;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Test {@link QueueInputStream}.
+ * 
+ * @see {@link QueueOutputStreamTest}
+ */
+public class QueueInputStreamTest {
+
+    public static Stream<Arguments> inputData() {
+        return Stream.of(Arguments.of(""),
+                Arguments.of("1"),
+                Arguments.of("12"),
+                Arguments.of("1234"),
+                Arguments.of("12345678"),
+                Arguments.of(StringUtils.repeat("A", 4095)),
+                Arguments.of(StringUtils.repeat("A", 4096)),
+                Arguments.of(StringUtils.repeat("A", 4097)),
+                Arguments.of(StringUtils.repeat("A", 8191)),
+                Arguments.of(StringUtils.repeat("A", 8192)),
+                Arguments.of(StringUtils.repeat("A", 8193)),
+                Arguments.of(StringUtils.repeat("A", 8192 * 4)));
+    }
+
+    @ParameterizedTest(name = "inputData={0}")
+    @MethodSource("inputData")
+    public void unbufferedReadWrite(final String inputData) throws IOException {
+        try (final QueueInputStream inputStream = new QueueInputStream();
+                final QueueOutputStream outputStream = inputStream.newQueueOutputStream()) {
+            writeUnbuffered(outputStream, inputData);
+            final String actualData = readUnbuffered(inputStream);
+            assertEquals(inputData, actualData);
+        }
+    }
+
+    @ParameterizedTest(name = "inputData={0}")
+    @MethodSource("inputData")
+    public void bufferedReads(final String inputData) throws IOException {
+        final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
+        try (final BufferedInputStream inputStream = new BufferedInputStream(new QueueInputStream(queue));
+                final QueueOutputStream outputStream = new QueueOutputStream(queue)) {
+            outputStream.write(inputData.getBytes(UTF_8));
+            final String actualData = IOUtils.toString(inputStream, UTF_8);
+            assertEquals(inputData, actualData);
+        }
+    }
+
+    @ParameterizedTest(name = "inputData={0}")
+    @MethodSource("inputData")
+    public void bufferedWrites(final String inputData) throws IOException {
+        final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
+        try (final QueueInputStream inputStream = new QueueInputStream(queue);
+                final BufferedOutputStream outputStream = new BufferedOutputStream(new QueueOutputStream(queue), defaultBufferSize())) {
+            outputStream.write(inputData.getBytes(UTF_8));
+            outputStream.flush();
+            final String actualData = readUnbuffered(inputStream);
+            assertEquals(inputData, actualData);
+        }
+    }
+
+    @ParameterizedTest(name = "inputData={0}")
+    @MethodSource("inputData")
+    public void bufferedReadWrite(final String inputData) throws IOException {
+        final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
+        try (final BufferedInputStream inputStream = new BufferedInputStream(new QueueInputStream(queue));
+                final BufferedOutputStream outputStream = new BufferedOutputStream(new QueueOutputStream(queue), defaultBufferSize())) {
+            outputStream.write(inputData.getBytes(UTF_8));
+            outputStream.flush();
+            final String dataCopy = IOUtils.toString(inputStream, UTF_8);
+            assertEquals(inputData, dataCopy);
+        }
+    }
+
+    @Test
+    public void testNullArgument() {
+        assertThrows(NullPointerException.class, () -> new QueueInputStream(null), "queue is required");
+    }
+
+    private int defaultBufferSize() {
+        return 8192;
+    }
+
+    private void writeUnbuffered(final QueueOutputStream outputStream, final String inputData) throws InterruptedIOException {
+        final byte[] bytes = inputData.getBytes(UTF_8);
+        for (byte oneByte : bytes) {
+            outputStream.write(oneByte);
+        }
+    }
+
+    private String readUnbuffered(final InputStream inputStream) throws IOException {
+        final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+        int n = -1;
+        while ((n = inputStream.read()) != -1) {
+            byteArrayOutputStream.write(n);
+        }
+        return byteArrayOutputStream.toString("UTF-8");
+    }
+}
diff --git a/src/test/java/org/apache/commons/io/output/QueueOutputStreamTest.java b/src/test/java/org/apache/commons/io/output/QueueOutputStreamTest.java
new file mode 100644
index 0000000..28a726c
--- /dev/null
+++ b/src/test/java/org/apache/commons/io/output/QueueOutputStreamTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.io.output;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.input.QueueInputStream;
+import org.apache.commons.io.input.QueueInputStreamTest;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Test;
+
+import java.io.InterruptedIOException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Exchanger;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Test {@link QueueOutputStream} and {@link QueueInputStream}
+ * 
+ * @see QueueInputStreamTest
+ */
+public class QueueOutputStreamTest {
+
+    private static final ExecutorService executorService = Executors.newFixedThreadPool(5);
+
+    @AfterAll
+    public static void afterAll() {
+        executorService.shutdown();
+    }
+
+    @Test
+    public void writeString() throws Exception {
+        try (final QueueOutputStream outputStream = new QueueOutputStream();
+                final QueueInputStream inputStream = outputStream.newQueueInputStream()) {
+            outputStream.write("ABC".getBytes(UTF_8));
+            final String value = IOUtils.toString(inputStream, UTF_8);
+            assertEquals("ABC", value);
+        }
+    }
+
+    @Test
+    public void writeStringMultiThread() throws Exception {
+        try (final QueueOutputStream outputStream = callInThrowAwayThread(QueueOutputStream::new);
+                final QueueInputStream inputStream = callInThrowAwayThread(outputStream::newQueueInputStream)) {
+            callInThrowAwayThread(() -> {
+                outputStream.write("ABC".getBytes(UTF_8));
+                return null;
+            });
+
+            final String value = callInThrowAwayThread(() -> IOUtils.toString(inputStream, UTF_8));
+            assertEquals("ABC", value);
+        }
+    }
+
+    @Test
+    public void writeInterrupted() throws Exception {
+        try (final QueueOutputStream outputStream = new QueueOutputStream(new LinkedBlockingQueue<>(1));
+                final QueueInputStream inputStream = outputStream.newQueueInputStream()) {
+
+            final int timeout = 1;
+            final Exchanger<Thread> writerThreadExchanger = new Exchanger<>();
+            final Exchanger<Exception> exceptionExchanger = new Exchanger<>();
+            executorService.submit(() -> {
+                final Thread writerThread = writerThreadExchanger.exchange(null, timeout, SECONDS);
+                writerThread.interrupt();
+                return null;
+            });
+
+            executorService.submit(() -> {
+                try {
+                    writerThreadExchanger.exchange(Thread.currentThread(), timeout, SECONDS);
+                    outputStream.write("ABC".getBytes(StandardCharsets.UTF_8));
+                } catch (Exception e) {
+                    Thread.interrupted(); //clear interrupt
+                    exceptionExchanger.exchange(e, timeout, SECONDS);
+                }
+                return null;
+            });
+
+            final Exception exception = exceptionExchanger.exchange(null, timeout, SECONDS);
+            assertNotNull(exception);
+            assertEquals(exception.getClass(), InterruptedIOException.class);
+        }
+    }
+
+    @Test
+    public void testNullArgument() {
+        assertThrows(NullPointerException.class, () -> new QueueOutputStream(null), "queue is required");
+    }
+
+    private static <T> T callInThrowAwayThread(final Callable<T> callable) throws Exception {
+        final Exchanger<T> exchanger = new Exchanger<>();
+        executorService.submit(() -> {
+            final T value = callable.call();
+            exchanger.exchange(value);
+            return null;
+        });
+        return exchanger.exchange(null);
+    }
+}