You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@daffodil.apache.org by sl...@apache.org on 2018/07/17 12:44:17 UTC

[incubator-daffodil] branch master updated: Support the --stream option with the unparse subcommand

This is an automated email from the ASF dual-hosted git repository.

slawrence pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-daffodil.git


The following commit(s) were added to refs/heads/master by this push:
     new ab1a1a0  Support the --stream option with the unparse subcommand
ab1a1a0 is described below

commit ab1a1a0d8cdc1fe0b0963f35edd256f2b28472b8
Author: Steve Lawrence <sl...@apache.org>
AuthorDate: Mon Jul 16 15:05:52 2018 -0400

    Support the --stream option with the unparse subcommand
    
    - When the --stream option is provided to the parse command, output a
      NUL character (\u0000) after each infoset is output to act as a
      separator. The NUL character is not allowed in either XML or JSON
      output, so this can never be confused with infoset data. This is also
      consistent with common linux tools like 'find' and 'xargs' which can
      use the NUL character for data separation.
    - When the --stream option is provided to the unparse command, the
      input data is split on the NUL character. Each chunk is converted to
      an InfosetInputter and unparsed to the same output channel. Note that
      we need to do this split because all the XML parsing libraries we use
      see the XML stream as invalid since there is not a single root
      element.
    - Fix a bug in the DirectOrBufferDataOutputStream where we incorrectly
      closed the underlying JavaOutputStream provided from the user, which
      caused successive unparse writes to fail. We do still need to close
      JavaOutputStreams related to layering, so add a flag that defines
      which DOS's are for layering and close those when appropriate. But if
      a DOS is not for layering, do not close it--the user is responsible
      for cleanup.
    
    These changes enable piping the output of parse --stream to unparse
    --stream, acting as a streaming round trip.
    
    DAFFODIL-1967
---
 .../org/apache/daffodil/CLI/input/input19.txt      | Bin 0 -> 308 bytes
 .../daffodil/unparsing/TestCLIUnparsing.scala      |  17 +++++++
 .../src/main/scala/org/apache/daffodil/Main.scala  |  55 ++++++++++++++++++---
 .../io/DirectOrBufferedDataOutputStream.scala      |  26 +++++++---
 .../apache/daffodil/layers/LayerTransformer.scala  |   2 +-
 project/Rat.scala                                  |   1 +
 6 files changed, 85 insertions(+), 16 deletions(-)

diff --git a/daffodil-cli/src/it/resources/org/apache/daffodil/CLI/input/input19.txt b/daffodil-cli/src/it/resources/org/apache/daffodil/CLI/input/input19.txt
new file mode 100644
index 0000000..7b62ed6
Binary files /dev/null and b/daffodil-cli/src/it/resources/org/apache/daffodil/CLI/input/input19.txt differ
diff --git a/daffodil-cli/src/it/scala/org/apache/daffodil/unparsing/TestCLIUnparsing.scala b/daffodil-cli/src/it/scala/org/apache/daffodil/unparsing/TestCLIUnparsing.scala
index 3c12f9d..a3328b5 100644
--- a/daffodil-cli/src/it/scala/org/apache/daffodil/unparsing/TestCLIUnparsing.scala
+++ b/daffodil-cli/src/it/scala/org/apache/daffodil/unparsing/TestCLIUnparsing.scala
@@ -410,4 +410,21 @@ class TestCLIunparsing {
     }
   }
 
+  @Test def test_XXX_CLI_Unparsing_Stream_01() {
+    val schemaFile = Util.daffodilPath("daffodil-cli/src/it/resources/org/apache/daffodil/CLI/cli_schema_02.dfdl.xsd")
+    val inputFile = Util.daffodilPath("daffodil-cli/src/it/resources/org/apache/daffodil/CLI/input/input19.txt")
+    val (testSchemaFile, testInputFile) = if (Util.isWindows) (Util.cmdConvert(schemaFile), Util.cmdConvert(inputFile)) else (schemaFile, inputFile)
+
+    val shell = Util.start("")
+
+    try {
+      val cmd = String.format("%s unparse --stream -s %s %s", Util.binPath, testSchemaFile, testInputFile)
+      shell.sendLine(cmd)
+      shell.expect(contains("123"))
+      shell.send("exit\n")
+    } finally {
+      shell.close()
+    }
+  }
+
 }
