You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2022/09/20 08:33:49 UTC
[jena] branch main updated: GH-1477: TestAsyncParser: Fixed non-deterministically failing test case and lowered number of created threads.
This is an automated email from the ASF dual-hosted git repository.
andy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/jena.git
The following commit(s) were added to refs/heads/main by this push:
new 76c28b3855 GH-1477: TestAsyncParser: Fixed non-deterministically failing test case and lowered number of created threads.
new c19b12832d Merge pull request #1547 from Aklakan/asyncparser-testfix
76c28b3855 is described below
commit 76c28b3855dca66bfb97e4c762792918ffb1827b
Author: Claus Stadler <Ra...@googlemail.com>
AuthorDate: Mon Sep 19 22:30:12 2022 +0200
GH-1477: TestAsyncParser: Fixed non-deterministically failing test case and lowered number of created threads.
---
.../apache/jena/riot/system/TestAsyncParser.java | 32 ++++++++++++++--------
1 file changed, 20 insertions(+), 12 deletions(-)
diff --git a/jena-arq/src/test/java/org/apache/jena/riot/system/TestAsyncParser.java b/jena-arq/src/test/java/org/apache/jena/riot/system/TestAsyncParser.java
index 290e346574..8c1a5afb1b 100644
--- a/jena-arq/src/test/java/org/apache/jena/riot/system/TestAsyncParser.java
+++ b/jena-arq/src/test/java/org/apache/jena/riot/system/TestAsyncParser.java
@@ -70,8 +70,6 @@ public class TestAsyncParser {
test(DIR + "no-suchfile.ttl");
}
- // RDFParserBuilder b = RDFParser.fromString("<s> <p> <o>.").lang(Lang.NT);
-
@Test
public void async_iterator1() {
Iterator<Triple> iter = AsyncParser.asyncParseTriples(DIR+"empty.ttl");
@@ -112,12 +110,13 @@ public class TestAsyncParser {
assertTrue( IsoMatcher.isomorphic(graph1, graph2));
}
- /** Tests the parser on an infinite amount of data with a failing client.
- * This should stop the parser. */
+ /** Repeatedly tests the parser on an infinite amount of data with a failing sink.
+ * This should terminate the parser thread. */
@Test
public void failingSink() {
+ int numThreadsToCreate = 20;
int beforeThreadCount = ManagementFactory.getThreadMXBean().getThreadCount();
- for (int i = 0; i < 100; ++i) {
+ for (int i = 0; i < numThreadsToCreate; ++i) {
try (InputStream in = openInfiniteNtStream();
OutputStream out = new FailingOutputStream()) {
StreamRDF sink = StreamRDFWriter.getWriterStream(out, RDFFormat.NT);
@@ -126,7 +125,9 @@ public class TestAsyncParser {
AsyncParser.of(in, Lang.NT, null).setChunkSize(10).setQueueSize(3).asyncParseSources(sink);
sink.finish();
} catch (Exception e) {
- // Expected to fail
+ // Expected to fail with FailingOutputStream's error message
+ Assert.assertNotNull("Unexpected exception: " + e, e.getCause());
+ Assert.assertEquals(FailingOutputStream.ERROR_MSG, e.getCause().getMessage());
continue;
}
throw new RuntimeException("Parsing unexpectedly succeeded");
@@ -143,8 +144,9 @@ public class TestAsyncParser {
/** Tests to ensure threads are not piling up when repeatedly canceling parsers */
@Test
public void repeatedParsingCancellation_1() throws Exception {
+ int numThreadsToCreate = 20;
int beforeThreadCount = ManagementFactory.getThreadMXBean().getThreadCount();
- for (int i = 0; i < 1000; ++i) {
+ for (int i = 0; i < numThreadsToCreate; ++i) {
IteratorCloseable<Triple> iter = null;
try {
iter = AsyncParser.of(DIR+"data.ttl").setDaemonMode(false).asyncParseTriples();
@@ -162,7 +164,7 @@ public class TestAsyncParser {
// Give some room in how much the number of threads may differ
// The main point is that the acceptable margin is significantly lower than
- // the number of started parser threads
+ // the number of created parser threads
int maxAllowedThreadCountDifference = 5;
Assert.assertTrue("Cancelling RDF parsing resulted in too many dangling threads ("
@@ -183,9 +185,14 @@ public class TestAsyncParser {
}
long pos = channel.position();
- // The exact number of bytes consumed from the channel when writing the test was 10050 bytes
- // However to give room for implementation changes the value tested against here is about twice as large
- Assert.assertTrue("Too many bytes consumed from input stream (" + pos + ")", pos < 20000);
+ // This test should consume up to 60.000 bytes:
+ // - 50 bytes/triple and 100 triples/chunk results in 5K bytes per chunk.
+ // - A queue size of 10 means that a full queue amounts to 50K bytes.
+ // - One chunk is removed from the queue when reading which gets filled up -> 55K.
+ // - The close action clears the queue and aborts on the next chunk -> 60K.
+ // In order to give room for implementation changes and to account for possible buffering in the parser
+ // the value tested against here is about twice as large
+ Assert.assertTrue("Too many bytes consumed from input stream (" + pos + ")", pos < 120_000);
}
/** This test first creates some 'good' data for reference and then appends some 'bad' data in order
@@ -278,9 +285,10 @@ public class TestAsyncParser {
private static class FailingOutputStream
extends OutputStream
{
+ public static final String ERROR_MSG = "Mocked IO Error";
@Override
public void write(int b) throws IOException {
- throw new IOException("Mocked IO Error");
+ throw new IOException(ERROR_MSG);
}
}