You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@commons.apache.org by gg...@apache.org on 2023/12/26 18:57:17 UTC
(commons-io) branch master updated: Add ThrottledInputStream
This is an automated email from the ASF dual-hosted git repository.
ggregory pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/commons-io.git
The following commit(s) were added to refs/heads/master by this push:
new 703b2839 Add ThrottledInputStream
703b2839 is described below
commit 703b283970e375d7651e093e389989482f5bb8c9
Author: Gary Gregory <ga...@gmail.com>
AuthorDate: Tue Dec 26 13:54:13 2023 -0500
Add ThrottledInputStream
---
src/changes/changes.xml | 1 +
.../commons/io/input/ThrottledInputStream.java | 170 +++++++++++++++++++++
.../commons/io/input/ProxyInputStreamTest.java | 41 +++--
.../commons/io/input/ThrottledInputStreamTest.java | 71 +++++++++
4 files changed, 270 insertions(+), 13 deletions(-)
diff --git a/src/changes/changes.xml b/src/changes/changes.xml
index f9439d24..a4ed6e26 100644
--- a/src/changes/changes.xml
+++ b/src/changes/changes.xml
@@ -93,6 +93,7 @@ The <action> type attribute can be add,update,fix,remove.
<action dev="ggregory" type="add" due-to="Gary Gregory">Add FileTimes.toUnixTime(FileTime).</action>
<action dev="ggregory" type="add" due-to="Gary Gregory">Add BrokenInputStream.Builder.</action>
<action dev="ggregory" type="add" due-to="Gary Gregory">Add PathUtils.getExtension(Path).</action>
+ <action dev="ggregory" type="add" due-to="Gary Gregory">Add ThrottledInputStream.</action>
<!-- UPDATE -->
<action dev="ggregory" type="fix" due-to="Gary Gregory">Bump commons.bytebuddy.version from 1.14.10 to 1.14.11 #534.</action>
</release>
diff --git a/src/main/java/org/apache/commons/io/input/ThrottledInputStream.java b/src/main/java/org/apache/commons/io/input/ThrottledInputStream.java
new file mode 100644
index 00000000..d94712a6
--- /dev/null
+++ b/src/main/java/org/apache/commons/io/input/ThrottledInputStream.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.io.input;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.build.AbstractStreamBuilder;
+
+/**
+ * Provides bandwidth throttling on a specified InputStream. It is implemented as a wrapper on top of another InputStream instance. The throttling works by
+ * examining the number of bytes read from the underlying InputStream from the beginning, and sleep()ing for a time interval if the byte-transfer is found
+ * exceed the specified tolerable maximum. (Thus, while the read-rate might exceed the maximum for a given short interval, the average tends towards the
+ * specified maximum, overall.)
+ * <p>
+ * Inspired by Apache HBase's class of the same name.
+ * </p>
+ *
+ * @since 2.16.0
+ */
+public final class ThrottledInputStream extends CountingInputStream {
+
+ /**
+ * Builds a new {@link QueueInputStream} instance.
+ * <h2>Using NIO</h2>
+ *
+ * <pre>{@code
+ * ThrottledInputStream s = ThrottledInputStream.builder().setPath(Paths.get("MyFile.xml")).setMaxBytesPerSecond(100_000).get();
+ * }
+ * </pre>
+ *
+ * <h2>Using IO</h2>
+ *
+ * <pre>{@code
+ * ThrottledInputStream s = ThrottledInputStream.builder().setFile(new File("MyFile.xml")).setMaxBytesPerSecond(100_000).get();
+ * }
+ * </pre>
+ *
+ * <pre>{@code
+ * ThrottledInputStream s = ThrottledInputStream.builder().setInputStream(inputStream).setMaxBytesPerSecond(100_000).get();
+ * }
+ * </pre>
+ */
+ public static class Builder extends AbstractStreamBuilder<ThrottledInputStream, Builder> {
+
+ /**
+ * Effectively not throttled.
+ */
+ private long maxBytesPerSecond = Long.MAX_VALUE;
+
+ @SuppressWarnings("resource")
+ @Override
+ public ThrottledInputStream get() throws IOException {
+ return new ThrottledInputStream(getInputStream(), maxBytesPerSecond);
+ }
+
+ /**
+ * Sets the maximum bytes per second.
+ *
+ * @param maxBytesPerSecond the maximum bytes per second.
+ */
+ public void setMaxBytesPerSecond(final long maxBytesPerSecond) {
+ this.maxBytesPerSecond = maxBytesPerSecond;
+ }
+
+ }
+
+ /**
+ * Constructs a new {@link Builder}.
+ *
+ * @return a new {@link Builder}.
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ static long toSleepMillis(final long bytesRead, final long maxBytesPerSec, final long elapsedMillis) {
+ assert elapsedMillis >= 0 : "The elapsed time should be greater or equal to zero";
+ if (bytesRead <= 0 || maxBytesPerSec <= 0 || elapsedMillis == 0) {
+ return 0;
+ }
+ // We use this class to load the single source file, so the bytesRead
+ // and maxBytesPerSec aren't greater than Double.MAX_VALUE.
+ // We can get the precise sleep time by using the double value.
+ final long millis = (long) ((double) bytesRead / (double) maxBytesPerSec * 1000 - elapsedMillis);
+ if (millis <= 0) {
+ return 0;
+ }
+ return millis;
+ }
+
+ private final long maxBytesPerSecond;
+ private final long startTime = System.currentTimeMillis();
+ private Duration totalSleepDuration = Duration.ZERO;
+
+ private ThrottledInputStream(final InputStream proxy, final long maxBytesPerSecond) {
+ super(proxy);
+ assert maxBytesPerSecond > 0 : "Bandwidth " + maxBytesPerSecond + " is invalid.";
+ this.maxBytesPerSecond = maxBytesPerSecond;
+ }
+
+ /**
+ * Gets the read-rate from this stream, since creation. Calculated as bytesRead/elapsedTimeSinceStart.
+ *
+ * @return Read rate, in bytes/sec.
+ */
+ public long getBytesPerSecond() {
+ final long elapsedSeconds = (System.currentTimeMillis() - startTime) / 1000;
+ if (elapsedSeconds == 0) {
+ return getByteCount();
+ }
+ return getByteCount() / elapsedSeconds;
+ }
+
+ private long getSleepMillis() {
+ return toSleepMillis(getByteCount(), maxBytesPerSecond, System.currentTimeMillis() - startTime);
+ }
+
+ /**
+ * Gets the total duration spent in sleep.
+ *
+ * @return Duration spent in sleep.
+ */
+ public Duration getTotalSleepDuration() {
+ return totalSleepDuration;
+ }
+
+ @Override
+ protected void beforeRead(final int n) throws IOException {
+ throttle();
+ }
+
+ private void throttle() throws InterruptedIOException {
+ final long sleepMillis = getSleepMillis();
+ if (sleepMillis > 0) {
+ totalSleepDuration = totalSleepDuration.plus(sleepMillis, ChronoUnit.MILLIS);
+ try {
+ TimeUnit.MILLISECONDS.sleep(sleepMillis);
+ } catch (final InterruptedException e) {
+ throw new InterruptedIOException("Thread aborted");
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString() {
+ return "ThrottledInputStream[bytesRead=" + getCount() + ", maxBytesPerSec=" + maxBytesPerSecond + ", bytesPerSec=" + getBytesPerSecond()
+ + ", totalSleepDuration=" + totalSleepDuration + ']';
+ }
+}
diff --git a/src/test/java/org/apache/commons/io/input/ProxyInputStreamTest.java b/src/test/java/org/apache/commons/io/input/ProxyInputStreamTest.java
index 9fd65b32..d14833be 100644
--- a/src/test/java/org/apache/commons/io/input/ProxyInputStreamTest.java
+++ b/src/test/java/org/apache/commons/io/input/ProxyInputStreamTest.java
@@ -29,8 +29,10 @@ import org.junit.jupiter.api.Test;
/**
* Tests {@link ProxyInputStream}.
+ *
+ * @param <T> The actual type tested.
*/
-public class ProxyInputStreamTest {
+public class ProxyInputStreamTest<T extends ProxyInputStream> {
private static final class ProxyInputStreamFixture extends ProxyInputStream {
@@ -40,10 +42,22 @@ public class ProxyInputStreamTest {
}
+ @SuppressWarnings({ "resource", "unused" }) // For subclasses
+ protected T createFixture() throws IOException {
+ return (T) new ProxyInputStreamFixture(createProxySource());
+ }
+
+ protected InputStream createProxySource() {
+ return CharSequenceInputStream.builder().setCharSequence("abc").get();
+ }
+
+ protected void testEos(final T inputStream) {
+ // empty
+ }
+
@Test
public void testRead() throws IOException {
- try (CharSequenceInputStream proxy = CharSequenceInputStream.builder().setCharSequence("abc").get();
- ProxyInputStream inputStream = new ProxyInputStreamFixture(proxy)) {
+ try (T inputStream = createFixture()) {
int found = inputStream.read();
assertEquals('a', found);
found = inputStream.read();
@@ -52,39 +66,39 @@ public class ProxyInputStreamTest {
assertEquals('c', found);
found = inputStream.read();
assertEquals(-1, found);
+ testEos(inputStream);
}
}
@Test
public void testReadArrayAtMiddleFully() throws IOException {
- try (CharSequenceInputStream proxy = CharSequenceInputStream.builder().setCharSequence("abc").get();
- ProxyInputStream inputStream = new ProxyInputStreamFixture(proxy)) {
+ try (T inputStream = createFixture()) {
final byte[] dest = new byte[5];
int found = inputStream.read(dest, 2, 3);
assertEquals(3, found);
assertArrayEquals(new byte[] { 0, 0, 'a', 'b', 'c' }, dest);
found = inputStream.read(dest, 2, 3);
assertEquals(-1, found);
+ testEos(inputStream);
}
}
@Test
public void testReadArrayAtStartFully() throws IOException {
- try (CharSequenceInputStream proxy = CharSequenceInputStream.builder().setCharSequence("abc").get();
- ProxyInputStream inputStream = new ProxyInputStreamFixture(proxy)) {
+ try (T inputStream = createFixture()) {
final byte[] dest = new byte[5];
int found = inputStream.read(dest, 0, 5);
assertEquals(3, found);
assertArrayEquals(new byte[] { 'a', 'b', 'c', 0, 0 }, dest);
found = inputStream.read(dest, 0, 5);
assertEquals(-1, found);
+ testEos(inputStream);
}
}
@Test
public void testReadArrayAtStartPartial() throws IOException {
- try (CharSequenceInputStream proxy = CharSequenceInputStream.builder().setCharSequence("abc").get();
- ProxyInputStream inputStream = new ProxyInputStreamFixture(proxy)) {
+ try (T inputStream = createFixture()) {
final byte[] dest = new byte[5];
int found = inputStream.read(dest, 0, 2);
assertEquals(2, found);
@@ -95,26 +109,26 @@ public class ProxyInputStreamTest {
assertArrayEquals(new byte[] { 'c', 0, 0, 0, 0 }, dest);
found = inputStream.read(dest, 0, 2);
assertEquals(-1, found);
+ testEos(inputStream);
}
}
@Test
public void testReadArrayFully() throws IOException {
- try (CharSequenceInputStream proxy = CharSequenceInputStream.builder().setCharSequence("abc").get();
- ProxyInputStream inputStream = new ProxyInputStreamFixture(proxy)) {
+ try (T inputStream = createFixture()) {
final byte[] dest = new byte[5];
int found = inputStream.read(dest);
assertEquals(3, found);
assertArrayEquals(new byte[] { 'a', 'b', 'c', 0, 0 }, dest);
found = inputStream.read(dest);
assertEquals(-1, found);
+ testEos(inputStream);
}
}
@Test
public void testReadArrayPartial() throws IOException {
- try (CharSequenceInputStream proxy = CharSequenceInputStream.builder().setCharSequence("abc").get();
- ProxyInputStream inputStream = new ProxyInputStreamFixture(proxy)) {
+ try (T inputStream = createFixture()) {
final byte[] dest = new byte[2];
int found = inputStream.read(dest);
assertEquals(2, found);
@@ -125,6 +139,7 @@ public class ProxyInputStreamTest {
assertArrayEquals(new byte[] { 'c', 0 }, dest);
found = inputStream.read(dest);
assertEquals(-1, found);
+ testEos(inputStream);
}
}
diff --git a/src/test/java/org/apache/commons/io/input/ThrottledInputStreamTest.java b/src/test/java/org/apache/commons/io/input/ThrottledInputStreamTest.java
new file mode 100644
index 00000000..0478b581
--- /dev/null
+++ b/src/test/java/org/apache/commons/io/input/ThrottledInputStreamTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.io.input;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.IOException;
+import java.time.Duration;
+
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests {@link ThrottledInputStream}.
+ */
+public class ThrottledInputStreamTest extends ProxyInputStreamTest<ThrottledInputStream> {
+
+ @Override
+ @SuppressWarnings("resource")
+ protected ThrottledInputStream createFixture() throws IOException {
+ return ThrottledInputStream.builder().setInputStream(createProxySource()).get();
+ }
+
+ @Test
+ public void testCalSleepTimeMs() {
+ // case 0: initial - no read, no sleep
+ assertEquals(0, ThrottledInputStream.toSleepMillis(0, 10_000, 1_000));
+
+ // case 1: no threshold
+ assertEquals(0, ThrottledInputStream.toSleepMillis(Long.MAX_VALUE, 0, 1_000));
+ assertEquals(0, ThrottledInputStream.toSleepMillis(Long.MAX_VALUE, -1, 1_000));
+
+ // case 2: too fast
+ assertEquals(1500, ThrottledInputStream.toSleepMillis(5, 2, 1_000));
+ assertEquals(500, ThrottledInputStream.toSleepMillis(5, 2, 2_000));
+ assertEquals(6500, ThrottledInputStream.toSleepMillis(15, 2, 1_000));
+
+ // case 3: too slow
+ assertEquals(0, ThrottledInputStream.toSleepMillis(1, 2, 1_000));
+ assertEquals(0, ThrottledInputStream.toSleepMillis(2, 2, 2_000));
+ assertEquals(0, ThrottledInputStream.toSleepMillis(1, 2, 1_000));
+ }
+
+ @Override
+ protected void testEos(final ThrottledInputStream inputStream) {
+ assertEquals(3, inputStream.getByteCount());
+ }
+
+ @Test
+ public void testGet() throws IOException {
+ try (ThrottledInputStream inputStream = createFixture()) {
+ inputStream.read();
+ assertEquals(Duration.ZERO, inputStream.getTotalSleepDuration());
+ }
+ }
+
+}