You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@jena.apache.org by Abika Chitra <Ab...@marklogic.com> on 2023/06/26 18:26:37 UTC

Need suggestions on handling latency on asyncparser

Hi there,

This is Abika from Marklogic. I work on a project called MLCP (Marklogic contentpump) which used to bulk load and transfer data to and from Marklogic server.

Our MLCP project uses Jena framework to process RDFs. We have built this project with Jena 2.13 previously and now we are transitioning to Jena 4.8.0 (latest available). Given the time frame, there are many changes in parsing with Jena. Right now we are using asyncparser (following a suggestion from jena javadocs). I notice there is some lag when using asyncparser and its also mentioned in the Javadoc<https://javadoc.io/doc/org.apache.jena/jena-arq/latest/org.apache.jena.arq/org/apache/jena/riot/system/AsyncParser.html>.

In our codebase, To parse a bunch of files in the zip/archive, we create a runnable parser for each file and submit them to executor service. In this implementation, I see some of the files being skipped due to the latency in result deliverance from asyncparser API calls. For now we are considering to either implement a wait on the results from asyncparser (OR) stop creating parallel threads to process many files in a zip/archive.

I would like to get your suggestions on what are some robust ways to handle this latency?

Heres the code of the run() in runnable parser class. I see the latency in debugging when the API calls to riotparser and asyncparser doesn’t return data right after the call.


public void run() {

    ErrorHandler handler = new ParserErrorHandler(fsname);
    ParserProfile prof = RiotLib.profile(lang, fsname, handler);
    try {
        if (lang == Lang.TRIG) {
            rdfInputStream = AsyncParser.of(in, lang, origFn).streamQuads();
            rdfIter = rdfInputStream.iterator();
        } else if (lang == Lang.NTRIPLES) {
            rdfIter = RiotParsers.createIteratorNTriples(in,prof);
            System.out.println("2else ntriples async run ");
        } else if (lang == Lang.NQUADS) {
            rdfIter = RiotParsers.createIteratorNQuads(in,prof);
        }else {
            rdfInputStream = AsyncParser.of(in, lang, fsname).setChunkSize(10).streamTriples();
            rdfIter = rdfInputStream.iterator();

        }
    } catch (Exception ex) {
        failed = true;
        LOG.error("Parse error in RDF document(please check intactness and encoding); processing partial document:" + origFn + " " + ex.getMessage());
        ex.printStackTrace();
    }
}



pool = Executors.newFixedThreadPool(1);

RunnableParser jenaStreamingParser = new RunnableParser(origFn, fsname, in, lang);

pool.submit(jenaStreamingParser);



Regards,
Abika

This message and any attached documents contain information of MarkLogic and/or its customers that may be confidential and/or privileged. If you are not the intended recipient, you may not read, copy, distribute, or use this information. If you have received this transmission in error, please notify the sender immediately by reply e-mail and then delete this message. This email may contain pricing or other suggested contract terms related to MarkLogic software or services. Any such terms are not binding on MarkLogic unless and until they are included in a definitive agreement executed by MarkLogic.

Re: Need suggestions on handling latency on asyncparser

Posted by Andy Seaborne <an...@apache.org>.

On 27/06/2023 09:27, Rob @ DNR wrote:
> Can you characterise what you mean by latency here?  i.e. are you talking about a measurable delay or something else?
> 
> Your code sample looks incomplete because you get the iterator BUT you never consume the iterator.  If you don’t consume the iterator then you aren’t ever going to get any data from the parser.
> 
> Also, in at least one case you set the chunk size to 10 which is very small and means the parser can only read a tiny amount ahead on its background thread.  You are better to have a larger chunk size so that the parser and the consuming thread can overlap computation better and cache more data in memory.

The code for "run" runs on a separate thread via the Executor. The Async 
parser itself will create a thread for the parsing process and delivers 
parsed items on the caller thread as a StreamRDF.

Is there a reason why you are using async parser rather than parsing and 
processing to a StreamRDF?

    StreamRDF destination = ...
    RDFParser....output(destination);

StreamRDF is the base interface for delivering parser output.

The normal chunk size is 100K as used in for the bulk loader (xloader).
It imposes some overhead when used for small chunks and also there are 
going to be issues with the scheduling of threads. For large chunks this 
is noise but when it is 1-2 microseconds to parse a triple, a small 
chunk is going to be inefficient - if you are reading a external source 
that may be tolerable.

     Andy

