You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@daffodil.apache.org by GitBox <gi...@apache.org> on 2020/11/06 22:34:08 UTC

[GitHub] [incubator-daffodil] olabusayoT opened a new pull request #453: Add SAX Unparse Event Batching

olabusayoT opened a new pull request #453:
URL: https://github.com/apache/incubator-daffodil/pull/453


   We use a SAXInfosetEvent array of tunable saxUnparseEventBatchSize size and pass the same array back and forth between the inputter and the unparseContentHandler, reusing its elements. The hope is this will reduce the context switching 
   and result in a constant memory footprint as opposed to the having to create a new SAXInfosetEvent object for each event. 
   
   - add saxUnparseEventBatchSize tunable with default 100
   - update coroutine to not expect a generic type wrapped in a Try, but any generic event; the implementation can then pass in whatever they wish
   - Add tests for tunables and batching tests
   
   DAFFODIL-2383


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-daffodil] olabusayoT merged pull request #453: Add SAX Unparse Event Batching

Posted by GitBox <gi...@apache.org>.
olabusayoT merged pull request #453:
URL: https://github.com/apache/incubator-daffodil/pull/453


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-daffodil] olabusayoT commented on pull request #453: Add SAX Unparse Event Batching

Posted by GitBox <gi...@apache.org>.
olabusayoT commented on pull request #453:
URL: https://github.com/apache/incubator-daffodil/pull/453#issuecomment-725467639


   Sounds good! I'll squash for now and wait till the commit hold is liftted.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-daffodil] olabusayoT commented on a change in pull request #453: Add SAX Unparse Event Batching

Posted by GitBox <gi...@apache.org>.
olabusayoT commented on a change in pull request #453:
URL: https://github.com/apache/incubator-daffodil/pull/453#discussion_r520795379



