You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2021/03/15 21:22:32 UTC
[tika] branch main updated: TIKA-3313 Improve performance and
usability of RereadableInputStream (#413)
This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new 87f05de TIKA-3313 Improve performance and usability of RereadableInputStream (#413)
87f05de is described below
commit 87f05dee2ec453f2e30fc82d7e3259de8298b6e4
Author: Peter Kronenberg <pa...@gmail.com>
AuthorDate: Mon Mar 15 17:21:26 2021 -0400
TIKA-3313 Improve performance and usability of RereadableInputStream (#413)
* TIKA-3313 Improve performance and usability of RereadableInputStream
* TIKA-3313 Improve logic
Authored-by: Peter Kronenberg <pe...@torch.ai>
---
.../apache/tika/utils/RereadableInputStream.java | 237 ++++++++++-----------
.../org/apache/tika/TestRereadableInputStream.java | 140 ++++++++----
.../org/apache/tika/parser/crypto/TSDParser.java | 2 +-
.../microsoft/ooxml/OOXMLExtractorFactory.java | 3 +-
4 files changed, 210 insertions(+), 172 deletions(-)
diff --git a/tika-core/src/main/java/org/apache/tika/utils/RereadableInputStream.java b/tika-core/src/main/java/org/apache/tika/utils/RereadableInputStream.java
index 47c2b20..5a5f796 100644
--- a/tika-core/src/main/java/org/apache/tika/utils/RereadableInputStream.java
+++ b/tika-core/src/main/java/org/apache/tika/utils/RereadableInputStream.java
@@ -31,7 +31,7 @@ import java.io.OutputStream;
* Wraps an input stream, reading it only once, but making it available
* for rereading an arbitrary number of times. The stream's bytes are
* stored in memory up to a user specified maximum, and then stored in a
- * temporary file which is deleted when this class' close() method is called.
+ * temporary file which is deleted when this class's close() method is called.
*/
public class RereadableInputStream extends InputStream {
@@ -44,12 +44,12 @@ public class RereadableInputStream extends InputStream {
/**
* Input stream originally passed to the constructor.
*/
- private InputStream originalInputStream;
+ private final InputStream originalInputStream;
/**
* The inputStream currently being used by this object to read contents;
* may be the original stream passed in, or a stream that reads
- * the saved copy.
+ * the saved copy from a memory buffer or file.
*/
private InputStream inputStream;
@@ -57,30 +57,32 @@ public class RereadableInputStream extends InputStream {
* Maximum number of bytes that can be stored in memory before
* storage will be moved to a temporary file.
*/
- private int maxBytesInMemory;
+ private final int maxBytesInMemory;
/**
- * True when the original stream is being read; set to false when
- * reading is set to use the stored data instead.
+ * Whether or not we are currently reading from the byte buffer in memory
+ * Bytes are read until we've exhausted the buffered bytes and then we proceed to read from the original input stream.
+ * If the numbers of bytes read from the original stream eventually exceed maxBytesInMemory, then we'll switch to reading from a file.
*/
- private boolean firstPass = true;
+ private boolean readingFromBuffer;
- /**
- * Whether or not the stream's contents are being stored in a file
- * as opposed to memory.
- */
- private boolean bufferIsInFile;
/**
* The buffer used to store the stream's content; this storage is moved
* to a file when the stored data's size exceeds maxBytesInMemory.
+ * Set to null once we start writing to a file.
*/
private byte[] byteBuffer;
/**
- * The total number of bytes read from the original stream at the time.
+ * The current pointer when reading from memory
+ */
+ private int bufferPointer;
+
+ /**
+ * Maximum size of the buffer that was written in previous pass(s)
*/
- private int size;
+ private int bufferHighWaterMark;
/**
* File used to store the stream's contents; is null until the stored
@@ -89,19 +91,15 @@ public class RereadableInputStream extends InputStream {
private File storeFile;
/**
- * OutputStream used to save the content of the input stream in a
- * temporary file.
+ * Specifies whether the stream has been closed
*/
- private OutputStream storeOutputStream;
-
+ private boolean closed;
/**
- * Specifies whether or not to read to the end of stream on first
- * rewind. This defaults to true. If this is set to false,
- * then the first time when rewind() is called, only those bytes
- * already read from the original stream will be available from then on.
+ * OutputStream used to save the content of the input stream in a
+ * temporary file.
*/
- private final boolean readToEndOfStreamOnFirstRewind;
+ private OutputStream storeOutputStream;
/**
@@ -111,24 +109,6 @@ public class RereadableInputStream extends InputStream {
private final boolean closeOriginalStreamOnClose;
- // TODO: At some point it would be better to replace the current approach
- // (specifying the above) with more automated behavior. The stream could
- // keep the original stream open until EOF was reached. For example, if:
- //
- // the original stream is 10 bytes, and
- // only 2 bytes are read on the first pass
- // rewind() is called
- // 5 bytes are read
- //
- // In this case, this instance gets the first 2 from its store,
- // and the next 3 from the original stream, saving those additional 3
- // bytes in the store. In this way, only the maximum number of bytes
- // ever needed must be saved in the store; unused bytes are never read.
- // The original stream is closed when EOF is reached, or when close()
- // is called, whichever comes first. Using this approach eliminates
- // the need to specify the flag (though makes implementation more complex).
-
-
/**
* Creates a rereadable input stream with defaults of 512*1024*1024 bytes (500M) for maxBytesInMemory
* and both readToEndOfStreamOnFirstRewind and closeOriginalStreamOnClose set to true
@@ -136,25 +116,20 @@ public class RereadableInputStream extends InputStream {
* @param inputStream stream containing the source of data
*/
public RereadableInputStream(InputStream inputStream) {
- this(inputStream, DEFAULT_MAX_BYTES_IN_MEMORY, true, true);
+ this(inputStream, DEFAULT_MAX_BYTES_IN_MEMORY, true);
}
/**
- * Creates a rereadable input stream defaulting to 512*1024*1024 bytes (500M) for maxBytesInMemory
+ * Creates a rereadable input stream defaulting to 512*1024*1024 bytes (500M) for maxBytesInMemory
*
- * @param inputStream stream containing the source of data
- * @param readToEndOfStreamOnFirstRewind Specifies whether or not to
- * read to the end of stream on first rewind. If this is set to false,
- * then when rewind() is first called, only those bytes already read
- * from the original stream will be available from then on.
+ * @param inputStream stream containing the source of data
*/
- public RereadableInputStream(InputStream inputStream, boolean readToEndOfStreamOnFirstRewind, boolean closeOriginalStreamOnClose) {
- this(inputStream, DEFAULT_MAX_BYTES_IN_MEMORY, readToEndOfStreamOnFirstRewind, closeOriginalStreamOnClose);
+ public RereadableInputStream(InputStream inputStream, boolean closeOriginalStreamOnClose) {
+ this(inputStream, DEFAULT_MAX_BYTES_IN_MEMORY, closeOriginalStreamOnClose);
}
/**
- * Creates a rereadable input stream with both readToEndOfStreamOnfirstRewind
- * and closeOriginalStreamOnClose set to true
+ * Creates a rereadable input stream with closeOriginalStreamOnClose set to true
*
* @param inputStream stream containing the source of data
* @param maxBytesInMemory maximum number of bytes to use to store
@@ -166,33 +141,26 @@ public class RereadableInputStream extends InputStream {
* when there are no more references to the instance.
*/
public RereadableInputStream(InputStream inputStream, int maxBytesInMemory) {
- this(inputStream, maxBytesInMemory, true, true);
+ this(inputStream, maxBytesInMemory, true);
}
/**
* Creates a rereadable input stream.
*
- * @param inputStream stream containing the source of data
- * @param maxBytesInMemory maximum number of bytes to use to store
- * the stream's contents in memory before switching to disk; note that
- * the instance will preallocate a byte array whose size is
- * maxBytesInMemory. This byte array will be made available for
- * garbage collection (i.e. its reference set to null) when the
- * content size exceeds the array's size, when close() is called, or
- * when there are no more references to the instance.
- * @param readToEndOfStreamOnFirstRewind Specifies whether or not to
- * read to the end of stream on first rewind. If this is set to false,
- * then when rewind() is first called, only those bytes already read
- * from the original stream will be available from then on.
+ * @param inputStream stream containing the source of data
+ * @param maxBytesInMemory maximum number of bytes to use to store
+ * the stream's contents in memory before switching to disk; note that
+ * the instance will preallocate a byte array whose size is
+ * maxBytesInMemory. This byte array will be made available for
+ * garbage collection (i.e. its reference set to null) when the
+ * content size exceeds the array's size, when close() is called, or
+ * when there are no more references to the instance.
*/
- public RereadableInputStream(InputStream inputStream, int maxBytesInMemory,
- boolean readToEndOfStreamOnFirstRewind,
- boolean closeOriginalStreamOnClose) {
+ public RereadableInputStream(InputStream inputStream, int maxBytesInMemory, boolean closeOriginalStreamOnClose) {
this.inputStream = inputStream;
this.originalInputStream = inputStream;
this.maxBytesInMemory = maxBytesInMemory;
byteBuffer = new byte[maxBytesInMemory];
- this.readToEndOfStreamOnFirstRewind = readToEndOfStreamOnFirstRewind;
this.closeOriginalStreamOnClose = closeOriginalStreamOnClose;
}
@@ -205,38 +173,92 @@ public class RereadableInputStream extends InputStream {
* @throws IOException
*/
public int read() throws IOException {
+ if (closed) {
+ throw new IOException("Stream is already closed");
+ }
+
int inputByte = inputStream.read();
- if (firstPass) {
+ if (inputByte == -1 && inputStream != originalInputStream) {
+ // If we got EOF reading from buffer or file, switch to the original stream and get the next byte from there instead
+ if (readingFromBuffer) {
+ readingFromBuffer = false;
+ inputStream.close(); // Close the input byte stream
+ } else {
+ inputStream.close(); // Close the input file stream
+ // start appending to the file
+ storeOutputStream = new BufferedOutputStream(new FileOutputStream(storeFile, true));
+ }
+ // The original stream is now the current stream
+ inputStream = originalInputStream;
+ inputByte = inputStream.read();
+ }
+
+ if (inputByte != -1 && inputStream == originalInputStream) {
+ // If not EOF and reading from original stream, save the bytes we read
saveByte(inputByte);
}
+
return inputByte;
}
/**
+ * Saves the bytes read from the original stream to buffer or file
+ */
+ private void saveByte(int inputByte) throws IOException {
+ if (byteBuffer != null) {
+ if (bufferPointer == maxBytesInMemory) {
+ // Need to switch to file
+ storeFile = File.createTempFile("TIKA_streamstore_", ".tmp");
+ storeOutputStream = new BufferedOutputStream(new FileOutputStream(storeFile));
+ // Save what we have so far in buffer
+ storeOutputStream.write(byteBuffer, 0, bufferPointer);
+ // Write the new byte
+ storeOutputStream.write(inputByte);
+ byteBuffer = null; // release for garbage collection
+ } else {
+ // Continue writing to buffer
+ byteBuffer[bufferPointer++] = (byte) inputByte;
+ }
+ } else {
+ storeOutputStream.write(inputByte);
+ }
+ }
+
+ /**
* "Rewinds" the stream to the beginning for rereading.
*
* @throws IOException
*/
public void rewind() throws IOException {
-
- if (firstPass && readToEndOfStreamOnFirstRewind) {
- // Force read to end of stream to fill store with any
- // remaining bytes from original stream.
- while (read() != -1) {
- // empty loop
- }
+ if (closed) {
+ throw new IOException("Stream is already closed");
}
- closeStream();
if (storeOutputStream != null) {
storeOutputStream.close();
storeOutputStream = null;
}
- firstPass = false;
- boolean newStreamIsInMemory = (size < maxBytesInMemory);
- inputStream = newStreamIsInMemory
- ? new ByteArrayInputStream(byteBuffer)
- : new BufferedInputStream(new FileInputStream(storeFile));
+
+ // Close the byte input stream or file input stream
+ if (inputStream != originalInputStream) {
+ inputStream.close();
+ }
+
+ bufferHighWaterMark = Math.max(bufferPointer, bufferHighWaterMark);
+ bufferPointer = bufferHighWaterMark;
+
+ if (bufferHighWaterMark > 0) {
+ // If we have a buffer, then we'll read from it
+ if (byteBuffer != null) {
+ readingFromBuffer = true;
+ inputStream = new ByteArrayInputStream(byteBuffer, 0, bufferHighWaterMark);
+ } else {
+ // No buffer, which means we've switched to a file
+ inputStream = new BufferedInputStream(new FileInputStream(storeFile));
+ }
+ } else {
+ inputStream = originalInputStream;
+ }
}
/**
@@ -245,14 +267,14 @@ public class RereadableInputStream extends InputStream {
*
* @throws IOException
*/
- // Does anyone need/want for this to be public?
private void closeStream() throws IOException {
- if (inputStream != null
- &&
- (inputStream != originalInputStream
- || closeOriginalStreamOnClose)) {
+ if (originalInputStream != inputStream) {
+ // Close the byte input stream or file input stream, if either is the current one
inputStream.close();
- inputStream = null;
+ }
+
+ if (closeOriginalStreamOnClose) {
+ originalInputStream.close();
}
}
@@ -274,41 +296,6 @@ public class RereadableInputStream extends InputStream {
if (storeFile != null) {
storeFile.delete();
}
- }
-
- /**
- * Returns the number of bytes read from the original stream.
- *
- * @return number of bytes read
- */
- public int getSize() {
- return size;
- }
-
- /**
- * Saves the byte read from the original stream to the store.
- *
- * @param inputByte byte read from original stream
- * @throws IOException
- */
- private void saveByte(int inputByte) throws IOException {
-
- if (!bufferIsInFile) {
- boolean switchToFile = (size == (maxBytesInMemory));
- if (switchToFile) {
- storeFile = File.createTempFile("TIKA_streamstore_", ".tmp");
- bufferIsInFile = true;
- storeOutputStream = new BufferedOutputStream(
- new FileOutputStream(storeFile));
- storeOutputStream.write(byteBuffer, 0, size);
- storeOutputStream.write(inputByte);
- byteBuffer = null; // release for garbage collection
- } else {
- byteBuffer[size] = (byte) inputByte;
- }
- } else {
- storeOutputStream.write(inputByte);
- }
- ++size;
+ closed = true;
}
}
diff --git a/tika-core/src/test/java/org/apache/tika/TestRereadableInputStream.java b/tika-core/src/test/java/org/apache/tika/TestRereadableInputStream.java
index ed00bc5..5fb4ba1 100644
--- a/tika-core/src/test/java/org/apache/tika/TestRereadableInputStream.java
+++ b/tika-core/src/test/java/org/apache/tika/TestRereadableInputStream.java
@@ -16,6 +16,9 @@
*/
package org.apache.tika;
+import org.apache.tika.utils.RereadableInputStream;
+import org.junit.Test;
+
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
@@ -23,79 +26,121 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
-import org.apache.tika.utils.RereadableInputStream;
-import org.junit.Test;
-
import static org.junit.Assert.assertEquals;
public class TestRereadableInputStream {
- private final int TEST_SIZE = 3;
+ private final int DEFAULT_TEST_SIZE = 3;
- private final int MEMORY_THRESHOLD = 1;
+ private final int MEMORY_THRESHOLD = 10;
private final int NUM_PASSES = 4;
+ // This size of data keeps us in memory
+ private final int TEST_SIZE_MEMORY = 7;
+
+ // This size of data exceeds memory threshold and gets us in a file
+ private final int TEST_SIZE_FILE = 15;
+
+ // This size of data exactly equals memory threshold
+ private final int TEST_SIZE_MAX = MEMORY_THRESHOLD;
+
@Test
- public void test() throws IOException {
+ public void testInMemory() throws IOException {
+ readEntireStream((TEST_SIZE_MEMORY));
+ }
+
+// @Test
+// public void testInFile() throws IOException {
+// readData(TEST_SIZE_FILE);
+// }
+//
+// @Test
+// public void testMemoryThreshold() throws IOException {
+// readData(TEST_SIZE_MAX);
+// }
+//
+// @Test
+// public void testInMemory2() throws IOException {
+// readData2((TEST_SIZE_MEMORY));
+// }
+//
+// @Test
+// public void testInFile2() throws IOException {
+// readData2(TEST_SIZE_FILE);
+// }
+
+ @Test
+ public void testMemoryThreshold2() throws IOException {
+ readPartialStream(TEST_SIZE_MAX);
+ }
- InputStream is = createTestInputStream();
- try (RereadableInputStream ris = new RereadableInputStream(is, MEMORY_THRESHOLD, true, true)) {
+ /**
+ * Read entire stream of various sizes
+ */
+ private void readEntireStream(int testSize) throws IOException {
+ InputStream is = createTestInputStream(testSize);
+ try (RereadableInputStream ris = new RereadableInputStream(is, MEMORY_THRESHOLD, true)) {
for (int pass = 0; pass < NUM_PASSES; pass++) {
- for (int byteNum = 0; byteNum < TEST_SIZE; byteNum++) {
+ for (int byteNum = 0; byteNum < testSize; byteNum++) {
int byteRead = ris.read();
assertEquals("Pass = " + pass + ", byte num should be "
- + byteNum + " but is " + byteRead + ".", byteNum,
- byteRead);
+ + byteNum + " but is " + byteRead + ".", byteNum, byteRead);
}
+ int eof = ris.read();
+ assertEquals("Pass = " + pass + ", byte num should be "
+ + -1 + " but is " + eof + ".", -1, eof);
ris.rewind();
}
}
}
/**
- * Test that the constructor's readToEndOfStreamOnFirstRewind parameter
- * correctly determines the behavior.
- *
- * @throws IOException
+ * Read increasingly more of the stream, but not all, with each pass before rewinding to make sure we pick up at the correct point
*/
- @Test
- public void testRewind() throws IOException {
- doTestRewind(true);
- doTestRewind(false);
- }
+ private void readPartialStream(int testSize) throws IOException {
+ InputStream is = createTestInputStream(20);
+ try (RereadableInputStream ris = new RereadableInputStream(is, MEMORY_THRESHOLD, true)) {
- private void doTestRewind(boolean readToEndOnRewind) throws IOException {
-
- RereadableInputStream ris = null;
-
- try {
- InputStream s1 = createTestInputStream();
- ris = new RereadableInputStream(s1, 5, readToEndOnRewind, true);
- ris.read();
- assertEquals(1, ris.getSize());
- ris.rewind();
- boolean moreBytesWereRead = (ris.getSize() > 1);
- assertEquals(readToEndOnRewind, moreBytesWereRead);
- } finally {
- if (ris != null) {
- ris.close();
+ int iterations = testSize;
+ for (int pass = 0; pass < NUM_PASSES; pass++) {
+ for (int byteNum = 0; byteNum < iterations; byteNum++) {
+ int byteRead = ris.read();
+ assertEquals("Pass = " + pass + ", byte num should be "
+ + byteNum + " but is " + byteRead + ".", byteNum, byteRead);
+ }
+ ris.rewind();
+ iterations++;
}
}
+ }
+
+ @Test
+ public void testRewind() throws IOException {
+ InputStream is = createTestInputStream(DEFAULT_TEST_SIZE);
+ try (RereadableInputStream ris = new RereadableInputStream(is, MEMORY_THRESHOLD, true)) {
+ ris.rewind(); // rewind before we've done anything
+ for (int byteNum = 0; byteNum < 1; byteNum++) {
+ int byteRead = ris.read();
+ assertEquals("Byte num should be "
+ + byteNum + " but is " + byteRead + ".", byteNum, byteRead);
+ }
+ }
}
- private TestInputStream createTestInputStream() throws IOException {
+
+ private TestInputStream createTestInputStream(int testSize) throws IOException {
return new TestInputStream(
new BufferedInputStream(
- new FileInputStream(createTestFile())));
+ new FileInputStream(createTestFile(testSize))));
}
- private File createTestFile() throws IOException {
+ private File createTestFile(int testSize) throws IOException {
File testfile = File.createTempFile("TIKA_ris_test", ".tmp");
testfile.deleteOnExit();
FileOutputStream fos = new FileOutputStream(testfile);
- for (int i = 0; i < TEST_SIZE; i++) {
+ for (int i = 0; i < testSize; i++) {
fos.write(i);
}
fos.close();
@@ -110,17 +155,25 @@ public class TestRereadableInputStream {
private void doACloseBehaviorTest(boolean wantToClose) throws IOException {
- TestInputStream tis = createTestInputStream();
- RereadableInputStream ris =
- new RereadableInputStream(tis, 5, true, wantToClose);
+ TestInputStream tis = createTestInputStream(DEFAULT_TEST_SIZE);
+ RereadableInputStream ris = new RereadableInputStream(tis, MEMORY_THRESHOLD, wantToClose);
ris.close();
assertEquals(wantToClose, tis.isClosed());
- if (! tis.isClosed()) {
+ if (!tis.isClosed()) {
tis.close();
}
}
+ @Test(expected = IOException.class)
+ public void doReadAfterCloseTest() throws IOException {
+
+ TestInputStream tis = createTestInputStream(DEFAULT_TEST_SIZE);
+ RereadableInputStream ris = new RereadableInputStream(tis, DEFAULT_TEST_SIZE);
+ ris.close();
+ ris.read();
+ }
+
/**
* Adds isClosed() to a BufferedInputStream.
@@ -142,5 +195,4 @@ public class TestRereadableInputStream {
return closed;
}
}
-
}
diff --git a/tika-parsers/tika-parsers-classic/tika-parsers-classic-modules/tika-parser-crypto-module/src/main/java/org/apache/tika/parser/crypto/TSDParser.java b/tika-parsers/tika-parsers-classic/tika-parsers-classic-modules/tika-parser-crypto-module/src/main/java/org/apache/tika/parser/crypto/TSDParser.java
index f2ae7a6..4af2def 100644
--- a/tika-parsers/tika-parsers-classic/tika-parsers-classic-modules/tika-parser-crypto-module/src/main/java/org/apache/tika/parser/crypto/TSDParser.java
+++ b/tika-parsers/tika-parsers-classic/tika-parsers-classic-modules/tika-parser-crypto-module/src/main/java/org/apache/tika/parser/crypto/TSDParser.java
@@ -93,7 +93,7 @@ public class TSDParser extends AbstractParser {
Metadata metadata, ParseContext context) throws IOException, SAXException, TikaException {
//Try to parse TSD file
- try (RereadableInputStream ris = new RereadableInputStream(stream, 2048, true, true)) {
+ try (RereadableInputStream ris = new RereadableInputStream(stream, 2048, true)) {
Metadata TSDAndEmbeddedMetadata = new Metadata();
List<TSDMetas> tsdMetasList = this.extractMetas(ris);
diff --git a/tika-parsers/tika-parsers-classic/tika-parsers-classic-modules/tika-parser-microsoft-module/src/main/java/org/apache/tika/parser/microsoft/ooxml/OOXMLExtractorFactory.java b/tika-parsers/tika-parsers-classic/tika-parsers-classic-modules/tika-parser-microsoft-module/src/main/java/org/apache/tika/parser/microsoft/ooxml/OOXMLExtractorFactory.java
index c135e8c..baadeaa 100644
--- a/tika-parsers/tika-parsers-classic/tika-parsers-classic-modules/tika-parser-microsoft-module/src/main/java/org/apache/tika/parser/microsoft/ooxml/OOXMLExtractorFactory.java
+++ b/tika-parsers/tika-parsers-classic/tika-parsers-classic-modules/tika-parser-microsoft-module/src/main/java/org/apache/tika/parser/microsoft/ooxml/OOXMLExtractorFactory.java
@@ -105,8 +105,7 @@ public class OOXMLExtractorFactory {
//OPCPackage slurps rris into memory so we can close rris
//without apparent problems
try (RereadableInputStream rereadableInputStream =
- new RereadableInputStream(stream, MAX_BUFFER_LENGTH,
- true, false)) {
+ new RereadableInputStream(stream, MAX_BUFFER_LENGTH, false)) {
try {
pkg = OPCPackage.open(rereadableInputStream);
} catch (EOFException e) {