You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@commons.apache.org by gg...@apache.org on 2023/12/26 18:57:17 UTC

(commons-io) branch master updated: Add ThrottledInputStream

This is an automated email from the ASF dual-hosted git repository.

ggregory pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/commons-io.git


The following commit(s) were added to refs/heads/master by this push:
     new 703b2839 Add ThrottledInputStream
703b2839 is described below

commit 703b283970e375d7651e093e389989482f5bb8c9
Author: Gary Gregory <ga...@gmail.com>
AuthorDate: Tue Dec 26 13:54:13 2023 -0500

    Add ThrottledInputStream
---
 src/changes/changes.xml                            |   1 +
 .../commons/io/input/ThrottledInputStream.java     | 170 +++++++++++++++++++++
 .../commons/io/input/ProxyInputStreamTest.java     |  41 +++--
 .../commons/io/input/ThrottledInputStreamTest.java |  71 +++++++++
 4 files changed, 270 insertions(+), 13 deletions(-)

diff --git a/src/changes/changes.xml b/src/changes/changes.xml
index f9439d24..a4ed6e26 100644
--- a/src/changes/changes.xml
+++ b/src/changes/changes.xml
@@ -93,6 +93,7 @@ The <action> type attribute can be add,update,fix,remove.
       <action dev="ggregory" type="add"                due-to="Gary Gregory">Add FileTimes.toUnixTime(FileTime).</action>
       <action dev="ggregory" type="add"                due-to="Gary Gregory">Add BrokenInputStream.Builder.</action>
       <action dev="ggregory" type="add"                due-to="Gary Gregory">Add PathUtils.getExtension(Path).</action>
+      <action dev="ggregory" type="add"                due-to="Gary Gregory">Add ThrottledInputStream.</action>
       <!--  UPDATE -->
       <action dev="ggregory" type="fix"                due-to="Gary Gregory">Bump commons.bytebuddy.version from 1.14.10 to 1.14.11 #534.</action>
     </release>