##########
File path: daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala
##########
@@ -121,38 +124,45 @@ class SAXInfosetInputter(
   }
 
   override def hasNext(): Boolean = {
-    if (endDocumentReceived) false
-    else if (!nextEvent.isEmpty) true
-    else {
-      val event = this.resume(unparseContentHandler, Try(currentEvent))
-      copyEvent(source = event, dest = nextEvent)
+    val nextIndex = currentIndex + 1
+    if (endDocumentReceived) {
+      // if the current Element is EndDocument, then there is no valid next
+      false
+    } else if (batchedInfosetEvents != null && batchedInfosetEvents.lift(nextIndex).nonEmpty
+      && !batchedInfosetEvents(nextIndex).isEmpty) {
+      // if the next element exists and is nonEmpty

Review comment:
       Yes, I think we can. The way sendToInputter is written, we won't context switch till the array is full or the EndDocument is received. Updated!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-daffodil] stevedlawrence commented on pull request #453: Add SAX Unparse Event Batching

Posted by GitBox <gi...@apache.org>.
stevedlawrence commented on pull request #453:
URL: https://github.com/apache/incubator-daffodil/pull/453#issuecomment-725416614


   +1 :+1: 
   
   Nice change and the numbers look great! Overhead of SAX now seems much smaller. I guess we've learned a good lesson, that context switching between threads is extremely expensive. We probably want to avoid use of the coroutine in the future unless 1) we are batching like this change does or 2) the coroutines naturally don't need to context switch very often.
   
   3.0 will hopefully be release soon once we get some votes from IPMC, so let's hold off on merge this until then. If we need an rc2, this might be a good candidate to add to that.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-daffodil] stevedlawrence commented on pull request #453: Add SAX Unparse Event Batching

Posted by GitBox <gi...@apache.org>.
stevedlawrence commented on pull request #453:
URL: https://github.com/apache/incubator-daffodil/pull/453#issuecomment-724679404


   Have you run any tests, maybe with the daffodil CLI ``performance --unparse`` command, to see how this change affects performance?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-daffodil] olabusayoT edited a comment on pull request #453: Add SAX Unparse Event Batching

Posted by GitBox <gi...@apache.org>.
olabusayoT edited a comment on pull request #453:
URL: https://github.com/apache/incubator-daffodil/pull/453#issuecomment-724944482


   @stevedlawrence As far as performance, see below. I tested it with a baseline of ebd1af445b9c9892481d50bdd2f094eec4f503db with 3 runs for each setting.
   
   It was run against a CSV file with 1000 entires and the CSV/src/main/resources/com/tresys/csv/xsd/csvHeaderEnforced.dfdl.xsd  schemas. N was set to 1000 for each run.
   
   No Batching (SAX)
   
   total unparse time (sec): 119.045022
   min rate (files/sec): 0.887929
   max rate (files/sec): 10.849576
   avg rate (files/sec): 8.400183
   
   total unparse time (sec): 122.495742
   min rate (files/sec): 0.933206
   max rate (files/sec): 10.572712
   avg rate (files/sec): 8.163549
   
   total unparse time (sec): 115.577783
   min rate (files/sec): 0.887943
   max rate (files/sec): 10.906708
   avg rate (files/sec): 8.652182
   
   BatchSize = 1
   
   total unparse time (sec): 123.355449
   min rate (files/sec): 0.946018
   max rate (files/sec): 11.109970
   avg rate (files/sec): 8.106654
   
   total unparse time (sec): 113.578335
   min rate (files/sec): 0.942423
   max rate (files/sec): 11.725798
   avg rate (files/sec): 8.804496
   
   total unparse time (sec): 118.298859
   min rate (files/sec): 1.013238
   max rate (files/sec): 10.598986
   avg rate (files/sec): 8.453167
   
   BatchSize = 10
   
   total unparse time (sec): 37.720906
   min rate (files/sec): 1.087529
   max rate (files/sec): 41.515119
   avg rate (files/sec): 26.510498
   
   total unparse time (sec): 37.424359
   min rate (files/sec): 1.296234
   max rate (files/sec): 40.985226
   avg rate (files/sec): 26.720564
   
   total unparse time (sec): 36.939181
   min rate (files/sec): 1.272301
   max rate (files/sec): 42.095508
   avg rate (files/sec): 27.071526
   
   Batch Size = 100
   
   total unparse time (sec): 27.516491
   min rate (files/sec): 1.102611
   max rate (files/sec): 55.597572
   avg rate (files/sec): 36.341843
   
   total unparse time (sec): 29.054037
   min rate (files/sec): 1.181798
   max rate (files/sec): 53.005364
   avg rate (files/sec): 34.418625
   
   total unparse time (sec): 27.553790
   min rate (files/sec): 1.364561
   max rate (files/sec): 55.164476
   avg rate (files/sec): 36.292648
   
   Batch Size = 1000
   
   total unparse time (sec): 24.731927
   min rate (files/sec): 1.370080
   max rate (files/sec): 59.091401
   avg rate (files/sec): 40.433565
   
   total unparse time (sec): 25.288054
   min rate (files/sec): 1.389677
   max rate (files/sec): 62.957730
   avg rate (files/sec): 39.544364
   
   total unparse time (sec): 25.620273
   min rate (files/sec): 1.297336
   max rate (files/sec): 59.002985
   avg rate (files/sec): 39.031590
   
   
   
   
    


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-daffodil] olabusayoT commented on a change in pull request #453: Add SAX Unparse Event Batching

Posted by GitBox <gi...@apache.org>.
olabusayoT commented on a change in pull request #453:
URL: https://github.com/apache/incubator-daffodil/pull/453#discussion_r520863428



##########
File path: daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala
##########
@@ -183,22 +193,23 @@ class SAXInfosetInputter(
 
   override protected def run(): Unit = {
     try {
-      // startDocument kicks off this entire process, so it should be on the queue so the
-      // waitForResume call can grab it. That is set to our current event, so when hasNext is called
-      // the nextEvent after the StartDocument can be queued
-      copyEvent(source = this.waitForResume(), dest = currentEvent)
+      // startDocument kicks off this entire process, so the first batch of events starting with it
+      // should be on the queue so the waitForResume call can grab it. This populates the
+      // batchedInfosetEvents global var for use by the Inputter
+      batchedInfosetEvents = this.waitForResume()
       val unparseResult = dp.unparse(this, output)
-      currentEvent.unparseResult = One(unparseResult)
+      batchedInfosetEvents(currentIndex).unparseResult = One(unparseResult)
       if (unparseResult.isError) {
         // unparseError is contained within unparseResult
-        currentEvent.causeError = One(new DaffodilUnparseErrorSAXException(unparseResult))
+        batchedInfosetEvents(currentIndex).causeError = One(new DaffodilUnparseErrorSAXException(unparseResult))
       }
     } catch {
       case e: Exception => {
-        currentEvent.causeError = One(new DaffodilUnhandledSAXException(e.getMessage, e))
+        batchedInfosetEvents(currentIndex).causeError = One(new DaffodilUnhandledSAXException(e.getMessage, e))

Review comment:
       Created [DAFFODIL-2433 ](https://issues.apache.org/jira/browse/DAFFODIL-2433) to take care of this




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-daffodil] olabusayoT commented on a change in pull request #453: Add SAX Unparse Event Batching

Posted by GitBox <gi...@apache.org>.
olabusayoT commented on a change in pull request #453:
URL: https://github.com/apache/incubator-daffodil/pull/453#discussion_r520792181



##########
File path: daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala
##########
@@ -121,38 +124,45 @@ class SAXInfosetInputter(
   }
 
   override def hasNext(): Boolean = {
-    if (endDocumentReceived) false
-    else if (!nextEvent.isEmpty) true
-    else {
-      val event = this.resume(unparseContentHandler, Try(currentEvent))
-      copyEvent(source = event, dest = nextEvent)
+    val nextIndex = currentIndex + 1
+    if (endDocumentReceived) {
+      // if the current Element is EndDocument, then there is no valid next
+      false
+    } else if (batchedInfosetEvents != null && batchedInfosetEvents.lift(nextIndex).nonEmpty
+      && !batchedInfosetEvents(nextIndex).isEmpty) {
+      // if the next element exists and is nonEmpty
+      true
+    }  else  {
+      // there is no nextEvent or it was empty, but we still have no EndDocument. So load the next
+      // batch from the contentHandler
+      returnedInfosetEvent(0) = batchedInfosetEvents(currentIndex)
+      batchedInfosetEvents = this.resume(unparseContentHandler, returnedInfosetEvent)
+      // we reset the index once we receive a new batch, but we don't use the first element from
+      // this point on
+      currentIndex = 0
       true
     }
   }
 
   override def next(): Unit = {
     if (hasNext()) {
-      copyEvent(source = Try(nextEvent), dest = currentEvent)
-      nextEvent.clear()
-      if (currentEvent.eventType.contains(EndDocument)) endDocumentReceived = true
+      // clear element at current index as we're done with it, except in the case we just loaded the
+      // new elements, then do nothing
+      batchedInfosetEvents(currentIndex).clear()
+

Review comment:
       Correct, we do the clearing at the start of the inputter.next call, cos that's when we can guarantee that the inputter is truly done with the event.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-daffodil] olabusayoT commented on a change in pull request #453: Add SAX Unparse Event Batching

Posted by GitBox <gi...@apache.org>.
olabusayoT commented on a change in pull request #453:
URL: https://github.com/apache/incubator-daffodil/pull/453#discussion_r520059244



##########
File path: daffodil-core/src/test/scala/org/apache/daffodil/processor/TestSAXUnparseAPI.scala
##########
@@ -46,6 +49,24 @@ class TestSAXUnparseAPI {
     assertEquals(testData, bao.toString)
   }
 
+  @Test def testUnparseContentHandler_unparse_saxUnparseEventBatchSize_0(): Unit = {
+    val dpT = testDataprocessor(testSchema, Some(Map("saxUnparseEventBatchSize" -> "0")))

Review comment:
       Correct the minimum batch size should always be 1, so this test ensures there would be an error should the user provide 0 as the batchSize. I'll add a comment to say as much




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-daffodil] olabusayoT commented on a change in pull request #453: Add SAX Unparse Event Batching

Posted by GitBox <gi...@apache.org>.
olabusayoT commented on a change in pull request #453:
URL: https://github.com/apache/incubator-daffodil/pull/453#discussion_r520067783



##########
File path: daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala
##########
@@ -70,10 +74,18 @@ class DaffodilUnparseContentHandler(
   extends DFDL.DaffodilUnparseContentHandler {
   private lazy val inputter = new SAXInfosetInputter(this, dp, output)
   private var unparseResult: DFDL.UnparseResult = _
-  private lazy val infosetEvent: DFDL.SAXInfosetEvent = new DFDL.SAXInfosetEvent
   private lazy val characterData = new StringBuilder
   private var prefixMapping: NamespaceBinding = _
   private lazy val prefixMappingTrackingStack = new MStackOf[NamespaceBinding]
+
+  private lazy val tunablesBatchSize = dp.getTunables().saxUnparseEventBatchSize
+  private lazy val SAX_UNPARSE_EVENT_BATCH_SIZE = tunablesBatchSize + 1

Review comment:
       So the tunablesBatchSixe + 1 is actually an implementation detail. 
   
   Under the hood, we always have an extra buffer in the array that we use for the hasNext call. For each element, we need to know if it has a viable next, if it doesn't, it triggers the context switch. 
   So for example, if the user provides 1 as the batchSize, under the hood we'll have [event1, event2] batched.
   - Daffodil unparse will call hasNext and getEventType for the initialization call, and that hasNext will check if currentIndex (0) + 1 is non-empty. Then it call getEventType for the event at currentIndex
   - Subsequent calls will be next(), ...some processing of the current event ..., hasNext()
   For out scenario, next() will update the currentIndex to 1, and event2 will be processed, then hasNext will check if there is a viable index 2, as there is not, it will perform the context switch
   
   Without us having the extra buffer, things would happen like this:
   user provides 1 as the batchSize, under the hood we'll have [event1] batched.
   - Daffodil unparse will call hasNext and getEventType for the initialization call, and that hasNext will check if currentIndex (0) + 1 is non-empty. As there is no index 1. It will context switch to get a new batched event, which would overwrite event1




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-daffodil] olabusayoT commented on a change in pull request #453: Add SAX Unparse Event Batching

Posted by GitBox <gi...@apache.org>.
olabusayoT commented on a change in pull request #453:
URL: https://github.com/apache/incubator-daffodil/pull/453#discussion_r520795195



##########
File path: daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala
##########
@@ -121,38 +124,45 @@ class SAXInfosetInputter(
   }
 
   override def hasNext(): Boolean = {
-    if (endDocumentReceived) false
-    else if (!nextEvent.isEmpty) true
-    else {
-      val event = this.resume(unparseContentHandler, Try(currentEvent))
-      copyEvent(source = event, dest = nextEvent)
+    val nextIndex = currentIndex + 1
+    if (endDocumentReceived) {
+      // if the current Element is EndDocument, then there is no valid next
+      false
+    } else if (batchedInfosetEvents != null && batchedInfosetEvents.lift(nextIndex).nonEmpty
+      && !batchedInfosetEvents(nextIndex).isEmpty) {
+      // if the next element exists and is nonEmpty
+      true
+    }  else  {
+      // there is no nextEvent or it was empty, but we still have no EndDocument. So load the next
+      // batch from the contentHandler
+      returnedInfosetEvent(0) = batchedInfosetEvents(currentIndex)
+      batchedInfosetEvents = this.resume(unparseContentHandler, returnedInfosetEvent)
+      // we reset the index once we receive a new batch, but we don't use the first element from
+      // this point on
+      currentIndex = 0

Review comment:
       That makes sense. Updated!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-daffodil] olabusayoT commented on pull request #453: Add SAX Unparse Event Batching

Posted by GitBox <gi...@apache.org>.
olabusayoT commented on pull request #453:
URL: https://github.com/apache/incubator-daffodil/pull/453#issuecomment-724944482


   @stevedlawrence As far as performance, see below. I tested it with a baseline of ebd1af445b9c9892481d50bdd2f094eec4f503db with 3 runs for each setting.
   
   It was run against a CSV file with 1000 entires and the CSV/src/main/resources/com/tresys/csv/xsd/csvHeaderEnforced.dfdl.xsd  schemas. N was set to 1000 for each run.
   
   No Batching
   
   total unparse time (sec): 119.045022
   min rate (files/sec): 0.887929
   max rate (files/sec): 10.849576
   avg rate (files/sec): 8.400183
   
   total unparse time (sec): 122.495742
   min rate (files/sec): 0.933206
   max rate (files/sec): 10.572712
   avg rate (files/sec): 8.163549
   
   total unparse time (sec): 115.577783
   min rate (files/sec): 0.887943
   max rate (files/sec): 10.906708
   avg rate (files/sec): 8.652182
   
   BatchSize = 1
   
   total unparse time (sec): 123.355449
   min rate (files/sec): 0.946018
   max rate (files/sec): 11.109970
   avg rate (files/sec): 8.106654
   
   total unparse time (sec): 113.578335
   min rate (files/sec): 0.942423
   max rate (files/sec): 11.725798
   avg rate (files/sec): 8.804496
   
   total unparse time (sec): 118.298859
   min rate (files/sec): 1.013238
   max rate (files/sec): 10.598986
   avg rate (files/sec): 8.453167
   
   BatchSize = 10
   
   total unparse time (sec): 37.720906
   min rate (files/sec): 1.087529
   max rate (files/sec): 41.515119
   avg rate (files/sec): 26.510498
   
   total unparse time (sec): 37.424359
   min rate (files/sec): 1.296234
   max rate (files/sec): 40.985226
   avg rate (files/sec): 26.720564
   
   total unparse time (sec): 36.939181
   min rate (files/sec): 1.272301
   max rate (files/sec): 42.095508
   avg rate (files/sec): 27.071526
   
   Batch Size = 100
   
   total unparse time (sec): 27.516491
   min rate (files/sec): 1.102611
   max rate (files/sec): 55.597572
   avg rate (files/sec): 36.341843
   
   total unparse time (sec): 29.054037
   min rate (files/sec): 1.181798
   max rate (files/sec): 53.005364
   avg rate (files/sec): 34.418625
   
   total unparse time (sec): 27.553790
   min rate (files/sec): 1.364561
   max rate (files/sec): 55.164476
   avg rate (files/sec): 36.292648
   
   Batch Size = 1000
   
   total unparse time (sec): 24.731927
   min rate (files/sec): 1.370080
   max rate (files/sec): 59.091401
   avg rate (files/sec): 40.433565
   
   total unparse time (sec): 25.288054
   min rate (files/sec): 1.389677
   max rate (files/sec): 62.957730
   avg rate (files/sec): 39.544364
   
   total unparse time (sec): 25.620273
   min rate (files/sec): 1.297336
   max rate (files/sec): 59.002985
   avg rate (files/sec): 39.031590
   
   
   
   
    


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-daffodil] olabusayoT commented on pull request #453: Add SAX Unparse Event Batching

Posted by GitBox <gi...@apache.org>.
olabusayoT commented on pull request #453:
URL: https://github.com/apache/incubator-daffodil/pull/453#issuecomment-724950338


   For reference, here is JDOM and the Daffodil default.
   
   JDOM
   
   total unparse time (sec): 21.297043
   min rate (files/sec): 1.687186
   max rate (files/sec): 64.920116
   avg rate (files/sec): 46.954875
   
   total unparse time (sec): 21.446842
   min rate (files/sec): 1.628061
   max rate (files/sec): 68.206050
   avg rate (files/sec): 46.626911
   
   total unparse time (sec): 22.870573
   min rate (files/sec): 1.625854
   max rate (files/sec): 66.413622
   avg rate (files/sec): 43.724310
   
   ScalaXML
   
   total unparse time (sec): 22.080367
   min rate (files/sec): 1.560714
   max rate (files/sec): 67.526345
   avg rate (files/sec): 45.289102
   
   total unparse time (sec): 23.218616
   min rate (files/sec): 1.315569
   max rate (files/sec): 63.884942
   avg rate (files/sec): 43.068890
   
   total unparse time (sec): 23.303886
   min rate (files/sec): 1.375595
   max rate (files/sec): 70.265265
   avg rate (files/sec): 42.911298
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-daffodil] mbeckerle commented on a change in pull request #453: Add SAX Unparse Event Batching

Posted by GitBox <gi...@apache.org>.
mbeckerle commented on a change in pull request #453:
URL: https://github.com/apache/incubator-daffodil/pull/453#discussion_r520106705



##########
File path: daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala
##########
@@ -70,10 +74,18 @@ class DaffodilUnparseContentHandler(
   extends DFDL.DaffodilUnparseContentHandler {
   private lazy val inputter = new SAXInfosetInputter(this, dp, output)
   private var unparseResult: DFDL.UnparseResult = _
-  private lazy val infosetEvent: DFDL.SAXInfosetEvent = new DFDL.SAXInfosetEvent
   private lazy val characterData = new StringBuilder
   private var prefixMapping: NamespaceBinding = _
   private lazy val prefixMappingTrackingStack = new MStackOf[NamespaceBinding]
+
+  private lazy val tunablesBatchSize = dp.getTunables().saxUnparseEventBatchSize
+  private lazy val SAX_UNPARSE_EVENT_BATCH_SIZE = tunablesBatchSize + 1

Review comment:
       Ah, so I missed this in my review, so to the degree this isnt' already in the comments, you should put this discussion into the code base. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-daffodil] stevedlawrence commented on a change in pull request #453: Add SAX Unparse Event Batching

Posted by GitBox <gi...@apache.org>.
stevedlawrence commented on a change in pull request #453:
URL: https://github.com/apache/incubator-daffodil/pull/453#discussion_r520734572



##########
File path: daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala
##########
@@ -121,38 +124,45 @@ class SAXInfosetInputter(
   }
 
   override def hasNext(): Boolean = {
-    if (endDocumentReceived) false
-    else if (!nextEvent.isEmpty) true
-    else {
-      val event = this.resume(unparseContentHandler, Try(currentEvent))
-      copyEvent(source = event, dest = nextEvent)
+    val nextIndex = currentIndex + 1
+    if (endDocumentReceived) {
+      // if the current Element is EndDocument, then there is no valid next
+      false
+    } else if (batchedInfosetEvents != null && batchedInfosetEvents.lift(nextIndex).nonEmpty
+      && !batchedInfosetEvents(nextIndex).isEmpty) {
+      // if the next element exists and is nonEmpty
+      true
+    }  else  {
+      // there is no nextEvent or it was empty, but we still have no EndDocument. So load the next
+      // batch from the contentHandler
+      returnedInfosetEvent(0) = batchedInfosetEvents(currentIndex)
+      batchedInfosetEvents = this.resume(unparseContentHandler, returnedInfosetEvent)
+      // we reset the index once we receive a new batch, but we don't use the first element from
+      // this point on
+      currentIndex = 0

Review comment:
       You're correct in practice, but in theory daffodil should be allowed to call hasNext without immediately calling next. For example, it could call hasNext(), getLocalName(), and then next(). In this case, getLocalName() still must refer the the same event bevor hasNext() was called. We could someday change the daffodil code so that happens. In which case, I think this won't work properly. I think we do need to do a copy in the ContentHandler (see on of my other comments) to allow this potential behavior.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-daffodil] olabusayoT commented on a change in pull request #453: Add SAX Unparse Event Batching

Posted by GitBox <gi...@apache.org>.
olabusayoT commented on a change in pull request #453:
URL: https://github.com/apache/incubator-daffodil/pull/453#discussion_r520729769



##########
File path: daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala
##########
@@ -121,38 +124,45 @@ class SAXInfosetInputter(
   }
 
   override def hasNext(): Boolean = {
-    if (endDocumentReceived) false
-    else if (!nextEvent.isEmpty) true
-    else {
-      val event = this.resume(unparseContentHandler, Try(currentEvent))
-      copyEvent(source = event, dest = nextEvent)
+    val nextIndex = currentIndex + 1
+    if (endDocumentReceived) {
+      // if the current Element is EndDocument, then there is no valid next
+      false
+    } else if (batchedInfosetEvents != null && batchedInfosetEvents.lift(nextIndex).nonEmpty
+      && !batchedInfosetEvents(nextIndex).isEmpty) {
+      // if the next element exists and is nonEmpty
+      true
+    }  else  {
+      // there is no nextEvent or it was empty, but we still have no EndDocument. So load the next
+      // batch from the contentHandler
+      returnedInfosetEvent(0) = batchedInfosetEvents(currentIndex)
+      batchedInfosetEvents = this.resume(unparseContentHandler, returnedInfosetEvent)
+      // we reset the index once we receive a new batch, but we don't use the first element from
+      // this point on
+      currentIndex = 0

Review comment:
       I'll update the comment, but I decided not to the copy, since once we context switch back to the inputter, we immediately call next, which clears whatever is at the currentIndex (i.e 0) away.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-daffodil] olabusayoT commented on a change in pull request #453: Add SAX Unparse Event Batching

Posted by GitBox <gi...@apache.org>.
olabusayoT commented on a change in pull request #453:
URL: https://github.com/apache/incubator-daffodil/pull/453#discussion_r520067783



##########
File path: daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala
##########
@@ -70,10 +74,18 @@ class DaffodilUnparseContentHandler(
   extends DFDL.DaffodilUnparseContentHandler {
   private lazy val inputter = new SAXInfosetInputter(this, dp, output)
   private var unparseResult: DFDL.UnparseResult = _
-  private lazy val infosetEvent: DFDL.SAXInfosetEvent = new DFDL.SAXInfosetEvent
   private lazy val characterData = new StringBuilder
   private var prefixMapping: NamespaceBinding = _
   private lazy val prefixMappingTrackingStack = new MStackOf[NamespaceBinding]
+
+  private lazy val tunablesBatchSize = dp.getTunables().saxUnparseEventBatchSize
+  private lazy val SAX_UNPARSE_EVENT_BATCH_SIZE = tunablesBatchSize + 1

Review comment:
       So this is actually an implementation detail. 
   
   Under the hood, we always have an extra buffer in the array that we use for the hasNext call. For each element, we need to know if it has a viable next, if it doesn't, it triggers the context switch. 
   So for example, if the user provides 1 as the batchSize, under the hood we'll have [event1, event2] batched.
   - Daffodil unparse will call hasNext and getEventType for the initialization call, and that hasNext will check if currentIndex (0) + 1 is non-empty. Then it call getEventType for the event at currentIndex
   - Subsequent calls will be next(), ...some processing of the current event ..., hasNext()
   For out scenario, next() will update the currentIndex to 1, and event2 will be processed, then hasNext will check if there is a viable index 2, as there is not, it will perform the context switch
   
   Without us having the extra buffer, things would happen like this:
   user provides 1 as the batchSize, under the hood we'll have [event1] batched.
   - Daffodil unparse will call hasNext and getEventType for the initialization call, and that hasNext will check if currentIndex (0) + 1 is non-empty. As there is no index 1. It will context switch to get a new batched event, which would overwrite event1




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-daffodil] stevedlawrence commented on a change in pull request #453: Add SAX Unparse Event Batching

Posted by GitBox <gi...@apache.org>.
stevedlawrence commented on a change in pull request #453:
URL: https://github.com/apache/incubator-daffodil/pull/453#discussion_r520581742



##########
File path: daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala
##########
@@ -121,38 +124,45 @@ class SAXInfosetInputter(
   }
 
   override def hasNext(): Boolean = {
-    if (endDocumentReceived) false
-    else if (!nextEvent.isEmpty) true
-    else {
-      val event = this.resume(unparseContentHandler, Try(currentEvent))
-      copyEvent(source = event, dest = nextEvent)
+    val nextIndex = currentIndex + 1
+    if (endDocumentReceived) {
+      // if the current Element is EndDocument, then there is no valid next
+      false
+    } else if (batchedInfosetEvents != null && batchedInfosetEvents.lift(nextIndex).nonEmpty
+      && !batchedInfosetEvents(nextIndex).isEmpty) {
+      // if the next element exists and is nonEmpty

Review comment:
       In what case can the nextIndex be empty? It seems like the ContentHandler will either fill up the entire array with events and not resume the coroutine until it does so, or it will stop and a EndDocument. I wonder if we can make an assumption in all this logic that there are never any empty events, as long as we never try to read past EndDocument?

##########
File path: daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala
##########
@@ -121,38 +124,45 @@ class SAXInfosetInputter(
   }
 
   override def hasNext(): Boolean = {
-    if (endDocumentReceived) false
-    else if (!nextEvent.isEmpty) true
-    else {
-      val event = this.resume(unparseContentHandler, Try(currentEvent))
-      copyEvent(source = event, dest = nextEvent)
+    val nextIndex = currentIndex + 1
+    if (endDocumentReceived) {
+      // if the current Element is EndDocument, then there is no valid next
+      false
+    } else if (batchedInfosetEvents != null && batchedInfosetEvents.lift(nextIndex).nonEmpty
+      && !batchedInfosetEvents(nextIndex).isEmpty) {
+      // if the next element exists and is nonEmpty
+      true
+    }  else  {
+      // there is no nextEvent or it was empty, but we still have no EndDocument. So load the next
+      // batch from the contentHandler
+      returnedInfosetEvent(0) = batchedInfosetEvents(currentIndex)
+      batchedInfosetEvents = this.resume(unparseContentHandler, returnedInfosetEvent)
+      // we reset the index once we receive a new batch, but we don't use the first element from
+      // this point on
+      currentIndex = 0

Review comment:
       I think this comment could be a little confusing. It maybe implies that we're doing something wasteful? The reason this must be set to 0 is because hasNext isn't allowed to change what we are looking at. Calling resume took the thing we were last looking at and made it the first thing in the array. So the currentIndex must now be changed to look at the zeroth thing. Only when next() is called are we allowed to actually increment currentIndex so we are actually looking at the next element.

##########
File path: daffodil-core/src/test/scala/org/apache/daffodil/processor/TestSAXParseUnparseAPI.scala
##########
@@ -55,8 +55,13 @@ object TestSAXParseUnparseAPI {
 
   lazy val dp: DataProcessor = testDataprocessor(testSchema)
 
-  def testDataprocessor(testSchema: scala.xml.Elem): DataProcessor = {
-    val schemaCompiler = Compiler()
+  def testDataprocessor(testSchema: scala.xml.Elem, tunablesArg: Option[Map[String, String]] = None): DataProcessor = {
+    val schemaCompiler =
+      if (tunablesArg.nonEmpty) {
+        Compiler().withTunables(tunablesArg.get)
+      } else {
+        Compiler()
+      }

Review comment:
       Rather than making tunablesArg an ``Option``, how about it's just a ``Map`` and it defaults to ``Map.empty``. The ``withTunables`` function should work just fine with an empty map, so this can just become
   ```scala
   val schemaCompiler = Compiler.withTuanbles(tunablesArg)
   ```
   So it cleans things up a little bit.
   
   Only downside is It will create a copy of the ``Compiler`` instance even if there are no tunables, but that doesn't seem like too big of a deal to me. If we really cared, we could modify the ``withTunables(Map)`` method to not do the copy if the map is empty, but I'm not sure if that's worth it.

##########
File path: daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala
##########
@@ -121,38 +124,45 @@ class SAXInfosetInputter(
   }
 
   override def hasNext(): Boolean = {
-    if (endDocumentReceived) false
-    else if (!nextEvent.isEmpty) true
-    else {
-      val event = this.resume(unparseContentHandler, Try(currentEvent))
-      copyEvent(source = event, dest = nextEvent)
+    val nextIndex = currentIndex + 1
+    if (endDocumentReceived) {
+      // if the current Element is EndDocument, then there is no valid next
+      false
+    } else if (batchedInfosetEvents != null && batchedInfosetEvents.lift(nextIndex).nonEmpty
+      && !batchedInfosetEvents(nextIndex).isEmpty) {
+      // if the next element exists and is nonEmpty
+      true
+    }  else  {
+      // there is no nextEvent or it was empty, but we still have no EndDocument. So load the next
+      // batch from the contentHandler
+      returnedInfosetEvent(0) = batchedInfosetEvents(currentIndex)
+      batchedInfosetEvents = this.resume(unparseContentHandler, returnedInfosetEvent)
+      // we reset the index once we receive a new batch, but we don't use the first element from
+      // this point on
+      currentIndex = 0
       true
     }
   }
 
   override def next(): Unit = {
     if (hasNext()) {
-      copyEvent(source = Try(nextEvent), dest = currentEvent)
-      nextEvent.clear()
-      if (currentEvent.eventType.contains(EndDocument)) endDocumentReceived = true
+      // clear element at current index as we're done with it, except in the case we just loaded the
+      // new elements, then do nothing
+      batchedInfosetEvents(currentIndex).clear()
+

Review comment:
       Just  so it's clear to me, the contract between the ContenttHandler and the InofsetInputter is that the ContentHandler fills up the array, and as the SAXInfosetInputter is done with each event it clears that event. So when we resume to the infoset inputter it can assume that the entire array has been cleared and doesn't need to clear anything itself?

##########
File path: daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala
##########
@@ -121,38 +124,45 @@ class SAXInfosetInputter(
   }
 
   override def hasNext(): Boolean = {
-    if (endDocumentReceived) false
-    else if (!nextEvent.isEmpty) true
-    else {
-      val event = this.resume(unparseContentHandler, Try(currentEvent))
-      copyEvent(source = event, dest = nextEvent)
+    val nextIndex = currentIndex + 1
+    if (endDocumentReceived) {
+      // if the current Element is EndDocument, then there is no valid next
+      false
+    } else if (batchedInfosetEvents != null && batchedInfosetEvents.lift(nextIndex).nonEmpty
+      && !batchedInfosetEvents(nextIndex).isEmpty) {
+      // if the next element exists and is nonEmpty
+      true
+    }  else  {
+      // there is no nextEvent or it was empty, but we still have no EndDocument. So load the next
+      // batch from the contentHandler
+      returnedInfosetEvent(0) = batchedInfosetEvents(currentIndex)
+      batchedInfosetEvents = this.resume(unparseContentHandler, returnedInfosetEvent)
+      // we reset the index once we receive a new batch, but we don't use the first element from
+      // this point on
+      currentIndex = 0
       true
     }
   }
 
   override def next(): Unit = {
     if (hasNext()) {
-      copyEvent(source = Try(nextEvent), dest = currentEvent)
-      nextEvent.clear()
-      if (currentEvent.eventType.contains(EndDocument)) endDocumentReceived = true
+      // clear element at current index as we're done with it, except in the case we just loaded the
+      // new elements, then do nothing
+      batchedInfosetEvents(currentIndex).clear()
+
+      // increment current index to the next index
+      currentIndex += 1
+
+      // check if new current index is EndDocument
+      if (batchedInfosetEvents.lift(currentIndex).nonEmpty
+        && batchedInfosetEvents(currentIndex).eventType.contains(EndDocument)) {
+        endDocumentReceived = true
+      }

Review comment:
       We called hasNext already, so that should mean currentIndex must exist, so I think this lift call can be removed.

##########
File path: daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala
##########
@@ -121,38 +124,45 @@ class SAXInfosetInputter(
   }
 
   override def hasNext(): Boolean = {
-    if (endDocumentReceived) false
-    else if (!nextEvent.isEmpty) true
-    else {
-      val event = this.resume(unparseContentHandler, Try(currentEvent))
-      copyEvent(source = event, dest = nextEvent)
+    val nextIndex = currentIndex + 1
+    if (endDocumentReceived) {
+      // if the current Element is EndDocument, then there is no valid next
+      false
+    } else if (batchedInfosetEvents != null && batchedInfosetEvents.lift(nextIndex).nonEmpty

Review comment:
       We might want to avoid the lift function. This allocates an Option, which we try to avoid where possible. Also, looking at the definition of Array.lift in scala doc is says:
   
   > This member is added by an implicit conversion from Array[T] toIndexedSeq[T] performed by method copyArrayToImmutableIndexedSeq in scala.LowPriorityImplicits2.
   
   So just do do this lift, there's some copying going on. This copy is hopefully efficient, but probably best to avoid. Especially since it seems to be just testing if there is a nextIndex. I think this can be done instead with something like ``nextIndex < batchedInfosetEvents.length``

##########
File path: daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala
##########
@@ -212,31 +224,39 @@ class DaffodilUnparseContentHandler(
   }
 
   private def sendToInputter(): Unit = {
-    val infosetEventWithResponse = this.resume(inputter, Try(infosetEvent))
-    infosetEvent.clear()
-    // if event is wrapped in a Try failure, we will not have an unparseResult, so we only set
-    // unparseResults for events wrapped in Try Success, including those events that have expected
-    // errors
-    if (infosetEventWithResponse.isSuccess && infosetEventWithResponse.get.unparseResult.isDefined) {
-      unparseResult = infosetEventWithResponse.get.unparseResult.get
-    }
-    // the exception from events wrapped in Try failures and events wrapped in Try Successes
-    // (with an unparse error state i.e unparseResult.isError) are collected and thrown to stop
-    // the execution of the xmlReader
-    if (infosetEventWithResponse.isFailure || infosetEventWithResponse.get.isError) {
-      val causeError = if(infosetEventWithResponse.isFailure) {
-        infosetEventWithResponse.failed.get
-      } else {
-        infosetEventWithResponse.get.causeError.get
+    val nextIndex = currentIndex + 1
+    if (nextIndex < SAX_UNPARSE_EVENT_BATCH_SIZE
+      && batchedInfosetEvents(currentIndex).eventType.get != EndDocument) {
+      // if we have room left on the batchedInfosetEvents array and the current element != EndDocument
+      currentIndex += 1
+      // if the new currentElement has any existing content, clear it
+      if (!batchedInfosetEvents(currentIndex).isEmpty) batchedInfosetEvents(currentIndex).clear()
+    } else {
+      // ready to send it off
+      val infosetEventWithResponse = this.resume(inputter, batchedInfosetEvents).head
+      // we only ever return a one element array
+
+      // it is possible for unparseResult to be null, in the case of an DaffodilUnhandledSAXException
+      if (infosetEventWithResponse.unparseResult.isDefined) {
+        unparseResult = infosetEventWithResponse.unparseResult.get
       }
-      causeError match {
-        case unparseError: DaffodilUnparseErrorSAXException =>
-          // although this is an expected error, we need to throw it so we can stop the xmlReader
-          // parse and this thread
-          throw unparseError
-        case unhandled: DaffodilUnhandledSAXException => throw unhandled
-        case unknown => throw new DaffodilUnhandledSAXException("Unknown exception: ", new Exception(unknown))
+      // any exception is collected and thrown to stop the execution of the xmlReader
+      if (infosetEventWithResponse.isError) {
+        val causeError = infosetEventWithResponse.causeError.get
+        causeError match {
+          case unparseError: DaffodilUnparseErrorSAXException =>
+            // although this is an expected error, we need to throw it so we can stop the xmlReader
+            // parse and this thread
+            throw unparseError
+          case unhandled: DaffodilUnhandledSAXException => throw unhandled
+          case unknown => throw new DaffodilUnhandledSAXException("Unknown exception: ",
+            new Exception(unknown))
+        }
       }
+      // clear the last element, then start to load elements starting at the second element since
+      // the first element is used as a buffer by the SAXInfosetInputter when new batches are sent
+      batchedInfosetEvents(currentIndex).clear()

Review comment:
       I was saying other calls to clear in the ContentHandler maybe weren't needed, but I think this one is actually needed. That's because the InfosetInputter only clears the current element when it calls next(), but it never calls next when the current element is the last one in the array. So the InfosetInputter clears all elements except the last one. Might be worth adding a comment to that affect.

##########
File path: daffodil-core/src/test/scala/org/apache/daffodil/processor/TestSAXUnparseAPI.scala
##########
@@ -46,6 +49,24 @@ class TestSAXUnparseAPI {
     assertEquals(testData, bao.toString)
   }
 
+  @Test def testUnparseContentHandler_unparse_saxUnparseEventBatchSize_0(): Unit = {
+    val dpT = testDataprocessor(testSchema, Some(Map("saxUnparseEventBatchSize" -> "0")))

Review comment:
       I've also created [DAFFODIL-2434](https://issues.apache.org/jira/browse/DAFFODIL-2432) as a way to more throughly ensure our tunables have valid values. That way we can validate them when they are set rather than when the values are actually used.

##########
File path: daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala
##########
@@ -212,31 +224,39 @@ class DaffodilUnparseContentHandler(
   }
 
   private def sendToInputter(): Unit = {

Review comment:
       Suggest renaming this to something like ``maybeSendToInputter`` since it might not necesarily send to the inputter unless the batch is full?

##########
File path: daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala
##########
@@ -212,31 +224,39 @@ class DaffodilUnparseContentHandler(
   }
 
   private def sendToInputter(): Unit = {
-    val infosetEventWithResponse = this.resume(inputter, Try(infosetEvent))
-    infosetEvent.clear()
-    // if event is wrapped in a Try failure, we will not have an unparseResult, so we only set
-    // unparseResults for events wrapped in Try Success, including those events that have expected
-    // errors
-    if (infosetEventWithResponse.isSuccess && infosetEventWithResponse.get.unparseResult.isDefined) {
-      unparseResult = infosetEventWithResponse.get.unparseResult.get
-    }
-    // the exception from events wrapped in Try failures and events wrapped in Try Successes
-    // (with an unparse error state i.e unparseResult.isError) are collected and thrown to stop
-    // the execution of the xmlReader
-    if (infosetEventWithResponse.isFailure || infosetEventWithResponse.get.isError) {
-      val causeError = if(infosetEventWithResponse.isFailure) {
-        infosetEventWithResponse.failed.get
-      } else {
-        infosetEventWithResponse.get.causeError.get
+    val nextIndex = currentIndex + 1
+    if (nextIndex < SAX_UNPARSE_EVENT_BATCH_SIZE
+      && batchedInfosetEvents(currentIndex).eventType.get != EndDocument) {
+      // if we have room left on the batchedInfosetEvents array and the current element != EndDocument
+      currentIndex += 1
+      // if the new currentElement has any existing content, clear it
+      if (!batchedInfosetEvents(currentIndex).isEmpty) batchedInfosetEvents(currentIndex).clear()

Review comment:
       I think this shouldn't be necessary since the the InfosetInputter is already clearing everything? Perhaps this should just be ``Assert(batchedInfosetEvents(currentIndex).isEmpty)`` if we want to ensure that the ContentHandler is doing the right hthing?

##########
File path: daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala
##########
@@ -212,31 +224,39 @@ class DaffodilUnparseContentHandler(
   }
 
   private def sendToInputter(): Unit = {
-    val infosetEventWithResponse = this.resume(inputter, Try(infosetEvent))
-    infosetEvent.clear()
-    // if event is wrapped in a Try failure, we will not have an unparseResult, so we only set
-    // unparseResults for events wrapped in Try Success, including those events that have expected
-    // errors
-    if (infosetEventWithResponse.isSuccess && infosetEventWithResponse.get.unparseResult.isDefined) {
-      unparseResult = infosetEventWithResponse.get.unparseResult.get
-    }
-    // the exception from events wrapped in Try failures and events wrapped in Try Successes
-    // (with an unparse error state i.e unparseResult.isError) are collected and thrown to stop
-    // the execution of the xmlReader
-    if (infosetEventWithResponse.isFailure || infosetEventWithResponse.get.isError) {
-      val causeError = if(infosetEventWithResponse.isFailure) {
-        infosetEventWithResponse.failed.get
-      } else {
-        infosetEventWithResponse.get.causeError.get
+    val nextIndex = currentIndex + 1
+    if (nextIndex < SAX_UNPARSE_EVENT_BATCH_SIZE
+      && batchedInfosetEvents(currentIndex).eventType.get != EndDocument) {
+      // if we have room left on the batchedInfosetEvents array and the current element != EndDocument
+      currentIndex += 1
+      // if the new currentElement has any existing content, clear it
+      if (!batchedInfosetEvents(currentIndex).isEmpty) batchedInfosetEvents(currentIndex).clear()
+    } else {
+      // ready to send it off
+      val infosetEventWithResponse = this.resume(inputter, batchedInfosetEvents).head
+      // we only ever return a one element array
+

Review comment:
       I feel like something is missing somewhere after this.
   
   At this point, the InfosetInputter has consumed all of the events in the the array, it called hasNext, and we are back at the ContentHandler here to get more events. That means the InfoseInputter thinks the current event is the last one in the batch array. When the ContentHander eventually yeilds back to the InfosetInputter, the InfosetInputter will expect that the event it was looking at to now be at the first index in the array (that's why it resets currentIndex back to zero). So I would expect this to copy that last event from the last index to the first index. And in fact, I think the last current event is the infosetEventWIthRepponse, right? So I think you can do something like:
   
   ```scala
   if (infosetEventWithResponse.unparseResult.isEmpty) {
     copyEvent(infosetEventWithResponse, batchedInfosetEvents(0))
   } 
   ```
   Note that I think this technically doesn't matter because anytime we call hasNext we immediately call next(), so the infoset inputter will never actually access this zeroth event after hasNext is called. However, someday we might, and so we want to ensure things work. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-daffodil] mbeckerle commented on a change in pull request #453: Add SAX Unparse Event Batching

Posted by GitBox <gi...@apache.org>.
mbeckerle commented on a change in pull request #453:
URL: https://github.com/apache/incubator-daffodil/pull/453#discussion_r519950055



##########
File path: daffodil-lib/src/main/scala/org/apache/daffodil/util/Coroutines.scala
##########
@@ -38,7 +38,7 @@
  trait Coroutine[T] {
 
    private val queueCapacity: Int = 1

Review comment:
       So here the queue size is hard wired to 1. You aren't using the tunable size. That makes sense because if you enlarge this queue, then the producer and consumer threads would run concurrently, which is specifically something we're trying to avoid when using Coroutines. 
   
   Worth it to point this structural issue with coroutines here with a comment that if the context-switching overhead is too large, that passing a larger data structure containing multiple smaller events is the way to fix it, not by enlarging this queue. 
   
   I like the design choice here to make that the responsibility of the calling application of the coroutines. Burying a high-overhead abstraction and trying to buffer it into submission seldom helps. 

##########
File path: daffodil-core/src/test/scala/org/apache/daffodil/processor/TestSAXUnparseAPI.scala
##########
@@ -46,6 +49,24 @@ class TestSAXUnparseAPI {
     assertEquals(testData, bao.toString)
   }
 
+  @Test def testUnparseContentHandler_unparse_saxUnparseEventBatchSize_0(): Unit = {
+    val dpT = testDataprocessor(testSchema, Some(Map("saxUnparseEventBatchSize" -> "0")))

Review comment:
       Comment: what does batch size 0 test? I would expect min batch size to be 1. 

##########
File path: daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala
##########
@@ -183,22 +193,23 @@ class SAXInfosetInputter(
 
   override protected def run(): Unit = {
     try {
-      // startDocument kicks off this entire process, so it should be on the queue so the
-      // waitForResume call can grab it. That is set to our current event, so when hasNext is called
-      // the nextEvent after the StartDocument can be queued
-      copyEvent(source = this.waitForResume(), dest = currentEvent)
+      // startDocument kicks off this entire process, so the first batch of events starting with it
+      // should be on the queue so the waitForResume call can grab it. This populates the
+      // batchedInfosetEvents global var for use by the Inputter

Review comment:
       Can't be "global" in the sense of a JVM-wide singleton here. It has to be per inputter. Otherwise you can't run multiple SAX parsers/unparsers at the same time in one JVM. We need to be able to do that. 
   
   I assume this isn't really "global", so please update comment and indicate the real scope of this var, which is, I think a private var in this SAXInfosetInputter ??

##########
File path: daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala
##########
@@ -70,10 +74,18 @@ class DaffodilUnparseContentHandler(
   extends DFDL.DaffodilUnparseContentHandler {
   private lazy val inputter = new SAXInfosetInputter(this, dp, output)
   private var unparseResult: DFDL.UnparseResult = _
-  private lazy val infosetEvent: DFDL.SAXInfosetEvent = new DFDL.SAXInfosetEvent
   private lazy val characterData = new StringBuilder
   private var prefixMapping: NamespaceBinding = _
   private lazy val prefixMappingTrackingStack = new MStackOf[NamespaceBinding]
+
+  private lazy val tunablesBatchSize = dp.getTunables().saxUnparseEventBatchSize
+  private lazy val SAX_UNPARSE_EVENT_BATCH_SIZE = tunablesBatchSize + 1

Review comment:
       Also, why all caps? Seems to me this should just be actualBatchSize. 

##########
File path: daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala
##########
@@ -70,10 +74,18 @@ class DaffodilUnparseContentHandler(
   extends DFDL.DaffodilUnparseContentHandler {
   private lazy val inputter = new SAXInfosetInputter(this, dp, output)
   private var unparseResult: DFDL.UnparseResult = _
-  private lazy val infosetEvent: DFDL.SAXInfosetEvent = new DFDL.SAXInfosetEvent
   private lazy val characterData = new StringBuilder
   private var prefixMapping: NamespaceBinding = _
   private lazy val prefixMappingTrackingStack = new MStackOf[NamespaceBinding]
+
+  private lazy val tunablesBatchSize = dp.getTunables().saxUnparseEventBatchSize
+  private lazy val SAX_UNPARSE_EVENT_BATCH_SIZE = tunablesBatchSize + 1

Review comment:
       We really do, for testing purposes, want to be able to make this 1, not 2. So suggest this is changed to
   ```
   if (tunablesBatchSize < 1) 1 else tunablesBatchSize
   ```

##########
File path: daffodil-core/src/test/scala/org/apache/daffodil/processor/TestSAXParseUnparseAPI.scala
##########
@@ -55,8 +55,13 @@ object TestSAXParseUnparseAPI {
 
   lazy val dp: DataProcessor = testDataprocessor(testSchema)
 
-  def testDataprocessor(testSchema: scala.xml.Elem): DataProcessor = {
-    val schemaCompiler = Compiler()
+  def testDataprocessor(testSchema: scala.xml.Elem, tunablesArg: Option[Map[String, String]] = None): DataProcessor = {

Review comment:
       name conventions: testDataProcessor (cap P)

##########
File path: daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala
##########
@@ -35,25 +34,28 @@ import org.apache.daffodil.xml.XMLUtils
 
 /**
  * The SAXInfosetInputter consumes SAXInfosetEvent objects from the DaffodilUnparseContentHandler
- * class and converts them to events that the DataProcessor unparse can use. This class contains two
- * SAXInfosetEvent objects, the current event the unparse method is processing and the next event
- * to be processed later.
+ * class and converts them to events that the DataProcessor unparse can use. This class contains an
+ * array of batched SAXInfosetEvent objects that it receives from the contentHandler and the index
+ * of the current element being processed.
  *
- * This class together with the DaffodilUnparseContentHandler use coroutines to ensure that only one event,
- * at a time, is passed between the two classes. The following is the general process:
+ * This class, together with the SAXInfosetInputter, uses coroutines to ensure that a batch of events
+ * (based on the tunable saxUnparseEventBatchSize) can be passed from the former to the latter.
+ * The following is the general process:
  *
- * - the run method is called, with a StartDocument event already loaded on the inputter's queue.
- * This is collected and stored in the currentEvent member
+ * - the run method is called, with the first batch of events, starting with the StartDocument event,
+ * already loaded on the inputter's queue.
+ * This is collected and stored in the batchedInfosetEvents member, and the currentIndex is set to 0
  * - The dp.unparse method is called, and it calls hasNext to make sure an event exists to be
- * processed and then queries the currentEvent. The hasNext call also queues the nextEvent by
- * transferring control to the contentHandler so it can load the next event.
- * - After it is done with the currentEvent, it calls inputter.next to get the next event, and that
- * copies the queued nextEvent into the currentEvent
- * - This process continues until the currentEvent contains an EndDocument event, at which point, the
- * nextEvent is cleared, endDocumentReceived is set to true and hasNext will return false
- * - This ends the unparse process, and the unparseResult and/or any Errors are set on the event. We
- * call resumeFinal passing along that element, terminating this thread and resuming the
- * contentHandler for the last time.
+ * processed and then queries the event at currentIndex. The hasNext call also checks that there is
+ * a next event to be processed (currentIndex+1), and if not, queues the next batch of events by
+ * transferring control to the contentHandler so it can load them.
+ * - After it is done with the current event, it calls inputter.next to get the next event, and that
+ * increments the currentIndex and cleans out the event at the previous index
+ * - This process continues until the event at currentIndex contains an EndDocument event, at which point, the
+ * endDocumentReceived is set to true and hasNext will return false

Review comment:
       Add comment: This implies that all these arrays are full of events, except the last one that contains the EndDocument event which will be the only "short" array. There is no reason the first array can't also be the last one if all the events from StartDocument to EndDocument fit in a single array. 

##########
File path: daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala
##########
@@ -183,22 +193,23 @@ class SAXInfosetInputter(
 
   override protected def run(): Unit = {
     try {
-      // startDocument kicks off this entire process, so it should be on the queue so the
-      // waitForResume call can grab it. That is set to our current event, so when hasNext is called
-      // the nextEvent after the StartDocument can be queued
-      copyEvent(source = this.waitForResume(), dest = currentEvent)
+      // startDocument kicks off this entire process, so the first batch of events starting with it
+      // should be on the queue so the waitForResume call can grab it. This populates the
+      // batchedInfosetEvents global var for use by the Inputter
+      batchedInfosetEvents = this.waitForResume()
       val unparseResult = dp.unparse(this, output)
-      currentEvent.unparseResult = One(unparseResult)
+      batchedInfosetEvents(currentIndex).unparseResult = One(unparseResult)
       if (unparseResult.isError) {
         // unparseError is contained within unparseResult
-        currentEvent.causeError = One(new DaffodilUnparseErrorSAXException(unparseResult))
+        batchedInfosetEvents(currentIndex).causeError = One(new DaffodilUnparseErrorSAXException(unparseResult))
       }
     } catch {
       case e: Exception => {
-        currentEvent.causeError = One(new DaffodilUnhandledSAXException(e.getMessage, e))
+        batchedInfosetEvents(currentIndex).causeError = One(new DaffodilUnhandledSAXException(e.getMessage, e))

Review comment:
       DaffodilUnhandledSAXException really should take just the 'e' argument. No reason to grab the string and make it construct the message, etc here. All the information is in the 'e' exception object. This is just wrapping it in an exception type we know about. getMesage should be called by code that is planning to display the message somewhere, or put it into a log, perhaps causing internationalized translation of the message to be triggered, etc. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org