You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@daffodil.apache.org by GitBox <gi...@apache.org> on 2022/11/18 20:36:08 UTC

[GitHub] [daffodil] stevedlawrence opened a new pull request, #879: Improve SAX parse/unparse performance

stevedlawrence opened a new pull request, #879:
URL: https://github.com/apache/daffodil/pull/879

   In SAX unparsing, remove coroutines--it's not totally clear why this helps, but they do cause a noticeable overhead with coroutines. Instead events are stored in thread-safe FIFO ArrayBlockingQueue which is shared between the DaffodilUnparseContentHandler and the SAXInfosetInputter. As the DaffodilContentHandler receives SAX events it put()'s them on the queue. As the SAXInfosetInputter is ready for events it takes from the queue. A bit more care is needed to ensure there are no deadlocks, but this allows the two threads to work in parallel, which may help speed things up a bit, especially if SAX events come in slowly.
   
   In addition to not using coroutines, the unparse() call is now run in a Future with an ExecutionContent using a cached thread pool. This should allow subsequent SAX unparse() calls to reuse threads, avoiding some overhead with thread creation.
   
   In SAX unparsing, keep a reference to the current event being mutated in a pre-allocated array of events. Not only does this avoid many array index lookups, it makes the code much cleaner.
   
   In SAX unparsing, initialize prefixMapping to TopScope instead of null. With a null value it was possible for getURI() to throw a NullPointerException if there is no prefix mapping, which is common when an element does not have a prefix and there is no default mapping. Before we caught and handled the exception, but we've seen some exceptions to have noticeable overhead. By using TopScope, there is no exception and instead getURI returns null if no mapping is found.
   
   In SAX unaparsing, avoid .split() which compiles and evaluates a slow regular expression behind the scenes. We only need to split on a single char, which we can do much faster using indexOf() and substring().
   
   In SAX parsing, we now store the computed prefix/prefixedName in a val instead of a def. The def was a noticeable hotspot during profiling that this removes.
   
   Also included was a handful of refactorings, such as moving SAXInfosetInputterEvents out of the DFDLParserUnparse API file (since they aren't really API related), move functions around to improve logical groupings, and rename variables/functions to be consistent between the DaffodilUnparseContentHandler and SAXInfosetInputter.
   
   DAFFODIL-2400


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@daffodil.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [daffodil] stevedlawrence commented on pull request #879: Improve SAX parse/unparse performance

Posted by GitBox <gi...@apache.org>.
stevedlawrence commented on PR #879:
URL: https://github.com/apache/daffodil/pull/879#issuecomment-1369716275

   > please resolve the conflict and merge the PR or rework/close the PR if you don't want to merge it in its current state.
   
   Based on @mbeckerle's comment, I've done a bit more testing and profiling of each individual change in this PR and I think the removal of the coroutines are a bit slower. I'll update this PR once i get a better grasp of which changes really do help with performance and which don't. I'll convert this PR to a draft and update once it's ready.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@daffodil.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [daffodil] mbeckerle commented on a diff in pull request #879: Improve SAX parse/unparse performance

Posted by GitBox <gi...@apache.org>.
mbeckerle commented on code in PR #879:
URL: https://github.com/apache/daffodil/pull/879#discussion_r1035064494


##########
daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala:
##########
@@ -17,327 +17,486 @@
 
 package org.apache.daffodil.processors
 
-import scala.xml.NamespaceBinding
 
+
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.Executors
 import javax.xml.XMLConstants
+
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+
+import scala.xml.NamespaceBinding
+import scala.xml.TopScope
+
+import org.xml.sax.Attributes
+import org.xml.sax.Locator
+import org.xml.sax.SAXException
+
 import org.apache.daffodil.api.DFDL
 import org.apache.daffodil.api.DFDL.DaffodilUnhandledSAXException
 import org.apache.daffodil.api.DFDL.DaffodilUnparseErrorSAXException
-import org.apache.daffodil.api.DFDL.SAXInfosetEvent
 import org.apache.daffodil.exceptions.Assert
-import org.apache.daffodil.infoset.IllegalContentWhereEventExpected
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndDocument
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndElement
 import org.apache.daffodil.infoset.InfosetInputterEventType.StartDocument
 import org.apache.daffodil.infoset.InfosetInputterEventType.StartElement
 import org.apache.daffodil.infoset.SAXInfosetInputter
+import org.apache.daffodil.infoset.SAXInfosetInputterEvent
 import org.apache.daffodil.util.MStackOf
-import org.apache.daffodil.util.Maybe.Nope
+import org.apache.daffodil.util.Maybe
 import org.apache.daffodil.util.Maybe.One
 import org.apache.daffodil.util.Misc
-import org.xml.sax.Attributes
-import org.xml.sax.Locator
 
 /**
- * DaffodilUnparseContentHandler produces SAXInfosetEvent objects for the SAXInfosetInputter to
- * consume and convert to events that the Dataprocessor unparse can use. The SAXInfosetEvent object
- * is built from information that is passed to the ContentHandler from an XMLReader parser. In
- * order to receive the uri and prefix information from the XMLReader, the XMLReader must have
- * support for XML Namespaces
+ * Provides a cached thread pool that Scala Futures can use so that SAX
+ * unparse() calls can reuse threads, avoiding overhead related to creating new
+ * threads.
+ */
+object DaffodilUnparseContentHandler {
+  val executionContext = new ExecutionContext {
+    private val threadPool = Executors.newCachedThreadPool()
+    def execute(runnable: Runnable): Unit = threadPool.submit(runnable)
+
+    //$COVERAGE-OFF$
+    def reportFailure(t: Throwable): Unit = {} //do nothing
+    //$COVERAGE-ON$
+  }
+}
+
+/**
+ * DaffodilUnparseContentHandler produces SAXInfosetInputterEvent objects for
+ * the SAXInfosetInputter to consume and convert to events that the
+ * Dataprocessor unparse() can use. The SAXInfosetInputterEvents are built base
+ * on information that is passed to this ContentHandler from an XMLReader.
+ *
+ * This class runs the DataProcessor unparse() method in a separate thread,
+ * with a shared ArrayBlockigQueue to provide SAXInfosetInputerEvents. This
+ * queue has a maximum size defined by the saxUnaprseEventBatchSize tunble

Review Comment:
   tunble > tunable



##########
daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala:
##########
@@ -17,327 +17,486 @@
 
 package org.apache.daffodil.processors
 
-import scala.xml.NamespaceBinding
 
+
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.Executors
 import javax.xml.XMLConstants
+
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+
+import scala.xml.NamespaceBinding
+import scala.xml.TopScope
+
+import org.xml.sax.Attributes
+import org.xml.sax.Locator
+import org.xml.sax.SAXException
+
 import org.apache.daffodil.api.DFDL
 import org.apache.daffodil.api.DFDL.DaffodilUnhandledSAXException
 import org.apache.daffodil.api.DFDL.DaffodilUnparseErrorSAXException
-import org.apache.daffodil.api.DFDL.SAXInfosetEvent
 import org.apache.daffodil.exceptions.Assert
-import org.apache.daffodil.infoset.IllegalContentWhereEventExpected
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndDocument
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndElement
 import org.apache.daffodil.infoset.InfosetInputterEventType.StartDocument
 import org.apache.daffodil.infoset.InfosetInputterEventType.StartElement
 import org.apache.daffodil.infoset.SAXInfosetInputter
+import org.apache.daffodil.infoset.SAXInfosetInputterEvent
 import org.apache.daffodil.util.MStackOf
-import org.apache.daffodil.util.Maybe.Nope
+import org.apache.daffodil.util.Maybe
 import org.apache.daffodil.util.Maybe.One
 import org.apache.daffodil.util.Misc
-import org.xml.sax.Attributes
-import org.xml.sax.Locator
 
 /**
- * DaffodilUnparseContentHandler produces SAXInfosetEvent objects for the SAXInfosetInputter to
- * consume and convert to events that the Dataprocessor unparse can use. The SAXInfosetEvent object
- * is built from information that is passed to the ContentHandler from an XMLReader parser. In
- * order to receive the uri and prefix information from the XMLReader, the XMLReader must have
- * support for XML Namespaces
+ * Provides a cached thread pool that Scala Futures can use so that SAX
+ * unparse() calls can reuse threads, avoiding overhead related to creating new
+ * threads.
+ */
+object DaffodilUnparseContentHandler {
+  val executionContext = new ExecutionContext {
+    private val threadPool = Executors.newCachedThreadPool()
+    def execute(runnable: Runnable): Unit = threadPool.submit(runnable)
+
+    //$COVERAGE-OFF$
+    def reportFailure(t: Throwable): Unit = {} //do nothing
+    //$COVERAGE-ON$
+  }
+}
+
+/**
+ * DaffodilUnparseContentHandler produces SAXInfosetInputterEvent objects for
+ * the SAXInfosetInputter to consume and convert to events that the
+ * Dataprocessor unparse() can use. The SAXInfosetInputterEvents are built base
+ * on information that is passed to this ContentHandler from an XMLReader.
+ *
+ * This class runs the DataProcessor unparse() method in a separate thread,
+ * with a shared ArrayBlockigQueue to provide SAXInfosetInputerEvents. This
+ * queue has a maximum size defined by the saxUnaprseEventBatchSize tunble

Review Comment:
   Can this be set small enough as to enforce sequential behavior, i.e., no parallelism between the caller of the content handler and the unparser? 
   
   I continue to be of the opinion that overlap parallelism here is not an advantage. It just muddies the waters about timing and overhead of unparsing. 
   
   To reduce overhead, we need to enqueue many events before context switching and allowing the unparser to run. Arguably, we should just queue up events to some max count, or until we get endDocument. For small messages we would then get exactly one context switch per message. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@daffodil.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [daffodil] stevedlawrence commented on a diff in pull request #879: Improve SAX parse/unparse performance

Posted by GitBox <gi...@apache.org>.
stevedlawrence commented on code in PR #879:
URL: https://github.com/apache/daffodil/pull/879#discussion_r1063602691


##########
daffodil-lib/src/main/scala/org/apache/daffodil/util/Coroutines.scala:
##########
@@ -20,10 +20,25 @@
  import org.apache.daffodil.exceptions.UnsuppressableException
 
  import java.util.concurrent.ArrayBlockingQueue
+ import java.util.concurrent.Executors
+
+ import scala.concurrent.ExecutionContext
+ import scala.concurrent.Future
  import scala.util.Failure
  import scala.util.Success
  import scala.util.Try
 
+ object Coroutine {
+   val executionContext = new ExecutionContext {
+     private val threadPool = Executors.newCachedThreadPool()
+     def execute(runnable: Runnable): Unit = threadPool.submit(runnable)

Review Comment:
   > How is the thread pool eventually closed, or does this executionContext stay around as long as the program keeps running?
   
   Yep, the JavaDoc mentions the same 60 second inactivity before throwing away a thread. So in low activity environments the threads are automatically discarded. We lose the performance gains, but if it's low activity I would think lower performance would be fine.
   
   >  How does Daffodil guarantee that only a small number of coroutine threads are used at any time?
   
   Is this a guarantee we want to make?
   
   There is only ever going to be one thread per active call to SAX unparse. So the only way to hit the INT_MAX limit is to invoke INT_MAX SAX unparses at the same time in parallel. I imagine trying to do that is going to cripple the system regardless of whether we're using coroutines/SAX or not.
   
   I think we just rely on the user to limit how many parallel unparse calls they think is reasonable, and we'll spawn one thread for each of those for our thread pool. If those users don't limit it any way and allow INT_MAX parallel unparses, then that's on them and we'll do our best to keep up.
   
   If we wanted, we could add allow callers to pass in an execution context to the `newContentHandlerInstance` function, which would give them control over how threads are spawned. I'm not against this, but I'm not sure if people would ever really use it...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@daffodil.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [daffodil] stevedlawrence commented on pull request #879: Improve SAX parse/unparse performance

Posted by GitBox <gi...@apache.org>.
stevedlawrence commented on PR #879:
URL: https://github.com/apache/daffodil/pull/879#issuecomment-1373673833

   I've updated this PR with just the changes that I've found makes a noticeable improvement. The biggest improvement is using a thread pool so we reuse our coroutine threads where possible, but the other three commits did make a slight improvement on average. I've also update the PR description.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@daffodil.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [daffodil] mbeckerle commented on pull request #879: Improve SAX parse/unparse performance

Posted by GitBox <gi...@apache.org>.
mbeckerle commented on PR #879:
URL: https://github.com/apache/daffodil/pull/879#issuecomment-1331234450

   
   > I think one potential advantage of this parallel approach is if the incoming sax events are sporadic or relatively slow (e.g. serialized over a network/diode). With the coroutine approach, we won't attempt to do any unparsing until we get a full batch of events (or we reach the endDoc event). And if the batch size is set to something large to avoid context switching, it might mean a lot of waiting doing nothing until we get those events. 
   
   While some applications (large data streaming) may be able to benefit from this, I know of at least one Daffodil cyberian implementation where the XML will be presented to Daffodil all at once, as a single buffer of XML data, to be unparsed.  In that case, any context switching is effectively just added overhead unless there are idle CPUs and idle memory bandwidth on the system (which I consider unlikely). 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@daffodil.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [daffodil] tuxji commented on a diff in pull request #879: Improve SAX parse/unparse performance

Posted by GitBox <gi...@apache.org>.
tuxji commented on code in PR #879:
URL: https://github.com/apache/daffodil/pull/879#discussion_r1063639451


##########
daffodil-lib/src/main/scala/org/apache/daffodil/util/Coroutines.scala:
##########
@@ -20,10 +20,25 @@
  import org.apache.daffodil.exceptions.UnsuppressableException
 
  import java.util.concurrent.ArrayBlockingQueue
+ import java.util.concurrent.Executors
+
+ import scala.concurrent.ExecutionContext
+ import scala.concurrent.Future
  import scala.util.Failure
  import scala.util.Success
  import scala.util.Try
 
+ object Coroutine {
+   val executionContext = new ExecutionContext {
+     private val threadPool = Executors.newCachedThreadPool()
+     def execute(runnable: Runnable): Unit = threadPool.submit(runnable)

Review Comment:
   I didn't know we add only 1 extra thread per SAX unparse call anyway, which is only going to double the total number of threads, not increase the number of threads exponentially.  No, we don't need to allow callers to pass in an execution context.  I agree it's really on the users to limit their calls if they use Daffodil as a server and make as many SAX unparse calls as there are connections from client machines.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@daffodil.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [daffodil] mbeckerle commented on pull request #879: Improve SAX parse/unparse performance

Posted by GitBox <gi...@apache.org>.
mbeckerle commented on PR #879:
URL: https://github.com/apache/daffodil/pull/879#issuecomment-1373954062

   I don't have access to my computer today, but I wanted to look this over.
   
   I don't understand at all why a thread pool should make any difference and
   if using a thread pool does matter it suggests we are doing something
   terribly wrong. We should create the coroutine thread exactly once.
   
   
   
   On Fri, Jan 6, 2023, 12:36 PM John Interrante ***@***.***>
   wrote:
   
   > ***@***.**** commented on this pull request.
   > ------------------------------
   >
   > In daffodil-lib/src/main/scala/org/apache/daffodil/util/Coroutines.scala
   > <https://github.com/apache/daffodil/pull/879#discussion_r1063639451>:
   >
   > >   import scala.util.Failure
   >   import scala.util.Success
   >   import scala.util.Try
   >
   > + object Coroutine {
   > +   val executionContext = new ExecutionContext {
   > +     private val threadPool = Executors.newCachedThreadPool()
   > +     def execute(runnable: Runnable): Unit = threadPool.submit(runnable)
   >
   > I didn't know we add only 1 extra thread per SAX unparse call anyway,
   > which is only going to double the total number of threads, not increase the
   > number of threads exponentially. No, we don't need to allow callers to pass
   > in an execution context. I agree it's really on the users to limit their
   > calls if they use Daffodil as a server and make as many SAX unparse calls
   > as there are connections from client machines.
   >
   > —
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/daffodil/pull/879#discussion_r1063639451>, or
   > unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/AALUDA6XNMAO4BER2XDRU2TWRBJZ5ANCNFSM6AAAAAASE3OHII>
   > .
   > You are receiving this because you were mentioned.Message ID:
   > ***@***.***>
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@daffodil.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [daffodil] tuxji commented on a diff in pull request #879: Improve SAX parse/unparse performance

Posted by GitBox <gi...@apache.org>.
tuxji commented on code in PR #879:
URL: https://github.com/apache/daffodil/pull/879#discussion_r1027144850


##########
daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala:
##########
@@ -19,98 +19,88 @@ package org.apache.daffodil.infoset
 
 import java.net.URI
 import java.net.URISyntaxException
+import java.util.concurrent.ArrayBlockingQueue
 
-import org.apache.daffodil.api.DFDL
-import org.apache.daffodil.api.DFDL.DaffodilUnhandledSAXException
-import org.apache.daffodil.api.DFDL.DaffodilUnparseErrorSAXException
-import org.apache.daffodil.api.DFDL.SAXInfosetEvent
 import org.apache.daffodil.dpath.NodeInfo
 import org.apache.daffodil.exceptions.Assert
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndDocument
-import org.apache.daffodil.util.Maybe.One
+import org.apache.daffodil.infoset.InfosetInputterEventType.StartDocument
+import org.apache.daffodil.infoset.InfosetInputterEventType.StartElement
+import org.apache.daffodil.util.Maybe
+import org.apache.daffodil.util.Maybe.Nope
 import org.apache.daffodil.util.MaybeBoolean
 import org.apache.daffodil.util.Misc
 import org.apache.daffodil.xml.XMLUtils
 
 /**
- * The SAXInfosetInputter consumes SAXInfosetEvent objects from the DaffodilUnparseContentHandler
- * class and converts them to events that the DataProcessor unparse can use. This class contains an
- * array of batched SAXInfosetEvent objects that it receives from the contentHandler and the index
- * of the current element being processed.
+ * The SAXInfosetInputter consumes SAXInfosetInputterEvent objects from the
+ * DaffodilUnparseContentHandler class and converts them to events that the
+ * DataProcessor unparse can use. When created, the DaffodilUnaprseContentHandler
+ * passes in a thread-safe array backed FIFO queue where events are added.
+ * However, there is really no special logic in this class regarding those
+ * evens. All the complexity of the received SAX events and dealing with thread
+ * and deadlocks is all encompassed in the DaffodilUnparseContentHandler--this
+ * just needs to take() the events from the queue as needed and provide the to

Review Comment:
   "the to" -> "them to", and please end the next line with a period.



##########
daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala:
##########
@@ -17,327 +17,486 @@
 
 package org.apache.daffodil.processors
 
-import scala.xml.NamespaceBinding
 
+
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.Executors
 import javax.xml.XMLConstants
+
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+
+import scala.xml.NamespaceBinding
+import scala.xml.TopScope
+
+import org.xml.sax.Attributes
+import org.xml.sax.Locator
+import org.xml.sax.SAXException
+
 import org.apache.daffodil.api.DFDL
 import org.apache.daffodil.api.DFDL.DaffodilUnhandledSAXException
 import org.apache.daffodil.api.DFDL.DaffodilUnparseErrorSAXException
-import org.apache.daffodil.api.DFDL.SAXInfosetEvent
 import org.apache.daffodil.exceptions.Assert
-import org.apache.daffodil.infoset.IllegalContentWhereEventExpected
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndDocument
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndElement
 import org.apache.daffodil.infoset.InfosetInputterEventType.StartDocument
 import org.apache.daffodil.infoset.InfosetInputterEventType.StartElement
 import org.apache.daffodil.infoset.SAXInfosetInputter
+import org.apache.daffodil.infoset.SAXInfosetInputterEvent
 import org.apache.daffodil.util.MStackOf
-import org.apache.daffodil.util.Maybe.Nope
+import org.apache.daffodil.util.Maybe
 import org.apache.daffodil.util.Maybe.One
 import org.apache.daffodil.util.Misc
-import org.xml.sax.Attributes
-import org.xml.sax.Locator
 
 /**
- * DaffodilUnparseContentHandler produces SAXInfosetEvent objects for the SAXInfosetInputter to
- * consume and convert to events that the Dataprocessor unparse can use. The SAXInfosetEvent object
- * is built from information that is passed to the ContentHandler from an XMLReader parser. In
- * order to receive the uri and prefix information from the XMLReader, the XMLReader must have
- * support for XML Namespaces
+ * Provides a cached thread pool that Scala Futures can use so that SAX
+ * unparse() calls can reuse threads, avoiding overhead related to creating new
+ * threads.
+ */
+object DaffodilUnparseContentHandler {
+  val executionContext = new ExecutionContext {
+    private val threadPool = Executors.newCachedThreadPool()
+    def execute(runnable: Runnable): Unit = threadPool.submit(runnable)
+
+    //$COVERAGE-OFF$
+    def reportFailure(t: Throwable): Unit = {} //do nothing
+    //$COVERAGE-ON$
+  }
+}
+
+/**
+ * DaffodilUnparseContentHandler produces SAXInfosetInputterEvent objects for
+ * the SAXInfosetInputter to consume and convert to events that the
+ * Dataprocessor unparse() can use. The SAXInfosetInputterEvents are built base
+ * on information that is passed to this ContentHandler from an XMLReader.
+ *
+ * This class runs the DataProcessor unparse() method in a separate thread,
+ * with a shared ArrayBlockigQueue to provide SAXInfosetInputerEvents. This

Review Comment:
   ArrayBlockigQueue -> ArrayBlockingQueue, SAXInfosetInputerEvents -> SAXInfosetInputterEvents



##########
daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala:
##########
@@ -192,26 +160,29 @@ class SAXInfosetInputter(
   override def fini(): Unit = {
     // do nothing
   }
+}
+
+class SAXInfosetInputterEvent() {
+  var eventType: Maybe[InfosetInputterEventType] = Nope
+  var localName: Maybe[String] = Nope
+  var namespaceURI: Maybe[String] = Nope
+  var nilValue: Maybe[String] = Nope
+  var simpleText: Maybe[String] = Nope
+  var mixedContent: Maybe[String] = Nope
+
+  def clear(): Unit = {
+    eventType = Nope
+    localName = Nope
+    namespaceURI = Nope
+    nilValue = Nope
+    simpleText = Nope
+    mixedContent = Nope
+  }

Review Comment:
   How does SAXInfosetInputterEvent ever get its fields initialized to other values besides Nope?  I've searched this pull request and I don't see a field assignment anywhere.  I also don't see any callers of the clear() method in the newly added green-colored text, only the removed red-colored text.  It's as if I'm not seeing all the changes I should be seeing.  Oh, the GitHub UI was hiding the DaffodilUnparseContentHandler.scala because the diff were so large.  I had to make it display the diffs before I could find the field assignments and clear() calls I was looking for.



##########
daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala:
##########
@@ -17,327 +17,486 @@
 
 package org.apache.daffodil.processors
 
-import scala.xml.NamespaceBinding
 
+
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.Executors
 import javax.xml.XMLConstants
+
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+
+import scala.xml.NamespaceBinding
+import scala.xml.TopScope
+
+import org.xml.sax.Attributes
+import org.xml.sax.Locator
+import org.xml.sax.SAXException
+
 import org.apache.daffodil.api.DFDL
 import org.apache.daffodil.api.DFDL.DaffodilUnhandledSAXException
 import org.apache.daffodil.api.DFDL.DaffodilUnparseErrorSAXException
-import org.apache.daffodil.api.DFDL.SAXInfosetEvent
 import org.apache.daffodil.exceptions.Assert
-import org.apache.daffodil.infoset.IllegalContentWhereEventExpected
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndDocument
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndElement
 import org.apache.daffodil.infoset.InfosetInputterEventType.StartDocument
 import org.apache.daffodil.infoset.InfosetInputterEventType.StartElement
 import org.apache.daffodil.infoset.SAXInfosetInputter
+import org.apache.daffodil.infoset.SAXInfosetInputterEvent
 import org.apache.daffodil.util.MStackOf
-import org.apache.daffodil.util.Maybe.Nope
+import org.apache.daffodil.util.Maybe
 import org.apache.daffodil.util.Maybe.One
 import org.apache.daffodil.util.Misc
-import org.xml.sax.Attributes
-import org.xml.sax.Locator
 
 /**
- * DaffodilUnparseContentHandler produces SAXInfosetEvent objects for the SAXInfosetInputter to
- * consume and convert to events that the Dataprocessor unparse can use. The SAXInfosetEvent object
- * is built from information that is passed to the ContentHandler from an XMLReader parser. In
- * order to receive the uri and prefix information from the XMLReader, the XMLReader must have
- * support for XML Namespaces
+ * Provides a cached thread pool that Scala Futures can use so that SAX
+ * unparse() calls can reuse threads, avoiding overhead related to creating new
+ * threads.
+ */
+object DaffodilUnparseContentHandler {
+  val executionContext = new ExecutionContext {
+    private val threadPool = Executors.newCachedThreadPool()
+    def execute(runnable: Runnable): Unit = threadPool.submit(runnable)
+
+    //$COVERAGE-OFF$
+    def reportFailure(t: Throwable): Unit = {} //do nothing
+    //$COVERAGE-ON$
+  }
+}
+
+/**
+ * DaffodilUnparseContentHandler produces SAXInfosetInputterEvent objects for
+ * the SAXInfosetInputter to consume and convert to events that the
+ * Dataprocessor unparse() can use. The SAXInfosetInputterEvents are built base
+ * on information that is passed to this ContentHandler from an XMLReader.
+ *
+ * This class runs the DataProcessor unparse() method in a separate thread,
+ * with a shared ArrayBlockigQueue to provide SAXInfosetInputerEvents. This
+ * queue has a maximum size defined by the saxUnaprseEventBatchSize tunble
  *
- * This class, together with the SAXInfosetInputter, uses coroutines to ensure that a batch of events
- * (based on the tunable saxUnparseEventBatchSize) can be passed from the former to the latter.
  * The following is the general process:
  *
- * - an external call is made to parse an XML Document
- * - this class receives a StartDocument call, which is the first SAXInfosetEvent that should be
- * sent to the SAXInfosetInputter. That event is put onto an array of SAXInfosetEvents of size the
- * saxUnparseEventBatchSize tunable. Once the array is full, it is put on the inputter's queue,
- * this thread is paused, and that inputter's thread is run
- * - when the SAXInfosetInputter is done processing that batch and is ready for a new batch, it
- * sends a 1 element array with the last completed event via the coroutine system, which loads it on
- * the contentHandler's queue, which restarts this thread and pauses that one. In the expected case,
- * the single element array will contain no new information until the unparse complete. In the case of
- * an unexpected error though, it will contain error information
- * - this process continues until the EndDocument SAXInfosetEvent is loaded into the batch.
- * Once that SAXInfosetEvent is processed by the SAXInfosetInputter, it signals the end of batched
- * events coming from the contentHandler. This ends the unparseProcess and returns the event with
- * the unparseResult and/or any error
- * information
+ * - An external call is made to parse from an XMLReader
+ * - This class receives a startDocument() event from the XMLReader. This
+ *   put()'s a StartDocument event on the eventQueue and starts a Future that
+ *   runs the DataProcessor unparse() method with a SAXInfosetInputter in a
+ *   thread
+ * - As additional ContentHandler functions are called from the XMLReader,
+ *   events are created and added to the eventQueue
+ * - This continues until the endDocument() SAX event is reached, at which
+ *   point we create a EndDocument event, put() it on the eventQueue, wait for
+ *   the Future to complete, and determine the result
+ * - While this is all going on, the SAXInfosetInputter take()'s from the
+ *   eventQueue and provides the infoset information necessary to unparse
  *
- * @param dp dataprocessor object that will be used to call the parse
- * @param output outputChannel of choice where the unparsed data is stored
+ * @param dp DataProcessor used to call the unparse() method
+ * @param output Output to write unparsed data
  */
 class DaffodilUnparseContentHandler(
   dp: DFDL.DataProcessor,
   output: DFDL.Output)
   extends DFDL.DaffodilUnparseContentHandler {
-  private lazy val inputter = new SAXInfosetInputter(this, dp, output)
-  private var unparseResult: DFDL.UnparseResult = _
-  private lazy val characterData = new StringBuilder
-  private var prefixMapping: NamespaceBinding = _
-  private lazy val prefixMappingTrackingStack = new MStackOf[NamespaceBinding]
 
-  private lazy val tunablesBatchSize = dp.getTunables().saxUnparseEventBatchSize
+  /**
+   * The Future thread running the DataProcessor.unparse() method
+   */
+  private var unparseTask: Future[Unit] = _
+  
+  /**
+   * The result of the unparseTask. This is None as long as the Future thread
+   * is still running. Oncee the unparseTask completes, this is set to a
+   * Right[UnparseResult] if the unparse completed without error. If this
+   * completed but UnparseResult.isError is true or an unexpected exception was
+   * thrown during unparse, this is set to a Left[SAXException].
+   */
+  private var unparseResult: Option[Either[SAXException, DFDL.UnparseResult]] = None
+ 
+  /**
+    * Maximum number of SAXInfosetInputterEvents that this ContentHandler can
+    * put() in the eventQueue at a time. If this queue is full, the
+    * ContentHandler thread is blocked until the unparseTask thread take()'s
+    * from the queue.
+   */
+  private val eventQueueSize = dp.getTunables().saxUnparseEventBatchSize
+
+  /**
+   * A thread-safe array-backed FIFO blocking queue to store events to be used
+   * by the SAXInfosetInputter. This ContentHandler put()'s to this queue as it
+   * receives SAX events, and the unparseTask take()'s from this queue to
+   * unparse().
+   */
+  private val eventQueue = new ArrayBlockingQueue[SAXInfosetInputterEvent](eventQueueSize)
 
   /**
-   * we always have an extra buffer in the array that we use for the inputter.hasNext call. For each
-   * element, we need to know if it has a viable next, if it doesn't, it will triggers the context
-   * switch to DaffodilUnparseContentHandler. So for example, if the user provides 1 as the
-   * batchSize, under the hood we'll batch [event1, event2].
-   *
-   * - DataProcessor.unparse will call hasNext and getEventType for the initialization call
-   * - hasNext will check if nextIndex (i.e currentIndex + 1) is non-empty. Since currentIndex is 0,
-   * it will return true since event2 exists.
-   * - getEventType (which signifies our processing step) is called for the event at currentIndex
-   * - After the initialization step, subsequent calls will be a loop of next(), ...some processing
-   * of the current event ..., and hasNext()
-   * - For our scenario, next() will clear out the contents at currentIndex, increment the currentIndex,
-   * and our event2 will be processed, then hasNext will check if there is a viable index 2, as
-   * there is not, it will perform the context switch so DaffodilUnparseContentHandler can batch
-   * more events
-   * - DaffodilUnparseContentHandler copies the last event into the first so the currentEvent stays
-   * the same for the inputter until it decides to change it so we end up with [event2, event3]
-   * - When we context switch back to inputter.hasNext, it resets the currentIndex to 0, and our loop
-   * begins again with a call to next
-   *
-   * Without us having the extra buffer, things would happen like this:
-   * user provides 1 as the batchSize, under the hood we'll have [event1] batched.
-   *
-   * DataProcessor.unparse will call hasNext and getEventType for the initialization call, and that
-   * hasNext will check if cnextIndex (i.e currentIndex + 1) is non-empty. As currentIndex is 0, and
-   * it is the maximum index, there is no index 1. It will context switch to get a new batched event,
-   * which, would overwrite event1 before we get to process it.
+   * To avoid allocating many SAXInfosetInputterEvents, we instead pre-allocate
+   * events and clear/mutate each event as needed. Note that we need 2 more
+   * pre-allocated events than the queue size. This is because while
+   * processing, there could be a most eventQueueSize events already on the
+   * eventQueue, plus another event that the SAXInfosetInputter may have from
+   * the most recent take(), plus the event that this ContentHander is mutating
+   * in preparation to put() on the queue.
    */
-  private lazy val actualBatchSize = tunablesBatchSize + 1
-  private lazy val batchedInfosetEvents: Array[SAXInfosetEvent] = {
-    Assert.invariant(tunablesBatchSize > 0, "invalid saxUnparseEventBatchSize; minimum value is 1")
-    Array.fill[SAXInfosetEvent](actualBatchSize)(new SAXInfosetEvent)
+  private val preAllocatedEvents = {
+    Array.fill(eventQueueSize + 2)(new SAXInfosetInputterEvent)
   }
-  private var currentIndex: Int = 0
 
   /**
-   * This is a flag that is set to true when startPrefixMapping is called. When true, we make
-   * the assumption that we don't need to use the Attributes parameter from startElement to get the
-   * namespaceURI information and will solely rely on start/endPrefixMapping. If false, we will use
-   * Attributes to get the namespaceURI info.
+   * Index into the pre-allocated event array that the content handler is
+   * currently mutating and preparing to put() on the eventQueue(). When all
+   * information for this event is gathered, we put() it on the eventQueue and
+   * increment the index into the pre-allocated events array and start muating
+   * the next event.
    */
-  private var contentHandlerPrefixMappingUsed = false
+  private var currentIndex = 0
 
   /**
-   * returns null in the case of an DaffodilUnhandledSAXException
+   * Variable to make it easier to access the pre-allocated event at
+   * currentIndex. This is updated each time currentIndex is incremented so
+   * that methods in this class should never need to access currentIndex or the
+   * preAllocated events array directly.
    */
-  def getUnparseResult: DFDL.UnparseResult = unparseResult
+  private var currentEvent = preAllocatedEvents(currentIndex)
 
-  def enableInputterResolutionOfRelativeInfosetBlobURIs(): Unit = inputter.enableResolutionOfRelativeInfosetBlobURIs()
+  /**
+   * Buffer to accumulate characters received from the SAX characters() function
+   */
+  private val characterData = new StringBuilder
 
-  override def setDocumentLocator(locator: Locator): Unit = {
-    // do nothing
-  }
+  /**
+   * Used to store the current in-scope prefix namespace mappings, determined
+   * by the start/endPrefixMapping functions, or from xmlns Attributes in the
+   * startElement function.
+   */
+  private var prefixMapping: NamespaceBinding = TopScope
 
-  override def startDocument(): Unit = {
-    batchedInfosetEvents(currentIndex).eventType = One(StartDocument)
-    maybeSendToInputter()
+  /**
+   * If the XMLReader doesn't use the start/endPrefixMapping() functions to
+   * pass along namespaceMapping information, we must extract the information
+   * from the Attributes parameter in the startElement() function. Doing so can
+   * potentially add multiple namespace mappings that must be removed in the
+   * associated endElement() function. This stack keeps track of which mappings
+   * are added in startElement() so they can easily be removed in endElement()
+   * by calling the pop function.
+   */
+  private val prefixMappingTrackingStack = new MStackOf[NamespaceBinding]
+
+  /**
+   * Flag to keep track if the XMLReader is using start/endPrefixMapping() for
+   * namespace mappings or if mappings should be gathered from Attributes in
+   * startElement(). This is set to true for the former and false for the
+   * latter. If true, the prefixMappingTrackingStack is unused.
+   */
+  private var contentHandlerPrefixMappingUsed = false
+
+  /**
+   * Add the current event to the eventQueue. If the eventQueue is full, this
+   * blocks until the SAXInfosetInputter take()'s an event and frees up a slot
+   * in the queue. Once the event is added, we update state to point to the
+   * next SAXInfosetInputterEvent that we should mutate as we receive more SAX
+   * events.
+   */
+  private def addCurrentEventToQueue(): Unit = {
+    // before we put() an event, check to make sure there hasn't been an error
+    // from the SAXInfosetInputter. If there is an error, checkUnparseResult()
+    // throws and exception which bubbles up to the XMLReader to handle. Note

Review Comment:
   and -> an



##########
daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala:
##########
@@ -19,98 +19,88 @@ package org.apache.daffodil.infoset
 
 import java.net.URI
 import java.net.URISyntaxException
+import java.util.concurrent.ArrayBlockingQueue
 
-import org.apache.daffodil.api.DFDL
-import org.apache.daffodil.api.DFDL.DaffodilUnhandledSAXException
-import org.apache.daffodil.api.DFDL.DaffodilUnparseErrorSAXException
-import org.apache.daffodil.api.DFDL.SAXInfosetEvent
 import org.apache.daffodil.dpath.NodeInfo
 import org.apache.daffodil.exceptions.Assert
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndDocument
-import org.apache.daffodil.util.Maybe.One
+import org.apache.daffodil.infoset.InfosetInputterEventType.StartDocument
+import org.apache.daffodil.infoset.InfosetInputterEventType.StartElement
+import org.apache.daffodil.util.Maybe
+import org.apache.daffodil.util.Maybe.Nope
 import org.apache.daffodil.util.MaybeBoolean
 import org.apache.daffodil.util.Misc
 import org.apache.daffodil.xml.XMLUtils
 
 /**
- * The SAXInfosetInputter consumes SAXInfosetEvent objects from the DaffodilUnparseContentHandler
- * class and converts them to events that the DataProcessor unparse can use. This class contains an
- * array of batched SAXInfosetEvent objects that it receives from the contentHandler and the index
- * of the current element being processed.
+ * The SAXInfosetInputter consumes SAXInfosetInputterEvent objects from the
+ * DaffodilUnparseContentHandler class and converts them to events that the
+ * DataProcessor unparse can use. When created, the DaffodilUnaprseContentHandler
+ * passes in a thread-safe array backed FIFO queue where events are added.
+ * However, there is really no special logic in this class regarding those
+ * evens. All the complexity of the received SAX events and dealing with thread
+ * and deadlocks is all encompassed in the DaffodilUnparseContentHandler--this
+ * just needs to take() the events from the queue as needed and provide the to
+ * the unparse() function
  *
- * This class, together with the SAXInfosetInputter, uses coroutines to ensure that a batch of events
- * (based on the tunable saxUnparseEventBatchSize) can be passed from the former to the latter.
- * The following is the general process:
- *
- * - the run method is called, with the first batch of events, starting with the StartDocument event,
- * already loaded on the inputter's queue.
- * This is collected and stored in the batchedInfosetEvents member, and the currentIndex is set to 0
- * - The dp.unparse method is called, and it calls hasNext to make sure an event exists to be
- * processed and then queries the event at currentIndex. The hasNext call also checks that there is
- * a next event to be processed (currentIndex+1), and if not, queues the next batch of events by
- * transferring control to the contentHandler so it can load them.
- * - After it is done with the current event, it calls inputter.next to get the next event, and that
- * increments the currentIndex and cleans out the event at the previous index
- * - This process continues until the event at currentIndex either contains an EndDocument event or
- * the currentIndex is the last in the batch. If it is the former, the endDocumentReceived flag is
- * set to true and hasNext will return false. If it is the latter, the next batch of events will be
- * queued by transferring control to the contentHandler so it can load them.
- * - This ends the unparse process, and the unparseResult and/or any Errors are set on a single element
- * array containing response events. We call resumeFinal passing along that array, terminating this
- * thread and resuming the contentHandler for the last time.
- *
- * @param unparseContentHandler producer coroutine that sends the SAXInfosetEvent to this class
- * @param dp dataprocessor that we use to kickstart the unparse process and that consumes the
- *           currentEvent
- * @param output  outputChannel of choice where the unparsed data is stored
+ * @param eventQueue queue of SAXInfosetInputterEvents to guide the unparse
  */
 class SAXInfosetInputter(
-  unparseContentHandler: DFDL.DaffodilUnparseContentHandler,
-  dp: DFDL.DataProcessor,
-  output: DFDL.Output)
-  extends InfosetInputter with DFDL.ConsumerCoroutine {
+  eventQueue: ArrayBlockingQueue[SAXInfosetInputterEvent])
+  extends InfosetInputter {
 
   /**
-   * allows support for converting relative URIs in Blobs to absolute URIs, this is necessary
-   * for running TDML tests as they allow relative URIs. Because Daffodil proper only uses
-   * absolute URIs, we hide this functionality behind this flag. It can be set to true by calling
-   * the unparseContentHandler.enableInputterResolutionOfRelativeInfosetBlobURIs(), which calls the
-   * inputter's enableResolutionOfRelativeInfosetBlobURIs() function to set the below variable to true
+   * Flag to store if relative blob URI resolution should be enabled in the
+   * infoset inputtter. This is set to true when
+   * enableResolutionOfRelativeInfosetBlobURIs() is called.
    */
-  private var resolveRelativeInfosetBlobURIs: Boolean = false
+  private var enableRelativeBlobURIs = false
 
-  private var endDocumentReceived = false
-  private var currentIndex: Int = 0
-  private var batchedInfosetEvents: Array[SAXInfosetEvent] = _
-  private lazy val returnedInfosetEvent: Array[SAXInfosetEvent] = new Array[SAXInfosetEvent](1)
+  /**
+   * Enable the resolution of relative infoset blob URIs. This should only be
+   * used when running TDML tests where relative blob URIs are often used. In
+   * production, blob URIs should always be absolute and this should not be
+   * needed.
+   */
+  def enableResolutionOfRelativeInfosetBlobURIs(): Unit = enableRelativeBlobURIs = true
+
+  /**
+   * Keeps track of the current event that SAXInfosetInputter returns
+   * information about. This must be initialized to a StartDocument event
+   */
+  private var currentEvent: SAXInfosetInputterEvent = {
+    val ev = eventQueue.take()
+    Assert.invariant(ev.eventType.contains(StartDocument))
+    ev
+  }

Review Comment:
   Oh, I had to look twice before I understood how this invariant works.  I initially misread and thought I was looking at def currentEvent, but now I realize this is var currentEvent and the invariant is being checked only after the first time eventQueue.take() is called.  



##########
daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala:
##########
@@ -17,327 +17,486 @@
 
 package org.apache.daffodil.processors
 
-import scala.xml.NamespaceBinding
 
+
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.Executors
 import javax.xml.XMLConstants
+
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+
+import scala.xml.NamespaceBinding
+import scala.xml.TopScope
+
+import org.xml.sax.Attributes
+import org.xml.sax.Locator
+import org.xml.sax.SAXException
+
 import org.apache.daffodil.api.DFDL
 import org.apache.daffodil.api.DFDL.DaffodilUnhandledSAXException
 import org.apache.daffodil.api.DFDL.DaffodilUnparseErrorSAXException
-import org.apache.daffodil.api.DFDL.SAXInfosetEvent
 import org.apache.daffodil.exceptions.Assert
-import org.apache.daffodil.infoset.IllegalContentWhereEventExpected
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndDocument
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndElement
 import org.apache.daffodil.infoset.InfosetInputterEventType.StartDocument
 import org.apache.daffodil.infoset.InfosetInputterEventType.StartElement
 import org.apache.daffodil.infoset.SAXInfosetInputter
+import org.apache.daffodil.infoset.SAXInfosetInputterEvent
 import org.apache.daffodil.util.MStackOf
-import org.apache.daffodil.util.Maybe.Nope
+import org.apache.daffodil.util.Maybe
 import org.apache.daffodil.util.Maybe.One
 import org.apache.daffodil.util.Misc
-import org.xml.sax.Attributes
-import org.xml.sax.Locator
 
 /**
- * DaffodilUnparseContentHandler produces SAXInfosetEvent objects for the SAXInfosetInputter to
- * consume and convert to events that the Dataprocessor unparse can use. The SAXInfosetEvent object
- * is built from information that is passed to the ContentHandler from an XMLReader parser. In
- * order to receive the uri and prefix information from the XMLReader, the XMLReader must have
- * support for XML Namespaces
+ * Provides a cached thread pool that Scala Futures can use so that SAX
+ * unparse() calls can reuse threads, avoiding overhead related to creating new
+ * threads.
+ */
+object DaffodilUnparseContentHandler {
+  val executionContext = new ExecutionContext {
+    private val threadPool = Executors.newCachedThreadPool()
+    def execute(runnable: Runnable): Unit = threadPool.submit(runnable)
+
+    //$COVERAGE-OFF$
+    def reportFailure(t: Throwable): Unit = {} //do nothing
+    //$COVERAGE-ON$
+  }
+}
+
+/**
+ * DaffodilUnparseContentHandler produces SAXInfosetInputterEvent objects for
+ * the SAXInfosetInputter to consume and convert to events that the
+ * Dataprocessor unparse() can use. The SAXInfosetInputterEvents are built base
+ * on information that is passed to this ContentHandler from an XMLReader.
+ *
+ * This class runs the DataProcessor unparse() method in a separate thread,
+ * with a shared ArrayBlockigQueue to provide SAXInfosetInputerEvents. This
+ * queue has a maximum size defined by the saxUnaprseEventBatchSize tunble

Review Comment:
   saxUnaprseEventBatchSize -> saxUnparseEventBatchSize 



##########
daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala:
##########
@@ -17,327 +17,486 @@
 
 package org.apache.daffodil.processors
 
-import scala.xml.NamespaceBinding
 
+
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.Executors
 import javax.xml.XMLConstants
+
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+
+import scala.xml.NamespaceBinding
+import scala.xml.TopScope
+
+import org.xml.sax.Attributes
+import org.xml.sax.Locator
+import org.xml.sax.SAXException
+
 import org.apache.daffodil.api.DFDL
 import org.apache.daffodil.api.DFDL.DaffodilUnhandledSAXException
 import org.apache.daffodil.api.DFDL.DaffodilUnparseErrorSAXException
-import org.apache.daffodil.api.DFDL.SAXInfosetEvent
 import org.apache.daffodil.exceptions.Assert
-import org.apache.daffodil.infoset.IllegalContentWhereEventExpected
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndDocument
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndElement
 import org.apache.daffodil.infoset.InfosetInputterEventType.StartDocument
 import org.apache.daffodil.infoset.InfosetInputterEventType.StartElement
 import org.apache.daffodil.infoset.SAXInfosetInputter
+import org.apache.daffodil.infoset.SAXInfosetInputterEvent
 import org.apache.daffodil.util.MStackOf
-import org.apache.daffodil.util.Maybe.Nope
+import org.apache.daffodil.util.Maybe
 import org.apache.daffodil.util.Maybe.One
 import org.apache.daffodil.util.Misc
-import org.xml.sax.Attributes
-import org.xml.sax.Locator
 
 /**
- * DaffodilUnparseContentHandler produces SAXInfosetEvent objects for the SAXInfosetInputter to
- * consume and convert to events that the Dataprocessor unparse can use. The SAXInfosetEvent object
- * is built from information that is passed to the ContentHandler from an XMLReader parser. In
- * order to receive the uri and prefix information from the XMLReader, the XMLReader must have
- * support for XML Namespaces
+ * Provides a cached thread pool that Scala Futures can use so that SAX
+ * unparse() calls can reuse threads, avoiding overhead related to creating new
+ * threads.
+ */
+object DaffodilUnparseContentHandler {
+  val executionContext = new ExecutionContext {
+    private val threadPool = Executors.newCachedThreadPool()
+    def execute(runnable: Runnable): Unit = threadPool.submit(runnable)
+
+    //$COVERAGE-OFF$
+    def reportFailure(t: Throwable): Unit = {} //do nothing
+    //$COVERAGE-ON$
+  }
+}
+
+/**
+ * DaffodilUnparseContentHandler produces SAXInfosetInputterEvent objects for
+ * the SAXInfosetInputter to consume and convert to events that the
+ * Dataprocessor unparse() can use. The SAXInfosetInputterEvents are built base

Review Comment:
   base -> based



##########
daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala:
##########
@@ -17,327 +17,486 @@
 
 package org.apache.daffodil.processors
 
-import scala.xml.NamespaceBinding
 
+
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.Executors
 import javax.xml.XMLConstants
+
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+
+import scala.xml.NamespaceBinding
+import scala.xml.TopScope
+
+import org.xml.sax.Attributes
+import org.xml.sax.Locator
+import org.xml.sax.SAXException
+
 import org.apache.daffodil.api.DFDL
 import org.apache.daffodil.api.DFDL.DaffodilUnhandledSAXException
 import org.apache.daffodil.api.DFDL.DaffodilUnparseErrorSAXException
-import org.apache.daffodil.api.DFDL.SAXInfosetEvent
 import org.apache.daffodil.exceptions.Assert
-import org.apache.daffodil.infoset.IllegalContentWhereEventExpected
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndDocument
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndElement
 import org.apache.daffodil.infoset.InfosetInputterEventType.StartDocument
 import org.apache.daffodil.infoset.InfosetInputterEventType.StartElement
 import org.apache.daffodil.infoset.SAXInfosetInputter
+import org.apache.daffodil.infoset.SAXInfosetInputterEvent
 import org.apache.daffodil.util.MStackOf
-import org.apache.daffodil.util.Maybe.Nope
+import org.apache.daffodil.util.Maybe
 import org.apache.daffodil.util.Maybe.One
 import org.apache.daffodil.util.Misc
-import org.xml.sax.Attributes
-import org.xml.sax.Locator
 
 /**
- * DaffodilUnparseContentHandler produces SAXInfosetEvent objects for the SAXInfosetInputter to
- * consume and convert to events that the Dataprocessor unparse can use. The SAXInfosetEvent object
- * is built from information that is passed to the ContentHandler from an XMLReader parser. In
- * order to receive the uri and prefix information from the XMLReader, the XMLReader must have
- * support for XML Namespaces
+ * Provides a cached thread pool that Scala Futures can use so that SAX
+ * unparse() calls can reuse threads, avoiding overhead related to creating new
+ * threads.
+ */
+object DaffodilUnparseContentHandler {
+  val executionContext = new ExecutionContext {
+    private val threadPool = Executors.newCachedThreadPool()
+    def execute(runnable: Runnable): Unit = threadPool.submit(runnable)
+
+    //$COVERAGE-OFF$
+    def reportFailure(t: Throwable): Unit = {} //do nothing
+    //$COVERAGE-ON$
+  }
+}
+
+/**
+ * DaffodilUnparseContentHandler produces SAXInfosetInputterEvent objects for
+ * the SAXInfosetInputter to consume and convert to events that the
+ * Dataprocessor unparse() can use. The SAXInfosetInputterEvents are built base
+ * on information that is passed to this ContentHandler from an XMLReader.
+ *
+ * This class runs the DataProcessor unparse() method in a separate thread,
+ * with a shared ArrayBlockigQueue to provide SAXInfosetInputerEvents. This
+ * queue has a maximum size defined by the saxUnaprseEventBatchSize tunble
  *
- * This class, together with the SAXInfosetInputter, uses coroutines to ensure that a batch of events
- * (based on the tunable saxUnparseEventBatchSize) can be passed from the former to the latter.
  * The following is the general process:
  *
- * - an external call is made to parse an XML Document
- * - this class receives a StartDocument call, which is the first SAXInfosetEvent that should be
- * sent to the SAXInfosetInputter. That event is put onto an array of SAXInfosetEvents of size the
- * saxUnparseEventBatchSize tunable. Once the array is full, it is put on the inputter's queue,
- * this thread is paused, and that inputter's thread is run
- * - when the SAXInfosetInputter is done processing that batch and is ready for a new batch, it
- * sends a 1 element array with the last completed event via the coroutine system, which loads it on
- * the contentHandler's queue, which restarts this thread and pauses that one. In the expected case,
- * the single element array will contain no new information until the unparse complete. In the case of
- * an unexpected error though, it will contain error information
- * - this process continues until the EndDocument SAXInfosetEvent is loaded into the batch.
- * Once that SAXInfosetEvent is processed by the SAXInfosetInputter, it signals the end of batched
- * events coming from the contentHandler. This ends the unparseProcess and returns the event with
- * the unparseResult and/or any error
- * information
+ * - An external call is made to parse from an XMLReader
+ * - This class receives a startDocument() event from the XMLReader. This
+ *   put()'s a StartDocument event on the eventQueue and starts a Future that
+ *   runs the DataProcessor unparse() method with a SAXInfosetInputter in a
+ *   thread
+ * - As additional ContentHandler functions are called from the XMLReader,
+ *   events are created and added to the eventQueue
+ * - This continues until the endDocument() SAX event is reached, at which
+ *   point we create a EndDocument event, put() it on the eventQueue, wait for
+ *   the Future to complete, and determine the result
+ * - While this is all going on, the SAXInfosetInputter take()'s from the
+ *   eventQueue and provides the infoset information necessary to unparse
  *
- * @param dp dataprocessor object that will be used to call the parse
- * @param output outputChannel of choice where the unparsed data is stored
+ * @param dp DataProcessor used to call the unparse() method
+ * @param output Output to write unparsed data
  */
 class DaffodilUnparseContentHandler(
   dp: DFDL.DataProcessor,
   output: DFDL.Output)
   extends DFDL.DaffodilUnparseContentHandler {
-  private lazy val inputter = new SAXInfosetInputter(this, dp, output)
-  private var unparseResult: DFDL.UnparseResult = _
-  private lazy val characterData = new StringBuilder
-  private var prefixMapping: NamespaceBinding = _
-  private lazy val prefixMappingTrackingStack = new MStackOf[NamespaceBinding]
 
-  private lazy val tunablesBatchSize = dp.getTunables().saxUnparseEventBatchSize
+  /**
+   * The Future thread running the DataProcessor.unparse() method
+   */
+  private var unparseTask: Future[Unit] = _
+  
+  /**
+   * The result of the unparseTask. This is None as long as the Future thread
+   * is still running. Oncee the unparseTask completes, this is set to a
+   * Right[UnparseResult] if the unparse completed without error. If this
+   * completed but UnparseResult.isError is true or an unexpected exception was
+   * thrown during unparse, this is set to a Left[SAXException].
+   */
+  private var unparseResult: Option[Either[SAXException, DFDL.UnparseResult]] = None
+ 
+  /**
+    * Maximum number of SAXInfosetInputterEvents that this ContentHandler can
+    * put() in the eventQueue at a time. If this queue is full, the
+    * ContentHandler thread is blocked until the unparseTask thread take()'s
+    * from the queue.
+   */
+  private val eventQueueSize = dp.getTunables().saxUnparseEventBatchSize
+
+  /**
+   * A thread-safe array-backed FIFO blocking queue to store events to be used
+   * by the SAXInfosetInputter. This ContentHandler put()'s to this queue as it
+   * receives SAX events, and the unparseTask take()'s from this queue to
+   * unparse().
+   */
+  private val eventQueue = new ArrayBlockingQueue[SAXInfosetInputterEvent](eventQueueSize)
 
   /**
-   * we always have an extra buffer in the array that we use for the inputter.hasNext call. For each
-   * element, we need to know if it has a viable next, if it doesn't, it will triggers the context
-   * switch to DaffodilUnparseContentHandler. So for example, if the user provides 1 as the
-   * batchSize, under the hood we'll batch [event1, event2].
-   *
-   * - DataProcessor.unparse will call hasNext and getEventType for the initialization call
-   * - hasNext will check if nextIndex (i.e currentIndex + 1) is non-empty. Since currentIndex is 0,
-   * it will return true since event2 exists.
-   * - getEventType (which signifies our processing step) is called for the event at currentIndex
-   * - After the initialization step, subsequent calls will be a loop of next(), ...some processing
-   * of the current event ..., and hasNext()
-   * - For our scenario, next() will clear out the contents at currentIndex, increment the currentIndex,
-   * and our event2 will be processed, then hasNext will check if there is a viable index 2, as
-   * there is not, it will perform the context switch so DaffodilUnparseContentHandler can batch
-   * more events
-   * - DaffodilUnparseContentHandler copies the last event into the first so the currentEvent stays
-   * the same for the inputter until it decides to change it so we end up with [event2, event3]
-   * - When we context switch back to inputter.hasNext, it resets the currentIndex to 0, and our loop
-   * begins again with a call to next
-   *
-   * Without us having the extra buffer, things would happen like this:
-   * user provides 1 as the batchSize, under the hood we'll have [event1] batched.
-   *
-   * DataProcessor.unparse will call hasNext and getEventType for the initialization call, and that
-   * hasNext will check if cnextIndex (i.e currentIndex + 1) is non-empty. As currentIndex is 0, and
-   * it is the maximum index, there is no index 1. It will context switch to get a new batched event,
-   * which, would overwrite event1 before we get to process it.
+   * To avoid allocating many SAXInfosetInputterEvents, we instead pre-allocate
+   * events and clear/mutate each event as needed. Note that we need 2 more
+   * pre-allocated events than the queue size. This is because while
+   * processing, there could be a most eventQueueSize events already on the

Review Comment:
   a -> at



##########
daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala:
##########
@@ -17,327 +17,486 @@
 
 package org.apache.daffodil.processors
 
-import scala.xml.NamespaceBinding
 
+
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.Executors
 import javax.xml.XMLConstants
+
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+
+import scala.xml.NamespaceBinding
+import scala.xml.TopScope
+
+import org.xml.sax.Attributes
+import org.xml.sax.Locator
+import org.xml.sax.SAXException
+
 import org.apache.daffodil.api.DFDL
 import org.apache.daffodil.api.DFDL.DaffodilUnhandledSAXException
 import org.apache.daffodil.api.DFDL.DaffodilUnparseErrorSAXException
-import org.apache.daffodil.api.DFDL.SAXInfosetEvent
 import org.apache.daffodil.exceptions.Assert
-import org.apache.daffodil.infoset.IllegalContentWhereEventExpected
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndDocument
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndElement
 import org.apache.daffodil.infoset.InfosetInputterEventType.StartDocument
 import org.apache.daffodil.infoset.InfosetInputterEventType.StartElement
 import org.apache.daffodil.infoset.SAXInfosetInputter
+import org.apache.daffodil.infoset.SAXInfosetInputterEvent
 import org.apache.daffodil.util.MStackOf
-import org.apache.daffodil.util.Maybe.Nope
+import org.apache.daffodil.util.Maybe
 import org.apache.daffodil.util.Maybe.One
 import org.apache.daffodil.util.Misc
-import org.xml.sax.Attributes
-import org.xml.sax.Locator
 
 /**
- * DaffodilUnparseContentHandler produces SAXInfosetEvent objects for the SAXInfosetInputter to
- * consume and convert to events that the Dataprocessor unparse can use. The SAXInfosetEvent object
- * is built from information that is passed to the ContentHandler from an XMLReader parser. In
- * order to receive the uri and prefix information from the XMLReader, the XMLReader must have
- * support for XML Namespaces
+ * Provides a cached thread pool that Scala Futures can use so that SAX
+ * unparse() calls can reuse threads, avoiding overhead related to creating new
+ * threads.
+ */
+object DaffodilUnparseContentHandler {
+  val executionContext = new ExecutionContext {
+    private val threadPool = Executors.newCachedThreadPool()
+    def execute(runnable: Runnable): Unit = threadPool.submit(runnable)
+
+    //$COVERAGE-OFF$
+    def reportFailure(t: Throwable): Unit = {} //do nothing
+    //$COVERAGE-ON$
+  }
+}
+
+/**
+ * DaffodilUnparseContentHandler produces SAXInfosetInputterEvent objects for
+ * the SAXInfosetInputter to consume and convert to events that the
+ * Dataprocessor unparse() can use. The SAXInfosetInputterEvents are built base
+ * on information that is passed to this ContentHandler from an XMLReader.
+ *
+ * This class runs the DataProcessor unparse() method in a separate thread,
+ * with a shared ArrayBlockigQueue to provide SAXInfosetInputerEvents. This
+ * queue has a maximum size defined by the saxUnaprseEventBatchSize tunble
  *
- * This class, together with the SAXInfosetInputter, uses coroutines to ensure that a batch of events
- * (based on the tunable saxUnparseEventBatchSize) can be passed from the former to the latter.
  * The following is the general process:
  *
- * - an external call is made to parse an XML Document
- * - this class receives a StartDocument call, which is the first SAXInfosetEvent that should be
- * sent to the SAXInfosetInputter. That event is put onto an array of SAXInfosetEvents of size the
- * saxUnparseEventBatchSize tunable. Once the array is full, it is put on the inputter's queue,
- * this thread is paused, and that inputter's thread is run
- * - when the SAXInfosetInputter is done processing that batch and is ready for a new batch, it
- * sends a 1 element array with the last completed event via the coroutine system, which loads it on
- * the contentHandler's queue, which restarts this thread and pauses that one. In the expected case,
- * the single element array will contain no new information until the unparse complete. In the case of
- * an unexpected error though, it will contain error information
- * - this process continues until the EndDocument SAXInfosetEvent is loaded into the batch.
- * Once that SAXInfosetEvent is processed by the SAXInfosetInputter, it signals the end of batched
- * events coming from the contentHandler. This ends the unparseProcess and returns the event with
- * the unparseResult and/or any error
- * information
+ * - An external call is made to parse from an XMLReader
+ * - This class receives a startDocument() event from the XMLReader. This
+ *   put()'s a StartDocument event on the eventQueue and starts a Future that
+ *   runs the DataProcessor unparse() method with a SAXInfosetInputter in a
+ *   thread
+ * - As additional ContentHandler functions are called from the XMLReader,
+ *   events are created and added to the eventQueue
+ * - This continues until the endDocument() SAX event is reached, at which
+ *   point we create a EndDocument event, put() it on the eventQueue, wait for
+ *   the Future to complete, and determine the result
+ * - While this is all going on, the SAXInfosetInputter take()'s from the
+ *   eventQueue and provides the infoset information necessary to unparse
  *
- * @param dp dataprocessor object that will be used to call the parse
- * @param output outputChannel of choice where the unparsed data is stored
+ * @param dp DataProcessor used to call the unparse() method
+ * @param output Output to write unparsed data
  */
 class DaffodilUnparseContentHandler(
   dp: DFDL.DataProcessor,
   output: DFDL.Output)
   extends DFDL.DaffodilUnparseContentHandler {
-  private lazy val inputter = new SAXInfosetInputter(this, dp, output)
-  private var unparseResult: DFDL.UnparseResult = _
-  private lazy val characterData = new StringBuilder
-  private var prefixMapping: NamespaceBinding = _
-  private lazy val prefixMappingTrackingStack = new MStackOf[NamespaceBinding]
 
-  private lazy val tunablesBatchSize = dp.getTunables().saxUnparseEventBatchSize
+  /**
+   * The Future thread running the DataProcessor.unparse() method
+   */
+  private var unparseTask: Future[Unit] = _
+  
+  /**
+   * The result of the unparseTask. This is None as long as the Future thread
+   * is still running. Oncee the unparseTask completes, this is set to a
+   * Right[UnparseResult] if the unparse completed without error. If this
+   * completed but UnparseResult.isError is true or an unexpected exception was
+   * thrown during unparse, this is set to a Left[SAXException].
+   */
+  private var unparseResult: Option[Either[SAXException, DFDL.UnparseResult]] = None
+ 
+  /**
+    * Maximum number of SAXInfosetInputterEvents that this ContentHandler can
+    * put() in the eventQueue at a time. If this queue is full, the
+    * ContentHandler thread is blocked until the unparseTask thread take()'s
+    * from the queue.
+   */
+  private val eventQueueSize = dp.getTunables().saxUnparseEventBatchSize
+
+  /**
+   * A thread-safe array-backed FIFO blocking queue to store events to be used
+   * by the SAXInfosetInputter. This ContentHandler put()'s to this queue as it
+   * receives SAX events, and the unparseTask take()'s from this queue to
+   * unparse().
+   */
+  private val eventQueue = new ArrayBlockingQueue[SAXInfosetInputterEvent](eventQueueSize)
 
   /**
-   * we always have an extra buffer in the array that we use for the inputter.hasNext call. For each
-   * element, we need to know if it has a viable next, if it doesn't, it will triggers the context
-   * switch to DaffodilUnparseContentHandler. So for example, if the user provides 1 as the
-   * batchSize, under the hood we'll batch [event1, event2].
-   *
-   * - DataProcessor.unparse will call hasNext and getEventType for the initialization call
-   * - hasNext will check if nextIndex (i.e currentIndex + 1) is non-empty. Since currentIndex is 0,
-   * it will return true since event2 exists.
-   * - getEventType (which signifies our processing step) is called for the event at currentIndex
-   * - After the initialization step, subsequent calls will be a loop of next(), ...some processing
-   * of the current event ..., and hasNext()
-   * - For our scenario, next() will clear out the contents at currentIndex, increment the currentIndex,
-   * and our event2 will be processed, then hasNext will check if there is a viable index 2, as
-   * there is not, it will perform the context switch so DaffodilUnparseContentHandler can batch
-   * more events
-   * - DaffodilUnparseContentHandler copies the last event into the first so the currentEvent stays
-   * the same for the inputter until it decides to change it so we end up with [event2, event3]
-   * - When we context switch back to inputter.hasNext, it resets the currentIndex to 0, and our loop
-   * begins again with a call to next
-   *
-   * Without us having the extra buffer, things would happen like this:
-   * user provides 1 as the batchSize, under the hood we'll have [event1] batched.
-   *
-   * DataProcessor.unparse will call hasNext and getEventType for the initialization call, and that
-   * hasNext will check if cnextIndex (i.e currentIndex + 1) is non-empty. As currentIndex is 0, and
-   * it is the maximum index, there is no index 1. It will context switch to get a new batched event,
-   * which, would overwrite event1 before we get to process it.
+   * To avoid allocating many SAXInfosetInputterEvents, we instead pre-allocate
+   * events and clear/mutate each event as needed. Note that we need 2 more
+   * pre-allocated events than the queue size. This is because while
+   * processing, there could be a most eventQueueSize events already on the
+   * eventQueue, plus another event that the SAXInfosetInputter may have from
+   * the most recent take(), plus the event that this ContentHander is mutating
+   * in preparation to put() on the queue.
    */
-  private lazy val actualBatchSize = tunablesBatchSize + 1
-  private lazy val batchedInfosetEvents: Array[SAXInfosetEvent] = {
-    Assert.invariant(tunablesBatchSize > 0, "invalid saxUnparseEventBatchSize; minimum value is 1")
-    Array.fill[SAXInfosetEvent](actualBatchSize)(new SAXInfosetEvent)
+  private val preAllocatedEvents = {
+    Array.fill(eventQueueSize + 2)(new SAXInfosetInputterEvent)
   }
-  private var currentIndex: Int = 0
 
   /**
-   * This is a flag that is set to true when startPrefixMapping is called. When true, we make
-   * the assumption that we don't need to use the Attributes parameter from startElement to get the
-   * namespaceURI information and will solely rely on start/endPrefixMapping. If false, we will use
-   * Attributes to get the namespaceURI info.
+   * Index into the pre-allocated event array that the content handler is
+   * currently mutating and preparing to put() on the eventQueue(). When all
+   * information for this event is gathered, we put() it on the eventQueue and
+   * increment the index into the pre-allocated events array and start muating

Review Comment:
   muating -> mutating



##########
daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala:
##########
@@ -17,327 +17,486 @@
 
 package org.apache.daffodil.processors
 
-import scala.xml.NamespaceBinding
 
+
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.Executors
 import javax.xml.XMLConstants
+
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+
+import scala.xml.NamespaceBinding
+import scala.xml.TopScope
+
+import org.xml.sax.Attributes
+import org.xml.sax.Locator
+import org.xml.sax.SAXException
+
 import org.apache.daffodil.api.DFDL
 import org.apache.daffodil.api.DFDL.DaffodilUnhandledSAXException
 import org.apache.daffodil.api.DFDL.DaffodilUnparseErrorSAXException
-import org.apache.daffodil.api.DFDL.SAXInfosetEvent
 import org.apache.daffodil.exceptions.Assert
-import org.apache.daffodil.infoset.IllegalContentWhereEventExpected
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndDocument
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndElement
 import org.apache.daffodil.infoset.InfosetInputterEventType.StartDocument
 import org.apache.daffodil.infoset.InfosetInputterEventType.StartElement
 import org.apache.daffodil.infoset.SAXInfosetInputter
+import org.apache.daffodil.infoset.SAXInfosetInputterEvent
 import org.apache.daffodil.util.MStackOf
-import org.apache.daffodil.util.Maybe.Nope
+import org.apache.daffodil.util.Maybe
 import org.apache.daffodil.util.Maybe.One
 import org.apache.daffodil.util.Misc
-import org.xml.sax.Attributes
-import org.xml.sax.Locator
 
 /**
- * DaffodilUnparseContentHandler produces SAXInfosetEvent objects for the SAXInfosetInputter to
- * consume and convert to events that the Dataprocessor unparse can use. The SAXInfosetEvent object
- * is built from information that is passed to the ContentHandler from an XMLReader parser. In
- * order to receive the uri and prefix information from the XMLReader, the XMLReader must have
- * support for XML Namespaces
+ * Provides a cached thread pool that Scala Futures can use so that SAX
+ * unparse() calls can reuse threads, avoiding overhead related to creating new
+ * threads.
+ */
+object DaffodilUnparseContentHandler {
+  val executionContext = new ExecutionContext {
+    private val threadPool = Executors.newCachedThreadPool()
+    def execute(runnable: Runnable): Unit = threadPool.submit(runnable)
+
+    //$COVERAGE-OFF$
+    def reportFailure(t: Throwable): Unit = {} //do nothing
+    //$COVERAGE-ON$
+  }
+}
+
+/**
+ * DaffodilUnparseContentHandler produces SAXInfosetInputterEvent objects for
+ * the SAXInfosetInputter to consume and convert to events that the
+ * Dataprocessor unparse() can use. The SAXInfosetInputterEvents are built base
+ * on information that is passed to this ContentHandler from an XMLReader.
+ *
+ * This class runs the DataProcessor unparse() method in a separate thread,
+ * with a shared ArrayBlockigQueue to provide SAXInfosetInputerEvents. This
+ * queue has a maximum size defined by the saxUnaprseEventBatchSize tunble
  *
- * This class, together with the SAXInfosetInputter, uses coroutines to ensure that a batch of events
- * (based on the tunable saxUnparseEventBatchSize) can be passed from the former to the latter.
  * The following is the general process:
  *
- * - an external call is made to parse an XML Document
- * - this class receives a StartDocument call, which is the first SAXInfosetEvent that should be
- * sent to the SAXInfosetInputter. That event is put onto an array of SAXInfosetEvents of size the
- * saxUnparseEventBatchSize tunable. Once the array is full, it is put on the inputter's queue,
- * this thread is paused, and that inputter's thread is run
- * - when the SAXInfosetInputter is done processing that batch and is ready for a new batch, it
- * sends a 1 element array with the last completed event via the coroutine system, which loads it on
- * the contentHandler's queue, which restarts this thread and pauses that one. In the expected case,
- * the single element array will contain no new information until the unparse complete. In the case of
- * an unexpected error though, it will contain error information
- * - this process continues until the EndDocument SAXInfosetEvent is loaded into the batch.
- * Once that SAXInfosetEvent is processed by the SAXInfosetInputter, it signals the end of batched
- * events coming from the contentHandler. This ends the unparseProcess and returns the event with
- * the unparseResult and/or any error
- * information
+ * - An external call is made to parse from an XMLReader
+ * - This class receives a startDocument() event from the XMLReader. This
+ *   put()'s a StartDocument event on the eventQueue and starts a Future that
+ *   runs the DataProcessor unparse() method with a SAXInfosetInputter in a
+ *   thread
+ * - As additional ContentHandler functions are called from the XMLReader,
+ *   events are created and added to the eventQueue
+ * - This continues until the endDocument() SAX event is reached, at which
+ *   point we create a EndDocument event, put() it on the eventQueue, wait for
+ *   the Future to complete, and determine the result
+ * - While this is all going on, the SAXInfosetInputter take()'s from the
+ *   eventQueue and provides the infoset information necessary to unparse
  *
- * @param dp dataprocessor object that will be used to call the parse
- * @param output outputChannel of choice where the unparsed data is stored
+ * @param dp DataProcessor used to call the unparse() method
+ * @param output Output to write unparsed data
  */
 class DaffodilUnparseContentHandler(
   dp: DFDL.DataProcessor,
   output: DFDL.Output)
   extends DFDL.DaffodilUnparseContentHandler {
-  private lazy val inputter = new SAXInfosetInputter(this, dp, output)
-  private var unparseResult: DFDL.UnparseResult = _
-  private lazy val characterData = new StringBuilder
-  private var prefixMapping: NamespaceBinding = _
-  private lazy val prefixMappingTrackingStack = new MStackOf[NamespaceBinding]
 
-  private lazy val tunablesBatchSize = dp.getTunables().saxUnparseEventBatchSize
+  /**
+   * The Future thread running the DataProcessor.unparse() method
+   */
+  private var unparseTask: Future[Unit] = _
+  
+  /**
+   * The result of the unparseTask. This is None as long as the Future thread
+   * is still running. Oncee the unparseTask completes, this is set to a
+   * Right[UnparseResult] if the unparse completed without error. If this
+   * completed but UnparseResult.isError is true or an unexpected exception was
+   * thrown during unparse, this is set to a Left[SAXException].
+   */
+  private var unparseResult: Option[Either[SAXException, DFDL.UnparseResult]] = None
+ 
+  /**
+    * Maximum number of SAXInfosetInputterEvents that this ContentHandler can
+    * put() in the eventQueue at a time. If this queue is full, the
+    * ContentHandler thread is blocked until the unparseTask thread take()'s
+    * from the queue.
+   */
+  private val eventQueueSize = dp.getTunables().saxUnparseEventBatchSize
+
+  /**
+   * A thread-safe array-backed FIFO blocking queue to store events to be used
+   * by the SAXInfosetInputter. This ContentHandler put()'s to this queue as it
+   * receives SAX events, and the unparseTask take()'s from this queue to
+   * unparse().
+   */
+  private val eventQueue = new ArrayBlockingQueue[SAXInfosetInputterEvent](eventQueueSize)
 
   /**
-   * we always have an extra buffer in the array that we use for the inputter.hasNext call. For each
-   * element, we need to know if it has a viable next, if it doesn't, it will triggers the context
-   * switch to DaffodilUnparseContentHandler. So for example, if the user provides 1 as the
-   * batchSize, under the hood we'll batch [event1, event2].
-   *
-   * - DataProcessor.unparse will call hasNext and getEventType for the initialization call
-   * - hasNext will check if nextIndex (i.e currentIndex + 1) is non-empty. Since currentIndex is 0,
-   * it will return true since event2 exists.
-   * - getEventType (which signifies our processing step) is called for the event at currentIndex
-   * - After the initialization step, subsequent calls will be a loop of next(), ...some processing
-   * of the current event ..., and hasNext()
-   * - For our scenario, next() will clear out the contents at currentIndex, increment the currentIndex,
-   * and our event2 will be processed, then hasNext will check if there is a viable index 2, as
-   * there is not, it will perform the context switch so DaffodilUnparseContentHandler can batch
-   * more events
-   * - DaffodilUnparseContentHandler copies the last event into the first so the currentEvent stays
-   * the same for the inputter until it decides to change it so we end up with [event2, event3]
-   * - When we context switch back to inputter.hasNext, it resets the currentIndex to 0, and our loop
-   * begins again with a call to next
-   *
-   * Without us having the extra buffer, things would happen like this:
-   * user provides 1 as the batchSize, under the hood we'll have [event1] batched.
-   *
-   * DataProcessor.unparse will call hasNext and getEventType for the initialization call, and that
-   * hasNext will check if cnextIndex (i.e currentIndex + 1) is non-empty. As currentIndex is 0, and
-   * it is the maximum index, there is no index 1. It will context switch to get a new batched event,
-   * which, would overwrite event1 before we get to process it.
+   * To avoid allocating many SAXInfosetInputterEvents, we instead pre-allocate
+   * events and clear/mutate each event as needed. Note that we need 2 more
+   * pre-allocated events than the queue size. This is because while
+   * processing, there could be a most eventQueueSize events already on the
+   * eventQueue, plus another event that the SAXInfosetInputter may have from
+   * the most recent take(), plus the event that this ContentHander is mutating
+   * in preparation to put() on the queue.
    */
-  private lazy val actualBatchSize = tunablesBatchSize + 1
-  private lazy val batchedInfosetEvents: Array[SAXInfosetEvent] = {
-    Assert.invariant(tunablesBatchSize > 0, "invalid saxUnparseEventBatchSize; minimum value is 1")
-    Array.fill[SAXInfosetEvent](actualBatchSize)(new SAXInfosetEvent)
+  private val preAllocatedEvents = {
+    Array.fill(eventQueueSize + 2)(new SAXInfosetInputterEvent)
   }
-  private var currentIndex: Int = 0
 
   /**
-   * This is a flag that is set to true when startPrefixMapping is called. When true, we make
-   * the assumption that we don't need to use the Attributes parameter from startElement to get the
-   * namespaceURI information and will solely rely on start/endPrefixMapping. If false, we will use
-   * Attributes to get the namespaceURI info.
+   * Index into the pre-allocated event array that the content handler is
+   * currently mutating and preparing to put() on the eventQueue(). When all
+   * information for this event is gathered, we put() it on the eventQueue and
+   * increment the index into the pre-allocated events array and start muating
+   * the next event.
    */
-  private var contentHandlerPrefixMappingUsed = false
+  private var currentIndex = 0
 
   /**
-   * returns null in the case of an DaffodilUnhandledSAXException
+   * Variable to make it easier to access the pre-allocated event at
+   * currentIndex. This is updated each time currentIndex is incremented so
+   * that methods in this class should never need to access currentIndex or the
+   * preAllocated events array directly.
    */
-  def getUnparseResult: DFDL.UnparseResult = unparseResult
+  private var currentEvent = preAllocatedEvents(currentIndex)
 
-  def enableInputterResolutionOfRelativeInfosetBlobURIs(): Unit = inputter.enableResolutionOfRelativeInfosetBlobURIs()
+  /**
+   * Buffer to accumulate characters received from the SAX characters() function
+   */
+  private val characterData = new StringBuilder
 
-  override def setDocumentLocator(locator: Locator): Unit = {
-    // do nothing
-  }
+  /**
+   * Used to store the current in-scope prefix namespace mappings, determined
+   * by the start/endPrefixMapping functions, or from xmlns Attributes in the
+   * startElement function.
+   */
+  private var prefixMapping: NamespaceBinding = TopScope
 
-  override def startDocument(): Unit = {
-    batchedInfosetEvents(currentIndex).eventType = One(StartDocument)
-    maybeSendToInputter()
+  /**
+   * If the XMLReader doesn't use the start/endPrefixMapping() functions to
+   * pass along namespaceMapping information, we must extract the information
+   * from the Attributes parameter in the startElement() function. Doing so can
+   * potentially add multiple namespace mappings that must be removed in the
+   * associated endElement() function. This stack keeps track of which mappings
+   * are added in startElement() so they can easily be removed in endElement()
+   * by calling the pop function.
+   */
+  private val prefixMappingTrackingStack = new MStackOf[NamespaceBinding]
+
+  /**
+   * Flag to keep track if the XMLReader is using start/endPrefixMapping() for
+   * namespace mappings or if mappings should be gathered from Attributes in
+   * startElement(). This is set to true for the former and false for the
+   * latter. If true, the prefixMappingTrackingStack is unused.
+   */
+  private var contentHandlerPrefixMappingUsed = false
+
+  /**
+   * Add the current event to the eventQueue. If the eventQueue is full, this
+   * blocks until the SAXInfosetInputter take()'s an event and frees up a slot
+   * in the queue. Once the event is added, we update state to point to the
+   * next SAXInfosetInputterEvent that we should mutate as we receive more SAX
+   * events.
+   */
+  private def addCurrentEventToQueue(): Unit = {
+    // before we put() an event, check to make sure there hasn't been an error
+    // from the SAXInfosetInputter. If there is an error, checkUnparseResult()
+    // throws and exception which bubbles up to the XMLReader to handle. Note
+    // that doing this here avoids a potential deadlock since the
+    // SAXInfosetInputter will no longer take() events, so the queue could
+    // potentially be full and the put() would block forever. By checking for
+    // an error first, we should never reach a deadlocking call to put()
+    checkUnparseResult()
+
+    eventQueue.put(currentEvent)
+    currentIndex = (currentIndex + 1) % preAllocatedEvents.length
+    currentEvent = preAllocatedEvents(currentIndex)
+    currentEvent.clear()
   }
 
-  override def endDocument(): Unit = {
-    batchedInfosetEvents(currentIndex).eventType = One(EndDocument)
-    maybeSendToInputter()
+  /**
+   * Flag to store if relative blob URI resolution should be enabled in the
+   * infoset inputtter. This is set to true when
+   * enableResolutionOfRelativeInfosetBlobURIs() is called.
+   */
+  private var enableRelativeBlobURIs = false
+
+  /**
+   * Enable the resolution of relative infoset blob URIs. This should only be
+   * used when running TDML tests where relative blob URIs are often used. In
+   * non test usages, blob URIs should always be absolute and this should not
+   * be needed. This must be called before the startDocument() function is
+   * called for it to have an affect.
+   */
+  def enableResolutionOfRelativeInfosetBlobURIs(): Unit = enableRelativeBlobURIs = true
+
+  /**
+   * Start the unparse() thread, potentially reusing a thread from our cached
+   * thread pool, which should help to minimize the overhead with creating new
+   * threads. Note that the Future does not return any or expect to capture any
+   * exceptions. We instead mutate the unparseResult variable depending on the
+   * unparse result. This ensures we can always clear the eventQueue to prevent
+   * deadlocks.
+   */
+  private def startUnparse(): Unit = {
+    unparseTask = Future[Unit] {
+      try {
+        // It is important to create the SAXInfosetInputter in this Future
+        // because during construction it reads from the queue waiting for the
+        // StartDocument event. If we create it outside of the future and in
+        // the same thread as the ContentHandler, then we may end up blocked
+        // since this ConentHandler would not be able to put() the
+        // StartDocument event for the SAXInfosetInputter construction to
+        // take().
+        val input = new SAXInfosetInputter(eventQueue)
+        if (enableRelativeBlobURIs) input.enableResolutionOfRelativeInfosetBlobURIs()
+
+        val res = dp.unparse(input, output)
+        if (res.isError) {
+          unparseResult = Some(Left(new DaffodilUnparseErrorSAXException(res)))
+        } else {
+          unparseResult = Some(Right(res))
+        }
+      } catch {
+        //$COVERAGE-OFF$
+        case e: Exception => {
+          unparseResult = Some(Left(new DaffodilUnhandledSAXException(e.getMessage, e)))
+        }
+        //$COVERAGE-ON$
+      }
+
+      // We are finished unparsing. At this point, it is possible that the
+      // ContentHandler thread is blocked trying to put() an event on the
+      // eventQueue, but the SAXInfosetInputter thread could have failed in a
+      // way where it won't ever take() an item from the queue, leaving the
+      // ContentHandler deadlocked. Now that unparseResult is defined, we can
+      // clear eventQueue which unblocks the ContentHandler. And since
+      // unparseResult is set, the ContentHandler should not attempt anymore
+      // put()'s and so should never get re-blocked.
+      eventQueue.clear()
+    }(DaffodilUnparseContentHandler.executionContext)
   }
 
-  override def startPrefixMapping(prefix: String, uri: String): Unit = {
-    if (!contentHandlerPrefixMappingUsed) contentHandlerPrefixMappingUsed = true
-    val pre = if (prefix == "") null else prefix
-    prefixMapping = NamespaceBinding(pre, uri, prefixMapping)
+  /**
+   * Throw an exception if the uparseTask has finished and signified there was
+   * an error, either from a failed UnparseResult or an unexpected thrown
+   * exception. The XMLReader is expected to catch the thrown SAXException and
+   * handle it accordingly.
+   */
+  private def checkUnparseResult(): Unit = {
+    unparseResult match {
+      case Some(Left(e)) => throw e
+      case _ =>
+    }
   }
 
   /**
-   * XMLReader does not guarantee the order of the prefixes called for this function, but it does
-   * guarantee that this method is called after its corresponding endElement, which means we can
-   * can just take off the top mappings, because the element that might have cared about the order
-   * is already done using the prefixMappings
+   * Return the UnparseResult of the ContentHandler if Daffodil finished
+   * without throwing a DaffodilUnhandledSAXException. Returns null if there is
+   * no unparse result yet (i.e. Daffodil is still unparsing) or if the
+   * Daffodil unparse() function terminated due to an unhandled exception. This
+   * can return an UnparseResult where isError is true, though that is also
+   * thrown as a DaffodilUnparseErrorSAXException.
    */
-  override def endPrefixMapping(prefix: String): Unit = {
-    prefixMapping = if (prefixMapping == null) prefixMapping else prefixMapping.parent
+  def getUnparseResult: DFDL.UnparseResult = unparseResult match {
+    case Some(Right(ur)) => ur
+    case Some(Left(DaffodilUnparseErrorSAXException(ur))) => ur
+    case _ => null
   }
 
   /**
-   * Uses Attributes, which is passed in to the startElement callback, to extract prefix mappings and
-   * populate the global prefixMapping
+   * Gather prefix namespace mappings from the Attributes passed to
+   * startElement() SAX event. This should only be called if we have detected
+   * that start/endPrefixMapping() functions are not used.
    */
-  def mapPrefixMappingFromAttributesImpl(atts:Attributes): Unit = {
+  private def addPrefixMappingsFromAttributesImpl(atts: Attributes): Unit = {
     var i = 0
-    while (i < atts.getLength) {
+    val numAtts = atts.getLength
+    while (i < numAtts) {
       val qName = atts.getQName(i)
-      val uri =  atts.getValue(i)
       if (qName == "xmlns") {
-        prefixMapping = NamespaceBinding(null, uri, prefixMapping)
+        prefixMapping = NamespaceBinding(null, atts.getValue(i), prefixMapping)
       } else if (qName.startsWith("xmlns:")) {
         val prefix = qName.substring(6)
-        prefixMapping = NamespaceBinding(prefix, uri, prefixMapping)
+        prefixMapping = NamespaceBinding(prefix, atts.getValue(i), prefixMapping)
       } else {
         // do nothing, not a namespace mapping attribute
       }
       i += 1
     }
   }
 
-  override def startElement(uri: String, localName: String, qName: String, atts: Attributes): Unit = {
-    // we need to check if the characters data is all whitespace, if it is we drop the whitespace
-    // data, if it is not, it is an error as starting a new element with actual characterData means
-    // we haven't hit an endElement yet, which means we're in a complexElement and a complexElement
-    // cannot have character content
-    if (characterData.nonEmpty && !Misc.isAllWhitespace(characterData)) {
-      throw new IllegalContentWhereEventExpected("Non-whitespace characters in complex " +
-        "Element: " + characterData.toString
-      )
-    } else {
-      // reset since it was whitespace only
+  /**
+   * Helper function for the start/endElement() SAX events to determine the
+   * localName and namespceURI values for the current event.
+   */
+  private def setLocalNameAndNamespaceUri(uri: String, localName: String, qName: String): Unit = {
+    lazy val prefixColonIndex = qName.indexOf(':')
+
+    currentEvent.localName =
+      if (localName.nonEmpty) {
+        One(localName)
+      } else {
+        Assert.invariant(qName.nonEmpty)
+        if (prefixColonIndex > 0) {
+          One(qName.substring(prefixColonIndex + 1))
+        } else {
+          One(qName)
+        }
+      }
+
+    currentEvent.namespaceURI =
+      if (uri.nonEmpty) {
+        One(uri)
+      } else {
+        Assert.invariant(qName.nonEmpty)
+        val qNamePrefix =
+          if (prefixColonIndex > 0) {
+            qName.substring(0, prefixColonIndex)
+          } else {
+            // if there is no prefix in the qName, then we want the namespace
+            // associated with the "" prefix, which NamespaceBinding represents
+            // as null
+            null
+          }
+        val uri = prefixMapping.getURI(qNamePrefix)
+        Maybe(uri)
+      }
+  }
+
+  /**
+   * This function should be called whenever mixed content (i.e non-whitespace
+   * characters) should be checked for and, since it isn't allowed in a DFDL
+   * infoset, state mutated to indicate the error.
+   *
+   * Note that if mixed content is detected we cannot simply throw a
+   * SAXException. This is because if this ContentHandler thread finishes, the
+   * unparseTask is going to eventually consume all events from the eventQueue
+   * and block trying to take() an event that will never come, leaving a
+   * hanging thread. Instead, we set the mixedContent value of the current
+   * event to the characterData we have accumulated, and the SAXInfosetInputer

Review Comment:
   SAXInfosetInputer -> SAXInfosetInputter



##########
daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala:
##########
@@ -17,327 +17,486 @@
 
 package org.apache.daffodil.processors
 
-import scala.xml.NamespaceBinding
 
+
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.Executors
 import javax.xml.XMLConstants
+
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+
+import scala.xml.NamespaceBinding
+import scala.xml.TopScope
+
+import org.xml.sax.Attributes
+import org.xml.sax.Locator
+import org.xml.sax.SAXException
+
 import org.apache.daffodil.api.DFDL
 import org.apache.daffodil.api.DFDL.DaffodilUnhandledSAXException
 import org.apache.daffodil.api.DFDL.DaffodilUnparseErrorSAXException
-import org.apache.daffodil.api.DFDL.SAXInfosetEvent
 import org.apache.daffodil.exceptions.Assert
-import org.apache.daffodil.infoset.IllegalContentWhereEventExpected
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndDocument
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndElement
 import org.apache.daffodil.infoset.InfosetInputterEventType.StartDocument
 import org.apache.daffodil.infoset.InfosetInputterEventType.StartElement
 import org.apache.daffodil.infoset.SAXInfosetInputter
+import org.apache.daffodil.infoset.SAXInfosetInputterEvent
 import org.apache.daffodil.util.MStackOf
-import org.apache.daffodil.util.Maybe.Nope
+import org.apache.daffodil.util.Maybe
 import org.apache.daffodil.util.Maybe.One
 import org.apache.daffodil.util.Misc
-import org.xml.sax.Attributes
-import org.xml.sax.Locator
 
 /**
- * DaffodilUnparseContentHandler produces SAXInfosetEvent objects for the SAXInfosetInputter to
- * consume and convert to events that the Dataprocessor unparse can use. The SAXInfosetEvent object
- * is built from information that is passed to the ContentHandler from an XMLReader parser. In
- * order to receive the uri and prefix information from the XMLReader, the XMLReader must have
- * support for XML Namespaces
+ * Provides a cached thread pool that Scala Futures can use so that SAX
+ * unparse() calls can reuse threads, avoiding overhead related to creating new
+ * threads.
+ */
+object DaffodilUnparseContentHandler {
+  val executionContext = new ExecutionContext {
+    private val threadPool = Executors.newCachedThreadPool()
+    def execute(runnable: Runnable): Unit = threadPool.submit(runnable)
+
+    //$COVERAGE-OFF$
+    def reportFailure(t: Throwable): Unit = {} //do nothing
+    //$COVERAGE-ON$
+  }
+}
+
+/**
+ * DaffodilUnparseContentHandler produces SAXInfosetInputterEvent objects for
+ * the SAXInfosetInputter to consume and convert to events that the
+ * Dataprocessor unparse() can use. The SAXInfosetInputterEvents are built base
+ * on information that is passed to this ContentHandler from an XMLReader.
+ *
+ * This class runs the DataProcessor unparse() method in a separate thread,
+ * with a shared ArrayBlockigQueue to provide SAXInfosetInputerEvents. This
+ * queue has a maximum size defined by the saxUnaprseEventBatchSize tunble
  *
- * This class, together with the SAXInfosetInputter, uses coroutines to ensure that a batch of events
- * (based on the tunable saxUnparseEventBatchSize) can be passed from the former to the latter.
  * The following is the general process:
  *
- * - an external call is made to parse an XML Document
- * - this class receives a StartDocument call, which is the first SAXInfosetEvent that should be
- * sent to the SAXInfosetInputter. That event is put onto an array of SAXInfosetEvents of size the
- * saxUnparseEventBatchSize tunable. Once the array is full, it is put on the inputter's queue,
- * this thread is paused, and that inputter's thread is run
- * - when the SAXInfosetInputter is done processing that batch and is ready for a new batch, it
- * sends a 1 element array with the last completed event via the coroutine system, which loads it on
- * the contentHandler's queue, which restarts this thread and pauses that one. In the expected case,
- * the single element array will contain no new information until the unparse complete. In the case of
- * an unexpected error though, it will contain error information
- * - this process continues until the EndDocument SAXInfosetEvent is loaded into the batch.
- * Once that SAXInfosetEvent is processed by the SAXInfosetInputter, it signals the end of batched
- * events coming from the contentHandler. This ends the unparseProcess and returns the event with
- * the unparseResult and/or any error
- * information
+ * - An external call is made to parse from an XMLReader
+ * - This class receives a startDocument() event from the XMLReader. This
+ *   put()'s a StartDocument event on the eventQueue and starts a Future that
+ *   runs the DataProcessor unparse() method with a SAXInfosetInputter in a
+ *   thread
+ * - As additional ContentHandler functions are called from the XMLReader,
+ *   events are created and added to the eventQueue
+ * - This continues until the endDocument() SAX event is reached, at which
+ *   point we create a EndDocument event, put() it on the eventQueue, wait for
+ *   the Future to complete, and determine the result
+ * - While this is all going on, the SAXInfosetInputter take()'s from the
+ *   eventQueue and provides the infoset information necessary to unparse
  *
- * @param dp dataprocessor object that will be used to call the parse
- * @param output outputChannel of choice where the unparsed data is stored
+ * @param dp DataProcessor used to call the unparse() method
+ * @param output Output to write unparsed data
  */
 class DaffodilUnparseContentHandler(
   dp: DFDL.DataProcessor,
   output: DFDL.Output)
   extends DFDL.DaffodilUnparseContentHandler {
-  private lazy val inputter = new SAXInfosetInputter(this, dp, output)
-  private var unparseResult: DFDL.UnparseResult = _
-  private lazy val characterData = new StringBuilder
-  private var prefixMapping: NamespaceBinding = _
-  private lazy val prefixMappingTrackingStack = new MStackOf[NamespaceBinding]
 
-  private lazy val tunablesBatchSize = dp.getTunables().saxUnparseEventBatchSize
+  /**
+   * The Future thread running the DataProcessor.unparse() method
+   */
+  private var unparseTask: Future[Unit] = _
+  
+  /**
+   * The result of the unparseTask. This is None as long as the Future thread
+   * is still running. Oncee the unparseTask completes, this is set to a
+   * Right[UnparseResult] if the unparse completed without error. If this
+   * completed but UnparseResult.isError is true or an unexpected exception was
+   * thrown during unparse, this is set to a Left[SAXException].
+   */
+  private var unparseResult: Option[Either[SAXException, DFDL.UnparseResult]] = None
+ 
+  /**
+    * Maximum number of SAXInfosetInputterEvents that this ContentHandler can
+    * put() in the eventQueue at a time. If this queue is full, the
+    * ContentHandler thread is blocked until the unparseTask thread take()'s
+    * from the queue.
+   */
+  private val eventQueueSize = dp.getTunables().saxUnparseEventBatchSize
+
+  /**
+   * A thread-safe array-backed FIFO blocking queue to store events to be used
+   * by the SAXInfosetInputter. This ContentHandler put()'s to this queue as it
+   * receives SAX events, and the unparseTask take()'s from this queue to
+   * unparse().
+   */
+  private val eventQueue = new ArrayBlockingQueue[SAXInfosetInputterEvent](eventQueueSize)
 
   /**
-   * we always have an extra buffer in the array that we use for the inputter.hasNext call. For each
-   * element, we need to know if it has a viable next, if it doesn't, it will triggers the context
-   * switch to DaffodilUnparseContentHandler. So for example, if the user provides 1 as the
-   * batchSize, under the hood we'll batch [event1, event2].
-   *
-   * - DataProcessor.unparse will call hasNext and getEventType for the initialization call
-   * - hasNext will check if nextIndex (i.e currentIndex + 1) is non-empty. Since currentIndex is 0,
-   * it will return true since event2 exists.
-   * - getEventType (which signifies our processing step) is called for the event at currentIndex
-   * - After the initialization step, subsequent calls will be a loop of next(), ...some processing
-   * of the current event ..., and hasNext()
-   * - For our scenario, next() will clear out the contents at currentIndex, increment the currentIndex,
-   * and our event2 will be processed, then hasNext will check if there is a viable index 2, as
-   * there is not, it will perform the context switch so DaffodilUnparseContentHandler can batch
-   * more events
-   * - DaffodilUnparseContentHandler copies the last event into the first so the currentEvent stays
-   * the same for the inputter until it decides to change it so we end up with [event2, event3]
-   * - When we context switch back to inputter.hasNext, it resets the currentIndex to 0, and our loop
-   * begins again with a call to next
-   *
-   * Without us having the extra buffer, things would happen like this:
-   * user provides 1 as the batchSize, under the hood we'll have [event1] batched.
-   *
-   * DataProcessor.unparse will call hasNext and getEventType for the initialization call, and that
-   * hasNext will check if cnextIndex (i.e currentIndex + 1) is non-empty. As currentIndex is 0, and
-   * it is the maximum index, there is no index 1. It will context switch to get a new batched event,
-   * which, would overwrite event1 before we get to process it.
+   * To avoid allocating many SAXInfosetInputterEvents, we instead pre-allocate
+   * events and clear/mutate each event as needed. Note that we need 2 more
+   * pre-allocated events than the queue size. This is because while
+   * processing, there could be a most eventQueueSize events already on the
+   * eventQueue, plus another event that the SAXInfosetInputter may have from
+   * the most recent take(), plus the event that this ContentHander is mutating
+   * in preparation to put() on the queue.
    */
-  private lazy val actualBatchSize = tunablesBatchSize + 1
-  private lazy val batchedInfosetEvents: Array[SAXInfosetEvent] = {
-    Assert.invariant(tunablesBatchSize > 0, "invalid saxUnparseEventBatchSize; minimum value is 1")
-    Array.fill[SAXInfosetEvent](actualBatchSize)(new SAXInfosetEvent)
+  private val preAllocatedEvents = {
+    Array.fill(eventQueueSize + 2)(new SAXInfosetInputterEvent)
   }
-  private var currentIndex: Int = 0
 
   /**
-   * This is a flag that is set to true when startPrefixMapping is called. When true, we make
-   * the assumption that we don't need to use the Attributes parameter from startElement to get the
-   * namespaceURI information and will solely rely on start/endPrefixMapping. If false, we will use
-   * Attributes to get the namespaceURI info.
+   * Index into the pre-allocated event array that the content handler is
+   * currently mutating and preparing to put() on the eventQueue(). When all
+   * information for this event is gathered, we put() it on the eventQueue and
+   * increment the index into the pre-allocated events array and start muating
+   * the next event.
    */
-  private var contentHandlerPrefixMappingUsed = false
+  private var currentIndex = 0
 
   /**
-   * returns null in the case of an DaffodilUnhandledSAXException
+   * Variable to make it easier to access the pre-allocated event at
+   * currentIndex. This is updated each time currentIndex is incremented so
+   * that methods in this class should never need to access currentIndex or the
+   * preAllocated events array directly.
    */
-  def getUnparseResult: DFDL.UnparseResult = unparseResult
+  private var currentEvent = preAllocatedEvents(currentIndex)
 
-  def enableInputterResolutionOfRelativeInfosetBlobURIs(): Unit = inputter.enableResolutionOfRelativeInfosetBlobURIs()
+  /**
+   * Buffer to accumulate characters received from the SAX characters() function
+   */
+  private val characterData = new StringBuilder
 
-  override def setDocumentLocator(locator: Locator): Unit = {
-    // do nothing
-  }
+  /**
+   * Used to store the current in-scope prefix namespace mappings, determined
+   * by the start/endPrefixMapping functions, or from xmlns Attributes in the
+   * startElement function.
+   */
+  private var prefixMapping: NamespaceBinding = TopScope
 
-  override def startDocument(): Unit = {
-    batchedInfosetEvents(currentIndex).eventType = One(StartDocument)
-    maybeSendToInputter()
+  /**
+   * If the XMLReader doesn't use the start/endPrefixMapping() functions to
+   * pass along namespaceMapping information, we must extract the information
+   * from the Attributes parameter in the startElement() function. Doing so can
+   * potentially add multiple namespace mappings that must be removed in the
+   * associated endElement() function. This stack keeps track of which mappings
+   * are added in startElement() so they can easily be removed in endElement()
+   * by calling the pop function.
+   */
+  private val prefixMappingTrackingStack = new MStackOf[NamespaceBinding]
+
+  /**
+   * Flag to keep track if the XMLReader is using start/endPrefixMapping() for
+   * namespace mappings or if mappings should be gathered from Attributes in
+   * startElement(). This is set to true for the former and false for the
+   * latter. If true, the prefixMappingTrackingStack is unused.
+   */
+  private var contentHandlerPrefixMappingUsed = false
+
+  /**
+   * Add the current event to the eventQueue. If the eventQueue is full, this
+   * blocks until the SAXInfosetInputter take()'s an event and frees up a slot
+   * in the queue. Once the event is added, we update state to point to the
+   * next SAXInfosetInputterEvent that we should mutate as we receive more SAX
+   * events.
+   */
+  private def addCurrentEventToQueue(): Unit = {
+    // before we put() an event, check to make sure there hasn't been an error
+    // from the SAXInfosetInputter. If there is an error, checkUnparseResult()
+    // throws and exception which bubbles up to the XMLReader to handle. Note
+    // that doing this here avoids a potential deadlock since the
+    // SAXInfosetInputter will no longer take() events, so the queue could
+    // potentially be full and the put() would block forever. By checking for
+    // an error first, we should never reach a deadlocking call to put()
+    checkUnparseResult()
+
+    eventQueue.put(currentEvent)
+    currentIndex = (currentIndex + 1) % preAllocatedEvents.length
+    currentEvent = preAllocatedEvents(currentIndex)
+    currentEvent.clear()
   }
 
-  override def endDocument(): Unit = {
-    batchedInfosetEvents(currentIndex).eventType = One(EndDocument)
-    maybeSendToInputter()
+  /**
+   * Flag to store if relative blob URI resolution should be enabled in the
+   * infoset inputtter. This is set to true when
+   * enableResolutionOfRelativeInfosetBlobURIs() is called.
+   */
+  private var enableRelativeBlobURIs = false
+
+  /**
+   * Enable the resolution of relative infoset blob URIs. This should only be
+   * used when running TDML tests where relative blob URIs are often used. In
+   * non test usages, blob URIs should always be absolute and this should not
+   * be needed. This must be called before the startDocument() function is
+   * called for it to have an affect.
+   */
+  def enableResolutionOfRelativeInfosetBlobURIs(): Unit = enableRelativeBlobURIs = true
+
+  /**
+   * Start the unparse() thread, potentially reusing a thread from our cached
+   * thread pool, which should help to minimize the overhead with creating new
+   * threads. Note that the Future does not return any or expect to capture any
+   * exceptions. We instead mutate the unparseResult variable depending on the
+   * unparse result. This ensures we can always clear the eventQueue to prevent
+   * deadlocks.
+   */
+  private def startUnparse(): Unit = {
+    unparseTask = Future[Unit] {
+      try {
+        // It is important to create the SAXInfosetInputter in this Future
+        // because during construction it reads from the queue waiting for the
+        // StartDocument event. If we create it outside of the future and in
+        // the same thread as the ContentHandler, then we may end up blocked
+        // since this ConentHandler would not be able to put() the
+        // StartDocument event for the SAXInfosetInputter construction to
+        // take().
+        val input = new SAXInfosetInputter(eventQueue)
+        if (enableRelativeBlobURIs) input.enableResolutionOfRelativeInfosetBlobURIs()
+
+        val res = dp.unparse(input, output)
+        if (res.isError) {
+          unparseResult = Some(Left(new DaffodilUnparseErrorSAXException(res)))
+        } else {
+          unparseResult = Some(Right(res))
+        }
+      } catch {
+        //$COVERAGE-OFF$
+        case e: Exception => {
+          unparseResult = Some(Left(new DaffodilUnhandledSAXException(e.getMessage, e)))
+        }
+        //$COVERAGE-ON$
+      }
+
+      // We are finished unparsing. At this point, it is possible that the
+      // ContentHandler thread is blocked trying to put() an event on the
+      // eventQueue, but the SAXInfosetInputter thread could have failed in a
+      // way where it won't ever take() an item from the queue, leaving the
+      // ContentHandler deadlocked. Now that unparseResult is defined, we can
+      // clear eventQueue which unblocks the ContentHandler. And since
+      // unparseResult is set, the ContentHandler should not attempt anymore

Review Comment:
   anymore -> any more



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@daffodil.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [daffodil] stevedlawrence commented on pull request #879: Improve SAX parse/unparse performance

Posted by GitBox <gi...@apache.org>.
stevedlawrence commented on PR #879:
URL: https://github.com/apache/daffodil/pull/879#issuecomment-1375637505

   > I thought that when an event was queued for the peer co-routine, the main thread then would read from the reply queue, hence blocking itself until the peer replies back to it.
   >
   > That was the notion for how the two coroutines never overlap in doing work.
   >
   > Is it not functioning that way now?
   
   You are correct, this is how our coroutine implementation works.
   
   But because java doesn't support native coroutines, the non-main coroutine executes in a separate thread. So even though there is no real parallelism, we do still have to create threads. This PR attempts to reduce the overhead of creating those threads by reusing them from a thread pool.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@daffodil.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [daffodil] stevedlawrence commented on pull request #879: Improve SAX parse/unparse performance

Posted by GitBox <gi...@apache.org>.
stevedlawrence commented on PR #879:
URL: https://github.com/apache/daffodil/pull/879#issuecomment-1331224033

   > Can this be set small enough as to enforce sequential behavior, i.e., no parallelism between the caller of the content handler and the unparser?
   
   With this approach, no. Even with the batch size tunable is set to 1, the ContentHandler and SAXInfosetInputter can (and likely will) both do work a the same time. The ContentHandler will be preparing the next event while the SAXInfosetInputter is unparsing using the current event.
   
   > To reduce overhead, we need to enqueue many events before context switching and allowing the unparser to run. Arguably, we should just queue up events to some max count, or until we get endDocument. For small messages we would then get exactly one context switch per message.
   
   Some of the changes here weren't specific to the ArrayBlockingQueue approach (e.g. thread pool reuse, split() removal). I can apply them to the current coroutine approach and see how it compares. These changes definitely had a big speed up, but I'm not sure which individual changes had the biggest effect.
   
   > I continue to be of the opinion that overlap parallelism here is not an advantage. It just muddies the waters about timing and overhead of unparsing.
   
   I think one potential advantage of this parallel approach is if the incoming sax events are sporadic or relatively slow (e.g. serialized over a network/diode). With the coroutine approach, we won't attempt to do any unparsing until we get a full batch of events (or we reach the endDoc event). And if the batch size is set to something large to avoid context switching, it might mean a lot of waiting doing nothing until we get those events. With this parallel approach, we can at least start unparsing immediately and do work in the time waiting for those SAX events. Though, I'm not sure how likely that is with SAX so maybe it's not worth really considering. And maybe if that is the case, tuning the batch size to a small number is the right approach.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@daffodil.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [daffodil] stevedlawrence merged pull request #879: Improve SAX parse/unparse performance

Posted by GitBox <gi...@apache.org>.
stevedlawrence merged PR #879:
URL: https://github.com/apache/daffodil/pull/879


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@daffodil.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [daffodil] tuxji commented on a diff in pull request #879: Improve SAX parse/unparse performance

Posted by GitBox <gi...@apache.org>.
tuxji commented on code in PR #879:
URL: https://github.com/apache/daffodil/pull/879#discussion_r1063553569


##########
daffodil-lib/src/main/scala/org/apache/daffodil/util/Coroutines.scala:
##########
@@ -20,10 +20,25 @@
  import org.apache.daffodil.exceptions.UnsuppressableException
 
  import java.util.concurrent.ArrayBlockingQueue
+ import java.util.concurrent.Executors
+
+ import scala.concurrent.ExecutionContext
+ import scala.concurrent.Future
  import scala.util.Failure
  import scala.util.Success
  import scala.util.Try
 
+ object Coroutine {
+   val executionContext = new ExecutionContext {
+     private val threadPool = Executors.newCachedThreadPool()
+     def execute(runnable: Runnable): Unit = threadPool.submit(runnable)

Review Comment:
   How is the thread pool eventually closed, or does this executionContext stay around as long as the program keeps running?  What happens to threads after they finish running?  I googled and found this good discussion:
   
   https://stackoverflow.com/questions/949355/executors-newcachedthreadpool-versus-executors-newfixedthreadpool
   
   The cached thread pool closes inactive threads after 60 seconds, so it will consume no resources when it is not being used.  However, the pool has no protection against denial-of-service attacks since its maximum pool size is set to Integer.MAX_VALUE.  How does Daffodil guarantee that only a small number of coroutine threads are used at any time?
   
   If safeguards against denial-of-service attacks are needed, please add comments to key places so that maintainers will be aware of the need to preserve these safeguards or add new ones.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@daffodil.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [daffodil] tuxji commented on pull request #879: Improve SAX parse/unparse performance

Posted by GitBox <gi...@apache.org>.
tuxji commented on PR #879:
URL: https://github.com/apache/daffodil/pull/879#issuecomment-1366160229

   This PR has received 2 approvals and passed all CI checks but now has a conflicting file.  @stevedlawrence, please resolve the conflict and merge the PR or rework/close the PR if you don't want to merge it in its current state.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@daffodil.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [daffodil] stevedlawrence commented on pull request #879: Improve SAX parse/unparse performance

Posted by GitBox <gi...@apache.org>.
stevedlawrence commented on PR #879:
URL: https://github.com/apache/daffodil/pull/879#issuecomment-1373987770

   > We should create the coroutine thread exactly once.
   
   Every new instance of the DaffodilUnparseContentHandler is a new coroutine and so will create a new thread. This is needed since a user could different SAX unparse calls in parallel and so they would need mutiple DaffodilUnparseContentHandlers, each with their own thread.
   
   And in the CLI performance command, each call to unparse creates a new DaffodilUnparseContentHandler, and thus a each uparse will create a new thread.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@daffodil.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [daffodil] mbeckerle commented on pull request #879: Improve SAX parse/unparse performance

Posted by GitBox <gi...@apache.org>.
mbeckerle commented on PR #879:
URL: https://github.com/apache/daffodil/pull/879#issuecomment-1374326315

   
   > > Can this be set small enough as to enforce sequential behavior, i.e., no parallelism between the caller of the content handler and the unparser?
   > 
   > With this approach, no. Even with the batch size tunable is set to 1, the ContentHandler and SAXInfosetInputter can (and likely will) both do work a the same time. The ContentHandler will be preparing the next event while the SAXInfosetInputter is unparsing using the current event.
   
   Re-studying this PR briefly. I thought that when an event was queued for the peer co-routine, the main thread then would read from the reply queue, hence blocking itself until the peer replies back to it. 
   
   That was the notion for how the two coroutines never overlap in doing work. 
   
   Is it not functioning that way now? 
   
   As I have tried to emphasize before, the goal, in fact the whole point of coroutines, is that it is a "call" that includes changing stacks too, but there is NO parallelism any more than there is when you call a procedure. Coroutines are a means of doing an inversion-of-control from push-to-pull or vice versa, but are not supposed to have any thread-safety implications, so both main and peer coroutines may run non-thread-safe code (if they want to) because there is not any concurrency. 
   
   From other things I have read online, it seems there is quite high overhead to Java thread switching. People have ventured opinions like the whole point of co-routines is lost if you have to use kernel interfaces to implement it. 
   
   But how else are you supposed to do this control inversion from push to pull without it? 
   
   Note that once we get to Java 19, there is a new non-kernel threads thread system for Java which could reduce the context switching overhead substantially. 
   https://blogs.oracle.com/javamagazine/post/java-loom-virtual-threads-platform-threads. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@daffodil.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [daffodil] stevedlawrence commented on pull request #879: Improve SAX parse/unparse performance

Posted by GitBox <gi...@apache.org>.
stevedlawrence commented on PR #879:
URL: https://github.com/apache/daffodil/pull/879#issuecomment-1320499306

   Note that PR #873 needs to be merged before we can really get accurate numbers about how much this improves things.
   
   I think there is still alot of overhead related to the threads need to connect the SAX push-style API with our Daffodil unparse pull-style API, but this seems to have reduced it so unparse is a bit closer to other infoset inputters.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@daffodil.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org