> Rob
> 
> From: Abika Chitra <Ab...@marklogic.com>
> Date: Monday, 26 June 2023 at 19:27
> To: users@jena.apache.org <us...@jena.apache.org>
> Subject: Need suggestions on handling latency on asyncparser
> Hi there,
> 
> This is Abika from Marklogic. I work on a project called MLCP (Marklogic contentpump) which used to bulk load and transfer data to and from Marklogic server.
> 
> Our MLCP project uses Jena framework to process RDFs. We have built this project with Jena 2.13 previously and now we are transitioning to Jena 4.8.0 (latest available). Given the time frame, there are many changes in parsing with Jena. Right now we are using asyncparser (following a suggestion from jena javadocs). I notice there is some lag when using asyncparser and its also mentioned in the Javadoc<https://javadoc.io/doc/org.apache.jena/jena-arq/latest/org.apache.jena.arq/org/apache/jena/riot/system/AsyncParser.html>.
> 
> In our codebase, To parse a bunch of files in the zip/archive, we create a runnable parser for each file and submit them to executor service. In this implementation, I see some of the files being skipped due to the latency in result deliverance from asyncparser API calls. For now we are considering to either implement a wait on the results from asyncparser (OR) stop creating parallel threads to process many files in a zip/archive.
> 
> I would like to get your suggestions on what are some robust ways to handle this latency?
> 
> Heres the code of the run() in runnable parser class. I see the latency in debugging when the API calls to riotparser and asyncparser doesn’t return data right after the call.
> 
> 
> public void run() {
> 
>      ErrorHandler handler = new ParserErrorHandler(fsname);
>      ParserProfile prof = RiotLib.profile(lang, fsname, handler);
>      try {
>          if (lang == Lang.TRIG) {
>              rdfInputStream = AsyncParser.of(in, lang, origFn).streamQuads();
>              rdfIter = rdfInputStream.iterator();
>          } else if (lang == Lang.NTRIPLES) {
>              rdfIter = RiotParsers.createIteratorNTriples(in,prof);
>              System.out.println("2else ntriples async run ");
>          } else if (lang == Lang.NQUADS) {
>              rdfIter = RiotParsers.createIteratorNQuads(in,prof);
>          }else {
>              rdfInputStream = AsyncParser.of(in, lang, fsname).setChunkSize(10).streamTriples();
>              rdfIter = rdfInputStream.iterator();
> 
>          }
>      } catch (Exception ex) {
>          failed = true;
>          LOG.error("Parse error in RDF document(please check intactness and encoding); processing partial document:" + origFn + " " + ex.getMessage());
>          ex.printStackTrace();
>      }
> }
> 
> 
> 
> pool = Executors.newFixedThreadPool(1);
> 
> RunnableParser jenaStreamingParser = new RunnableParser(origFn, fsname, in, lang);
> 
> pool.submit(jenaStreamingParser);
> 
> 
> 
> Regards,
> Abika
> 
> This message and any attached documents contain information of MarkLogic and/or its customers that may be confidential and/or privileged. If you are not the intended recipient, you may not read, copy, distribute, or use this information. If you have received this transmission in error, please notify the sender immediately by reply e-mail and then delete this message. This email may contain pricing or other suggested contract terms related to MarkLogic software or services. Any such terms are not binding on MarkLogic unless and until they are included in a definitive agreement executed by MarkLogic.
> 

Re: Need suggestions on handling latency on asyncparser

Posted by "Rob @ DNR" <rv...@dotnetrdf.org>.
Can you characterise what you mean by latency here?  i.e. are you talking about a measurable delay or something else?

Your code sample looks incomplete because you get the iterator BUT you never consume the iterator.  If you don’t consume the iterator then you aren’t ever going to get any data from the parser.

Also, in at least one case you set the chunk size to 10 which is very small and means the parser can only read a tiny amount ahead on its background thread.  You are better to have a larger chunk size so that the parser and the consuming thread can overlap computation better and cache more data in memory.

Rob

From: Abika Chitra <Ab...@marklogic.com>
Date: Monday, 26 June 2023 at 19:27
To: users@jena.apache.org <us...@jena.apache.org>
Subject: Need suggestions on handling latency on asyncparser
Hi there,

This is Abika from Marklogic. I work on a project called MLCP (Marklogic contentpump) which used to bulk load and transfer data to and from Marklogic server.

Our MLCP project uses Jena framework to process RDFs. We have built this project with Jena 2.13 previously and now we are transitioning to Jena 4.8.0 (latest available). Given the time frame, there are many changes in parsing with Jena. Right now we are using asyncparser (following a suggestion from jena javadocs). I notice there is some lag when using asyncparser and its also mentioned in the Javadoc<https://javadoc.io/doc/org.apache.jena/jena-arq/latest/org.apache.jena.arq/org/apache/jena/riot/system/AsyncParser.html>.

In our codebase, To parse a bunch of files in the zip/archive, we create a runnable parser for each file and submit them to executor service. In this implementation, I see some of the files being skipped due to the latency in result deliverance from asyncparser API calls. For now we are considering to either implement a wait on the results from asyncparser (OR) stop creating parallel threads to process many files in a zip/archive.

I would like to get your suggestions on what are some robust ways to handle this latency?

Heres the code of the run() in runnable parser class. I see the latency in debugging when the API calls to riotparser and asyncparser doesn’t return data right after the call.


public void run() {

    ErrorHandler handler = new ParserErrorHandler(fsname);
    ParserProfile prof = RiotLib.profile(lang, fsname, handler);
    try {
        if (lang == Lang.TRIG) {
            rdfInputStream = AsyncParser.of(in, lang, origFn).streamQuads();
            rdfIter = rdfInputStream.iterator();
        } else if (lang == Lang.NTRIPLES) {
            rdfIter = RiotParsers.createIteratorNTriples(in,prof);
            System.out.println("2else ntriples async run ");
        } else if (lang == Lang.NQUADS) {
            rdfIter = RiotParsers.createIteratorNQuads(in,prof);
        }else {
            rdfInputStream = AsyncParser.of(in, lang, fsname).setChunkSize(10).streamTriples();
            rdfIter = rdfInputStream.iterator();

        }
    } catch (Exception ex) {
        failed = true;
        LOG.error("Parse error in RDF document(please check intactness and encoding); processing partial document:" + origFn + " " + ex.getMessage());
        ex.printStackTrace();
    }
}



pool = Executors.newFixedThreadPool(1);

RunnableParser jenaStreamingParser = new RunnableParser(origFn, fsname, in, lang);

pool.submit(jenaStreamingParser);



Regards,
Abika

This message and any attached documents contain information of MarkLogic and/or its customers that may be confidential and/or privileged. If you are not the intended recipient, you may not read, copy, distribute, or use this information. If you have received this transmission in error, please notify the sender immediately by reply e-mail and then delete this message. This email may contain pricing or other suggested contract terms related to MarkLogic software or services. Any such terms are not binding on MarkLogic unless and until they are included in a definitive agreement executed by MarkLogic.