diff --git a/daffodil-cli/src/main/scala/org/apache/daffodil/Main.scala b/daffodil-cli/src/main/scala/org/apache/daffodil/Main.scala
index f12476e..451cdc1 100644
--- a/daffodil-cli/src/main/scala/org/apache/daffodil/Main.scala
+++ b/daffodil-cli/src/main/scala/org/apache/daffodil/Main.scala
@@ -29,6 +29,8 @@ import java.io.ByteArrayInputStream
 import java.io.File
 import java.nio.channels.Channels
 import java.net.URI
+import java.util.Scanner
+
 import scala.xml.SAXParseException
 import org.rogach.scallop
 import org.apache.daffodil.debugger.{ InteractiveDebugger, TraceDebuggerRunner, CLIDebuggerRunner }
@@ -341,7 +343,7 @@ class CLIConf(arguments: Array[String]) extends scallop.ScallopConf(arguments)
     val tunables = props[String]('T', keyName = "tunable", valueName = "value", descr = "daffodil tunable to be used when parsing.")
     val config = opt[String](short = 'c', argName = "file", descr = "path to file containing configuration items.")
     val infosetType = opt[String](short = 'I', argName = "infoset_type", descr = "infoset type to output. Must be one of 'xml', 'scala-xml', 'json', 'jdom', 'w3cdom', or 'null'.", default = Some("xml")).map { _.toLowerCase }
