You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@daffodil.apache.org by GitBox <gi...@apache.org> on 2020/11/10 14:57:47 UTC

[GitHub] [incubator-daffodil] stevedlawrence commented on a change in pull request #453: Add SAX Unparse Event Batching

stevedlawrence commented on a change in pull request #453:
URL: https://github.com/apache/incubator-daffodil/pull/453#discussion_r520581742



##########
File path: daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala
##########
@@ -121,38 +124,45 @@ class SAXInfosetInputter(
   }
 
   override def hasNext(): Boolean = {
-    if (endDocumentReceived) false
-    else if (!nextEvent.isEmpty) true
-    else {
-      val event = this.resume(unparseContentHandler, Try(currentEvent))
-      copyEvent(source = event, dest = nextEvent)
+    val nextIndex = currentIndex + 1
+    if (endDocumentReceived) {
+      // if the current Element is EndDocument, then there is no valid next
+      false
+    } else if (batchedInfosetEvents != null && batchedInfosetEvents.lift(nextIndex).nonEmpty
+      && !batchedInfosetEvents(nextIndex).isEmpty) {
+      // if the next element exists and is nonEmpty

Review comment:
       In what case can the nextIndex be empty? It seems like the ContentHandler will either fill up the entire array with events and not resume the coroutine until it does so, or it will stop and a EndDocument. I wonder if we can make an assumption in all this logic that there are never any empty events, as long as we never try to read past EndDocument?

##########
File path: daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala
##########
@@ -121,38 +124,45 @@ class SAXInfosetInputter(
   }
 
   override def hasNext(): Boolean = {
-    if (endDocumentReceived) false
-    else if (!nextEvent.isEmpty) true
-    else {
-      val event = this.resume(unparseContentHandler, Try(currentEvent))
-      copyEvent(source = event, dest = nextEvent)
+    val nextIndex = currentIndex + 1
+    if (endDocumentReceived) {
+      // if the current Element is EndDocument, then there is no valid next
+      false
+    } else if (batchedInfosetEvents != null && batchedInfosetEvents.lift(nextIndex).nonEmpty
+      && !batchedInfosetEvents(nextIndex).isEmpty) {
+      // if the next element exists and is nonEmpty
+      true
+    }  else  {
+      // there is no nextEvent or it was empty, but we still have no EndDocument. So load the next
+      // batch from the contentHandler
+      returnedInfosetEvent(0) = batchedInfosetEvents(currentIndex)
+      batchedInfosetEvents = this.resume(unparseContentHandler, returnedInfosetEvent)
+      // we reset the index once we receive a new batch, but we don't use the first element from
+      // this point on
+      currentIndex = 0

Review comment:
       I think this comment could be a little confusing. It maybe implies that we're doing something wasteful? The reason this must be set to 0 is because hasNext isn't allowed to change what we are looking at. Calling resume took the thing we were last looking at and made it the first thing in the array. So the currentIndex must now be changed to look at the zeroth thing. Only when next() is called are we allowed to actually increment currentIndex so we are actually looking at the next element.

##########
File path: daffodil-core/src/test/scala/org/apache/daffodil/processor/TestSAXParseUnparseAPI.scala
##########
@@ -55,8 +55,13 @@ object TestSAXParseUnparseAPI {
 
   lazy val dp: DataProcessor = testDataprocessor(testSchema)
 
-  def testDataprocessor(testSchema: scala.xml.Elem): DataProcessor = {
-    val schemaCompiler = Compiler()
+  def testDataprocessor(testSchema: scala.xml.Elem, tunablesArg: Option[Map[String, String]] = None): DataProcessor = {
+    val schemaCompiler =
+      if (tunablesArg.nonEmpty) {
+        Compiler().withTunables(tunablesArg.get)
+      } else {
+        Compiler()
+      }

Review comment:
       Rather than making tunablesArg an ``Option``, how about it's just a ``Map`` and it defaults to ``Map.empty``. The ``withTunables`` function should work just fine with an empty map, so this can just become
   ```scala
   val schemaCompiler = Compiler.withTuanbles(tunablesArg)
   ```
   So it cleans things up a little bit.
   
   Only downside is It will create a copy of the ``Compiler`` instance even if there are no tunables, but that doesn't seem like too big of a deal to me. If we really cared, we could modify the ``withTunables(Map)`` method to not do the copy if the map is empty, but I'm not sure if that's worth it.

##########
File path: daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala
##########
@@ -121,38 +124,45 @@ class SAXInfosetInputter(
   }
 
   override def hasNext(): Boolean = {
-    if (endDocumentReceived) false
-    else if (!nextEvent.isEmpty) true
-    else {
-      val event = this.resume(unparseContentHandler, Try(currentEvent))
-      copyEvent(source = event, dest = nextEvent)
+    val nextIndex = currentIndex + 1
+    if (endDocumentReceived) {
+      // if the current Element is EndDocument, then there is no valid next
+      false
+    } else if (batchedInfosetEvents != null && batchedInfosetEvents.lift(nextIndex).nonEmpty
+      && !batchedInfosetEvents(nextIndex).isEmpty) {
+      // if the next element exists and is nonEmpty
+      true
+    }  else  {
+      // there is no nextEvent or it was empty, but we still have no EndDocument. So load the next
+      // batch from the contentHandler
+      returnedInfosetEvent(0) = batchedInfosetEvents(currentIndex)
+      batchedInfosetEvents = this.resume(unparseContentHandler, returnedInfosetEvent)
+      // we reset the index once we receive a new batch, but we don't use the first element from
+      // this point on
+      currentIndex = 0
       true
     }
   }
 
   override def next(): Unit = {
     if (hasNext()) {
-      copyEvent(source = Try(nextEvent), dest = currentEvent)
-      nextEvent.clear()
-      if (currentEvent.eventType.contains(EndDocument)) endDocumentReceived = true
+      // clear element at current index as we're done with it, except in the case we just loaded the
+      // new elements, then do nothing
+      batchedInfosetEvents(currentIndex).clear()
+

Review comment:
       Just  so it's clear to me, the contract between the ContenttHandler and the InofsetInputter is that the ContentHandler fills up the array, and as the SAXInfosetInputter is done with each event it clears that event. So when we resume to the infoset inputter it can assume that the entire array has been cleared and doesn't need to clear anything itself?

##########
File path: daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala
##########
@@ -121,38 +124,45 @@ class SAXInfosetInputter(
   }
 
   override def hasNext(): Boolean = {
-    if (endDocumentReceived) false
-    else if (!nextEvent.isEmpty) true
-    else {
-      val event = this.resume(unparseContentHandler, Try(currentEvent))
-      copyEvent(source = event, dest = nextEvent)
+    val nextIndex = currentIndex + 1
+    if (endDocumentReceived) {
+      // if the current Element is EndDocument, then there is no valid next
+      false
+    } else if (batchedInfosetEvents != null && batchedInfosetEvents.lift(nextIndex).nonEmpty
+      && !batchedInfosetEvents(nextIndex).isEmpty) {
+      // if the next element exists and is nonEmpty
+      true
+    }  else  {
+      // there is no nextEvent or it was empty, but we still have no EndDocument. So load the next
+      // batch from the contentHandler
+      returnedInfosetEvent(0) = batchedInfosetEvents(currentIndex)
+      batchedInfosetEvents = this.resume(unparseContentHandler, returnedInfosetEvent)
+      // we reset the index once we receive a new batch, but we don't use the first element from
+      // this point on
+      currentIndex = 0
       true
     }
   }
 
   override def next(): Unit = {
     if (hasNext()) {
-      copyEvent(source = Try(nextEvent), dest = currentEvent)
-      nextEvent.clear()
-      if (currentEvent.eventType.contains(EndDocument)) endDocumentReceived = true
+      // clear element at current index as we're done with it, except in the case we just loaded the
+      // new elements, then do nothing
+      batchedInfosetEvents(currentIndex).clear()
+
+      // increment current index to the next index
+      currentIndex += 1
+
+      // check if new current index is EndDocument
+      if (batchedInfosetEvents.lift(currentIndex).nonEmpty
+        && batchedInfosetEvents(currentIndex).eventType.contains(EndDocument)) {
+        endDocumentReceived = true
+      }

Review comment:
       We called hasNext already, so that should mean currentIndex must exist, so I think this lift call can be removed.

##########
File path: daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala
##########
@@ -121,38 +124,45 @@ class SAXInfosetInputter(
   }
 
   override def hasNext(): Boolean = {
-    if (endDocumentReceived) false
-    else if (!nextEvent.isEmpty) true
-    else {
-      val event = this.resume(unparseContentHandler, Try(currentEvent))
-      copyEvent(source = event, dest = nextEvent)
+    val nextIndex = currentIndex + 1
+    if (endDocumentReceived) {
+      // if the current Element is EndDocument, then there is no valid next
+      false
+    } else if (batchedInfosetEvents != null && batchedInfosetEvents.lift(nextIndex).nonEmpty

Review comment:
       We might want to avoid the lift function. This allocates an Option, which we try to avoid where possible. Also, looking at the definition of Array.lift in scala doc is says:
   
   > This member is added by an implicit conversion from Array[T] toIndexedSeq[T] performed by method copyArrayToImmutableIndexedSeq in scala.LowPriorityImplicits2.
   
   So just do do this lift, there's some copying going on. This copy is hopefully efficient, but probably best to avoid. Especially since it seems to be just testing if there is a nextIndex. I think this can be done instead with something like ``nextIndex < batchedInfosetEvents.length``

##########
File path: daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala
##########
@@ -212,31 +224,39 @@ class DaffodilUnparseContentHandler(
   }
 
   private def sendToInputter(): Unit = {
-    val infosetEventWithResponse = this.resume(inputter, Try(infosetEvent))
-    infosetEvent.clear()
-    // if event is wrapped in a Try failure, we will not have an unparseResult, so we only set
-    // unparseResults for events wrapped in Try Success, including those events that have expected
-    // errors
-    if (infosetEventWithResponse.isSuccess && infosetEventWithResponse.get.unparseResult.isDefined) {
-      unparseResult = infosetEventWithResponse.get.unparseResult.get
-    }
-    // the exception from events wrapped in Try failures and events wrapped in Try Successes
-    // (with an unparse error state i.e unparseResult.isError) are collected and thrown to stop
-    // the execution of the xmlReader
-    if (infosetEventWithResponse.isFailure || infosetEventWithResponse.get.isError) {
-      val causeError = if(infosetEventWithResponse.isFailure) {
-        infosetEventWithResponse.failed.get
-      } else {
-        infosetEventWithResponse.get.causeError.get
+    val nextIndex = currentIndex + 1
+    if (nextIndex < SAX_UNPARSE_EVENT_BATCH_SIZE
+      && batchedInfosetEvents(currentIndex).eventType.get != EndDocument) {
+      // if we have room left on the batchedInfosetEvents array and the current element != EndDocument
+      currentIndex += 1
+      // if the new currentElement has any existing content, clear it
+      if (!batchedInfosetEvents(currentIndex).isEmpty) batchedInfosetEvents(currentIndex).clear()
+    } else {
+      // ready to send it off
+      val infosetEventWithResponse = this.resume(inputter, batchedInfosetEvents).head
+      // we only ever return a one element array
+
+      // it is possible for unparseResult to be null, in the case of an DaffodilUnhandledSAXException
+      if (infosetEventWithResponse.unparseResult.isDefined) {
+        unparseResult = infosetEventWithResponse.unparseResult.get
       }
-      causeError match {
-        case unparseError: DaffodilUnparseErrorSAXException =>
-          // although this is an expected error, we need to throw it so we can stop the xmlReader
-          // parse and this thread
-          throw unparseError
-        case unhandled: DaffodilUnhandledSAXException => throw unhandled
-        case unknown => throw new DaffodilUnhandledSAXException("Unknown exception: ", new Exception(unknown))
+      // any exception is collected and thrown to stop the execution of the xmlReader
+      if (infosetEventWithResponse.isError) {
+        val causeError = infosetEventWithResponse.causeError.get
+        causeError match {
+          case unparseError: DaffodilUnparseErrorSAXException =>
+            // although this is an expected error, we need to throw it so we can stop the xmlReader
+            // parse and this thread
+            throw unparseError
+          case unhandled: DaffodilUnhandledSAXException => throw unhandled
+          case unknown => throw new DaffodilUnhandledSAXException("Unknown exception: ",
+            new Exception(unknown))
+        }
       }
+      // clear the last element, then start to load elements starting at the second element since
+      // the first element is used as a buffer by the SAXInfosetInputter when new batches are sent
+      batchedInfosetEvents(currentIndex).clear()

Review comment:
       I was saying other calls to clear in the ContentHandler maybe weren't needed, but I think this one is actually needed. That's because the InfosetInputter only clears the current element when it calls next(), but it never calls next when the current element is the last one in the array. So the InfosetInputter clears all elements except the last one. Might be worth adding a comment to that affect.

##########
File path: daffodil-core/src/test/scala/org/apache/daffodil/processor/TestSAXUnparseAPI.scala
##########
@@ -46,6 +49,24 @@ class TestSAXUnparseAPI {
     assertEquals(testData, bao.toString)
   }
 
+  @Test def testUnparseContentHandler_unparse_saxUnparseEventBatchSize_0(): Unit = {
+    val dpT = testDataprocessor(testSchema, Some(Map("saxUnparseEventBatchSize" -> "0")))

Review comment:
       I've also created [DAFFODIL-2434](https://issues.apache.org/jira/browse/DAFFODIL-2432) as a way to more throughly ensure our tunables have valid values. That way we can validate them when they are set rather than when the values are actually used.

##########
File path: daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala
##########
@@ -212,31 +224,39 @@ class DaffodilUnparseContentHandler(
   }
 
   private def sendToInputter(): Unit = {

Review comment:
       Suggest renaming this to something like ``maybeSendToInputter`` since it might not necesarily send to the inputter unless the batch is full?

##########
File path: daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala
##########
@@ -212,31 +224,39 @@ class DaffodilUnparseContentHandler(
   }
 
   private def sendToInputter(): Unit = {
-    val infosetEventWithResponse = this.resume(inputter, Try(infosetEvent))
-    infosetEvent.clear()
-    // if event is wrapped in a Try failure, we will not have an unparseResult, so we only set
-    // unparseResults for events wrapped in Try Success, including those events that have expected
-    // errors
-    if (infosetEventWithResponse.isSuccess && infosetEventWithResponse.get.unparseResult.isDefined) {
-      unparseResult = infosetEventWithResponse.get.unparseResult.get
-    }
-    // the exception from events wrapped in Try failures and events wrapped in Try Successes
-    // (with an unparse error state i.e unparseResult.isError) are collected and thrown to stop
-    // the execution of the xmlReader
-    if (infosetEventWithResponse.isFailure || infosetEventWithResponse.get.isError) {
-      val causeError = if(infosetEventWithResponse.isFailure) {
-        infosetEventWithResponse.failed.get
-      } else {
-        infosetEventWithResponse.get.causeError.get
+    val nextIndex = currentIndex + 1
+    if (nextIndex < SAX_UNPARSE_EVENT_BATCH_SIZE
+      && batchedInfosetEvents(currentIndex).eventType.get != EndDocument) {
+      // if we have room left on the batchedInfosetEvents array and the current element != EndDocument
+      currentIndex += 1
+      // if the new currentElement has any existing content, clear it
+      if (!batchedInfosetEvents(currentIndex).isEmpty) batchedInfosetEvents(currentIndex).clear()

Review comment:
       I think this shouldn't be necessary since the the InfosetInputter is already clearing everything? Perhaps this should just be ``Assert(batchedInfosetEvents(currentIndex).isEmpty)`` if we want to ensure that the ContentHandler is doing the right hthing?

##########
File path: daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala
##########
@@ -212,31 +224,39 @@ class DaffodilUnparseContentHandler(
   }
 
   private def sendToInputter(): Unit = {
-    val infosetEventWithResponse = this.resume(inputter, Try(infosetEvent))
-    infosetEvent.clear()
-    // if event is wrapped in a Try failure, we will not have an unparseResult, so we only set
-    // unparseResults for events wrapped in Try Success, including those events that have expected
-    // errors
-    if (infosetEventWithResponse.isSuccess && infosetEventWithResponse.get.unparseResult.isDefined) {
-      unparseResult = infosetEventWithResponse.get.unparseResult.get
-    }
-    // the exception from events wrapped in Try failures and events wrapped in Try Successes
-    // (with an unparse error state i.e unparseResult.isError) are collected and thrown to stop
-    // the execution of the xmlReader
-    if (infosetEventWithResponse.isFailure || infosetEventWithResponse.get.isError) {
-      val causeError = if(infosetEventWithResponse.isFailure) {
-        infosetEventWithResponse.failed.get
-      } else {
-        infosetEventWithResponse.get.causeError.get
+    val nextIndex = currentIndex + 1
+    if (nextIndex < SAX_UNPARSE_EVENT_BATCH_SIZE
+      && batchedInfosetEvents(currentIndex).eventType.get != EndDocument) {
+      // if we have room left on the batchedInfosetEvents array and the current element != EndDocument
+      currentIndex += 1
+      // if the new currentElement has any existing content, clear it
+      if (!batchedInfosetEvents(currentIndex).isEmpty) batchedInfosetEvents(currentIndex).clear()
+    } else {
+      // ready to send it off
+      val infosetEventWithResponse = this.resume(inputter, batchedInfosetEvents).head
+      // we only ever return a one element array
+

Review comment:
       I feel like something is missing somewhere after this.
   
   At this point, the InfosetInputter has consumed all of the events in the the array, it called hasNext, and we are back at the ContentHandler here to get more events. That means the InfoseInputter thinks the current event is the last one in the batch array. When the ContentHander eventually yeilds back to the InfosetInputter, the InfosetInputter will expect that the event it was looking at to now be at the first index in the array (that's why it resets currentIndex back to zero). So I would expect this to copy that last event from the last index to the first index. And in fact, I think the last current event is the infosetEventWIthRepponse, right? So I think you can do something like:
   
   ```scala
   if (infosetEventWithResponse.unparseResult.isEmpty) {
     copyEvent(infosetEventWithResponse, batchedInfosetEvents(0))
   } 
   ```
   Note that I think this technically doesn't matter because anytime we call hasNext we immediately call next(), so the infoset inputter will never actually access this zeroth event after hasNext is called. However, someday we might, and so we want to ensure things work. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org