You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by "cgivre (via GitHub)" <gi...@apache.org> on 2024/01/02 03:37:20 UTC

Re: [PR] DRILL-2835: Daffodil Feature for Drill (drill)

cgivre commented on code in PR #2836:
URL: https://github.com/apache/drill/pull/2836#discussion_r1439055155


##########
contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilBatchReader.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil;
+
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Objects;
+
+import org.apache.daffodil.japi.DataProcessor;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.daffodil.schema.DaffodilDataProcessorFactory;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils.daffodilDataProcessorToDrillSchema;
+
+
+public class DaffodilBatchReader implements ManagedReader {
+
+  private static final Logger logger = LoggerFactory.getLogger(DaffodilBatchReader.class);
+  private final DaffodilFormatConfig dafConfig;
+  private final RowSetLoader rowSetLoader;
+  private final CustomErrorContext errorContext;
+  private final DaffodilMessageParser dafParser;
+  private final InputStream dataInputStream;
+
+  static class DaffodilReaderConfig {
+    final DaffodilFormatPlugin plugin;
+    DaffodilReaderConfig(DaffodilFormatPlugin plugin) {
+      this.plugin = plugin;
+    }
+  }
+
+  public DaffodilBatchReader (DaffodilReaderConfig readerConfig, EasySubScan scan, FileSchemaNegotiator negotiator) {
+
+    errorContext = negotiator.parentErrorContext();
+    this.dafConfig = readerConfig.plugin.getConfig();
+
+    String schemaURIString = dafConfig.getSchemaURI(); // "schema/complexArray1.dfdl.xsd";
+    String rootName = dafConfig.getRootName();
+    String rootNamespace = dafConfig.getRootNamespace();
+    boolean validationMode = dafConfig.getValidationMode();
+
+    URI dfdlSchemaURI;
+    try {
+      dfdlSchemaURI = new URI(schemaURIString);
+    } catch (URISyntaxException e) {
+      throw UserException.validationError(e)
+          .build(logger);
+    }
+
+    FileDescrip file = negotiator.file();
+    DrillFileSystem fs = file.fileSystem();
+    URI fsSchemaURI = fs.getUri().resolve(dfdlSchemaURI);
+
+
+    DaffodilDataProcessorFactory dpf = new DaffodilDataProcessorFactory();
+    DataProcessor dp;
+    try {
+      dp = dpf.getDataProcessor(fsSchemaURI, validationMode, rootName, rootNamespace);
+    } catch (Exception e) {
+      throw UserException.dataReadError(e)
+          .message(String.format("Failed to get Daffodil DFDL processor for: %s", fsSchemaURI))
+          .addContext(errorContext).addContext(e.getMessage()).build(logger);
+    }
+    // Create the corresponding Drill schema.
+    // Note: this could be a very large schema. Think of a large complex RDBMS schema,
+    // all of it, hundreds of tables, but all part of the same metadata tree.
+    TupleMetadata drillSchema = daffodilDataProcessorToDrillSchema(dp);
+    // Inform Drill about the schema
+    negotiator.tableSchema(drillSchema, true);
+
+    //
+    // DATA TIME: Next we construct the runtime objects, and open files.
+    //
+    // We get the DaffodilMessageParser, which is a stateful driver for daffodil that
+    // actually does the parsing.
+    rowSetLoader = negotiator.build().writer();
+
+    // We construct the Daffodil InfosetOutputter which the daffodil parser uses to
+    // convert infoset event calls to fill in a Drill row via a rowSetLoader.
+    DaffodilDrillInfosetOutputter outputter = new DaffodilDrillInfosetOutputter(rowSetLoader);
+
+    // Now we can setup the dafParser with the outputter it will drive with
+    // the parser-produced infoset.
+    dafParser = new DaffodilMessageParser(dp); // needs further initialization after this.
+    dafParser.setInfosetOutputter(outputter);
+
+    Path dataPath = file.split().getPath();
+    // Lastly, we open the data stream
+    try {
+      dataInputStream = fs.openPossiblyCompressedStream(dataPath);
+    } catch (Exception e) {
+      throw UserException.dataReadError(e)
+          .message(String.format("Failed to open input file: %s", dataPath.toString()))
+          .addContext(errorContext).addContext(e.getMessage()).build(logger);
+    }
+    // And lastly,... tell daffodil the input data stream.
+    dafParser.setInputStream(dataInputStream);
+  }
+
+
+  /**
+   * This is the core of actual processing - data movement from Daffodil to Drill.
+   * <p>
+   * If there is space in the batch, and there is data available to parse
+   * then this calls the daffodil parser, which parses data, delivering it to the rowWriter
+   * by way of the infoset outputter.
+   * <p>
+   * Repeats until the rowWriter is full (a batch is full), or there is no more data, or
+   * a parse error ends execution with a throw.
+   * <p>
+   * Validation errors and other warnings are not errors and are logged but do not cause
+   * parsing to fail/throw.
+   * @return true if there are rows retrieved, false if no rows were retrieved, which means
+   * no more will ever be retrieved (end of data).
+   * @throws RuntimeException on parse errors.
+   */
+  @Override
+  public boolean next() {
+    // Check assumed invariants
+    // We don't know if there is data or not. This could be called on an empty data file.
+    // We DO know that this won't be called if there is no space in the batch for even 1
+    // row.
+    if (dafParser.isEOF()) {
+      return false; // return without even checking for more rows or trying to parse.
+    }
+    while (rowSetLoader.start() && !dafParser.isEOF()) { // we never zero-trip this loop.
+      // the predicate is always true once.
+      try {
+        dafParser.parse();
+        if (dafParser.isProcessingError()) {
+          assert(Objects.nonNull(dafParser.getDiagnostics()));
+          throw UserException.dataReadError().message(dafParser.getDiagnosticsAsString())
+              .addContext(errorContext).build(logger);
+        }
+        if (dafParser.isValidationError()) {
+          logger.warn(dafParser.getDiagnosticsAsString());
+          // Note that even if daffodil is set to not validate, validation errors may still occur
+          // from DFDL's "recoverableError" assertions.
+        }
+      } catch (Exception e) {

Review Comment:
   Same comment here.  Do we know what kind(s) of exceptions we may encounter here?



##########
contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilBatchReader.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil;
+
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Objects;
+
+import org.apache.daffodil.japi.DataProcessor;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.daffodil.schema.DaffodilDataProcessorFactory;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils.daffodilDataProcessorToDrillSchema;
+
+
+public class DaffodilBatchReader implements ManagedReader {
+
+  private static final Logger logger = LoggerFactory.getLogger(DaffodilBatchReader.class);
+  private final DaffodilFormatConfig dafConfig;
+  private final RowSetLoader rowSetLoader;
+  private final CustomErrorContext errorContext;
+  private final DaffodilMessageParser dafParser;
+  private final InputStream dataInputStream;
+
+  static class DaffodilReaderConfig {
+    final DaffodilFormatPlugin plugin;
+    DaffodilReaderConfig(DaffodilFormatPlugin plugin) {
+      this.plugin = plugin;
+    }
+  }
+
+  public DaffodilBatchReader (DaffodilReaderConfig readerConfig, EasySubScan scan, FileSchemaNegotiator negotiator) {
+
+    errorContext = negotiator.parentErrorContext();
+    this.dafConfig = readerConfig.plugin.getConfig();
+
+    String schemaURIString = dafConfig.getSchemaURI(); // "schema/complexArray1.dfdl.xsd";
+    String rootName = dafConfig.getRootName();
+    String rootNamespace = dafConfig.getRootNamespace();
+    boolean validationMode = dafConfig.getValidationMode();
+
+    URI dfdlSchemaURI;
+    try {
+      dfdlSchemaURI = new URI(schemaURIString);
+    } catch (URISyntaxException e) {
+      throw UserException.validationError(e)
+          .build(logger);
+    }
+
+    FileDescrip file = negotiator.file();
+    DrillFileSystem fs = file.fileSystem();
+    URI fsSchemaURI = fs.getUri().resolve(dfdlSchemaURI);
+
+
+    DaffodilDataProcessorFactory dpf = new DaffodilDataProcessorFactory();
+    DataProcessor dp;
+    try {
+      dp = dpf.getDataProcessor(fsSchemaURI, validationMode, rootName, rootNamespace);
+    } catch (Exception e) {

Review Comment:
   Can we be more specific here?



##########
contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilDrillInfosetOutputter.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.daffodil;
+
+import org.apache.daffodil.runtime1.api.ComplexElementMetadata;
+import org.apache.daffodil.runtime1.api.ElementMetadata;
+import org.apache.daffodil.runtime1.api.InfosetArray;
+import org.apache.daffodil.runtime1.api.InfosetComplexElement;
+import org.apache.daffodil.japi.infoset.InfosetOutputter;
+import org.apache.daffodil.runtime1.api.InfosetSimpleElement;
+import org.apache.daffodil.runtime1.api.PrimitiveType;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaVisitor;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ColumnWriter;
+import org.apache.drill.exec.vector.accessor.ObjectType;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Stack;
+
+/**
+ * Adapts Daffodil parser infoset event calls to Drill writer calls
+ * to fill in Drill data rows.
+ */
+public class DaffodilDrillInfosetOutputter
+    extends InfosetOutputter {
+
+  private boolean isOriginalRoot() {
+    boolean result = currentTupleWriter() == rowSetWriter;
+    if (result)
+      assert(tupleWriterStack.size() == 1);
+    return result;
+  }
+
+  /**
+   * True if the next startComplex call will be for the
+   * DFDL infoset root element whose children are the columns of
+   * the row set.
+   */
+  private boolean isRootElement = true;
+
+  /**
+   * Stack that is used only if we have sub-structures that are not
+   * simple-type fields of the row.
+   */
+  private final Stack<TupleWriter> tupleWriterStack = new Stack<>();
+
+  private final Stack<ArrayWriter> arrayWriterStack = new Stack<>();
+
+  private TupleWriter currentTupleWriter() {
+    return tupleWriterStack.peek();
+  }
+
+  private ArrayWriter currentArrayWriter() {
+    return arrayWriterStack.peek();
+  }
+
+
+  private static final Logger logger = LoggerFactory.getLogger(DaffodilDrillInfosetOutputter.class);
+
+  private DaffodilDrillInfosetOutputter() {} // no default constructor
+
+  private RowSetLoader rowSetWriter;
+
+  public DaffodilDrillInfosetOutputter(RowSetLoader writer) {
+    this.rowSetWriter = writer;
+    this.tupleWriterStack.push(writer);
+  }
+
+  @Override
+  public void reset() {
+    tupleWriterStack.clear();
+    tupleWriterStack.push(rowSetWriter);
+    arrayWriterStack.clear();
+    this.isRootElement = true;
+    checkCleanState();
+  }
+
+  private void checkCleanState() {
+    assert(isOriginalRoot());
+    assert(arrayWriterStack.isEmpty());
+    assert(isRootElement);
+  }
+
+  @Override
+  public void startDocument() {
+    checkCleanState();
+  }
+
+  @Override
+  public void endDocument() {
+    checkCleanState();
+  }
+
+  private String colName(ElementMetadata md) {
+    return DrillDaffodilSchemaVisitor.makeColumnName(md);
+  }
+
+  @Override
+  public void startSimple(InfosetSimpleElement ise) {
+    assert (!isRootElement);
+    ElementMetadata md = ise.metadata();
+    String colName = colName(md);
+    ColumnWriter cw;
+    if (md.isArray()) {
+      // A simple type array
+      assert(!arrayWriterStack.isEmpty());
+      cw = currentArrayWriter().scalar();
+    } else {
+      // A simple element within a map
+      // Note the map itself might be an array
+      // but we don't care about that here.
+      cw = currentTupleWriter().column(colName);
+    }
+    ColumnMetadata cm = cw.schema();
+    assert(cm.isScalar());
+    if (md.isNillable() && ise.isNilled()) {
+      assert cm.isNullable();
+      cw.setNull();
+    } else {
+      convertDaffodilValueToDrillValue(ise, cm, cw);
+    }
+  }
+
+  @Override
+  public void endSimple(InfosetSimpleElement diSimple) {
+    assert (!isRootElement);
+    // do nothing
+  }
+
+  @Override
+  public void startComplex(InfosetComplexElement ce) {
+    ComplexElementMetadata md = ce.metadata();
+    String colName = colName(ce.metadata());
+    if (isRootElement) {
+      assert(isOriginalRoot());
+      // This complex element's corresponds to the root element of the
+      // DFDL schema. We don't treat this as a column of the row set.
+      // Rather, it's children are the columns of the row set.
+      //
+      // If we do nothing at all here, then we'll start getting
+      // even calls for the children.
+      isRootElement = false;
+      return;
+    }
+    if (md.isArray()) {
+      assert(!arrayWriterStack.isEmpty());
+      // FIXME: is this the way to add a complex array child item (i.e., each array item is a map)
+      tupleWriterStack.push(currentArrayWriter().tuple());
+    } else {
+      tupleWriterStack.push(currentTupleWriter().tuple(colName));
+    }
+  }
+
+  @Override
+  public void endComplex(InfosetComplexElement ce) {
+    ComplexElementMetadata md = ce.metadata();
+    if (isOriginalRoot()) {
+      isRootElement = true;
+      // do nothing else. The row gets closed-out in the DaffodilBatchReader.next() method.
+    } else {
+      // it's a map.
+      // We seem to not need to do anything to end the map. No action taken here works.
+      if (md.isArray()) {
+        assert (!arrayWriterStack.isEmpty());
+        currentArrayWriter().save(); // required for map array entries.
+      }
+      tupleWriterStack.pop();
+    }
+  }
+
+  @Override
+  public void startArray(InfosetArray diArray) {
+    ElementMetadata md = diArray.metadata();
+    assert (md.isArray());
+    // DFDL has no notion of an array directly within another array. A named field (map) is necessary
+    // before you can have another array.
+    assert (currentTupleWriter().type() == ObjectType.TUPLE); // parent is a map, or the top level row.
+    String colName = colName(md);
+    TupleWriter enclosingParentTupleWriter = currentTupleWriter();
+    ArrayWriter aw = enclosingParentTupleWriter.array(colName);
+    arrayWriterStack.push(aw);
+  }
+
+  @Override
+  public void endArray(InfosetArray ia) {
+    ElementMetadata md = ia.metadata();
+    assert (md.isArray());
+    assert (!arrayWriterStack.empty());
+    // FIXME: How do we end/close-out an array?
+    // note that each array instance, when the instance is a map, must have
+    // save called after it is written to the array but that happens
+    // in endComplex events since it must be called not once per array, but
+    // once per array item.
+    arrayWriterStack.pop();
+  }
+
+  private void convertDaffodilValueToDrillValue(InfosetSimpleElement ise, ColumnMetadata cm, ColumnWriter cw) {
+    PrimitiveType dafType = ise.metadata().primitiveType();
+    TypeProtos.MinorType drillType = DrillDaffodilSchemaUtils.getDrillDataType(dafType);
+    assert(drillType == cm.type());
+    switch (drillType) {
+    case INT: {
+      //
+      // FIXME: Javadoc for setObject says "primarily for testing"
+      // So how are we supposed to assign the column value then?
+      // Is there a way to get from a ColumnWriter to a typed scalar writer (downcast perhaps?)
+      cw.setObject(ise.getInt());
+      break;
+    }
+    case BIGINT: {
+      cw.setObject(ise.getLong());
+      break;
+    }
+    case SMALLINT: {
+      cw.setObject(ise.getShort());
+      break;
+    }
+    case TINYINT: {
+      cw.setObject(ise.getByte());
+      break;
+    }
+//        .put("UNSIGNEDLONG", TypeProtos.MinorType.UINT8)

Review Comment:
   Please remove... 



##########
contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilMessageParser.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.daffodil;
+
+
+import org.apache.daffodil.japi.DataProcessor;
+import org.apache.daffodil.japi.Diagnostic;
+import org.apache.daffodil.japi.ParseResult;
+import org.apache.daffodil.japi.infoset.InfosetOutputter;
+import org.apache.daffodil.japi.io.InputSourceDataInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * DFDL Daffodil Streaming message parser
+ * <br/>
+ * You construct this providing a DataProcessor obtained from the
+ * DaffodilDataProcessorFactory.
+ * The DataProcessor contains the compiled DFDL schema, ready to use, as
+ * well as whether validation while parsing has been requested.
+ * <br/>
+ * The DataProcessor object may be shared/reused by multiple threads each of which
+ * has its own copy of this class.
+ * This object is, however, stateful, and must not be shared by multiple threads.
+ * <br/>
+ * You must call setInputStream, and setInfosetOutputter before
+ * you call parse().
+ * The input stream and the InfosetOutputter objects are also private to one thread and are stateful
+ * and owned by this object.
+ * Once you have called setInputStream, you should view the input stream as the private property of
+ * this object.
+ * The parse() will invoke the InfosetOutputter's methods to deliver
+ * parsed data, and it may optionally create diagnostics (obtained via getDiagnostics)
+ * indicating which kind they are via the getIsProcessingError, getIsValidationError.
+ * <br/>
+ * Note that the InfosetOutputter may be called many times before a processing error is detected,
+ * as Daffodil delivers result data incrementally.
+ * <br/>
+ * Validation errors do not affect the InfosetOutputter output calls, but indicate that data was
+ * detected that is invalid.
+ * <br/>
+ * When parse() returns, the parse has ended and one can check for errors/diagnostics.
+ * One can call parse() again if there is still data to consume, which is checked via the
+ * isEOF() method.
+ * <br/>
+ * There are no guarantees about where the input stream is positioned between parse() calls.
+ * In particular, it may not be positioned at the start of the next message, as Daffodil may
+ * have pre-fetched additional bytes from the input stream which it found are not part of the
+ * current infoset, but the next one.
+ * The positioning of the input stream may in fact be somewhere in the middle of a byte,
+ * as Daffodil does not require messages to be of lengths that are in whole byte units.
+ * Hence, once you give the input stream to this object via setInputStream, that input stream is
+ * owned privately by this class for ever after.
+ */
+public class DaffodilMessageParser {
+
+  /**
+   * Constructs the parser using a DataProcessor obtained from
+   * a DaffodilDataProcessorFactory.
+   * @param dp
+   */
+  DaffodilMessageParser(DataProcessor dp) {
+    this.dp = dp;
+  }
+
+  /**
+   * Provide the input stream from which data is to be parsed.
+   * <br/>
+   * This input stream is then owned by this object and becomes part of its state.
+   * <br/>
+   * It is; however, the responsibility of the caller to close this
+   * input stream after the completion of all parse calls.
+   * In particular, if a parse error is considered fatal, then
+   * the caller should close the input stream.
+   * There are advanced error-recovery techniques that may attempt to find
+   * data that can be parsed later in the data stream.
+   * In those cases the input stream would not be closed after a processing error,
+   * but such usage is beyond the scope of this javadoc.
+   * @param inputStream
+   */
+  public void setInputStream(InputStream inputStream) {
+    dis = new InputSourceDataInputStream(inputStream);
+  }
+
+  /**
+   * Provides the InfosetOutputter which will be called to deliver
+   * the Infoset via calls to its methods.
+   * @param outputter
+   */
+  public void setInfosetOutputter(InfosetOutputter outputter) {
+    this.outputter = outputter;
+  }
+
+  /**
+   * Called to pull messages from the data stream.
+   * The message 'Infoset' is delivered by way of calls to the InfosetOutputter's methods.
+   * <br/>
+   * After calling this, one may call getIsProcessingError, getIsValiationError, isEOF, and
+   * getDiagnostics.
+   */
+  public void parse() {
+    if (dis == null)
+      throw new IllegalStateException("Input stream must be provided by setInputStream() call.");
+    if (outputter == null)
+      throw new IllegalStateException("InfosetOutputter must be provided by setInfosetOutputter() call.");
+
+    reset();
+    ParseResult res = dp.parse(dis, outputter);
+    isProcessingError = res.isProcessingError();
+    isValidationError = res.isValidationError();
+    diagnostics = res.getDiagnostics();
+  }
+
+  /**
+   * True if the input stream is known to contain no more data.
+   * If the input stream is a true stream, not a file, then temporary unavailability of data
+   * may cause this call to block until the stream is closed from the other end, or data becomes
+   * available.
+   * <br/>
+   * False if the input stream is at EOF, and no more data can be obtained.
+   * It is an error to call parse() after isEOF has returned true.
+   * @return
+   */
+  public boolean isEOF() {
+    return !dis.hasData();
+  }
+
+  /**
+   * True if the parse() call failed with a processing error.
+   * This indicates that the data was not well-formed and could not be
+   * parsed successfully.
+   * <br/>
+   * It is possible for isProcessingError and isValidationError to both be true.
+   * @return
+   */
+  public boolean isProcessingError() { return isProcessingError; }
+
+  /**
+   * True if a validation error occurred during parsing.
+   * Subsequently to a validation error occurring, parsing may succeed or fail.
+   * after the validation error was detected.
+   * @return
+   */
+  public boolean isValidationError() { return isValidationError; }
+
+  /**
+   * After a parse() call this returns null or a list of 1 or more diagnostics.
+   * <br/>
+   * If isProcessingError or isValidationError are true, then this will contain at least 1
+   * diagnostic.
+   * If both are true this will contain at least 2 diagnostics.
+   * @return
+   */
+  public List<Diagnostic> getDiagnostics() { return diagnostics;  }
+  public String getDiagnosticsAsString() {
+    String result = diagnostics.stream()
+        .map(Diagnostic::getMessage)
+        .collect(Collectors.joining("\n"));
+    return result;
+  }
+
+
+  private static final Logger logger = LoggerFactory.getLogger(DaffodilMessageParser.class);

Review Comment:
   See above comment about Drill style for classes.



##########
contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilBatchReader.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil;
+
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Objects;
+
+import org.apache.daffodil.japi.DataProcessor;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.daffodil.schema.DaffodilDataProcessorFactory;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils.daffodilDataProcessorToDrillSchema;
+
+
+public class DaffodilBatchReader implements ManagedReader {
+
+  private static final Logger logger = LoggerFactory.getLogger(DaffodilBatchReader.class);
+  private final DaffodilFormatConfig dafConfig;
+  private final RowSetLoader rowSetLoader;
+  private final CustomErrorContext errorContext;
+  private final DaffodilMessageParser dafParser;
+  private final InputStream dataInputStream;
+
+  static class DaffodilReaderConfig {
+    final DaffodilFormatPlugin plugin;
+    DaffodilReaderConfig(DaffodilFormatPlugin plugin) {
+      this.plugin = plugin;
+    }
+  }
+
+  public DaffodilBatchReader (DaffodilReaderConfig readerConfig, EasySubScan scan, FileSchemaNegotiator negotiator) {
+
+    errorContext = negotiator.parentErrorContext();
+    this.dafConfig = readerConfig.plugin.getConfig();
+
+    String schemaURIString = dafConfig.getSchemaURI(); // "schema/complexArray1.dfdl.xsd";
+    String rootName = dafConfig.getRootName();
+    String rootNamespace = dafConfig.getRootNamespace();
+    boolean validationMode = dafConfig.getValidationMode();
+
+    URI dfdlSchemaURI;
+    try {
+      dfdlSchemaURI = new URI(schemaURIString);
+    } catch (URISyntaxException e) {
+      throw UserException.validationError(e)
+          .build(logger);
+    }
+
+    FileDescrip file = negotiator.file();
+    DrillFileSystem fs = file.fileSystem();
+    URI fsSchemaURI = fs.getUri().resolve(dfdlSchemaURI);
+
+
+    DaffodilDataProcessorFactory dpf = new DaffodilDataProcessorFactory();
+    DataProcessor dp;
+    try {
+      dp = dpf.getDataProcessor(fsSchemaURI, validationMode, rootName, rootNamespace);
+    } catch (Exception e) {
+      throw UserException.dataReadError(e)
+          .message(String.format("Failed to get Daffodil DFDL processor for: %s", fsSchemaURI))
+          .addContext(errorContext).addContext(e.getMessage()).build(logger);
+    }
+    // Create the corresponding Drill schema.
+    // Note: this could be a very large schema. Think of a large complex RDBMS schema,
+    // all of it, hundreds of tables, but all part of the same metadata tree.
+    TupleMetadata drillSchema = daffodilDataProcessorToDrillSchema(dp);
+    // Inform Drill about the schema
+    negotiator.tableSchema(drillSchema, true);
+
+    //
+    // DATA TIME: Next we construct the runtime objects, and open files.
+    //
+    // We get the DaffodilMessageParser, which is a stateful driver for daffodil that
+    // actually does the parsing.
+    rowSetLoader = negotiator.build().writer();
+
+    // We construct the Daffodil InfosetOutputter which the daffodil parser uses to
+    // convert infoset event calls to fill in a Drill row via a rowSetLoader.
+    DaffodilDrillInfosetOutputter outputter = new DaffodilDrillInfosetOutputter(rowSetLoader);
+
+    // Now we can setup the dafParser with the outputter it will drive with
+    // the parser-produced infoset.
+    dafParser = new DaffodilMessageParser(dp); // needs further initialization after this.
+    dafParser.setInfosetOutputter(outputter);
+
+    Path dataPath = file.split().getPath();
+    // Lastly, we open the data stream
+    try {
+      dataInputStream = fs.openPossiblyCompressedStream(dataPath);
+    } catch (Exception e) {
+      throw UserException.dataReadError(e)
+          .message(String.format("Failed to open input file: %s", dataPath.toString()))
+          .addContext(errorContext).addContext(e.getMessage()).build(logger);
+    }
+    // And lastly,... tell daffodil the input data stream.
+    dafParser.setInputStream(dataInputStream);
+  }
+
+
+  /**
+   * This is the core of actual processing - data movement from Daffodil to Drill.
+   * <p>
+   * If there is space in the batch, and there is data available to parse
+   * then this calls the daffodil parser, which parses data, delivering it to the rowWriter
+   * by way of the infoset outputter.
+   * <p>
+   * Repeats until the rowWriter is full (a batch is full), or there is no more data, or
+   * a parse error ends execution with a throw.
+   * <p>
+   * Validation errors and other warnings are not errors and are logged but do not cause
+   * parsing to fail/throw.
+   * @return true if there are rows retrieved, false if no rows were retrieved, which means
+   * no more will ever be retrieved (end of data).
+   * @throws RuntimeException on parse errors.
+   */
+  @Override
+  public boolean next() {
+    // Check assumed invariants
+    // We don't know if there is data or not. This could be called on an empty data file.
+    // We DO know that this won't be called if there is no space in the batch for even 1
+    // row.
+    if (dafParser.isEOF()) {
+      return false; // return without even checking for more rows or trying to parse.
+    }
+    while (rowSetLoader.start() && !dafParser.isEOF()) { // we never zero-trip this loop.
+      // the predicate is always true once.
+      try {
+        dafParser.parse();
+        if (dafParser.isProcessingError()) {
+          assert(Objects.nonNull(dafParser.getDiagnostics()));
+          throw UserException.dataReadError().message(dafParser.getDiagnosticsAsString())
+              .addContext(errorContext).build(logger);
+        }
+        if (dafParser.isValidationError()) {
+          logger.warn(dafParser.getDiagnosticsAsString());
+          // Note that even if daffodil is set to not validate, validation errors may still occur
+          // from DFDL's "recoverableError" assertions.
+        }
+      } catch (Exception e) {
+        throw UserException.dataReadError(e).message("Error parsing file: " + e.getMessage())
+            .addContext(errorContext).build(logger);
+      }
+      rowSetLoader.save();
+    }
+    int nRows = rowSetLoader.rowCount();

Review Comment:
   Do we need this logic here?



##########
contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilBatchReader.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil;
+
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Objects;
+
+import org.apache.daffodil.japi.DataProcessor;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.daffodil.schema.DaffodilDataProcessorFactory;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils.daffodilDataProcessorToDrillSchema;
+
+
+public class DaffodilBatchReader implements ManagedReader {
+
+  private static final Logger logger = LoggerFactory.getLogger(DaffodilBatchReader.class);
+  private final DaffodilFormatConfig dafConfig;
+  private final RowSetLoader rowSetLoader;
+  private final CustomErrorContext errorContext;
+  private final DaffodilMessageParser dafParser;
+  private final InputStream dataInputStream;
+
+  static class DaffodilReaderConfig {
+    final DaffodilFormatPlugin plugin;
+    DaffodilReaderConfig(DaffodilFormatPlugin plugin) {
+      this.plugin = plugin;
+    }
+  }
+
+  public DaffodilBatchReader (DaffodilReaderConfig readerConfig, EasySubScan scan, FileSchemaNegotiator negotiator) {
+
+    errorContext = negotiator.parentErrorContext();
+    this.dafConfig = readerConfig.plugin.getConfig();
+
+    String schemaURIString = dafConfig.getSchemaURI(); // "schema/complexArray1.dfdl.xsd";
+    String rootName = dafConfig.getRootName();
+    String rootNamespace = dafConfig.getRootNamespace();
+    boolean validationMode = dafConfig.getValidationMode();
+
+    URI dfdlSchemaURI;
+    try {
+      dfdlSchemaURI = new URI(schemaURIString);
+    } catch (URISyntaxException e) {
+      throw UserException.validationError(e)
+          .build(logger);
+    }
+
+    FileDescrip file = negotiator.file();
+    DrillFileSystem fs = file.fileSystem();
+    URI fsSchemaURI = fs.getUri().resolve(dfdlSchemaURI);
+
+
+    DaffodilDataProcessorFactory dpf = new DaffodilDataProcessorFactory();
+    DataProcessor dp;
+    try {
+      dp = dpf.getDataProcessor(fsSchemaURI, validationMode, rootName, rootNamespace);
+    } catch (Exception e) {
+      throw UserException.dataReadError(e)
+          .message(String.format("Failed to get Daffodil DFDL processor for: %s", fsSchemaURI))
+          .addContext(errorContext).addContext(e.getMessage()).build(logger);
+    }
+    // Create the corresponding Drill schema.
+    // Note: this could be a very large schema. Think of a large complex RDBMS schema,
+    // all of it, hundreds of tables, but all part of the same metadata tree.
+    TupleMetadata drillSchema = daffodilDataProcessorToDrillSchema(dp);
+    // Inform Drill about the schema
+    negotiator.tableSchema(drillSchema, true);
+
+    //
+    // DATA TIME: Next we construct the runtime objects, and open files.
+    //
+    // We get the DaffodilMessageParser, which is a stateful driver for daffodil that
+    // actually does the parsing.
+    rowSetLoader = negotiator.build().writer();
+
+    // We construct the Daffodil InfosetOutputter which the daffodil parser uses to
+    // convert infoset event calls to fill in a Drill row via a rowSetLoader.
+    DaffodilDrillInfosetOutputter outputter = new DaffodilDrillInfosetOutputter(rowSetLoader);
+
+    // Now we can setup the dafParser with the outputter it will drive with
+    // the parser-produced infoset.
+    dafParser = new DaffodilMessageParser(dp); // needs further initialization after this.
+    dafParser.setInfosetOutputter(outputter);
+
+    Path dataPath = file.split().getPath();
+    // Lastly, we open the data stream
+    try {
+      dataInputStream = fs.openPossiblyCompressedStream(dataPath);
+    } catch (Exception e) {
+      throw UserException.dataReadError(e)
+          .message(String.format("Failed to open input file: %s", dataPath.toString()))
+          .addContext(errorContext).addContext(e.getMessage()).build(logger);
+    }
+    // And lastly,... tell daffodil the input data stream.
+    dafParser.setInputStream(dataInputStream);
+  }
+
+
+  /**
+   * This is the core of actual processing - data movement from Daffodil to Drill.
+   * <p>
+   * If there is space in the batch, and there is data available to parse
+   * then this calls the daffodil parser, which parses data, delivering it to the rowWriter
+   * by way of the infoset outputter.
+   * <p>
+   * Repeats until the rowWriter is full (a batch is full), or there is no more data, or
+   * a parse error ends execution with a throw.
+   * <p>
+   * Validation errors and other warnings are not errors and are logged but do not cause
+   * parsing to fail/throw.
+   * @return true if there are rows retrieved, false if no rows were retrieved, which means
+   * no more will ever be retrieved (end of data).
+   * @throws RuntimeException on parse errors.
+   */
+  @Override
+  public boolean next() {
+    // Check assumed invariants
+    // We don't know if there is data or not. This could be called on an empty data file.
+    // We DO know that this won't be called if there is no space in the batch for even 1
+    // row.
+    if (dafParser.isEOF()) {
+      return false; // return without even checking for more rows or trying to parse.
+    }
+    while (rowSetLoader.start() && !dafParser.isEOF()) { // we never zero-trip this loop.
+      // the predicate is always true once.
+      try {
+        dafParser.parse();
+        if (dafParser.isProcessingError()) {
+          assert(Objects.nonNull(dafParser.getDiagnostics()));
+          throw UserException.dataReadError().message(dafParser.getDiagnosticsAsString())
+              .addContext(errorContext).build(logger);
+        }
+        if (dafParser.isValidationError()) {
+          logger.warn(dafParser.getDiagnosticsAsString());
+          // Note that even if daffodil is set to not validate, validation errors may still occur
+          // from DFDL's "recoverableError" assertions.
+        }
+      } catch (Exception e) {
+        throw UserException.dataReadError(e).message("Error parsing file: " + e.getMessage())
+            .addContext(errorContext).build(logger);
+      }
+      rowSetLoader.save();
+    }
+    int nRows = rowSetLoader.rowCount();
+    assert nRows > 0; // This cannot be zero. If the parse failed we will have already thrown out of here.
+    return true;
+  }
+
+  @Override
+  public void close() {
+    AutoCloseables.closeSilently(dataInputStream);

Review Comment:
   Do we need to close the Daffodil parser, or is it ok to leave that?



##########
contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilDrillInfosetOutputter.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.daffodil;
+
+import org.apache.daffodil.runtime1.api.ComplexElementMetadata;
+import org.apache.daffodil.runtime1.api.ElementMetadata;
+import org.apache.daffodil.runtime1.api.InfosetArray;
+import org.apache.daffodil.runtime1.api.InfosetComplexElement;
+import org.apache.daffodil.japi.infoset.InfosetOutputter;
+import org.apache.daffodil.runtime1.api.InfosetSimpleElement;
+import org.apache.daffodil.runtime1.api.PrimitiveType;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaVisitor;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ColumnWriter;
+import org.apache.drill.exec.vector.accessor.ObjectType;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Stack;
+
+/**
+ * Adapts Daffodil parser infoset event calls to Drill writer calls
+ * to fill in Drill data rows.
+ */
+public class DaffodilDrillInfosetOutputter
+    extends InfosetOutputter {
+
+  private boolean isOriginalRoot() {
+    boolean result = currentTupleWriter() == rowSetWriter;
+    if (result)
+      assert(tupleWriterStack.size() == 1);
+    return result;
+  }
+
+  /**
+   * True if the next startComplex call will be for the
+   * DFDL infoset root element whose children are the columns of
+   * the row set.
+   */
+  private boolean isRootElement = true;
+
+  /**
+   * Stack that is used only if we have sub-structures that are not
+   * simple-type fields of the row.
+   */
+  private final Stack<TupleWriter> tupleWriterStack = new Stack<>();
+
+  private final Stack<ArrayWriter> arrayWriterStack = new Stack<>();
+
+  private TupleWriter currentTupleWriter() {
+    return tupleWriterStack.peek();
+  }
+
+  private ArrayWriter currentArrayWriter() {
+    return arrayWriterStack.peek();
+  }
+
+
+  private static final Logger logger = LoggerFactory.getLogger(DaffodilDrillInfosetOutputter.class);
+
+  private DaffodilDrillInfosetOutputter() {} // no default constructor
+
+  private RowSetLoader rowSetWriter;
+
+  public DaffodilDrillInfosetOutputter(RowSetLoader writer) {
+    this.rowSetWriter = writer;
+    this.tupleWriterStack.push(writer);
+  }
+
+  @Override
+  public void reset() {
+    tupleWriterStack.clear();
+    tupleWriterStack.push(rowSetWriter);
+    arrayWriterStack.clear();
+    this.isRootElement = true;
+    checkCleanState();
+  }
+
+  private void checkCleanState() {
+    assert(isOriginalRoot());
+    assert(arrayWriterStack.isEmpty());
+    assert(isRootElement);
+  }
+
+  @Override
+  public void startDocument() {
+    checkCleanState();
+  }
+
+  @Override
+  public void endDocument() {
+    checkCleanState();
+  }
+
+  private String colName(ElementMetadata md) {
+    return DrillDaffodilSchemaVisitor.makeColumnName(md);
+  }
+
+  @Override
+  public void startSimple(InfosetSimpleElement ise) {
+    assert (!isRootElement);
+    ElementMetadata md = ise.metadata();
+    String colName = colName(md);
+    ColumnWriter cw;
+    if (md.isArray()) {
+      // A simple type array
+      assert(!arrayWriterStack.isEmpty());
+      cw = currentArrayWriter().scalar();
+    } else {
+      // A simple element within a map
+      // Note the map itself might be an array
+      // but we don't care about that here.
+      cw = currentTupleWriter().column(colName);
+    }
+    ColumnMetadata cm = cw.schema();
+    assert(cm.isScalar());
+    if (md.isNillable() && ise.isNilled()) {
+      assert cm.isNullable();
+      cw.setNull();
+    } else {
+      convertDaffodilValueToDrillValue(ise, cm, cw);
+    }
+  }
+
+  @Override
+  public void endSimple(InfosetSimpleElement diSimple) {
+    assert (!isRootElement);
+    // do nothing
+  }
+
+  @Override
+  public void startComplex(InfosetComplexElement ce) {
+    ComplexElementMetadata md = ce.metadata();
+    String colName = colName(ce.metadata());
+    if (isRootElement) {
+      assert(isOriginalRoot());
+      // This complex element's corresponds to the root element of the
+      // DFDL schema. We don't treat this as a column of the row set.
+      // Rather, it's children are the columns of the row set.
+      //
+      // If we do nothing at all here, then we'll start getting
+      // even calls for the children.
+      isRootElement = false;
+      return;
+    }
+    if (md.isArray()) {
+      assert(!arrayWriterStack.isEmpty());
+      // FIXME: is this the way to add a complex array child item (i.e., each array item is a map)
+      tupleWriterStack.push(currentArrayWriter().tuple());
+    } else {
+      tupleWriterStack.push(currentTupleWriter().tuple(colName));
+    }
+  }
+
+  @Override
+  public void endComplex(InfosetComplexElement ce) {
+    ComplexElementMetadata md = ce.metadata();
+    if (isOriginalRoot()) {
+      isRootElement = true;
+      // do nothing else. The row gets closed-out in the DaffodilBatchReader.next() method.
+    } else {
+      // it's a map.
+      // We seem to not need to do anything to end the map. No action taken here works.
+      if (md.isArray()) {
+        assert (!arrayWriterStack.isEmpty());
+        currentArrayWriter().save(); // required for map array entries.
+      }
+      tupleWriterStack.pop();
+    }
+  }
+
+  @Override
+  public void startArray(InfosetArray diArray) {
+    ElementMetadata md = diArray.metadata();
+    assert (md.isArray());
+    // DFDL has no notion of an array directly within another array. A named field (map) is necessary
+    // before you can have another array.
+    assert (currentTupleWriter().type() == ObjectType.TUPLE); // parent is a map, or the top level row.
+    String colName = colName(md);
+    TupleWriter enclosingParentTupleWriter = currentTupleWriter();
+    ArrayWriter aw = enclosingParentTupleWriter.array(colName);
+    arrayWriterStack.push(aw);
+  }
+
+  @Override
+  public void endArray(InfosetArray ia) {
+    ElementMetadata md = ia.metadata();
+    assert (md.isArray());
+    assert (!arrayWriterStack.empty());
+    // FIXME: How do we end/close-out an array?
+    // note that each array instance, when the instance is a map, must have
+    // save called after it is written to the array but that happens
+    // in endComplex events since it must be called not once per array, but
+    // once per array item.
+    arrayWriterStack.pop();
+  }
+
+  private void convertDaffodilValueToDrillValue(InfosetSimpleElement ise, ColumnMetadata cm, ColumnWriter cw) {
+    PrimitiveType dafType = ise.metadata().primitiveType();
+    TypeProtos.MinorType drillType = DrillDaffodilSchemaUtils.getDrillDataType(dafType);
+    assert(drillType == cm.type());
+    switch (drillType) {
+    case INT: {
+      //
+      // FIXME: Javadoc for setObject says "primarily for testing"
+      // So how are we supposed to assign the column value then?
+      // Is there a way to get from a ColumnWriter to a typed scalar writer (downcast perhaps?)
+      cw.setObject(ise.getInt());

Review Comment:
   For these methods here, I seem to recall that Drill has dedicated set methods for each data type.  Is there a reason why you chose the `setObject()`?  I suspect that if there are methods for each data type, we should probably use them since setObject probably does a type check anyway.



##########
contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilDrillInfosetOutputter.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.daffodil;
+
+import org.apache.daffodil.runtime1.api.ComplexElementMetadata;
+import org.apache.daffodil.runtime1.api.ElementMetadata;
+import org.apache.daffodil.runtime1.api.InfosetArray;
+import org.apache.daffodil.runtime1.api.InfosetComplexElement;
+import org.apache.daffodil.japi.infoset.InfosetOutputter;
+import org.apache.daffodil.runtime1.api.InfosetSimpleElement;
+import org.apache.daffodil.runtime1.api.PrimitiveType;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaVisitor;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ColumnWriter;
+import org.apache.drill.exec.vector.accessor.ObjectType;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Stack;
+
+/**
+ * Adapts Daffodil parser infoset event calls to Drill writer calls
+ * to fill in Drill data rows.
+ */
+public class DaffodilDrillInfosetOutputter
+    extends InfosetOutputter {
+
+  private boolean isOriginalRoot() {
+    boolean result = currentTupleWriter() == rowSetWriter;

Review Comment:
   Nit:  Please use formatting consistent with Drill's style for this class.  Usually we put the logger at the beginning, then constructors. etc.. 



##########
contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/TestDaffodilReader.java:
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil;
+
+import org.apache.drill.categories.RowSetTest;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetReader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.nio.file.Paths;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+@Category(RowSetTest.class)
+public class TestDaffodilReader extends ClusterTest {
+
+  String schemaURIRoot = "file:///opt/drill/contrib/format-daffodil/src/test/resources/";
+  @BeforeClass
+  public static void setup() throws Exception {
+    // boilerplate call to start test rig
+    ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
+
+    DaffodilFormatConfig formatConfig =
+        new DaffodilFormatConfig(null,
+            "",
+            "",
+            "",
+            false);
+
+    cluster.defineFormat("dfs", "daffodil", formatConfig);
+
+    // Needed to test against compressed files.
+    // Copies data from src/test/resources to the dfs root.
+    dirTestWatcher.copyResourceToRoot(Paths.get("data/"));
+    dirTestWatcher.copyResourceToRoot(Paths.get("schema/"));
+  }
+
+  private String selectRow(String schema, String file) {
+    return "SELECT * FROM table(dfs.`data/" + file + "` " +
+        " (type => 'daffodil'," +
+        " validationMode => 'true', " +
+        " schemaURI => '" + schemaURIRoot + "schema/" + schema + ".dfdl.xsd'," +
+        " rootName => 'row'," +
+        " rootNamespace => null " +
+        "))";
+  }
+
+  /**
+   * This unit test tests a simple data file
+   *
+   * @throws Exception Throw exception if anything goes wrong
+   */
+  @Test
+  public void testSimpleQuery1() throws Exception {
+
+    QueryBuilder qb = client.queryBuilder();
+    QueryBuilder query = qb.sql(selectRow("simple", "data01Int.dat.gz"));
+    RowSet results = query.rowSet();
+    results.print();
+    assertEquals(1, results.rowCount());
+
+    // create the expected metadata and data for this test
+    // metadata first
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .add("col", MinorType.INT)
+        .buildSchema();
+
+    RowSet expected = client.rowSetBuilder(expectedSchema)
+        .addRow(0x00000101) // aka 257
+        .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testSimpleQuery2() throws Exception {
+
+    QueryBuilder qb = client.queryBuilder();
+    QueryBuilder query = qb.sql(selectRow("simple","data06Int.dat"));
+    RowSet results = query.rowSet();
+    results.print();
+    assertEquals(6, results.rowCount());
+
+    // create the expected metadata and data for this test
+    // metadata first
+    TupleMetadata expectedSchema = new SchemaBuilder()
+            .add("col", MinorType.INT)
+            .buildSchema();
+
+    RowSet expected = client.rowSetBuilder(expectedSchema)
+            .addRow(0x00000101)
+            .addRow(0x00000102)
+            .addRow(0x00000103)
+            .addRow(0x00000104)
+            .addRow(0x00000105)
+            .addRow(0x00000106)
+            .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testComplexQuery1() throws Exception {
+
+    QueryBuilder qb = client.queryBuilder();
+    QueryBuilder query = qb.sql(selectRow("complex1", "data02Int.dat"));
+    RowSet results = query.rowSet();
+    results.print();
+    assertEquals(1, results.rowCount());
+
+    RowSetReader rdr = results.reader();
+    rdr.next();
+    String col = rdr.getAsString();
+    assertEquals("{257, 258}", col);
+    assertFalse(rdr.next());
+    results.clear();
+  }
+
+  @Test
+  public void testComplexQuery2() throws Exception {
+
+    QueryBuilder qb = client.queryBuilder();
+    QueryBuilder query = qb.sql(selectRow("complex1", "data06Int.dat"));
+    RowSet results = query.rowSet();
+    results.print();

Review Comment:
   Please remove output from unit tests when they are ready.



##########
contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilDrillInfosetOutputter.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.daffodil;
+
+import org.apache.daffodil.runtime1.api.ComplexElementMetadata;
+import org.apache.daffodil.runtime1.api.ElementMetadata;
+import org.apache.daffodil.runtime1.api.InfosetArray;
+import org.apache.daffodil.runtime1.api.InfosetComplexElement;
+import org.apache.daffodil.japi.infoset.InfosetOutputter;
+import org.apache.daffodil.runtime1.api.InfosetSimpleElement;
+import org.apache.daffodil.runtime1.api.PrimitiveType;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaVisitor;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ColumnWriter;
+import org.apache.drill.exec.vector.accessor.ObjectType;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Stack;
+
+/**
+ * Adapts Daffodil parser infoset event calls to Drill writer calls
+ * to fill in Drill data rows.
+ */
+public class DaffodilDrillInfosetOutputter
+    extends InfosetOutputter {
+
+  private boolean isOriginalRoot() {
+    boolean result = currentTupleWriter() == rowSetWriter;
+    if (result)
+      assert(tupleWriterStack.size() == 1);
+    return result;
+  }
+
+  /**
+   * True if the next startComplex call will be for the
+   * DFDL infoset root element whose children are the columns of
+   * the row set.
+   */
+  private boolean isRootElement = true;
+
+  /**
+   * Stack that is used only if we have sub-structures that are not
+   * simple-type fields of the row.
+   */
+  private final Stack<TupleWriter> tupleWriterStack = new Stack<>();
+
+  private final Stack<ArrayWriter> arrayWriterStack = new Stack<>();
+
+  private TupleWriter currentTupleWriter() {
+    return tupleWriterStack.peek();
+  }
+
+  private ArrayWriter currentArrayWriter() {
+    return arrayWriterStack.peek();
+  }
+
+
+  private static final Logger logger = LoggerFactory.getLogger(DaffodilDrillInfosetOutputter.class);
+
+  private DaffodilDrillInfosetOutputter() {} // no default constructor
+
+  private RowSetLoader rowSetWriter;
+
+  public DaffodilDrillInfosetOutputter(RowSetLoader writer) {
+    this.rowSetWriter = writer;
+    this.tupleWriterStack.push(writer);
+  }
+
+  @Override
+  public void reset() {
+    tupleWriterStack.clear();
+    tupleWriterStack.push(rowSetWriter);
+    arrayWriterStack.clear();
+    this.isRootElement = true;
+    checkCleanState();
+  }
+
+  private void checkCleanState() {
+    assert(isOriginalRoot());
+    assert(arrayWriterStack.isEmpty());
+    assert(isRootElement);
+  }
+
+  @Override
+  public void startDocument() {
+    checkCleanState();
+  }
+
+  @Override
+  public void endDocument() {
+    checkCleanState();
+  }
+
+  private String colName(ElementMetadata md) {
+    return DrillDaffodilSchemaVisitor.makeColumnName(md);
+  }
+
+  @Override
+  public void startSimple(InfosetSimpleElement ise) {
+    assert (!isRootElement);
+    ElementMetadata md = ise.metadata();
+    String colName = colName(md);
+    ColumnWriter cw;
+    if (md.isArray()) {
+      // A simple type array
+      assert(!arrayWriterStack.isEmpty());
+      cw = currentArrayWriter().scalar();
+    } else {
+      // A simple element within a map
+      // Note the map itself might be an array
+      // but we don't care about that here.
+      cw = currentTupleWriter().column(colName);
+    }
+    ColumnMetadata cm = cw.schema();
+    assert(cm.isScalar());
+    if (md.isNillable() && ise.isNilled()) {
+      assert cm.isNullable();
+      cw.setNull();
+    } else {
+      convertDaffodilValueToDrillValue(ise, cm, cw);
+    }
+  }
+
+  @Override
+  public void endSimple(InfosetSimpleElement diSimple) {
+    assert (!isRootElement);
+    // do nothing
+  }
+
+  @Override
+  public void startComplex(InfosetComplexElement ce) {
+    ComplexElementMetadata md = ce.metadata();
+    String colName = colName(ce.metadata());
+    if (isRootElement) {
+      assert(isOriginalRoot());
+      // This complex element's corresponds to the root element of the
+      // DFDL schema. We don't treat this as a column of the row set.
+      // Rather, it's children are the columns of the row set.
+      //
+      // If we do nothing at all here, then we'll start getting
+      // even calls for the children.
+      isRootElement = false;
+      return;
+    }
+    if (md.isArray()) {
+      assert(!arrayWriterStack.isEmpty());
+      // FIXME: is this the way to add a complex array child item (i.e., each array item is a map)
+      tupleWriterStack.push(currentArrayWriter().tuple());
+    } else {
+      tupleWriterStack.push(currentTupleWriter().tuple(colName));
+    }
+  }
+
+  @Override
+  public void endComplex(InfosetComplexElement ce) {
+    ComplexElementMetadata md = ce.metadata();
+    if (isOriginalRoot()) {
+      isRootElement = true;
+      // do nothing else. The row gets closed-out in the DaffodilBatchReader.next() method.
+    } else {
+      // it's a map.
+      // We seem to not need to do anything to end the map. No action taken here works.
+      if (md.isArray()) {
+        assert (!arrayWriterStack.isEmpty());
+        currentArrayWriter().save(); // required for map array entries.
+      }
+      tupleWriterStack.pop();
+    }
+  }
+
+  @Override
+  public void startArray(InfosetArray diArray) {
+    ElementMetadata md = diArray.metadata();
+    assert (md.isArray());
+    // DFDL has no notion of an array directly within another array. A named field (map) is necessary
+    // before you can have another array.
+    assert (currentTupleWriter().type() == ObjectType.TUPLE); // parent is a map, or the top level row.
+    String colName = colName(md);
+    TupleWriter enclosingParentTupleWriter = currentTupleWriter();
+    ArrayWriter aw = enclosingParentTupleWriter.array(colName);
+    arrayWriterStack.push(aw);
+  }
+
+  @Override
+  public void endArray(InfosetArray ia) {
+    ElementMetadata md = ia.metadata();
+    assert (md.isArray());
+    assert (!arrayWriterStack.empty());
+    // FIXME: How do we end/close-out an array?
+    // note that each array instance, when the instance is a map, must have
+    // save called after it is written to the array but that happens
+    // in endComplex events since it must be called not once per array, but
+    // once per array item.
+    arrayWriterStack.pop();
+  }
+
+  private void convertDaffodilValueToDrillValue(InfosetSimpleElement ise, ColumnMetadata cm, ColumnWriter cw) {
+    PrimitiveType dafType = ise.metadata().primitiveType();
+    TypeProtos.MinorType drillType = DrillDaffodilSchemaUtils.getDrillDataType(dafType);
+    assert(drillType == cm.type());
+    switch (drillType) {
+    case INT: {
+      //
+      // FIXME: Javadoc for setObject says "primarily for testing"
+      // So how are we supposed to assign the column value then?
+      // Is there a way to get from a ColumnWriter to a typed scalar writer (downcast perhaps?)
+      cw.setObject(ise.getInt());
+      break;
+    }
+    case BIGINT: {
+      cw.setObject(ise.getLong());
+      break;
+    }
+    case SMALLINT: {
+      cw.setObject(ise.getShort());
+      break;
+    }
+    case TINYINT: {
+      cw.setObject(ise.getByte());
+      break;
+    }
+//        .put("UNSIGNEDLONG", TypeProtos.MinorType.UINT8)
+//        .put("UNSIGNEDINT", TypeProtos.MinorType.UINT4)
+//        .put("UNSIGNEDSHORT", TypeProtos.MinorType.UINT2)
+//        .put("UNSIGNEDBYTE", TypeProtos.MinorType.UINT1)
+//        .put("INTEGER", TypeProtos.MinorType.BIGINT)
+//        .put("NONNEGATIVEINTEGER", TypeProtos.MinorType.BIGINT)
+    case BIT: {
+      cw.setObject(ise.getBoolean());
+      break;
+    }
+//        .put("DATE", TypeProtos.MinorType.DATE) // requires conversion
+//        .put("DATETIME", TypeProtos.MinorType.TIMESTAMP) // requires conversion
+//        .put("DECIMAL", TypeProtos.MinorType.VARDECIMAL) // requires conversion (maybe)
+    case FLOAT8: {
+      cw.setObject(ise.getDouble());
+      break;
+    }
+    case FLOAT4: {
+      cw.setObject(ise.getFloat());
+      break;
+    }
+    case VARBINARY: {
+      cw.setObject(ise.getHexBinary());
+      break;
+    }
+    case VARCHAR: {
+      //
+      // FIXME: VARCHAR is defined in drill as utf8 string.
+      // Is Drill expecting something other than a Java string in this setObject call?
+      // Should we be mapping Daffodil strings to Drill VAR16CHAR type?

Review Comment:
   I believe the answer to this question is yes.  We should map Daffodil strings to Drill `VARCHAR` types.  I wouldn't use the `VAR16CHAR` however.



##########
contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilDrillInfosetOutputter.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.daffodil;
+
+import org.apache.daffodil.runtime1.api.ComplexElementMetadata;
+import org.apache.daffodil.runtime1.api.ElementMetadata;
+import org.apache.daffodil.runtime1.api.InfosetArray;
+import org.apache.daffodil.runtime1.api.InfosetComplexElement;
+import org.apache.daffodil.japi.infoset.InfosetOutputter;
+import org.apache.daffodil.runtime1.api.InfosetSimpleElement;
+import org.apache.daffodil.runtime1.api.PrimitiveType;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaVisitor;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ColumnWriter;
+import org.apache.drill.exec.vector.accessor.ObjectType;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Stack;
+
+/**
+ * Adapts Daffodil parser infoset event calls to Drill writer calls
+ * to fill in Drill data rows.
+ */
+public class DaffodilDrillInfosetOutputter
+    extends InfosetOutputter {
+
+  private boolean isOriginalRoot() {
+    boolean result = currentTupleWriter() == rowSetWriter;
+    if (result)
+      assert(tupleWriterStack.size() == 1);
+    return result;
+  }
+
+  /**
+   * True if the next startComplex call will be for the
+   * DFDL infoset root element whose children are the columns of
+   * the row set.
+   */
+  private boolean isRootElement = true;
+
+  /**
+   * Stack that is used only if we have sub-structures that are not
+   * simple-type fields of the row.
+   */
+  private final Stack<TupleWriter> tupleWriterStack = new Stack<>();
+
+  private final Stack<ArrayWriter> arrayWriterStack = new Stack<>();
+
+  private TupleWriter currentTupleWriter() {
+    return tupleWriterStack.peek();
+  }
+
+  private ArrayWriter currentArrayWriter() {
+    return arrayWriterStack.peek();
+  }
+
+
+  private static final Logger logger = LoggerFactory.getLogger(DaffodilDrillInfosetOutputter.class);
+
+  private DaffodilDrillInfosetOutputter() {} // no default constructor
+
+  private RowSetLoader rowSetWriter;
+
+  public DaffodilDrillInfosetOutputter(RowSetLoader writer) {
+    this.rowSetWriter = writer;
+    this.tupleWriterStack.push(writer);
+  }
+
+  @Override
+  public void reset() {
+    tupleWriterStack.clear();
+    tupleWriterStack.push(rowSetWriter);
+    arrayWriterStack.clear();
+    this.isRootElement = true;
+    checkCleanState();
+  }
+
+  private void checkCleanState() {
+    assert(isOriginalRoot());
+    assert(arrayWriterStack.isEmpty());
+    assert(isRootElement);
+  }
+
+  @Override
+  public void startDocument() {
+    checkCleanState();
+  }
+
+  @Override
+  public void endDocument() {
+    checkCleanState();
+  }
+
+  private String colName(ElementMetadata md) {
+    return DrillDaffodilSchemaVisitor.makeColumnName(md);
+  }
+
+  @Override
+  public void startSimple(InfosetSimpleElement ise) {
+    assert (!isRootElement);
+    ElementMetadata md = ise.metadata();
+    String colName = colName(md);
+    ColumnWriter cw;
+    if (md.isArray()) {
+      // A simple type array
+      assert(!arrayWriterStack.isEmpty());
+      cw = currentArrayWriter().scalar();
+    } else {
+      // A simple element within a map
+      // Note the map itself might be an array
+      // but we don't care about that here.
+      cw = currentTupleWriter().column(colName);
+    }
+    ColumnMetadata cm = cw.schema();
+    assert(cm.isScalar());
+    if (md.isNillable() && ise.isNilled()) {
+      assert cm.isNullable();
+      cw.setNull();
+    } else {
+      convertDaffodilValueToDrillValue(ise, cm, cw);
+    }
+  }
+
+  @Override
+  public void endSimple(InfosetSimpleElement diSimple) {
+    assert (!isRootElement);
+    // do nothing
+  }
+
+  @Override
+  public void startComplex(InfosetComplexElement ce) {
+    ComplexElementMetadata md = ce.metadata();
+    String colName = colName(ce.metadata());
+    if (isRootElement) {
+      assert(isOriginalRoot());
+      // This complex element's corresponds to the root element of the
+      // DFDL schema. We don't treat this as a column of the row set.
+      // Rather, it's children are the columns of the row set.
+      //
+      // If we do nothing at all here, then we'll start getting
+      // even calls for the children.
+      isRootElement = false;
+      return;
+    }
+    if (md.isArray()) {
+      assert(!arrayWriterStack.isEmpty());
+      // FIXME: is this the way to add a complex array child item (i.e., each array item is a map)
+      tupleWriterStack.push(currentArrayWriter().tuple());
+    } else {
+      tupleWriterStack.push(currentTupleWriter().tuple(colName));
+    }
+  }
+
+  @Override
+  public void endComplex(InfosetComplexElement ce) {
+    ComplexElementMetadata md = ce.metadata();
+    if (isOriginalRoot()) {
+      isRootElement = true;
+      // do nothing else. The row gets closed-out in the DaffodilBatchReader.next() method.
+    } else {
+      // it's a map.
+      // We seem to not need to do anything to end the map. No action taken here works.
+      if (md.isArray()) {
+        assert (!arrayWriterStack.isEmpty());
+        currentArrayWriter().save(); // required for map array entries.
+      }
+      tupleWriterStack.pop();
+    }
+  }
+
+  @Override
+  public void startArray(InfosetArray diArray) {
+    ElementMetadata md = diArray.metadata();
+    assert (md.isArray());
+    // DFDL has no notion of an array directly within another array. A named field (map) is necessary
+    // before you can have another array.
+    assert (currentTupleWriter().type() == ObjectType.TUPLE); // parent is a map, or the top level row.
+    String colName = colName(md);
+    TupleWriter enclosingParentTupleWriter = currentTupleWriter();
+    ArrayWriter aw = enclosingParentTupleWriter.array(colName);
+    arrayWriterStack.push(aw);
+  }
+
+  @Override
+  public void endArray(InfosetArray ia) {
+    ElementMetadata md = ia.metadata();
+    assert (md.isArray());
+    assert (!arrayWriterStack.empty());
+    // FIXME: How do we end/close-out an array?
+    // note that each array instance, when the instance is a map, must have
+    // save called after it is written to the array but that happens
+    // in endComplex events since it must be called not once per array, but
+    // once per array item.
+    arrayWriterStack.pop();
+  }
+
+  private void convertDaffodilValueToDrillValue(InfosetSimpleElement ise, ColumnMetadata cm, ColumnWriter cw) {
+    PrimitiveType dafType = ise.metadata().primitiveType();
+    TypeProtos.MinorType drillType = DrillDaffodilSchemaUtils.getDrillDataType(dafType);
+    assert(drillType == cm.type());
+    switch (drillType) {
+    case INT: {
+      //
+      // FIXME: Javadoc for setObject says "primarily for testing"
+      // So how are we supposed to assign the column value then?
+      // Is there a way to get from a ColumnWriter to a typed scalar writer (downcast perhaps?)
+      cw.setObject(ise.getInt());
+      break;
+    }
+    case BIGINT: {
+      cw.setObject(ise.getLong());
+      break;
+    }
+    case SMALLINT: {
+      cw.setObject(ise.getShort());
+      break;
+    }
+    case TINYINT: {
+      cw.setObject(ise.getByte());
+      break;
+    }
+//        .put("UNSIGNEDLONG", TypeProtos.MinorType.UINT8)
+//        .put("UNSIGNEDINT", TypeProtos.MinorType.UINT4)
+//        .put("UNSIGNEDSHORT", TypeProtos.MinorType.UINT2)
+//        .put("UNSIGNEDBYTE", TypeProtos.MinorType.UINT1)
+//        .put("INTEGER", TypeProtos.MinorType.BIGINT)
+//        .put("NONNEGATIVEINTEGER", TypeProtos.MinorType.BIGINT)
+    case BIT: {
+      cw.setObject(ise.getBoolean());
+      break;
+    }
+//        .put("DATE", TypeProtos.MinorType.DATE) // requires conversion
+//        .put("DATETIME", TypeProtos.MinorType.TIMESTAMP) // requires conversion
+//        .put("DECIMAL", TypeProtos.MinorType.VARDECIMAL) // requires conversion (maybe)
+    case FLOAT8: {
+      cw.setObject(ise.getDouble());
+      break;
+    }
+    case FLOAT4: {
+      cw.setObject(ise.getFloat());
+      break;
+    }
+    case VARBINARY: {
+      cw.setObject(ise.getHexBinary());
+      break;
+    }
+    case VARCHAR: {
+      //
+      // FIXME: VARCHAR is defined in drill as utf8 string.
+      // Is Drill expecting something other than a Java string in this setObject call?
+      // Should we be mapping Daffodil strings to Drill VAR16CHAR type?
+      //
+      String s = ise.getString();
+      cw.setObject(s);
+      break;
+    }
+//        .put("TIME", TypeProtos.MinorType.TIME) // requires conversion

Review Comment:
   Does Daffodil support date, time, timestamp and interval types?  If so we should include those as well. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org