-    val stream = toggle(noshort = true, default = Some(false), descrYes = "when left over data exists, parse again with remaining data", descrNo = "stop after the first parse, even if left over data exists")
+    val stream = toggle(noshort = true, default = Some(false), descrYes = "when left over data exists, parse again with remaining data, separating infosets by a NUL character", descrNo = "stop after the first parse, even if left over data exists")
     val infile = trailArg[String](required = false, descr = "input file to parse. If not specified, or a value of -, reads from stdin.")
 
     validateOpt(debug, infile) {
@@ -434,6 +436,7 @@ class CLIConf(arguments: Array[String]) extends scallop.ScallopConf(arguments)
               |                        [--validate [mode]]
               |                        [-D[{namespace}]<variable>=<value>...] [-c <file>]
               |                        [-I <infoset_type>]
+              |                        [--stream]
               |                        [-o <output>] [infile]
               |
               |Unparse an infoset file, using either a DFDL schema or a saved parser
@@ -453,6 +456,7 @@ class CLIConf(arguments: Array[String]) extends scallop.ScallopConf(arguments)
     val tunables = props[String]('T', keyName = "tunable", valueName = "value", descr = "daffodil tunable to be used when parsing.")
     val config = opt[String](short = 'c', argName = "file", descr = "path to file containing configuration items.")
     val infosetType = opt[String](short = 'I', argName = "infoset_type", descr = "infoset type to unparse. Must be one of 'xml', 'scala-xml', 'json', 'jdom', or 'w3cdom'.", default = Some("xml")).map { _.toLowerCase }
+    val stream = toggle(noshort = true, default = Some(false), descrYes = "split the input data on the NUL character, and unparse each chuck separately", descrNo = "treat the entire input data as one infoset")
     val infile = trailArg[String](required = false, descr = "input file to unparse. If not specified, or a value of -, reads from stdin.")
 
     validateOpt(debug, infile) {
@@ -906,6 +910,7 @@ object Main extends Logging {
                       lastParseBitPosition = loc.bitPos0b
                       keepParsing = true
                       error = false
+                      writer.write("\u0000")
                     }
                   } else {
                     // not streaming, show left over data warning
@@ -1100,15 +1105,49 @@ object Main extends Logging {
           case None => 1
           case Some(processor) => {
             setupDebugOrTrace(processor.asInstanceOf[DataProcessor], conf)
-            val data = IOUtils.toByteArray(is)
-            val inputterData = infosetDataToInputterData(unparseOpts.infosetType.toOption.get, data)
-            val inputter = getInfosetInputter(unparseOpts.infosetType.toOption.get, inputterData)
-            val unparseResult = Timer.getResult("unparsing", processor.unparse(inputter, outChannel))
-            output.close()
-            displayDiagnostics(unparseResult)
-            if (unparseResult.isError) 1 else 0
+
+            val maybeScanner =
+              if (unparseOpts.stream.toOption.get) {
+                val scnr = new Scanner(is)
+                scnr.useDelimiter("\u0000")
+                Some(scnr)
+              } else {
+                None
+              }
+
+            var keepUnparsing = maybeScanner.isEmpty || maybeScanner.get.hasNext
+            var error = false
+
+            while (keepUnparsing) {
+
+              val data =
+                if (maybeScanner.isDefined) {
+                  maybeScanner.get.next().getBytes()
+                } else {
+                  IOUtils.toByteArray(is)
+                }
+
+              val inputterData = infosetDataToInputterData(unparseOpts.infosetType.toOption.get, data)
+              val inputter = getInfosetInputter(unparseOpts.infosetType.toOption.get, inputterData)
+              val unparseResult = Timer.getResult("unparsing", processor.unparse(inputter, outChannel))
+              displayDiagnostics(unparseResult)
+
+              if (unparseResult.isError) {
+                keepUnparsing = false
+                error = true
+              } else {
+                keepUnparsing = maybeScanner.isDefined && maybeScanner.get.hasNext
+                error = false
+              }
+            }
+
+            if (error) 1 else 0
           }
         }
+
+        is.close()
+        outChannel.close()
+
         rc
       }
 
diff --git a/daffodil-io/src/main/scala/org/apache/daffodil/io/DirectOrBufferedDataOutputStream.scala b/daffodil-io/src/main/scala/org/apache/daffodil/io/DirectOrBufferedDataOutputStream.scala
index b07c0ef..077db6b 100644
--- a/daffodil-io/src/main/scala/org/apache/daffodil/io/DirectOrBufferedDataOutputStream.scala
+++ b/daffodil-io/src/main/scala/org/apache/daffodil/io/DirectOrBufferedDataOutputStream.scala
@@ -81,8 +81,15 @@ private[io] class ByteArrayOutputStreamWithGetBuf() extends java.io.ByteArrayOut
  * Has two modes of operation, buffering or direct. When buffering, all output goes into a
  * buffer. When direct, all output goes into a "real" DataOutputStream.
  *
+ * The isLayer parameter defines whether or not this instance originated from a
+ * layer or not. This is important to specify because this class is reponsible
+ * for closing the associated Java OutputStream, ultimately being written to
+ * the underlying underlying DataOutputStream. However, if the DataOutputStream
+ * is not related to a layer, that means the associated Java OutputStream came
+ * from the user and it is the users responsibility to close it. The isLayer
+ * provides the flag to know which streams should be closed or not.
  */
-final class DirectOrBufferedDataOutputStream private[io] (var splitFrom: DirectOrBufferedDataOutputStream)
+final class DirectOrBufferedDataOutputStream private[io] (var splitFrom: DirectOrBufferedDataOutputStream, val isLayer: Boolean = false)
   extends DataOutputStreamImplMixin {
   type ThisType = DirectOrBufferedDataOutputStream
 
@@ -212,7 +219,7 @@ final class DirectOrBufferedDataOutputStream private[io] (var splitFrom: DirectO
    */
   def addBuffered: DirectOrBufferedDataOutputStream = {
     Assert.usage(_following.isEmpty)
-    val newBufStr = new DirectOrBufferedDataOutputStream(this)
+    val newBufStr = new DirectOrBufferedDataOutputStream(this, isLayer)
     _following = One(newBufStr)
     //
     // TODO: PERFORMANCE: This is very pessimistic. It's making a complete clone of the state
@@ -324,9 +331,14 @@ final class DirectOrBufferedDataOutputStream private[io] (var splitFrom: DirectO
             // zero out so we don't end up thinking it is still there
             directStream.cst.setFragmentLastByte(0, 0)
           }
-          // now flush/close the whole data output stream
-          // propagate the closed-ness by closing the underlying java stream.
-          directStream.getJavaOutputStream().close()
+          // Now flush the whole data output stream. Note that we only want to
+          // close the java output stream if it was one we created for
+          // layering. If it was not from a layer, then it is the underlying
+          // OutputStream from a user and they are responsible for closing it.
+          directStream.getJavaOutputStream().flush()
+          if (directStream.isLayer) {
+            directStream.getJavaOutputStream().close()
+          }
           directStream.setDOSState(Uninitialized) // not just finished. We're dead now.
         } else {
           // the last stream we merged forward into was not finished.
@@ -805,8 +817,8 @@ object DirectOrBufferedDataOutputStream {
    * Factory for creating new ones/
    * Passing creator as null indicates no other stream created this one.
    */
-  def apply(jos: java.io.OutputStream, creator: DirectOrBufferedDataOutputStream) = {
-    val dbdos = new DirectOrBufferedDataOutputStream(creator)
+  def apply(jos: java.io.OutputStream, creator: DirectOrBufferedDataOutputStream, isLayer: Boolean = false) = {
+    val dbdos = new DirectOrBufferedDataOutputStream(creator, isLayer)
     dbdos.setJavaOutputStream(jos)
 
     if (creator eq null) {
diff --git a/daffodil-runtime1/src/main/scala/org/apache/daffodil/layers/LayerTransformer.scala b/daffodil-runtime1/src/main/scala/org/apache/daffodil/layers/LayerTransformer.scala
index 7cc6afd..3687b5a 100644
--- a/daffodil-runtime1/src/main/scala/org/apache/daffodil/layers/LayerTransformer.scala
+++ b/daffodil-runtime1/src/main/scala/org/apache/daffodil/layers/LayerTransformer.scala
@@ -153,7 +153,7 @@ abstract class LayerTransformer()
     val jos = wrapJavaOutputStream(s, state)
     val limitedJOS = wrapLimitingStream(jos, state)
     val encodedOutputStream = wrapLayerEncoder(limitedJOS)
-    val newDOS = DirectOrBufferedDataOutputStream(encodedOutputStream, null)
+    val newDOS = DirectOrBufferedDataOutputStream(encodedOutputStream, null, isLayer = true)
     newDOS.setPriorBitOrder(BitOrder.MostSignificantBitFirst)
     newDOS.setAbsStartingBitPos0b(ULong(0L))
     newDOS.setDebugging(s.areDebugging)
diff --git a/project/Rat.scala b/project/Rat.scala
index 888517e..1ec9df4 100644
--- a/project/Rat.scala
+++ b/project/Rat.scala
@@ -62,6 +62,7 @@ object Rat {
     file("daffodil-cli/src/it/resources/org/apache/daffodil/CLI/input/input15.txt"),
     file("daffodil-cli/src/it/resources/org/apache/daffodil/CLI/input/input16.txt"),
     file("daffodil-cli/src/it/resources/org/apache/daffodil/CLI/input/input18.txt"),
+    file("daffodil-cli/src/it/resources/org/apache/daffodil/CLI/input/input19.txt"),
     file("daffodil-cli/src/it/resources/org/apache/daffodil/CLI/input/input1.txt"),
     file("daffodil-cli/src/it/resources/org/apache/daffodil/CLI/input/input2.txt"),
     file("daffodil-cli/src/it/resources/org/apache/daffodil/CLI/input/input3.txt"),