diff --git a/src/main/java/org/apache/commons/io/input/ThrottledInputStream.java b/src/main/java/org/apache/commons/io/input/ThrottledInputStream.java
new file mode 100644
index 00000000..d94712a6
--- /dev/null
+++ b/src/main/java/org/apache/commons/io/input/ThrottledInputStream.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.io.input;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.build.AbstractStreamBuilder;
+
+/**
+ * Provides bandwidth throttling on a specified InputStream. It is implemented as a wrapper on top of another InputStream instance. The throttling works by
+ * examining the number of bytes read from the underlying InputStream from the beginning, and sleep()ing for a time interval if the byte-transfer is found
+ * exceed the specified tolerable maximum. (Thus, while the read-rate might exceed the maximum for a given short interval, the average tends towards the
+ * specified maximum, overall.)
+ * <p>
+ * Inspired by Apache HBase's class of the same name.
+ * </p>
+ *
+ * @since 2.16.0
+ */
+public final class ThrottledInputStream extends CountingInputStream {
+
+    /**
+     * Builds a new {@link QueueInputStream} instance.
+     * <h2>Using NIO</h2>
+     *
+     * <pre>{@code
+     * ThrottledInputStream s = ThrottledInputStream.builder().setPath(Paths.get("MyFile.xml")).setMaxBytesPerSecond(100_000).get();
+     * }
+     * </pre>
+     *
+     * <h2>Using IO</h2>
+     *
+     * <pre>{@code
+     * ThrottledInputStream s = ThrottledInputStream.builder().setFile(new File("MyFile.xml")).setMaxBytesPerSecond(100_000).get();
+     * }
+     * </pre>
+     *
+     * <pre>{@code
+     * ThrottledInputStream s = ThrottledInputStream.builder().setInputStream(inputStream).setMaxBytesPerSecond(100_000).get();
+     * }
+     * </pre>
+     */
+    public static class Builder extends AbstractStreamBuilder<ThrottledInputStream, Builder> {
+
+        /**
+         * Effectively not throttled.
+         */
+        private long maxBytesPerSecond = Long.MAX_VALUE;
+
+        @SuppressWarnings("resource")
+        @Override
+        public ThrottledInputStream get() throws IOException {
+            return new ThrottledInputStream(getInputStream(), maxBytesPerSecond);
+        }
+
+        /**
+         * Sets the maximum bytes per second.
+         *
+         * @param maxBytesPerSecond the maximum bytes per second.
+         */
+        public void setMaxBytesPerSecond(final long maxBytesPerSecond) {
+            this.maxBytesPerSecond = maxBytesPerSecond;
+        }
+
+    }
+
+    /**
+     * Constructs a new {@link Builder}.
+     *
+     * @return a new {@link Builder}.
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    static long toSleepMillis(final long bytesRead, final long maxBytesPerSec, final long elapsedMillis) {
+        assert elapsedMillis >= 0 : "The elapsed time should be greater or equal to zero";
+        if (bytesRead <= 0 || maxBytesPerSec <= 0 || elapsedMillis == 0) {
+            return 0;
+        }
+        // We use this class to load the single source file, so the bytesRead
+        // and maxBytesPerSec aren't greater than Double.MAX_VALUE.
+        // We can get the precise sleep time by using the double value.
+        final long millis = (long) ((double) bytesRead / (double) maxBytesPerSec * 1000 - elapsedMillis);
+        if (millis <= 0) {
+            return 0;
+        }
+        return millis;
+    }
+
+    private final long maxBytesPerSecond;
+    private final long startTime = System.currentTimeMillis();
+    private Duration totalSleepDuration = Duration.ZERO;
+
+    private ThrottledInputStream(final InputStream proxy, final long maxBytesPerSecond) {
+        super(proxy);
+        assert maxBytesPerSecond > 0 : "Bandwidth " + maxBytesPerSecond + " is invalid.";
+        this.maxBytesPerSecond = maxBytesPerSecond;
+    }
+
+    /**
+     * Gets the read-rate from this stream, since creation. Calculated as bytesRead/elapsedTimeSinceStart.
+     *
+     * @return Read rate, in bytes/sec.
+     */
+    public long getBytesPerSecond() {
+        final long elapsedSeconds = (System.currentTimeMillis() - startTime) / 1000;
+        if (elapsedSeconds == 0) {
+            return getByteCount();
+        }
+        return getByteCount() / elapsedSeconds;
+    }
+
+    private long getSleepMillis() {
+        return toSleepMillis(getByteCount(), maxBytesPerSecond, System.currentTimeMillis() - startTime);
+    }
+
+    /**
+     * Gets the total duration spent in sleep.
+     *
+     * @return Duration spent in sleep.
+     */
+    public Duration getTotalSleepDuration() {
+        return totalSleepDuration;
+    }
+
+    @Override
+    protected void beforeRead(final int n) throws IOException {
+        throttle();
+    }
+
+    private void throttle() throws InterruptedIOException {
+        final long sleepMillis = getSleepMillis();
+        if (sleepMillis > 0) {
+            totalSleepDuration = totalSleepDuration.plus(sleepMillis, ChronoUnit.MILLIS);
+            try {
+                TimeUnit.MILLISECONDS.sleep(sleepMillis);
+            } catch (final InterruptedException e) {
+                throw new InterruptedIOException("Thread aborted");
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String toString() {
+        return "ThrottledInputStream[bytesRead=" + getCount() + ", maxBytesPerSec=" + maxBytesPerSecond + ", bytesPerSec=" + getBytesPerSecond()
+                + ", totalSleepDuration=" + totalSleepDuration + ']';
+    }
+}
diff --git a/src/test/java/org/apache/commons/io/input/ProxyInputStreamTest.java b/src/test/java/org/apache/commons/io/input/ProxyInputStreamTest.java
index 9fd65b32..d14833be 100644
--- a/src/test/java/org/apache/commons/io/input/ProxyInputStreamTest.java
+++ b/src/test/java/org/apache/commons/io/input/ProxyInputStreamTest.java
@@ -29,8 +29,10 @@ import org.junit.jupiter.api.Test;
 
 /**
  * Tests {@link ProxyInputStream}.
+ *
+ * @param <T> The actual type tested.
  */
-public class ProxyInputStreamTest {
+public class ProxyInputStreamTest<T extends ProxyInputStream> {
 
     private static final class ProxyInputStreamFixture extends ProxyInputStream {
 
@@ -40,10 +42,22 @@ public class ProxyInputStreamTest {
 
     }
 
+    @SuppressWarnings({ "resource", "unused" }) // For subclasses
+    protected T createFixture() throws IOException {
+        return (T) new ProxyInputStreamFixture(createProxySource());
+    }
+
+    protected InputStream createProxySource() {
+        return CharSequenceInputStream.builder().setCharSequence("abc").get();
+    }
+
+    protected void testEos(final T inputStream) {
+        // empty
+    }
+
     @Test
     public void testRead() throws IOException {
-        try (CharSequenceInputStream proxy = CharSequenceInputStream.builder().setCharSequence("abc").get();
-                ProxyInputStream inputStream = new ProxyInputStreamFixture(proxy)) {
+        try (T inputStream = createFixture()) {
             int found = inputStream.read();
             assertEquals('a', found);
             found = inputStream.read();
@@ -52,39 +66,39 @@ public class ProxyInputStreamTest {
             assertEquals('c', found);
             found = inputStream.read();
             assertEquals(-1, found);
+            testEos(inputStream);
         }
     }
 
     @Test
     public void testReadArrayAtMiddleFully() throws IOException {
-        try (CharSequenceInputStream proxy = CharSequenceInputStream.builder().setCharSequence("abc").get();
-                ProxyInputStream inputStream = new ProxyInputStreamFixture(proxy)) {
+        try (T inputStream = createFixture()) {
             final byte[] dest = new byte[5];
             int found = inputStream.read(dest, 2, 3);
             assertEquals(3, found);
             assertArrayEquals(new byte[] { 0, 0, 'a', 'b', 'c' }, dest);
             found = inputStream.read(dest, 2, 3);
             assertEquals(-1, found);
+            testEos(inputStream);
         }
     }
 
     @Test
     public void testReadArrayAtStartFully() throws IOException {
-        try (CharSequenceInputStream proxy = CharSequenceInputStream.builder().setCharSequence("abc").get();
-                ProxyInputStream inputStream = new ProxyInputStreamFixture(proxy)) {
+        try (T inputStream = createFixture()) {
             final byte[] dest = new byte[5];
             int found = inputStream.read(dest, 0, 5);
             assertEquals(3, found);
             assertArrayEquals(new byte[] { 'a', 'b', 'c', 0, 0 }, dest);
             found = inputStream.read(dest, 0, 5);
             assertEquals(-1, found);
+            testEos(inputStream);
         }
     }
 
     @Test
     public void testReadArrayAtStartPartial() throws IOException {
-        try (CharSequenceInputStream proxy = CharSequenceInputStream.builder().setCharSequence("abc").get();
-                ProxyInputStream inputStream = new ProxyInputStreamFixture(proxy)) {
+        try (T inputStream = createFixture()) {
             final byte[] dest = new byte[5];
             int found = inputStream.read(dest, 0, 2);
             assertEquals(2, found);
@@ -95,26 +109,26 @@ public class ProxyInputStreamTest {
             assertArrayEquals(new byte[] { 'c', 0, 0, 0, 0 }, dest);
             found = inputStream.read(dest, 0, 2);
             assertEquals(-1, found);
+            testEos(inputStream);
         }
     }
 
     @Test
     public void testReadArrayFully() throws IOException {
-        try (CharSequenceInputStream proxy = CharSequenceInputStream.builder().setCharSequence("abc").get();
-                ProxyInputStream inputStream = new ProxyInputStreamFixture(proxy)) {
+        try (T inputStream = createFixture()) {
             final byte[] dest = new byte[5];
             int found = inputStream.read(dest);
             assertEquals(3, found);
             assertArrayEquals(new byte[] { 'a', 'b', 'c', 0, 0 }, dest);
             found = inputStream.read(dest);
             assertEquals(-1, found);
+            testEos(inputStream);
         }
     }
 
     @Test
     public void testReadArrayPartial() throws IOException {
-        try (CharSequenceInputStream proxy = CharSequenceInputStream.builder().setCharSequence("abc").get();
-                ProxyInputStream inputStream = new ProxyInputStreamFixture(proxy)) {
+        try (T inputStream = createFixture()) {
             final byte[] dest = new byte[2];
             int found = inputStream.read(dest);
             assertEquals(2, found);
@@ -125,6 +139,7 @@ public class ProxyInputStreamTest {
             assertArrayEquals(new byte[] { 'c', 0 }, dest);
             found = inputStream.read(dest);
             assertEquals(-1, found);
+            testEos(inputStream);
         }
     }
 
diff --git a/src/test/java/org/apache/commons/io/input/ThrottledInputStreamTest.java b/src/test/java/org/apache/commons/io/input/ThrottledInputStreamTest.java
new file mode 100644
index 00000000..0478b581
--- /dev/null
+++ b/src/test/java/org/apache/commons/io/input/ThrottledInputStreamTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.io.input;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.IOException;
+import java.time.Duration;
+
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests {@link ThrottledInputStream}.
+ */
+public class ThrottledInputStreamTest extends ProxyInputStreamTest<ThrottledInputStream> {
+
+    @Override
+    @SuppressWarnings("resource")
+    protected ThrottledInputStream createFixture() throws IOException {
+        return ThrottledInputStream.builder().setInputStream(createProxySource()).get();
+    }
+
+    @Test
+    public void testCalSleepTimeMs() {
+        // case 0: initial - no read, no sleep
+        assertEquals(0, ThrottledInputStream.toSleepMillis(0, 10_000, 1_000));
+
+        // case 1: no threshold
+        assertEquals(0, ThrottledInputStream.toSleepMillis(Long.MAX_VALUE, 0, 1_000));
+        assertEquals(0, ThrottledInputStream.toSleepMillis(Long.MAX_VALUE, -1, 1_000));
+
+        // case 2: too fast
+        assertEquals(1500, ThrottledInputStream.toSleepMillis(5, 2, 1_000));
+        assertEquals(500, ThrottledInputStream.toSleepMillis(5, 2, 2_000));
+        assertEquals(6500, ThrottledInputStream.toSleepMillis(15, 2, 1_000));
+
+        // case 3: too slow
+        assertEquals(0, ThrottledInputStream.toSleepMillis(1, 2, 1_000));
+        assertEquals(0, ThrottledInputStream.toSleepMillis(2, 2, 2_000));
+        assertEquals(0, ThrottledInputStream.toSleepMillis(1, 2, 1_000));
+    }
+
+    @Override
+    protected void testEos(final ThrottledInputStream inputStream) {
+        assertEquals(3, inputStream.getByteCount());
+    }
+
+    @Test
+    public void testGet() throws IOException {
+        try (ThrottledInputStream inputStream = createFixture()) {
+            inputStream.read();
+            assertEquals(Duration.ZERO, inputStream.getTotalSleepDuration());
+        }
+    }
+
+}