You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@jena.apache.org by GitBox <gi...@apache.org> on 2022/08/12 15:38:35 UTC

[GitHub] [jena] Aklakan opened a new pull request, #1478: Improved AsyncParser

Aklakan opened a new pull request, #1478:
URL: https://github.com/apache/jena/pull/1478

   GitHub issue resolved #1477
   
   ----
   
    - [ ] Tests are included.
    - [ ] Documentation change and updates are provided for the [Apache Jena website](https://github.com/apache/jena-site/)
    - [ ] Commits have been squashed to remove intermediate development commit messages.
    - [ ] Key commit messages start with the issue number (GH-xxxx or JENA-xxxx)
   
   By submitting this pull request, I acknowledge that I am making a contribution to the Apache Software Foundation under the terms and conditions of the [Contributor's Agreement](https://www.apache.org/licenses/contributor-agreements.html).
   
   ----
   
   See the [Apache Jena "Contributing" guide](https://github.com/apache/jena/blob/main/CONTRIBUTING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] Aklakan commented on a diff in pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
Aklakan commented on code in PR #1478:
URL: https://github.com/apache/jena/pull/1478#discussion_r968034827


##########
jena-arq/src/main/java/org/apache/jena/riot/system/EltStreamRDFImpl.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.system;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import org.apache.jena.graph.Triple;
+import org.apache.jena.riot.out.NodeFmtLib;
+import org.apache.jena.sparql.core.Quad;
+
+/** An item of a StreamRDF, including exceptions. Internal API. */
+public class EltStreamRDFImpl
+    implements EltStreamRDF, Serializable
+{
+    private static final long serialVersionUID = 1L;
+
+    Triple triple = null;
+    Quad quad = null;
+    String prefix = null; // Null implies "base".
+    String iri = null;
+    Throwable exception = null;
+
+    public EltStreamRDFImpl() {}
+
+    /* Prefer static constructors in EltStreamRDF */
+
+    public EltStreamRDFImpl(Triple triple) { this.triple = triple; }
+    public EltStreamRDFImpl(Quad quad) { this.quad = quad; }
+    public EltStreamRDFImpl(String prefix, String iri) { this.prefix = prefix; this.iri = iri; }
+    public EltStreamRDFImpl(String iri) { this.iri = iri; }
+    public EltStreamRDFImpl(Throwable exception) { this.exception = exception; }
+
+    @Override public boolean   isTriple()     { return triple != null; }
+    @Override public Triple    getTriple()    { return triple; }
+    @Override public boolean   isQuad()       { return quad != null; }
+    @Override public Quad      getQuad()      { return quad; }
+    @Override public boolean   isPrefix()     { return prefix != null; }
+    @Override public String    getPrefix()    { return prefix; }
+    @Override public boolean   isBase()       { return prefix == null && iri != null; }
+    @Override public String    getIri()       { return iri; }
+    @Override public boolean   isException()  { return exception != null; }
+    @Override public Throwable getException() { return exception; }
+
+    @Override
+    public EltStreamRDFType getType() {

Review Comment:
   > There are no setters because records are value-based/immutable.
   
   Whoops, I mentioned setters just out of habit.
   
   I replaced the EltStreamRDF interface with the implementation and attempted to make it 'record-like' for future-proofness, i.e. field() rather than getField(). I implemented all your suggestions.



##########
jena-arq/src/main/java/org/apache/jena/riot/system/AsyncParser.java:
##########
@@ -239,40 +278,143 @@ public void fatal(String message, long line, long col) {
                 throw new RiotException(SysRIOT.fmtMessage(message, line, col)) ;
             }
         };
+    }
+
+    /** Returns a runnable for stopping the parse process */
+    static Runnable startParserThread(
+            Logger LOG1, List<RDFParserBuilder> parserBuilders,
+            BlockingQueue<List<EltStreamRDFImpl>> queue,
+            int chunkSize,
+            boolean daemonMode) {
+
+        // A flag for whether parsing is considered aborted - regardless
+        // whether due to normal or exceptional conditions
+        AtomicBoolean isAborted = new AtomicBoolean(false);
+
+        // -- Parser thread setup
+        Consumer<List<EltStreamRDFImpl>> destination = batch -> {
+            if (!isAborted.get()) {
+                try {
+                    queue.put(batch);
+                } catch (InterruptedException ex) {
+                    // FmtLog.error(LOG, ex, "Error: %s", ex.getMessage());
+                    throw new RuntimeException(ex);
+                }
+            } else {
+                throw new RuntimeException(new InterruptedException());
+            }
+        };
+        EltStreamBatcher batcher = new EltStreamBatcher(destination, chunkSize);
+        StreamRDF generatorStream = new StreamToElements(batcher);
 
         // Parser thread
-        Runnable task = ()->{
-            batcher.startBatching();
+        Runnable task = () -> {
+            batcher.startBatching(); // No try/catch block currently needed because this is a no-op
             if ( LOG1.isDebugEnabled() )
                 LOG1.debug("Start parsing");
-            try {
-                for ( RDFParserBuilder parser : parserBuilders ) {
-                    parser.errorHandler(errhandler).parse(generatorStream);
+
+            int n = parserBuilders.size();
+
+            // Under normal operation the loop below runs once more after
+            // processing all (possibly zero!) parsers in order to place
+            // the end-marker on the queue
+
+            // If an error occurs then all parser are invoked anyway because any
+            // resources they own need yet to be closed.
+            // At this point, however, any further errors will be suppressed.
+            boolean errorEncountered = false;
+            for (int i = 0; i <= n; ++i) {
+                RDFParserBuilder parser = i < n ? parserBuilders.get(i) : null;
+
+                if (parser != null) {
+                    try {
+                        parser.parse(generatorStream);
+                    } catch (RuntimeException ex) {
+                        Throwable cause = ex.getCause();
+                        if (errorEncountered) {
+                            LOG1.debug("Suppressed exception", ex);
+                        } else {
+                            if (cause instanceof InterruptedException) {
+                                LOG1.debug("Parsing was interrupted");
+                            } else {
+                                // Parse error.
+                                EltStreamRDFImpl elt = new EltStreamRDFImpl();
+                                elt.exception = ex;
+                                batcher.accept(elt);
+                            }
+                            errorEncountered = true;
+                        }
+                    } catch (Throwable cause) {
+                        if (errorEncountered) {
+                            LOG1.debug("Suppressed exception", cause);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] Aklakan commented on a diff in pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
Aklakan commented on code in PR #1478:
URL: https://github.com/apache/jena/pull/1478#discussion_r965025837


##########
jena-arq/src/main/java/org/apache/jena/riot/system/EltStreamRDF.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.system;
+
+import org.apache.jena.graph.Triple;
+import org.apache.jena.sparql.core.Quad;
+
+/**
+ * Interface for streamRDF parsing elements.
+ */
+public interface EltStreamRDF {

Review Comment:
   Back in [JENA-2309](https://issues.apache.org/jira/browse/JENA-2309) I mentioned the use case for getting individual elements from the parser in order to scan the head of a file for prefixes, thereby stopping once only a configurable amount of triples/quads is seen.
   
   There is `Stream<EltStreamRDF> stream = AsyncParser.of(filename).streamElements();` which uses the interface.
   I did not add the shorthands such as the various `AsyncParser.asyncParseElements` because actually I think the builder covers it quite well but I could add them for consistency.
   
   The intent of the interface is to provide a public API to access elements, whereas `EltStreamRDFImpl` is a rather AsyncParser specific implementation - for example, the parser directly sets the (package visible) attributes without going through any getters/setters or ctors. Not sure if this is actually faster but this is one way it makes sense to implement. The public API however shouldn't be tied to that.
   Also, the interface abstracts whether the element type is conditionally determined based on the non-null attributes (as it is done now) or whether it is stored in a dedicated attribute.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] afs commented on a diff in pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
afs commented on code in PR #1478:
URL: https://github.com/apache/jena/pull/1478#discussion_r964953691


##########
jena-arq/src/main/java/org/apache/jena/riot/system/EltStreamRDF.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.system;
+
+import org.apache.jena.graph.Triple;
+import org.apache.jena.sparql.core.Quad;
+
+/**
+ * Interface for streamRDF parsing elements.
+ */
+public interface EltStreamRDF {

Review Comment:
   Could you explain why this is moved out and an interface? it seems to have one implementation and the Impl is hardwired into the code.
   
   `StreamToElements`, which is local to AsyncParser uses the `Impl`.
   And `IteratorCloseable<EltStreamRDFImpl>` in `AsyncParserBuilder`.
   



##########
jena-arq/src/main/java/org/apache/jena/riot/system/AsyncParser.java:
##########
@@ -199,7 +244,7 @@ protected X moveToNext() {
                         return null;
                     }
                     return x;
-                } catch (InterruptedException e) {
+                } catch (@SuppressWarnings("unused") InterruptedException e) {

Review Comment:
   This `SuppressWarnings` causes a warning (an "info").
   
   "At least one of the problems in category 'unused' is not analysed due to a compiler option being ignored"
   
   The rest of the code does not use "unused".
   
   Please remove.
   
   



##########
jena-arq/src/main/java/org/apache/jena/riot/system/AsyncParser.java:
##########
@@ -239,40 +278,143 @@ public void fatal(String message, long line, long col) {
                 throw new RiotException(SysRIOT.fmtMessage(message, line, col)) ;
             }
         };
+    }
+
+    /** Returns a runnable for stopping the parse process */
+    static Runnable startParserThread(
+            Logger LOG1, List<RDFParserBuilder> parserBuilders,
+            BlockingQueue<List<EltStreamRDFImpl>> queue,
+            int chunkSize,
+            boolean daemonMode) {
+
+        // A flag for whether parsing is considered aborted - regardless
+        // whether due to normal or exceptional conditions
+        AtomicBoolean isAborted = new AtomicBoolean(false);
+
+        // -- Parser thread setup
+        Consumer<List<EltStreamRDFImpl>> destination = batch -> {
+            if (!isAborted.get()) {
+                try {
+                    queue.put(batch);
+                } catch (InterruptedException ex) {
+                    // FmtLog.error(LOG, ex, "Error: %s", ex.getMessage());
+                    throw new RuntimeException(ex);
+                }
+            } else {
+                throw new RuntimeException(new InterruptedException());
+            }
+        };
+        EltStreamBatcher batcher = new EltStreamBatcher(destination, chunkSize);
+        StreamRDF generatorStream = new StreamToElements(batcher);
 
         // Parser thread
-        Runnable task = ()->{
-            batcher.startBatching();
+        Runnable task = () -> {
+            batcher.startBatching(); // No try/catch block currently needed because this is a no-op
             if ( LOG1.isDebugEnabled() )
                 LOG1.debug("Start parsing");
-            try {
-                for ( RDFParserBuilder parser : parserBuilders ) {
-                    parser.errorHandler(errhandler).parse(generatorStream);
+
+            int n = parserBuilders.size();
+
+            // Under normal operation the loop below runs once more after
+            // processing all (possibly zero!) parsers in order to place
+            // the end-marker on the queue
+
+            // If an error occurs then all parser are invoked anyway because any
+            // resources they own need yet to be closed.
+            // At this point, however, any further errors will be suppressed.
+            boolean errorEncountered = false;
+            for (int i = 0; i <= n; ++i) {
+                RDFParserBuilder parser = i < n ? parserBuilders.get(i) : null;
+
+                if (parser != null) {
+                    try {
+                        parser.parse(generatorStream);
+                    } catch (RuntimeException ex) {
+                        Throwable cause = ex.getCause();
+                        if (errorEncountered) {
+                            LOG1.debug("Suppressed exception", ex);
+                        } else {
+                            if (cause instanceof InterruptedException) {
+                                LOG1.debug("Parsing was interrupted");
+                            } else {
+                                // Parse error.
+                                EltStreamRDFImpl elt = new EltStreamRDFImpl();
+                                elt.exception = ex;
+                                batcher.accept(elt);
+                            }
+                            errorEncountered = true;
+                        }
+                    } catch (Throwable cause) {
+                        if (errorEncountered) {
+                            LOG1.debug("Suppressed exception", cause);

Review Comment:
   Style point: other debug calls have  `if ( LOG1.isDebugEnabled() )`.
   
   Even if unnecessary, a common style helps.
   



##########
jena-arq/src/main/java/org/apache/jena/riot/system/AsyncParser.java:
##########
@@ -239,40 +278,143 @@ public void fatal(String message, long line, long col) {
                 throw new RiotException(SysRIOT.fmtMessage(message, line, col)) ;
             }
         };
+    }
+
+    /** Returns a runnable for stopping the parse process */
+    static Runnable startParserThread(
+            Logger LOG1, List<RDFParserBuilder> parserBuilders,
+            BlockingQueue<List<EltStreamRDFImpl>> queue,
+            int chunkSize,
+            boolean daemonMode) {
+
+        // A flag for whether parsing is considered aborted - regardless
+        // whether due to normal or exceptional conditions
+        AtomicBoolean isAborted = new AtomicBoolean(false);
+
+        // -- Parser thread setup
+        Consumer<List<EltStreamRDFImpl>> destination = batch -> {
+            if (!isAborted.get()) {
+                try {
+                    queue.put(batch);
+                } catch (InterruptedException ex) {
+                    // FmtLog.error(LOG, ex, "Error: %s", ex.getMessage());
+                    throw new RuntimeException(ex);
+                }
+            } else {
+                throw new RuntimeException(new InterruptedException());
+            }
+        };
+        EltStreamBatcher batcher = new EltStreamBatcher(destination, chunkSize);
+        StreamRDF generatorStream = new StreamToElements(batcher);
 
         // Parser thread
-        Runnable task = ()->{
-            batcher.startBatching();
+        Runnable task = () -> {
+            batcher.startBatching(); // No try/catch block currently needed because this is a no-op

Review Comment:
   startBatching and finishBatching no longer line up.
   finishBatching is conditional. Can it ever fail to be called? Comments needed.
   



##########
jena-arq/src/main/java/org/apache/jena/riot/system/AsyncParser.java:
##########
@@ -51,143 +56,183 @@
  * <p>
  * There are overheads, so this is only beneficial in some situations. Delivery to
  * the StreamRDF has an initial latency while the first batch of work is accumulated.
+ * Using the {@link Builder} gives control over the chunk size such that initial latency

Review Comment:
   Bad javadoc.
   
   s/Builder/AsyncParserBuilder/



##########
jena-arq/src/main/java/org/apache/jena/riot/system/AsyncParser.java:
##########
@@ -239,40 +278,143 @@ public void fatal(String message, long line, long col) {
                 throw new RiotException(SysRIOT.fmtMessage(message, line, col)) ;
             }
         };
+    }
+
+    /** Returns a runnable for stopping the parse process */
+    static Runnable startParserThread(
+            Logger LOG1, List<RDFParserBuilder> parserBuilders,
+            BlockingQueue<List<EltStreamRDFImpl>> queue,
+            int chunkSize,
+            boolean daemonMode) {
+
+        // A flag for whether parsing is considered aborted - regardless
+        // whether due to normal or exceptional conditions
+        AtomicBoolean isAborted = new AtomicBoolean(false);
+
+        // -- Parser thread setup
+        Consumer<List<EltStreamRDFImpl>> destination = batch -> {
+            if (!isAborted.get()) {
+                try {
+                    queue.put(batch);
+                } catch (InterruptedException ex) {
+                    // FmtLog.error(LOG, ex, "Error: %s", ex.getMessage());
+                    throw new RuntimeException(ex);
+                }
+            } else {
+                throw new RuntimeException(new InterruptedException());
+            }
+        };
+        EltStreamBatcher batcher = new EltStreamBatcher(destination, chunkSize);
+        StreamRDF generatorStream = new StreamToElements(batcher);
 
         // Parser thread
-        Runnable task = ()->{
-            batcher.startBatching();
+        Runnable task = () -> {
+            batcher.startBatching(); // No try/catch block currently needed because this is a no-op

Review Comment:
   Personally, this seems odd. It is a no-op currently but implementation may change. What is the comment recording?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] Aklakan commented on pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
Aklakan commented on PR #1478:
URL: https://github.com/apache/jena/pull/1478#issuecomment-1237918861

   > There are too many factors [...]
   
   Yes, the only change I made to the processing code is the addition of an AtomicBoolean that is checked only after every chunk for whether to terminate the parser - so there is an extra check now but the overhead should be neglectable (considering the amount of work it takes to turn bytes into elements).
   
   A current limitation is, that with large chunk sizes only taking the first few elements still has to wait for the first chunk. Though now one can at least control the chunk size.
   
   In principle the builder API could in a future PR be extended to improve this, such as by adding support for a custom Predicate<EltStreamRDF> that allows for stopping the parser and dispatching the current chunk immediately.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] Aklakan commented on a diff in pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
Aklakan commented on code in PR #1478:
URL: https://github.com/apache/jena/pull/1478#discussion_r965133599


##########
jena-arq/src/main/java/org/apache/jena/riot/system/EltStreamRDFImpl.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.system;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import org.apache.jena.graph.Triple;
+import org.apache.jena.riot.out.NodeFmtLib;
+import org.apache.jena.sparql.core.Quad;
+
+/** An item of a StreamRDF, including exceptions. Internal API. */
+public class EltStreamRDFImpl
+    implements EltStreamRDF, Serializable
+{
+    private static final long serialVersionUID = 1L;
+
+    Triple triple = null;
+    Quad quad = null;
+    String prefix = null; // Null implies "base".
+    String iri = null;
+    Throwable exception = null;
+
+    public EltStreamRDFImpl() {}
+
+    /* Prefer static constructors in EltStreamRDF */
+
+    public EltStreamRDFImpl(Triple triple) { this.triple = triple; }
+    public EltStreamRDFImpl(Quad quad) { this.quad = quad; }
+    public EltStreamRDFImpl(String prefix, String iri) { this.prefix = prefix; this.iri = iri; }
+    public EltStreamRDFImpl(String iri) { this.iri = iri; }
+    public EltStreamRDFImpl(Throwable exception) { this.exception = exception; }
+
+    @Override public boolean   isTriple()     { return triple != null; }
+    @Override public Triple    getTriple()    { return triple; }
+    @Override public boolean   isQuad()       { return quad != null; }
+    @Override public Quad      getQuad()      { return quad; }
+    @Override public boolean   isPrefix()     { return prefix != null; }
+    @Override public String    getPrefix()    { return prefix; }
+    @Override public boolean   isBase()       { return prefix == null && iri != null; }
+    @Override public String    getIri()       { return iri; }
+    @Override public boolean   isException()  { return exception != null; }
+    @Override public Throwable getException() { return exception; }
+
+    @Override
+    public EltStreamRDFType getType() {

Review Comment:
   > which also means it can't extends an interface
   
   I guess I have to try that out myself - so far I haven't. From the snippets I find on the web I would have thought something like this might work:
   ```
   interface Foo { String name(); }
   record Bar (String name) implements Foo;
   ```
   Its not clear to me whether bean properties (methods with set/get/is prefixes) would still auto-map to the record fields.
   
   In any case, I can add your suggestions which would make EltStreamRDFImpl "clean" to use publicly.
   Though there is then still the question whether the ctors should be public or whether construction should only be possible with the named static methods such as `EltStreamRDF.triple(t)` - rather than `new EltStreamRDF(t)`.
   
   The intent of the interface was to decouple client code from these implementation considerations so that the implementation can evolve independently - but I am fine with removing it if it turns out that one implementation can be effectively authorative.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] afs commented on a diff in pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
afs commented on code in PR #1478:
URL: https://github.com/apache/jena/pull/1478#discussion_r965044562


##########
jena-arq/src/main/java/org/apache/jena/riot/system/AsyncParser.java:
##########
@@ -239,40 +278,143 @@ public void fatal(String message, long line, long col) {
                 throw new RiotException(SysRIOT.fmtMessage(message, line, col)) ;
             }
         };
+    }
+
+    /** Returns a runnable for stopping the parse process */
+    static Runnable startParserThread(
+            Logger LOG1, List<RDFParserBuilder> parserBuilders,
+            BlockingQueue<List<EltStreamRDFImpl>> queue,
+            int chunkSize,
+            boolean daemonMode) {
+
+        // A flag for whether parsing is considered aborted - regardless
+        // whether due to normal or exceptional conditions
+        AtomicBoolean isAborted = new AtomicBoolean(false);
+
+        // -- Parser thread setup
+        Consumer<List<EltStreamRDFImpl>> destination = batch -> {
+            if (!isAborted.get()) {
+                try {
+                    queue.put(batch);
+                } catch (InterruptedException ex) {
+                    // FmtLog.error(LOG, ex, "Error: %s", ex.getMessage());
+                    throw new RuntimeException(ex);
+                }
+            } else {
+                throw new RuntimeException(new InterruptedException());
+            }
+        };
+        EltStreamBatcher batcher = new EltStreamBatcher(destination, chunkSize);
+        StreamRDF generatorStream = new StreamToElements(batcher);
 
         // Parser thread
-        Runnable task = ()->{
-            batcher.startBatching();
+        Runnable task = () -> {
+            batcher.startBatching(); // No try/catch block currently needed because this is a no-op

Review Comment:
   But does it do any harm?
   
   Thinking long term - clear code such as "start-finish" pairing, or "start-finish|abort" or "start-shutdown(current "finish")-finish" is useful to have for the next person (who may be yourself in a few PRs time!)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] Aklakan commented on a diff in pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
Aklakan commented on code in PR #1478:
URL: https://github.com/apache/jena/pull/1478#discussion_r965133599


##########
jena-arq/src/main/java/org/apache/jena/riot/system/EltStreamRDFImpl.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.system;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import org.apache.jena.graph.Triple;
+import org.apache.jena.riot.out.NodeFmtLib;
+import org.apache.jena.sparql.core.Quad;
+
+/** An item of a StreamRDF, including exceptions. Internal API. */
+public class EltStreamRDFImpl
+    implements EltStreamRDF, Serializable
+{
+    private static final long serialVersionUID = 1L;
+
+    Triple triple = null;
+    Quad quad = null;
+    String prefix = null; // Null implies "base".
+    String iri = null;
+    Throwable exception = null;
+
+    public EltStreamRDFImpl() {}
+
+    /* Prefer static constructors in EltStreamRDF */
+
+    public EltStreamRDFImpl(Triple triple) { this.triple = triple; }
+    public EltStreamRDFImpl(Quad quad) { this.quad = quad; }
+    public EltStreamRDFImpl(String prefix, String iri) { this.prefix = prefix; this.iri = iri; }
+    public EltStreamRDFImpl(String iri) { this.iri = iri; }
+    public EltStreamRDFImpl(Throwable exception) { this.exception = exception; }
+
+    @Override public boolean   isTriple()     { return triple != null; }
+    @Override public Triple    getTriple()    { return triple; }
+    @Override public boolean   isQuad()       { return quad != null; }
+    @Override public Quad      getQuad()      { return quad; }
+    @Override public boolean   isPrefix()     { return prefix != null; }
+    @Override public String    getPrefix()    { return prefix; }
+    @Override public boolean   isBase()       { return prefix == null && iri != null; }
+    @Override public String    getIri()       { return iri; }
+    @Override public boolean   isException()  { return exception != null; }
+    @Override public Throwable getException() { return exception; }
+
+    @Override
+    public EltStreamRDFType getType() {

Review Comment:
   > which also means it can't extends an interface
   
   I guess I have to try that out myself - so far I haven't. From the snippets I find on the web I would have thought something like this might work:
   ```
   interface Foo { String name(); }
   record Bar (String name) implements Foo;
   ```
   Its not clear to me whether bean properties (methods with set/get/is prefixes) would still auto-map to the record fields.
   
   In any case, I can add your suggestions which would make EltStreamRDFImpl "clean" to use publicly.
   Though there is then still the question whether the ctors should be public or whether construction should only be possible with the named static methods such as `EltStreamRDF.triple(t)` - rather than `new EltStreamRDF(t)`.
   
   The intent of the interface was to decouple client code from these implementation considerations so that the implementation can evolve independently - but I am fine with removing it if it turns out that one implementation can be effectively authoritative.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] Aklakan commented on a diff in pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
Aklakan commented on code in PR #1478:
URL: https://github.com/apache/jena/pull/1478#discussion_r965149754


##########
jena-arq/src/main/java/org/apache/jena/riot/system/AsyncParser.java:
##########
@@ -239,40 +278,143 @@ public void fatal(String message, long line, long col) {
                 throw new RiotException(SysRIOT.fmtMessage(message, line, col)) ;
             }
         };
+    }
+
+    /** Returns a runnable for stopping the parse process */
+    static Runnable startParserThread(
+            Logger LOG1, List<RDFParserBuilder> parserBuilders,
+            BlockingQueue<List<EltStreamRDFImpl>> queue,
+            int chunkSize,
+            boolean daemonMode) {
+
+        // A flag for whether parsing is considered aborted - regardless
+        // whether due to normal or exceptional conditions
+        AtomicBoolean isAborted = new AtomicBoolean(false);
+
+        // -- Parser thread setup
+        Consumer<List<EltStreamRDFImpl>> destination = batch -> {
+            if (!isAborted.get()) {
+                try {
+                    queue.put(batch);
+                } catch (InterruptedException ex) {
+                    // FmtLog.error(LOG, ex, "Error: %s", ex.getMessage());
+                    throw new RuntimeException(ex);
+                }
+            } else {
+                throw new RuntimeException(new InterruptedException());
+            }
+        };
+        EltStreamBatcher batcher = new EltStreamBatcher(destination, chunkSize);
+        StreamRDF generatorStream = new StreamToElements(batcher);
 
         // Parser thread
-        Runnable task = ()->{
-            batcher.startBatching();
+        Runnable task = () -> {
+            batcher.startBatching(); // No try/catch block currently needed because this is a no-op

Review Comment:
   > But does it do any harm?
   
   Right now the chunk-destination raises an exception if it is in abort state intended to forcefully kill the parser thread once it sends out another chunk.
   Calling finishBatching with isAborted=true would attempt to dispatch the remaining batch and poison to the destination which is guaranteed to raise an exception.
   Maybe in order to improve the structure, the `Runnable task = ()` would have to be factored out into its own class (that implements Runnable), so that parts of the process can be put into their own methods.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] Aklakan commented on pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
Aklakan commented on PR #1478:
URL: https://github.com/apache/jena/pull/1478#issuecomment-1225813806

   I searched jena-site for AsyncParser and extended the section about it: https://github.com/apache/jena-site/pull/115
   
   Today I tested the `tdb2.tdbloader` tool built with this PR on a 10mio triples dataset and it was functional.
   Tomorrow I will do a final performance comparison with the 4.5.0 loader to make sure I didn't accidentally mess something up.
   This should then conclude the PR.
   
   (Also the commit message now make use of use capital `GH-` prefix and are squashed).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] Aklakan commented on a diff in pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
Aklakan commented on code in PR #1478:
URL: https://github.com/apache/jena/pull/1478#discussion_r965025837


##########
jena-arq/src/main/java/org/apache/jena/riot/system/EltStreamRDF.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.system;
+
+import org.apache.jena.graph.Triple;
+import org.apache.jena.sparql.core.Quad;
+
+/**
+ * Interface for streamRDF parsing elements.
+ */
+public interface EltStreamRDF {

Review Comment:
   Back in [JENA-2309](https://issues.apache.org/jira/browse/JENA-2309) I mentioned the use case for getting individual elements from the parser in order to scan the head of a file for prefixes, thereby stopping once only a configurable amount of triples/quads is seen.
   
   There is `Stream<EltStreamRDF> stream = AsyncParser.of(filename).streamElements();` and the iterator-based version which use the interface.
   I did not add the various shorthands for `AsyncParser.asyncParseElements(filename | inputstream | source | etc.)`  because the builder already covers this uniformly but I could add them for consistency if desired.
   
   The intent of the interface is to provide a public API to access elements, whereas `EltStreamRDFImpl` is a rather AsyncParser specific implementation - for example, the parser directly sets the (package visible) attributes without going through any getters/setters or ctors. Not sure if this is actually faster but this is one way it makes sense to implement. The public API however shouldn't be tied to that.
   Also, the interface abstracts whether the element type is conditionally determined based on the non-null attributes (as it is done now) or whether it is stored in a dedicated attribute.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] Aklakan commented on a diff in pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
Aklakan commented on code in PR #1478:
URL: https://github.com/apache/jena/pull/1478#discussion_r968036093


##########
jena-arq/src/main/java/org/apache/jena/riot/system/AsyncParser.java:
##########
@@ -239,40 +278,143 @@ public void fatal(String message, long line, long col) {
                 throw new RiotException(SysRIOT.fmtMessage(message, line, col)) ;
             }
         };
+    }
+
+    /** Returns a runnable for stopping the parse process */
+    static Runnable startParserThread(
+            Logger LOG1, List<RDFParserBuilder> parserBuilders,
+            BlockingQueue<List<EltStreamRDFImpl>> queue,
+            int chunkSize,
+            boolean daemonMode) {
+
+        // A flag for whether parsing is considered aborted - regardless
+        // whether due to normal or exceptional conditions
+        AtomicBoolean isAborted = new AtomicBoolean(false);
+
+        // -- Parser thread setup
+        Consumer<List<EltStreamRDFImpl>> destination = batch -> {
+            if (!isAborted.get()) {
+                try {
+                    queue.put(batch);
+                } catch (InterruptedException ex) {
+                    // FmtLog.error(LOG, ex, "Error: %s", ex.getMessage());
+                    throw new RuntimeException(ex);
+                }
+            } else {
+                throw new RuntimeException(new InterruptedException());
+            }
+        };
+        EltStreamBatcher batcher = new EltStreamBatcher(destination, chunkSize);
+        StreamRDF generatorStream = new StreamToElements(batcher);
 
         // Parser thread
-        Runnable task = ()->{
-            batcher.startBatching();
+        Runnable task = () -> {
+            batcher.startBatching(); // No try/catch block currently needed because this is a no-op

Review Comment:
   I factored out `Runnable task = () -> {...};` into its own `Task` class. There the run method has a clearer structure making use of private helper methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] Aklakan commented on a diff in pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
Aklakan commented on code in PR #1478:
URL: https://github.com/apache/jena/pull/1478#discussion_r968037629


##########
jena-arq/src/main/java/org/apache/jena/riot/system/AsyncParser.java:
##########
@@ -51,143 +56,183 @@
  * <p>
  * There are overheads, so this is only beneficial in some situations. Delivery to
  * the StreamRDF has an initial latency while the first batch of work is accumulated.
+ * Using the {@link Builder} gives control over the chunk size such that initial latency

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] Aklakan commented on a diff in pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
Aklakan commented on code in PR #1478:
URL: https://github.com/apache/jena/pull/1478#discussion_r965025837


##########
jena-arq/src/main/java/org/apache/jena/riot/system/EltStreamRDF.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.system;
+
+import org.apache.jena.graph.Triple;
+import org.apache.jena.sparql.core.Quad;
+
+/**
+ * Interface for streamRDF parsing elements.
+ */
+public interface EltStreamRDF {

Review Comment:
   Back in [JENA-2309](https://issues.apache.org/jira/browse/JENA-2309) I mentioned the use case for getting individual elements from the parser in order to scan the head of a file for prefixes, thereby stopping once only a configurable amount of triples/quads is seen.
   
   There is `Stream<EltStreamRDF> stream = AsyncParser.of(filename).streamElements();` which uses the interface.
   
   The intent of the interface is to provide a public API to access elements, whereas `EltStreamRDFImpl` is a rather AsyncParser specific implementation - for example, the parser directly sets the (package visible) attributes without going through any getters/setters or ctors. Not sure if this is actually faster but this is one way it makes sense to implement. The public API however shouldn't be tied to that.
   Also, the interface abstracts whether the element type is conditionally determined based on the non-null attributes (as it is done now) or whether it is stored in a dedicated attribute.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] Aklakan commented on a diff in pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
Aklakan commented on code in PR #1478:
URL: https://github.com/apache/jena/pull/1478#discussion_r965149754


##########
jena-arq/src/main/java/org/apache/jena/riot/system/AsyncParser.java:
##########
@@ -239,40 +278,143 @@ public void fatal(String message, long line, long col) {
                 throw new RiotException(SysRIOT.fmtMessage(message, line, col)) ;
             }
         };
+    }
+
+    /** Returns a runnable for stopping the parse process */
+    static Runnable startParserThread(
+            Logger LOG1, List<RDFParserBuilder> parserBuilders,
+            BlockingQueue<List<EltStreamRDFImpl>> queue,
+            int chunkSize,
+            boolean daemonMode) {
+
+        // A flag for whether parsing is considered aborted - regardless
+        // whether due to normal or exceptional conditions
+        AtomicBoolean isAborted = new AtomicBoolean(false);
+
+        // -- Parser thread setup
+        Consumer<List<EltStreamRDFImpl>> destination = batch -> {
+            if (!isAborted.get()) {
+                try {
+                    queue.put(batch);
+                } catch (InterruptedException ex) {
+                    // FmtLog.error(LOG, ex, "Error: %s", ex.getMessage());
+                    throw new RuntimeException(ex);
+                }
+            } else {
+                throw new RuntimeException(new InterruptedException());
+            }
+        };
+        EltStreamBatcher batcher = new EltStreamBatcher(destination, chunkSize);
+        StreamRDF generatorStream = new StreamToElements(batcher);
 
         // Parser thread
-        Runnable task = ()->{
-            batcher.startBatching();
+        Runnable task = () -> {
+            batcher.startBatching(); // No try/catch block currently needed because this is a no-op

Review Comment:
   > But does it do any harm?
   
   Right now the chunk-destination raises an exception if it is in abort state intended to forcefully kill the parser thread once it triggers sending out another chunk.
   Calling finishBatching with isAborted=true would attempt to dispatch the remaining batch and poison to the destination which is guaranteed to raise an exception.
   Maybe in order to improve the structure, the `Runnable task = ()` would have to be factored out into its own class (that implements Runnable), so that parts of the process can be put into their own methods.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] afs commented on pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
afs commented on PR #1478:
URL: https://github.com/apache/jena/pull/1478#issuecomment-1237799506

   There are too many factors (e.g. JIT effects) to get really stable performance figures without very long warm-up and timing runs. Even when the rest of the machine is "quiet", there are things going on outside the JVM.
   
   I don't think anything changed between 4.5.0 and 4.6.1 that directly impacts this area. But seemingly unrelated changes elsewhere can affect when the JIT decides to kick-in and can make for visible differences.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] afs commented on pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
afs commented on PR #1478:
URL: https://github.com/apache/jena/pull/1478#issuecomment-1239262950

   Not a worry about the cost for the benefit here because it is per chunk but ... `AtomicBoolean` is not so negligible. It's a non-local memory load and they take forever.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] Aklakan commented on pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
Aklakan commented on PR #1478:
URL: https://github.com/apache/jena/pull/1478#issuecomment-1239766289

   I just realized that the atomic boolean is not sufficient to synchronize between
   ```
   Iterator.close() { if (!isAborted.get()) { isAborted.set(true); queue.clear()} }
   Destination.receive(chunk) { if(!isAborted.get() { queue.add(chunk); }}
   ```
   
   When calling close(), the control flow in the destination may be after abort check but before the queue.add.
   So it can result in the queue being cleared by close, then one more chunk is added, and only then the poison is added.
   This can effectively lead to garbage data becoming returned. This needs a synchronized block.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] Aklakan commented on a diff in pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
Aklakan commented on code in PR #1478:
URL: https://github.com/apache/jena/pull/1478#discussion_r965038559


##########
jena-arq/src/main/java/org/apache/jena/riot/system/AsyncParser.java:
##########
@@ -239,40 +278,143 @@ public void fatal(String message, long line, long col) {
                 throw new RiotException(SysRIOT.fmtMessage(message, line, col)) ;
             }
         };
+    }
+
+    /** Returns a runnable for stopping the parse process */
+    static Runnable startParserThread(
+            Logger LOG1, List<RDFParserBuilder> parserBuilders,
+            BlockingQueue<List<EltStreamRDFImpl>> queue,
+            int chunkSize,
+            boolean daemonMode) {
+
+        // A flag for whether parsing is considered aborted - regardless
+        // whether due to normal or exceptional conditions
+        AtomicBoolean isAborted = new AtomicBoolean(false);
+
+        // -- Parser thread setup
+        Consumer<List<EltStreamRDFImpl>> destination = batch -> {
+            if (!isAborted.get()) {
+                try {
+                    queue.put(batch);
+                } catch (InterruptedException ex) {
+                    // FmtLog.error(LOG, ex, "Error: %s", ex.getMessage());
+                    throw new RuntimeException(ex);
+                }
+            } else {
+                throw new RuntimeException(new InterruptedException());
+            }
+        };
+        EltStreamBatcher batcher = new EltStreamBatcher(destination, chunkSize);
+        StreamRDF generatorStream = new StreamToElements(batcher);
 
         // Parser thread
-        Runnable task = ()->{
-            batcher.startBatching();
+        Runnable task = () -> {
+            batcher.startBatching(); // No try/catch block currently needed because this is a no-op

Review Comment:
   If close() is invoked and isAborted is false then isAborted is atomically set to true. Furthermore, the close()-logic proceeds to clear the queue and place the poison on it.
   I essence, if isAborted is true, then it means that the poison is (about to be) placed on the queue - and therefore finishBatching is no longer needed.
   
   In that case finishBatching is suppressed because it would redundantly try to place another poison on the queue.



##########
jena-arq/src/main/java/org/apache/jena/riot/system/AsyncParser.java:
##########
@@ -239,40 +278,143 @@ public void fatal(String message, long line, long col) {
                 throw new RiotException(SysRIOT.fmtMessage(message, line, col)) ;
             }
         };
+    }
+
+    /** Returns a runnable for stopping the parse process */
+    static Runnable startParserThread(
+            Logger LOG1, List<RDFParserBuilder> parserBuilders,
+            BlockingQueue<List<EltStreamRDFImpl>> queue,
+            int chunkSize,
+            boolean daemonMode) {
+
+        // A flag for whether parsing is considered aborted - regardless
+        // whether due to normal or exceptional conditions
+        AtomicBoolean isAborted = new AtomicBoolean(false);
+
+        // -- Parser thread setup
+        Consumer<List<EltStreamRDFImpl>> destination = batch -> {
+            if (!isAborted.get()) {
+                try {
+                    queue.put(batch);
+                } catch (InterruptedException ex) {
+                    // FmtLog.error(LOG, ex, "Error: %s", ex.getMessage());
+                    throw new RuntimeException(ex);
+                }
+            } else {
+                throw new RuntimeException(new InterruptedException());
+            }
+        };
+        EltStreamBatcher batcher = new EltStreamBatcher(destination, chunkSize);
+        StreamRDF generatorStream = new StreamToElements(batcher);
 
         // Parser thread
-        Runnable task = ()->{
-            batcher.startBatching();
+        Runnable task = () -> {
+            batcher.startBatching(); // No try/catch block currently needed because this is a no-op

Review Comment:
   If close() is invoked and isAborted is false then isAborted is atomically set to true. Furthermore, the close()-logic proceeds to clear the queue and place the poison on it.
   I essence, if isAborted is true, then it means that the poison is (about to be) placed on the queue.
   
   In that case finishBatching is suppressed because it would redundantly try to place another poison on the queue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] afs commented on a diff in pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
afs commented on code in PR #1478:
URL: https://github.com/apache/jena/pull/1478#discussion_r965087736


##########
jena-arq/src/main/java/org/apache/jena/riot/system/EltStreamRDFImpl.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.system;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import org.apache.jena.graph.Triple;
+import org.apache.jena.riot.out.NodeFmtLib;
+import org.apache.jena.sparql.core.Quad;
+
+/** An item of a StreamRDF, including exceptions. Internal API. */
+public class EltStreamRDFImpl
+    implements EltStreamRDF, Serializable
+{
+    private static final long serialVersionUID = 1L;
+
+    Triple triple = null;
+    Quad quad = null;
+    String prefix = null; // Null implies "base".
+    String iri = null;
+    Throwable exception = null;
+
+    public EltStreamRDFImpl() {}
+
+    /* Prefer static constructors in EltStreamRDF */
+
+    public EltStreamRDFImpl(Triple triple) { this.triple = triple; }
+    public EltStreamRDFImpl(Quad quad) { this.quad = quad; }
+    public EltStreamRDFImpl(String prefix, String iri) { this.prefix = prefix; this.iri = iri; }
+    public EltStreamRDFImpl(String iri) { this.iri = iri; }
+    public EltStreamRDFImpl(Throwable exception) { this.exception = exception; }
+
+    @Override public boolean   isTriple()     { return triple != null; }
+    @Override public Triple    getTriple()    { return triple; }
+    @Override public boolean   isQuad()       { return quad != null; }
+    @Override public Quad      getQuad()      { return quad; }
+    @Override public boolean   isPrefix()     { return prefix != null; }
+    @Override public String    getPrefix()    { return prefix; }
+    @Override public boolean   isBase()       { return prefix == null && iri != null; }
+    @Override public String    getIri()       { return iri; }
+    @Override public boolean   isException()  { return exception != null; }
+    @Override public Throwable getException() { return exception; }
+
+    @Override
+    public EltStreamRDFType getType() {

Review Comment:
   This class is really a tagged union value (immutable).
   
   Suggestions
   * have a slot for the type and set the type when constructed. 
   * make all fields final.
   * use `Objects.requireNonNull` in constructors to catch problems early. Then there is a guarantee one and only one slot is non-null.
   
   Its tedious writing out the duplication but it is not visible.
   
   And maybe it will be a Java `record` at Java17 (which also means it can't be an interface - see the implementation-of-one comment).
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] Aklakan commented on a diff in pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
Aklakan commented on code in PR #1478:
URL: https://github.com/apache/jena/pull/1478#discussion_r968036931


##########
jena-arq/src/main/java/org/apache/jena/riot/system/EltStreamRDF.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.system;
+
+import org.apache.jena.graph.Triple;
+import org.apache.jena.sparql.core.Quad;
+
+/**
+ * Interface for streamRDF parsing elements.
+ */
+public interface EltStreamRDF {

Review Comment:
   EltStreamRDF Interface replaced with the implementation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] Aklakan commented on a diff in pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
Aklakan commented on code in PR #1478:
URL: https://github.com/apache/jena/pull/1478#discussion_r965025837


##########
jena-arq/src/main/java/org/apache/jena/riot/system/EltStreamRDF.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.system;
+
+import org.apache.jena.graph.Triple;
+import org.apache.jena.sparql.core.Quad;
+
+/**
+ * Interface for streamRDF parsing elements.
+ */
+public interface EltStreamRDF {

Review Comment:
   Back in [JENA-2309](https://issues.apache.org/jira/browse/JENA-2309) I mentioned the use case for getting individual elements from the parser in order to scan the head of a file for prefixes, thereby stopping once only a configurable amount of triples/quads is seen.
   
   The intent of the interface is to provide a public API to access elements, whereas `EltStreamRDFImpl` is a rather AsyncParser specific implementation - for example, the parser directly sets the (package visible) attributes without going through any getters/setters or ctors. Not sure if this is actually faster but this is one way it makes sense to implement. The public API however shouldn't be tied to that.
   Also, the interface abstracts whether the element type is conditionally determined based on the non-null attributes (as it is done now) or whether it is stored in a dedicated attribute.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] Aklakan commented on a diff in pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
Aklakan commented on code in PR #1478:
URL: https://github.com/apache/jena/pull/1478#discussion_r965038559


##########
jena-arq/src/main/java/org/apache/jena/riot/system/AsyncParser.java:
##########
@@ -239,40 +278,143 @@ public void fatal(String message, long line, long col) {
                 throw new RiotException(SysRIOT.fmtMessage(message, line, col)) ;
             }
         };
+    }
+
+    /** Returns a runnable for stopping the parse process */
+    static Runnable startParserThread(
+            Logger LOG1, List<RDFParserBuilder> parserBuilders,
+            BlockingQueue<List<EltStreamRDFImpl>> queue,
+            int chunkSize,
+            boolean daemonMode) {
+
+        // A flag for whether parsing is considered aborted - regardless
+        // whether due to normal or exceptional conditions
+        AtomicBoolean isAborted = new AtomicBoolean(false);
+
+        // -- Parser thread setup
+        Consumer<List<EltStreamRDFImpl>> destination = batch -> {
+            if (!isAborted.get()) {
+                try {
+                    queue.put(batch);
+                } catch (InterruptedException ex) {
+                    // FmtLog.error(LOG, ex, "Error: %s", ex.getMessage());
+                    throw new RuntimeException(ex);
+                }
+            } else {
+                throw new RuntimeException(new InterruptedException());
+            }
+        };
+        EltStreamBatcher batcher = new EltStreamBatcher(destination, chunkSize);
+        StreamRDF generatorStream = new StreamToElements(batcher);
 
         // Parser thread
-        Runnable task = ()->{
-            batcher.startBatching();
+        Runnable task = () -> {
+            batcher.startBatching(); // No try/catch block currently needed because this is a no-op

Review Comment:
   If close() is invoked and isAborted is false then isAborted is atomically set to true. Furthermore, the close()-logic proceeds to clear the queue and place the poison on it. The clear happens to ensure that there is space for the poison. The atomic operation prevents further chunks to be placed on the queue in the meantime.
   I essence, if isAborted is true, then it means that the poison is (just about to be) placed on the queue.
   
   In that case finishBatching is suppressed because it would redundantly try to place another poison on the queue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] Aklakan commented on a diff in pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
Aklakan commented on code in PR #1478:
URL: https://github.com/apache/jena/pull/1478#discussion_r965038559


##########
jena-arq/src/main/java/org/apache/jena/riot/system/AsyncParser.java:
##########
@@ -239,40 +278,143 @@ public void fatal(String message, long line, long col) {
                 throw new RiotException(SysRIOT.fmtMessage(message, line, col)) ;
             }
         };
+    }
+
+    /** Returns a runnable for stopping the parse process */
+    static Runnable startParserThread(
+            Logger LOG1, List<RDFParserBuilder> parserBuilders,
+            BlockingQueue<List<EltStreamRDFImpl>> queue,
+            int chunkSize,
+            boolean daemonMode) {
+
+        // A flag for whether parsing is considered aborted - regardless
+        // whether due to normal or exceptional conditions
+        AtomicBoolean isAborted = new AtomicBoolean(false);
+
+        // -- Parser thread setup
+        Consumer<List<EltStreamRDFImpl>> destination = batch -> {
+            if (!isAborted.get()) {
+                try {
+                    queue.put(batch);
+                } catch (InterruptedException ex) {
+                    // FmtLog.error(LOG, ex, "Error: %s", ex.getMessage());
+                    throw new RuntimeException(ex);
+                }
+            } else {
+                throw new RuntimeException(new InterruptedException());
+            }
+        };
+        EltStreamBatcher batcher = new EltStreamBatcher(destination, chunkSize);
+        StreamRDF generatorStream = new StreamToElements(batcher);
 
         // Parser thread
-        Runnable task = ()->{
-            batcher.startBatching();
+        Runnable task = () -> {
+            batcher.startBatching(); // No try/catch block currently needed because this is a no-op

Review Comment:
   If close() is invoked and isAborted is false then isAborted is atomically set to true. Furthermore, the close()-logic proceeds to clear the queue and place the poison on it. The clear happens to ensure that there is space for the poison.
   I essence, if isAborted is true, then it means that the poison is (just about to be) placed on the queue.
   
   In that case finishBatching is suppressed because it would redundantly try to place another poison on the queue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] afs commented on pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
afs commented on PR #1478:
URL: https://github.com/apache/jena/pull/1478#issuecomment-1249267696

   There are a few changes going it. What's the status?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] Aklakan commented on a diff in pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
Aklakan commented on code in PR #1478:
URL: https://github.com/apache/jena/pull/1478#discussion_r965025837


##########
jena-arq/src/main/java/org/apache/jena/riot/system/EltStreamRDF.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.system;
+
+import org.apache.jena.graph.Triple;
+import org.apache.jena.sparql.core.Quad;
+
+/**
+ * Interface for streamRDF parsing elements.
+ */
+public interface EltStreamRDF {

Review Comment:
   Back in [JENA-2309](https://issues.apache.org/jira/browse/JENA-2309) I mentioned the use case for getting individual elements from the parser in order to scan the head of a file for prefixes, thereby stopping once only a configurable amount of triples/quads is seen.
   
   There is `Stream<EltStreamRDF> stream = AsyncParser.of(filename).streamElements();` which uses the interface.
   I did not add the various shorthands for `AsyncParser.asyncParseElements(filename | inputstream | source | etc.)`  because the builder already covers this uniformly but I could add them for consistency if desired.
   
   The intent of the interface is to provide a public API to access elements, whereas `EltStreamRDFImpl` is a rather AsyncParser specific implementation - for example, the parser directly sets the (package visible) attributes without going through any getters/setters or ctors. Not sure if this is actually faster but this is one way it makes sense to implement. The public API however shouldn't be tied to that.
   Also, the interface abstracts whether the element type is conditionally determined based on the non-null attributes (as it is done now) or whether it is stored in a dedicated attribute.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] Aklakan commented on pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
Aklakan commented on PR #1478:
URL: https://github.com/apache/jena/pull/1478#issuecomment-1220883920

   I just added a fix where the last batch was no longer dispatched if a parse error was encountered.
   
   I missed that error before because it did not occur when I was using small chunk sizes.
   I updated our hadoop/spark code to use small chunk sizes when probing for record offsets, and large ones once when parsing complete fragments and the test suite is now green.
   
   I can add another test case for the described situation, but functionality-wise I'd say this PR is complete.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] afs commented on pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
afs commented on PR #1478:
URL: https://github.com/apache/jena/pull/1478#issuecomment-1220898332

   Please add the test.
   The async parser is critical to TDB2 loading.
   I'm finding it difficult to know what things are ready or not.
   
   It would help me when trying to jump between several different PRs and issues (include RDF Delta):
   
   1. Use "GH-" (capitals) in commits - having a common style makes grepping the commits logs easier.
   2. Tidy the commits to what is important when looking back at this
   
   You may think these are trivial - but for me, juggling multiple PRs and issues, common forms help.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] afs commented on a diff in pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
afs commented on code in PR #1478:
URL: https://github.com/apache/jena/pull/1478#discussion_r966977118


##########
jena-arq/src/main/java/org/apache/jena/riot/system/EltStreamRDFImpl.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.system;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import org.apache.jena.graph.Triple;
+import org.apache.jena.riot.out.NodeFmtLib;
+import org.apache.jena.sparql.core.Quad;
+
+/** An item of a StreamRDF, including exceptions. Internal API. */
+public class EltStreamRDFImpl
+    implements EltStreamRDF, Serializable
+{
+    private static final long serialVersionUID = 1L;
+
+    Triple triple = null;
+    Quad quad = null;
+    String prefix = null; // Null implies "base".
+    String iri = null;
+    Throwable exception = null;
+
+    public EltStreamRDFImpl() {}
+
+    /* Prefer static constructors in EltStreamRDF */
+
+    public EltStreamRDFImpl(Triple triple) { this.triple = triple; }
+    public EltStreamRDFImpl(Quad quad) { this.quad = quad; }
+    public EltStreamRDFImpl(String prefix, String iri) { this.prefix = prefix; this.iri = iri; }
+    public EltStreamRDFImpl(String iri) { this.iri = iri; }
+    public EltStreamRDFImpl(Throwable exception) { this.exception = exception; }
+
+    @Override public boolean   isTriple()     { return triple != null; }
+    @Override public Triple    getTriple()    { return triple; }
+    @Override public boolean   isQuad()       { return quad != null; }
+    @Override public Quad      getQuad()      { return quad; }
+    @Override public boolean   isPrefix()     { return prefix != null; }
+    @Override public String    getPrefix()    { return prefix; }
+    @Override public boolean   isBase()       { return prefix == null && iri != null; }
+    @Override public String    getIri()       { return iri; }
+    @Override public boolean   isException()  { return exception != null; }
+    @Override public Throwable getException() { return exception; }
+
+    @Override
+    public EltStreamRDFType getType() {

Review Comment:
   The getters for records are `field()` style, no `get`. There are no setters because records are value-based/immutable.
   
   I was wrong - records can implement interface- they can't extend anything.
   
   Yes - this does seem to be authoritive as it is the output of the AsyncParser.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] afs commented on a diff in pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
afs commented on code in PR #1478:
URL: https://github.com/apache/jena/pull/1478#discussion_r966992811


##########
jena-arq/src/main/java/org/apache/jena/riot/system/EltStreamRDF.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.system;
+
+import org.apache.jena.graph.Triple;
+import org.apache.jena.sparql.core.Quad;
+
+/**
+ * Interface for streamRDF parsing elements.
+ */
+public interface EltStreamRDF {

Review Comment:
   That is an argument for a public top-level thing, not an interface.
   
   `EltStreamRDF` is effectively specific to AsyncParser.
   
   java generics don't help here - the interaction of generics and subclassing/impl interfaces doesn't make things easy (using casts is IMO not nice!)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] afs commented on a diff in pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
afs commented on code in PR #1478:
URL: https://github.com/apache/jena/pull/1478#discussion_r965087736


##########
jena-arq/src/main/java/org/apache/jena/riot/system/EltStreamRDFImpl.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.system;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import org.apache.jena.graph.Triple;
+import org.apache.jena.riot.out.NodeFmtLib;
+import org.apache.jena.sparql.core.Quad;
+
+/** An item of a StreamRDF, including exceptions. Internal API. */
+public class EltStreamRDFImpl
+    implements EltStreamRDF, Serializable
+{
+    private static final long serialVersionUID = 1L;
+
+    Triple triple = null;
+    Quad quad = null;
+    String prefix = null; // Null implies "base".
+    String iri = null;
+    Throwable exception = null;
+
+    public EltStreamRDFImpl() {}
+
+    /* Prefer static constructors in EltStreamRDF */
+
+    public EltStreamRDFImpl(Triple triple) { this.triple = triple; }
+    public EltStreamRDFImpl(Quad quad) { this.quad = quad; }
+    public EltStreamRDFImpl(String prefix, String iri) { this.prefix = prefix; this.iri = iri; }
+    public EltStreamRDFImpl(String iri) { this.iri = iri; }
+    public EltStreamRDFImpl(Throwable exception) { this.exception = exception; }
+
+    @Override public boolean   isTriple()     { return triple != null; }
+    @Override public Triple    getTriple()    { return triple; }
+    @Override public boolean   isQuad()       { return quad != null; }
+    @Override public Quad      getQuad()      { return quad; }
+    @Override public boolean   isPrefix()     { return prefix != null; }
+    @Override public String    getPrefix()    { return prefix; }
+    @Override public boolean   isBase()       { return prefix == null && iri != null; }
+    @Override public String    getIri()       { return iri; }
+    @Override public boolean   isException()  { return exception != null; }
+    @Override public Throwable getException() { return exception; }
+
+    @Override
+    public EltStreamRDFType getType() {

Review Comment:
   This class is really a tagged union value (immutable).
   
   Suggestions
   * have a slot for the type and set the type when constructed. 
   * make all fields final.
   * use `Objects.requireNonNull` in constructors to catch problems early. Then there is a guarantee one and only one slot is non-null.
   
   Its tedious writing out the duplication but it is not visible.
   
   And maybe it will be a Java `record` at Java17 (which also means it can't extends an interface - see also the implementation-of-one comment).
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] Aklakan commented on a diff in pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
Aklakan commented on code in PR #1478:
URL: https://github.com/apache/jena/pull/1478#discussion_r965031633


##########
jena-arq/src/main/java/org/apache/jena/riot/system/AsyncParser.java:
##########
@@ -239,40 +278,143 @@ public void fatal(String message, long line, long col) {
                 throw new RiotException(SysRIOT.fmtMessage(message, line, col)) ;
             }
         };
+    }
+
+    /** Returns a runnable for stopping the parse process */
+    static Runnable startParserThread(
+            Logger LOG1, List<RDFParserBuilder> parserBuilders,
+            BlockingQueue<List<EltStreamRDFImpl>> queue,
+            int chunkSize,
+            boolean daemonMode) {
+
+        // A flag for whether parsing is considered aborted - regardless
+        // whether due to normal or exceptional conditions
+        AtomicBoolean isAborted = new AtomicBoolean(false);
+
+        // -- Parser thread setup
+        Consumer<List<EltStreamRDFImpl>> destination = batch -> {
+            if (!isAborted.get()) {
+                try {
+                    queue.put(batch);
+                } catch (InterruptedException ex) {
+                    // FmtLog.error(LOG, ex, "Error: %s", ex.getMessage());
+                    throw new RuntimeException(ex);
+                }
+            } else {
+                throw new RuntimeException(new InterruptedException());
+            }
+        };
+        EltStreamBatcher batcher = new EltStreamBatcher(destination, chunkSize);
+        StreamRDF generatorStream = new StreamToElements(batcher);
 
         // Parser thread
-        Runnable task = ()->{
-            batcher.startBatching();
+        Runnable task = () -> {
+            batcher.startBatching(); // No try/catch block currently needed because this is a no-op

Review Comment:
   If startBatching failed then it would have to be handled in a try/catch block. If it failed in the code as it is now then the resources wouldn't be freed; even the poison (end marker) would never be placed on the queue resulting in infinite wait.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] Aklakan commented on a diff in pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
Aklakan commented on code in PR #1478:
URL: https://github.com/apache/jena/pull/1478#discussion_r965025837


##########
jena-arq/src/main/java/org/apache/jena/riot/system/EltStreamRDF.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.system;
+
+import org.apache.jena.graph.Triple;
+import org.apache.jena.sparql.core.Quad;
+
+/**
+ * Interface for streamRDF parsing elements.
+ */
+public interface EltStreamRDF {

Review Comment:
   Back in [JENA-2309](https://issues.apache.org/jira/browse/JENA-2309) I mentioned the use case for getting individual elements from the parser in order to scan the head of a file for prefixes, thereby stopping once only a configurable amount of triples/quads is seen.
   
   There is `Stream<EltStreamRDF> stream = AsyncParser.of(filename).streamElements();` which uses the interface.
   I did not add the shorthands such as the various `AsyncParser.asyncParseElements` methods  because I think the builder covers this use case already but I could add them for consistency.
   
   The intent of the interface is to provide a public API to access elements, whereas `EltStreamRDFImpl` is a rather AsyncParser specific implementation - for example, the parser directly sets the (package visible) attributes without going through any getters/setters or ctors. Not sure if this is actually faster but this is one way it makes sense to implement. The public API however shouldn't be tied to that.
   Also, the interface abstracts whether the element type is conditionally determined based on the non-null attributes (as it is done now) or whether it is stored in a dedicated attribute.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] Aklakan commented on a diff in pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
Aklakan commented on code in PR #1478:
URL: https://github.com/apache/jena/pull/1478#discussion_r965038559


##########
jena-arq/src/main/java/org/apache/jena/riot/system/AsyncParser.java:
##########
@@ -239,40 +278,143 @@ public void fatal(String message, long line, long col) {
                 throw new RiotException(SysRIOT.fmtMessage(message, line, col)) ;
             }
         };
+    }
+
+    /** Returns a runnable for stopping the parse process */
+    static Runnable startParserThread(
+            Logger LOG1, List<RDFParserBuilder> parserBuilders,
+            BlockingQueue<List<EltStreamRDFImpl>> queue,
+            int chunkSize,
+            boolean daemonMode) {
+
+        // A flag for whether parsing is considered aborted - regardless
+        // whether due to normal or exceptional conditions
+        AtomicBoolean isAborted = new AtomicBoolean(false);
+
+        // -- Parser thread setup
+        Consumer<List<EltStreamRDFImpl>> destination = batch -> {
+            if (!isAborted.get()) {
+                try {
+                    queue.put(batch);
+                } catch (InterruptedException ex) {
+                    // FmtLog.error(LOG, ex, "Error: %s", ex.getMessage());
+                    throw new RuntimeException(ex);
+                }
+            } else {
+                throw new RuntimeException(new InterruptedException());
+            }
+        };
+        EltStreamBatcher batcher = new EltStreamBatcher(destination, chunkSize);
+        StreamRDF generatorStream = new StreamToElements(batcher);
 
         // Parser thread
-        Runnable task = ()->{
-            batcher.startBatching();
+        Runnable task = () -> {
+            batcher.startBatching(); // No try/catch block currently needed because this is a no-op

Review Comment:
   If close() is invoked and isAborted is false then isAborted is atomically set to true. Furthermore, the close()-logic proceeds to clear the queue and place the poison on it. The clear happens to ensure that there is space for the poison. The atomic operation prevents further chunks from becoming placed on the queue in the meantime.
   I essence, if isAborted is true, then it means that the poison is (just about to be) placed on the queue.
   
   In that case finishBatching is suppressed because it would redundantly try to place another poison on the queue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] Aklakan commented on pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
Aklakan commented on PR #1478:
URL: https://github.com/apache/jena/pull/1478#issuecomment-1243300154

   I revised the previous comment with line numbers and a better description of the issue.
   The most recent commits have the AtomicBoolean replaced with an AtomicInteger representing 3 states: RUNNING, ABORTING (= abort requested) and ABORTED.
   
   These states affect what happens when the destination receives the next chunk:
   - When in ABORTING state, then there is an atomic transition into ABORTED state. Subsequently, the chunk is ignored and instead the queue is cleared and the poison is placed on it.
   - In ABORTED state and further chunk raises an exception.
   - In RUNNING state the chunk is just normally added to the queue
   
   A concurrent abort() request transitions from RUNNING into ABORTING state and interrupts the parser thread.
   - If the parser was waiting at a full queue then it should be interrupted and do the abort action (clear queue + place poison)
   - Otherwise, at latest when the next chunk arrives then the abort action will trigger
   
   I hope I didn't overlook anything so that this approach is now sound. The "art" here is essentially getting away with only a single atomic value that is checked per chunk.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] Aklakan commented on a diff in pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
Aklakan commented on code in PR #1478:
URL: https://github.com/apache/jena/pull/1478#discussion_r968036398


##########
jena-arq/src/main/java/org/apache/jena/riot/system/AsyncParser.java:
##########
@@ -239,40 +278,143 @@ public void fatal(String message, long line, long col) {
                 throw new RiotException(SysRIOT.fmtMessage(message, line, col)) ;
             }
         };
+    }
+
+    /** Returns a runnable for stopping the parse process */
+    static Runnable startParserThread(
+            Logger LOG1, List<RDFParserBuilder> parserBuilders,
+            BlockingQueue<List<EltStreamRDFImpl>> queue,
+            int chunkSize,
+            boolean daemonMode) {
+
+        // A flag for whether parsing is considered aborted - regardless
+        // whether due to normal or exceptional conditions
+        AtomicBoolean isAborted = new AtomicBoolean(false);
+
+        // -- Parser thread setup
+        Consumer<List<EltStreamRDFImpl>> destination = batch -> {
+            if (!isAborted.get()) {
+                try {
+                    queue.put(batch);
+                } catch (InterruptedException ex) {
+                    // FmtLog.error(LOG, ex, "Error: %s", ex.getMessage());
+                    throw new RuntimeException(ex);
+                }
+            } else {
+                throw new RuntimeException(new InterruptedException());
+            }
+        };
+        EltStreamBatcher batcher = new EltStreamBatcher(destination, chunkSize);
+        StreamRDF generatorStream = new StreamToElements(batcher);
 
         // Parser thread
-        Runnable task = ()->{
-            batcher.startBatching();
+        Runnable task = () -> {
+            batcher.startBatching(); // No try/catch block currently needed because this is a no-op

Review Comment:
   Revised in the factored out `Task` class.



##########
jena-arq/src/main/java/org/apache/jena/riot/system/AsyncParser.java:
##########
@@ -199,7 +244,7 @@ protected X moveToNext() {
                         return null;
                     }
                     return x;
-                } catch (InterruptedException e) {
+                } catch (@SuppressWarnings("unused") InterruptedException e) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] Aklakan commented on a diff in pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
Aklakan commented on code in PR #1478:
URL: https://github.com/apache/jena/pull/1478#discussion_r965137905


##########
jena-arq/src/main/java/org/apache/jena/riot/system/AsyncParserBuilder.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.system;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+import org.apache.jena.atlas.iterator.Iter;
+import org.apache.jena.atlas.iterator.IteratorCloseable;
+import org.apache.jena.graph.Triple;
+import org.apache.jena.riot.RDFParserBuilder;
+import org.apache.jena.sparql.core.Quad;
+import org.slf4j.Logger;
+
+public class AsyncParserBuilder {
+    /* These following two attributes only exist for javadoc references */
+    private static final int dftChunkSize = AsyncParser.dftChunkSize;
+    private static final int dftQueueSize = AsyncParser.dftQueueSize;
+
+    protected int chunkSize;
+    protected int queueSize;
+    protected boolean daemonMode;
+    protected List<RDFParserBuilder> sources;
+
+    public AsyncParserBuilder() {
+        this.chunkSize = dftChunkSize;
+        this.queueSize = dftQueueSize;
+        this.daemonMode = true;
+        this.sources = Collections.emptyList();
+    }
+
+    public AsyncParserBuilder(List<RDFParserBuilder> sources) {
+        this();
+        this.sources = sources;

Review Comment:
   Fine with me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] Aklakan commented on a diff in pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
Aklakan commented on code in PR #1478:
URL: https://github.com/apache/jena/pull/1478#discussion_r965038559


##########
jena-arq/src/main/java/org/apache/jena/riot/system/AsyncParser.java:
##########
@@ -239,40 +278,143 @@ public void fatal(String message, long line, long col) {
                 throw new RiotException(SysRIOT.fmtMessage(message, line, col)) ;
             }
         };
+    }
+
+    /** Returns a runnable for stopping the parse process */
+    static Runnable startParserThread(
+            Logger LOG1, List<RDFParserBuilder> parserBuilders,
+            BlockingQueue<List<EltStreamRDFImpl>> queue,
+            int chunkSize,
+            boolean daemonMode) {
+
+        // A flag for whether parsing is considered aborted - regardless
+        // whether due to normal or exceptional conditions
+        AtomicBoolean isAborted = new AtomicBoolean(false);
+
+        // -- Parser thread setup
+        Consumer<List<EltStreamRDFImpl>> destination = batch -> {
+            if (!isAborted.get()) {
+                try {
+                    queue.put(batch);
+                } catch (InterruptedException ex) {
+                    // FmtLog.error(LOG, ex, "Error: %s", ex.getMessage());
+                    throw new RuntimeException(ex);
+                }
+            } else {
+                throw new RuntimeException(new InterruptedException());
+            }
+        };
+        EltStreamBatcher batcher = new EltStreamBatcher(destination, chunkSize);
+        StreamRDF generatorStream = new StreamToElements(batcher);
 
         // Parser thread
-        Runnable task = ()->{
-            batcher.startBatching();
+        Runnable task = () -> {
+            batcher.startBatching(); // No try/catch block currently needed because this is a no-op

Review Comment:
   If close() is invoked and isAborted is false then isAborted is atomically set to true. Furthermore, in that case, the close()-logic proceeds to clear the queue and place the poison on it. The clear happens to ensure that there is space for the poison. The atomic operation prevents further chunks becoming placed on the queue in the meantime which might stall it.
   I essence, if isAborted is true, then it means that the poison is (just about to be) placed on the queue.
   
   In that case finishBatching is suppressed because it would redundantly try to place another poison on the queue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] afs merged pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
afs merged PR #1478:
URL: https://github.com/apache/jena/pull/1478


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] Aklakan commented on pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
Aklakan commented on PR #1478:
URL: https://github.com/apache/jena/pull/1478#issuecomment-1249487015

   ```
   10 iterations of inserting 5,000,000 quads
   4.6.0: 24m32,261s
   4.7.0: 24m56,504s
   
   20 iterations of inserting 5,432,109 quads
   4.6.0: 53m10,034s
   4.7.0: 51m39,403s
   ```
   
   So no significant difference, also I found no issues in our hadoop setup.
   It should be done.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] Aklakan commented on pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
Aklakan commented on PR #1478:
URL: https://github.com/apache/jena/pull/1478#issuecomment-1237485290

   The recent values I get by comparing the phased loading strategy between 4.6.0 and 4.7.0-SNAPSHOT with this PR I get:
   I incorrectly mentioned 4.5.0 in my last post; 4.6.0 is the most recent version without this PR.
   
   Iteration means loading a dataset once.
   I verified that the right number of triples were loaded into the store.
   ```
   # 20 iterations with 13 quads in 2 graphs (a new graph every 10 quads)
   The intention is check that small data that doesn't fill queue works.
   jena 4.6.0:           39,355s
   jena 4.7.0-SNAPSHOT:  40,553s
   (with a single quad it's essentially also 40s which is mostly JVM/Jena startup time rather than actual insertions)
   
   # 20 iterations with 9.876.543 quads in 99 graphs (a new graph every 100K quads)
   jena: 4.6.0:         4094s
   jena 4.7.0-SNAPSHOT: 3990s
   
   # 10 iterations with 9.876.543 triples
   jena 4.6.0           1946s
   jena 4.7.0-SNAPSHOT: 1901s
   ```
   On another day I had reversed figures such that 4.6.0 was slightly faster; but now that I repeated the runs it seems that there isn't a significant difference.
   
   So to me it looks all good.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] rvesse commented on pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
rvesse commented on PR #1478:
URL: https://github.com/apache/jena/pull/1478#issuecomment-1239392959

   If it's just used as a flag to check whether to continue or not could probably use a `private volatile boolean` instead?  Still has to go to main memory but should have marginally less overhead than an `AtomicBoolean`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] afs commented on pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
afs commented on PR #1478:
URL: https://github.com/apache/jena/pull/1478#issuecomment-1239418815

   Probably much the same - the object itself is cacheable and optimizable. 
   The `AtomicBoolean` is clearer (IMO) and has useful other operations such as `getPlain` and the other memory semantics.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] Aklakan commented on a diff in pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
Aklakan commented on code in PR #1478:
URL: https://github.com/apache/jena/pull/1478#discussion_r965031633


##########
jena-arq/src/main/java/org/apache/jena/riot/system/AsyncParser.java:
##########
@@ -239,40 +278,143 @@ public void fatal(String message, long line, long col) {
                 throw new RiotException(SysRIOT.fmtMessage(message, line, col)) ;
             }
         };
+    }
+
+    /** Returns a runnable for stopping the parse process */
+    static Runnable startParserThread(
+            Logger LOG1, List<RDFParserBuilder> parserBuilders,
+            BlockingQueue<List<EltStreamRDFImpl>> queue,
+            int chunkSize,
+            boolean daemonMode) {
+
+        // A flag for whether parsing is considered aborted - regardless
+        // whether due to normal or exceptional conditions
+        AtomicBoolean isAborted = new AtomicBoolean(false);
+
+        // -- Parser thread setup
+        Consumer<List<EltStreamRDFImpl>> destination = batch -> {
+            if (!isAborted.get()) {
+                try {
+                    queue.put(batch);
+                } catch (InterruptedException ex) {
+                    // FmtLog.error(LOG, ex, "Error: %s", ex.getMessage());
+                    throw new RuntimeException(ex);
+                }
+            } else {
+                throw new RuntimeException(new InterruptedException());
+            }
+        };
+        EltStreamBatcher batcher = new EltStreamBatcher(destination, chunkSize);
+        StreamRDF generatorStream = new StreamToElements(batcher);
 
         // Parser thread
-        Runnable task = ()->{
-            batcher.startBatching();
+        Runnable task = () -> {
+            batcher.startBatching(); // No try/catch block currently needed because this is a no-op

Review Comment:
   If startBatching failed that it would have to be handled in a try/catch block. If it failed in the code as it is now then the resources wouldn't be freed; even the poison (end marker) would never be placed on the queue resulting in infinite wait.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] afs commented on a diff in pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
afs commented on code in PR #1478:
URL: https://github.com/apache/jena/pull/1478#discussion_r965114900


##########
jena-arq/src/main/java/org/apache/jena/riot/system/AsyncParserBuilder.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.system;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+import org.apache.jena.atlas.iterator.Iter;
+import org.apache.jena.atlas.iterator.IteratorCloseable;
+import org.apache.jena.graph.Triple;
+import org.apache.jena.riot.RDFParserBuilder;
+import org.apache.jena.sparql.core.Quad;
+import org.slf4j.Logger;
+
+public class AsyncParserBuilder {
+    /* These following two attributes only exist for javadoc references */
+    private static final int dftChunkSize = AsyncParser.dftChunkSize;
+    private static final int dftQueueSize = AsyncParser.dftQueueSize;
+
+    protected int chunkSize;
+    protected int queueSize;
+    protected boolean daemonMode;
+    protected List<RDFParserBuilder> sources;
+
+    public AsyncParserBuilder() {
+        this.chunkSize = dftChunkSize;
+        this.queueSize = dftQueueSize;
+        this.daemonMode = true;
+        this.sources = Collections.emptyList();
+    }
+
+    public AsyncParserBuilder(List<RDFParserBuilder> sources) {
+        this();
+        this.sources = sources;

Review Comment:
   This can be isolated with `this.sources = List.copyOf(sources);` then changes to the argument have no effect or concurrency issue. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] Aklakan commented on a diff in pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
Aklakan commented on code in PR #1478:
URL: https://github.com/apache/jena/pull/1478#discussion_r968031921


##########
jena-arq/src/main/java/org/apache/jena/riot/system/AsyncParserBuilder.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.system;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+import org.apache.jena.atlas.iterator.Iter;
+import org.apache.jena.atlas.iterator.IteratorCloseable;
+import org.apache.jena.graph.Triple;
+import org.apache.jena.riot.RDFParserBuilder;
+import org.apache.jena.sparql.core.Quad;
+import org.slf4j.Logger;
+
+public class AsyncParserBuilder {
+    /* These following two attributes only exist for javadoc references */
+    private static final int dftChunkSize = AsyncParser.dftChunkSize;
+    private static final int dftQueueSize = AsyncParser.dftQueueSize;
+
+    protected int chunkSize;
+    protected int queueSize;
+    protected boolean daemonMode;
+    protected List<RDFParserBuilder> sources;
+
+    public AsyncParserBuilder() {
+        this.chunkSize = dftChunkSize;
+        this.queueSize = dftQueueSize;
+        this.daemonMode = true;
+        this.sources = Collections.emptyList();
+    }
+
+    public AsyncParserBuilder(List<RDFParserBuilder> sources) {
+        this();
+        this.sources = sources;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] Aklakan commented on a diff in pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
Aklakan commented on code in PR #1478:
URL: https://github.com/apache/jena/pull/1478#discussion_r965025837


##########
jena-arq/src/main/java/org/apache/jena/riot/system/EltStreamRDF.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.system;
+
+import org.apache.jena.graph.Triple;
+import org.apache.jena.sparql.core.Quad;
+
+/**
+ * Interface for streamRDF parsing elements.
+ */
+public interface EltStreamRDF {

Review Comment:
   Back in [JENA-2309](https://issues.apache.org/jira/browse/JENA-2309) I mentioned the use case for getting individual elements from the parser in order to scan the head of a file for prefixes, thereby stopping once only a configurable amount of triples/quads is seen.
   
   The intent of the interface is to provide a public API to access elements, whereas EltStreamRDFImpl is a rather AsyncParser specific implementation - for example, the parser directly sets the (package visible) attributes without going through any getters/setters or ctors. Not sure if this is actually faster but this is one way it makes sense to implement. The public API however shouldn't be tied to that.
   Also, the interface abstracts whether the element type is conditionally determined based on the non-null attributes (as it is done now) or whether it is stored in a dedicated attribute.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] Aklakan commented on a diff in pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
Aklakan commented on code in PR #1478:
URL: https://github.com/apache/jena/pull/1478#discussion_r965038559


##########
jena-arq/src/main/java/org/apache/jena/riot/system/AsyncParser.java:
##########
@@ -239,40 +278,143 @@ public void fatal(String message, long line, long col) {
                 throw new RiotException(SysRIOT.fmtMessage(message, line, col)) ;
             }
         };
+    }
+
+    /** Returns a runnable for stopping the parse process */
+    static Runnable startParserThread(
+            Logger LOG1, List<RDFParserBuilder> parserBuilders,
+            BlockingQueue<List<EltStreamRDFImpl>> queue,
+            int chunkSize,
+            boolean daemonMode) {
+
+        // A flag for whether parsing is considered aborted - regardless
+        // whether due to normal or exceptional conditions
+        AtomicBoolean isAborted = new AtomicBoolean(false);
+
+        // -- Parser thread setup
+        Consumer<List<EltStreamRDFImpl>> destination = batch -> {
+            if (!isAborted.get()) {
+                try {
+                    queue.put(batch);
+                } catch (InterruptedException ex) {
+                    // FmtLog.error(LOG, ex, "Error: %s", ex.getMessage());
+                    throw new RuntimeException(ex);
+                }
+            } else {
+                throw new RuntimeException(new InterruptedException());
+            }
+        };
+        EltStreamBatcher batcher = new EltStreamBatcher(destination, chunkSize);
+        StreamRDF generatorStream = new StreamToElements(batcher);
 
         // Parser thread
-        Runnable task = ()->{
-            batcher.startBatching();
+        Runnable task = () -> {
+            batcher.startBatching(); // No try/catch block currently needed because this is a no-op

Review Comment:
   If close() is invoked and isAborted is false then isAborted is atomically set to true. Furthermore, the close()-logic proceeds to clear the queue and place the poison on it.
   I essence, if isAborted is true, then it means that the poison is (just about to be) placed on the queue.
   
   In that case finishBatching is suppressed because it would redundantly try to place another poison on the queue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] Aklakan commented on a diff in pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
Aklakan commented on code in PR #1478:
URL: https://github.com/apache/jena/pull/1478#discussion_r965031633


##########
jena-arq/src/main/java/org/apache/jena/riot/system/AsyncParser.java:
##########
@@ -239,40 +278,143 @@ public void fatal(String message, long line, long col) {
                 throw new RiotException(SysRIOT.fmtMessage(message, line, col)) ;
             }
         };
+    }
+
+    /** Returns a runnable for stopping the parse process */
+    static Runnable startParserThread(
+            Logger LOG1, List<RDFParserBuilder> parserBuilders,
+            BlockingQueue<List<EltStreamRDFImpl>> queue,
+            int chunkSize,
+            boolean daemonMode) {
+
+        // A flag for whether parsing is considered aborted - regardless
+        // whether due to normal or exceptional conditions
+        AtomicBoolean isAborted = new AtomicBoolean(false);
+
+        // -- Parser thread setup
+        Consumer<List<EltStreamRDFImpl>> destination = batch -> {
+            if (!isAborted.get()) {
+                try {
+                    queue.put(batch);
+                } catch (InterruptedException ex) {
+                    // FmtLog.error(LOG, ex, "Error: %s", ex.getMessage());
+                    throw new RuntimeException(ex);
+                }
+            } else {
+                throw new RuntimeException(new InterruptedException());
+            }
+        };
+        EltStreamBatcher batcher = new EltStreamBatcher(destination, chunkSize);
+        StreamRDF generatorStream = new StreamToElements(batcher);
 
         // Parser thread
-        Runnable task = ()->{
-            batcher.startBatching();
+        Runnable task = () -> {
+            batcher.startBatching(); // No try/catch block currently needed because this is a no-op

Review Comment:
   The comment records that if startBatching could fail then it would have to be handled in a try/catch block. If it failed in the code as it is now then the resources wouldn't be freed; even the poison (end marker) would never be placed on the queue resulting in infinite wait.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] Aklakan commented on a diff in pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
Aklakan commented on code in PR #1478:
URL: https://github.com/apache/jena/pull/1478#discussion_r965038559


##########
jena-arq/src/main/java/org/apache/jena/riot/system/AsyncParser.java:
##########
@@ -239,40 +278,143 @@ public void fatal(String message, long line, long col) {
                 throw new RiotException(SysRIOT.fmtMessage(message, line, col)) ;
             }
         };
+    }
+
+    /** Returns a runnable for stopping the parse process */
+    static Runnable startParserThread(
+            Logger LOG1, List<RDFParserBuilder> parserBuilders,
+            BlockingQueue<List<EltStreamRDFImpl>> queue,
+            int chunkSize,
+            boolean daemonMode) {
+
+        // A flag for whether parsing is considered aborted - regardless
+        // whether due to normal or exceptional conditions
+        AtomicBoolean isAborted = new AtomicBoolean(false);
+
+        // -- Parser thread setup
+        Consumer<List<EltStreamRDFImpl>> destination = batch -> {
+            if (!isAborted.get()) {
+                try {
+                    queue.put(batch);
+                } catch (InterruptedException ex) {
+                    // FmtLog.error(LOG, ex, "Error: %s", ex.getMessage());
+                    throw new RuntimeException(ex);
+                }
+            } else {
+                throw new RuntimeException(new InterruptedException());
+            }
+        };
+        EltStreamBatcher batcher = new EltStreamBatcher(destination, chunkSize);
+        StreamRDF generatorStream = new StreamToElements(batcher);
 
         // Parser thread
-        Runnable task = ()->{
-            batcher.startBatching();
+        Runnable task = () -> {
+            batcher.startBatching(); // No try/catch block currently needed because this is a no-op

Review Comment:
   If close() is invoked and isAborted is false then isAborted is atomically set to true. Furthermore, in that case, the close()-logic proceeds to clear the queue and place the poison on it. The clear happens to ensure that there is space for the poison. The atomic operation prevents further chunks from becoming placed on the queue in the meantime.
   I essence, if isAborted is true, then it means that the poison is (just about to be) placed on the queue.
   
   In that case finishBatching is suppressed because it would redundantly try to place another poison on the queue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] afs commented on pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
afs commented on PR #1478:
URL: https://github.com/apache/jena/pull/1478#issuecomment-1241886005

   Not sure I fully understand that. Could you give a concrete timeline diagram? 
   
   Is the problem having two sides calling "add" (both consumer and producer)? We already have is the AtomicBoolean for a back channel. A bit of extra work in what should be an error situation so as not to impact the normal processing is a reasonable tradeoff.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org


[GitHub] [jena] afs commented on pull request #1478: GH-1477: Improved AsyncParser

Posted by GitBox <gi...@apache.org>.
afs commented on PR #1478:
URL: https://github.com/apache/jena/pull/1478#issuecomment-1250332211

   @Aklakan 
   FYI: After this is merged, the code reorg #1539 PR will go in which is a small bump to forked repos.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@jena.apache.org
For additional commands, e-mail: pr-help@jena.apache.org