You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2022/09/20 08:33:49 UTC

[jena] branch main updated: GH-1477: TestAsyncParser: Fixed non-deterministically failing test case and lowered number of created threads.

This is an automated email from the ASF dual-hosted git repository.

andy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/jena.git


The following commit(s) were added to refs/heads/main by this push:
     new 76c28b3855 GH-1477: TestAsyncParser: Fixed non-deterministically failing test case and lowered number of created threads.
     new c19b12832d Merge pull request #1547 from Aklakan/asyncparser-testfix
76c28b3855 is described below

commit 76c28b3855dca66bfb97e4c762792918ffb1827b
Author: Claus Stadler <Ra...@googlemail.com>
AuthorDate: Mon Sep 19 22:30:12 2022 +0200

    GH-1477: TestAsyncParser: Fixed non-deterministically failing test case and lowered number of created threads.
---
 .../apache/jena/riot/system/TestAsyncParser.java   | 32 ++++++++++++++--------
 1 file changed, 20 insertions(+), 12 deletions(-)

diff --git a/jena-arq/src/test/java/org/apache/jena/riot/system/TestAsyncParser.java b/jena-arq/src/test/java/org/apache/jena/riot/system/TestAsyncParser.java
index 290e346574..8c1a5afb1b 100644
--- a/jena-arq/src/test/java/org/apache/jena/riot/system/TestAsyncParser.java
+++ b/jena-arq/src/test/java/org/apache/jena/riot/system/TestAsyncParser.java
@@ -70,8 +70,6 @@ public class TestAsyncParser {
         test(DIR + "no-suchfile.ttl");
     }
 
-    // RDFParserBuilder b = RDFParser.fromString("<s> <p> <o>.").lang(Lang.NT);
-
     @Test
     public void async_iterator1() {
         Iterator<Triple> iter = AsyncParser.asyncParseTriples(DIR+"empty.ttl");
@@ -112,12 +110,13 @@ public class TestAsyncParser {
         assertTrue( IsoMatcher.isomorphic(graph1, graph2));
     }
 
-    /** Tests the parser on an infinite amount of data with a failing client.
-     * This should stop the parser. */
+    /** Repeatedly tests the parser on an infinite amount of data with a failing sink.
+     * This should terminate the parser thread. */
     @Test
     public void failingSink() {
+        int numThreadsToCreate = 20;
         int beforeThreadCount = ManagementFactory.getThreadMXBean().getThreadCount();
-        for (int i = 0; i < 100; ++i) {
+        for (int i = 0; i < numThreadsToCreate; ++i) {
             try (InputStream in = openInfiniteNtStream();
                     OutputStream out = new FailingOutputStream()) {
                 StreamRDF sink = StreamRDFWriter.getWriterStream(out, RDFFormat.NT);
@@ -126,7 +125,9 @@ public class TestAsyncParser {
                 AsyncParser.of(in, Lang.NT, null).setChunkSize(10).setQueueSize(3).asyncParseSources(sink);
                 sink.finish();
             } catch (Exception e) {
-                // Expected to fail
+                // Expected to fail with FailingOutputStream's error message
+                Assert.assertNotNull("Unexpected exception: " + e, e.getCause());
+                Assert.assertEquals(FailingOutputStream.ERROR_MSG, e.getCause().getMessage());
                 continue;
             }
             throw new RuntimeException("Parsing unexpectedly succeeded");
@@ -143,8 +144,9 @@ public class TestAsyncParser {
     /** Tests to ensure threads are not piling up when repeatedly canceling parsers */
     @Test
     public void repeatedParsingCancellation_1() throws Exception {
+        int numThreadsToCreate = 20;
         int beforeThreadCount = ManagementFactory.getThreadMXBean().getThreadCount();
-        for (int i = 0; i < 1000; ++i) {
+        for (int i = 0; i < numThreadsToCreate; ++i) {
             IteratorCloseable<Triple> iter = null;
             try {
                 iter = AsyncParser.of(DIR+"data.ttl").setDaemonMode(false).asyncParseTriples();
@@ -162,7 +164,7 @@ public class TestAsyncParser {
 
         // Give some room in how much the number of threads may differ
         // The main point is that the acceptable margin is significantly lower than
-        // the number of started parser threads
+        // the number of created parser threads
         int maxAllowedThreadCountDifference = 5;
 
         Assert.assertTrue("Cancelling RDF parsing resulted in too many dangling threads ("
@@ -183,9 +185,14 @@ public class TestAsyncParser {
         }
 
         long pos = channel.position();
-        // The exact number of bytes consumed from the channel when writing the test was 10050 bytes
-        // However to give room for implementation changes the value tested against here is about twice as large
-        Assert.assertTrue("Too many bytes consumed from input stream (" + pos + ")", pos < 20000);
+        // This test should consume up to 60.000 bytes:
+        // - 50 bytes/triple and 100 triples/chunk results in 5K bytes per chunk.
+        // - A queue size of 10 means that a full queue amounts to 50K bytes.
+        // - One chunk is removed from the queue when reading which gets filled up -> 55K.
+        // - The close action clears the queue and aborts on the next chunk -> 60K.
+        // In order to give room for implementation changes and to account for possible buffering in the parser
+        // the value tested against here is about twice as large
+        Assert.assertTrue("Too many bytes consumed from input stream (" + pos + ")", pos < 120_000);
     }
 
     /** This test first creates some 'good' data for reference and then appends some 'bad' data in order
@@ -278,9 +285,10 @@ public class TestAsyncParser {
     private static class FailingOutputStream
         extends OutputStream
     {
+        public static final String ERROR_MSG = "Mocked IO Error";
         @Override
         public void write(int b) throws IOException {
-            throw new IOException("Mocked IO Error");
+            throw new IOException(ERROR_MSG);
         }
     }