You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2019/06/07 10:38:52 UTC

[drill] 03/05: DRILL-7279: Enable provided schema for text files without headers

This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 20ae96a4d0de23d329de1b5534683fdaac8816a7
Author: Paul Rogers <pa...@yahoo.com>
AuthorDate: Sun May 26 23:32:40 2019 -0700

    DRILL-7279: Enable provided schema for text files without headers
    
    * Allows a provided schema for text files without headers. The
      provided schema columns replace the `columns` column that is
      normally used.
    * Allows customizing text format properties using table properties.
      The table properties "override" properties set in the plugin config.
    * Added unit tests for the newly supported use cases.
    * Fixed bug in quote escape handling.
    
    closes #1798
---
 .../impl/scan/project/ScanLevelProjection.java     |   2 +-
 .../impl/scan/project/ScanSchemaOrchestrator.java  |   2 +-
 .../exec/store/dfs/easy/EasyFormatPlugin.java      | 140 ++++++-
 .../exec/store/easy/text/TextFormatPlugin.java     | 117 +++++-
 .../compliant/v3/CompliantTextBatchReader.java     |  17 +-
 .../store/easy/text/compliant/v3/TextInput.java    |   2 -
 .../text/compliant/v3/TextParsingSettingsV3.java   | 282 ++++++-------
 .../store/easy/text/compliant/v3/TextReader.java   |  43 +-
 .../store/easy/text/compliant/BaseCsvTest.java     |  25 ++
 .../easy/text/compliant/TestCsvIgnoreHeaders.java  |  11 +-
 .../text/compliant/TestCsvTableProperties.java     | 451 +++++++++++++++++++++
 .../easy/text/compliant/TestCsvWithSchema.java     |  27 +-
 .../easy/text/compliant/TestCsvWithoutHeaders.java |  10 +-
 .../exec/record/metadata/AbstractPropertied.java   |  13 +-
 .../drill/exec/record/metadata/Propertied.java     |   3 +-
 15 files changed, 917 insertions(+), 228 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java
index 7718119..4a02b33 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java
@@ -456,7 +456,7 @@ public class ScanLevelProjection {
 
     if (hasOutputSchema()) {
       projectionType =
-          outputSchema.getBooleanProperty(TupleMetadata.IS_STRICT_SCHEMA_PROP)
+          outputSchema.booleanProperty(TupleMetadata.IS_STRICT_SCHEMA_PROP)
           ? ScanProjectionType.STRICT_SCHEMA_WILDCARD
           : ScanProjectionType.SCHEMA_WILDCARD;
     } else if (wildcardPosn != -1) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
index a315a3f..37f7c75 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
@@ -322,7 +322,7 @@ public class ScanSchemaOrchestrator {
         // Use only implicit conversions
         schemaTransformer = new SchemaTransformerImpl(
             builder.outputSchema, builder.conversionProps);
-        if (builder.outputSchema.getBooleanProperty(TupleMetadata.IS_STRICT_SCHEMA_PROP)) {
+        if (builder.outputSchema.booleanProperty(TupleMetadata.IS_STRICT_SCHEMA_PROP)) {
           allowRequiredNulls = true;
         }
       } else {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index a170122..3c79aea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -147,6 +147,142 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
           .addContext(errorContext())
           .build(logger);
       }
+
+      return new ScanBatch(context, oContext, readers, implicitColumns);
+    }
+
+    /**
+     * Create a record reader given a file system, a file description and other
+     * information. For backward compatibility, calls the plugin method by
+     * default.
+     *
+     * @param plugin
+     *          the plugin creating the scan
+     * @param context
+     *          fragment context for the fragment running the scan
+     * @param dfs
+     *          Drill's distributed file system facade
+     * @param fileWork
+     *          description of the file to scan
+     * @param columns
+     *          list of columns to project
+     * @param userName
+     *          the name of the user performing the scan
+     * @return a scan operator
+     * @throws ExecutionSetupException
+     *           if anything goes wrong
+     */
+
+    public RecordReader getRecordReader(EasyFormatPlugin<? extends FormatPluginConfig> plugin,
+        FragmentContext context, DrillFileSystem dfs, FileWork fileWork,
+        List<SchemaPath> columns, String userName) throws ExecutionSetupException {
+      return plugin.getRecordReader(context, dfs, fileWork, columns, userName);
+    }
+  }
+
+  /**
+   * Revised scanner based on the revised {@link org.apache.drill.exec.physical.rowSet.ResultSetLoader}
+   * and {@link org.apache.drill.exec.physical.impl.scan.RowBatchReader} classes.
+   * Handles most projection tasks automatically. Able to limit
+   * vector and batch sizes. Use this for new format plugins.
+   */
+
+  public abstract static class ScanFrameworkCreator
+      implements ScanBatchCreator {
+
+    protected EasyFormatPlugin<? extends FormatPluginConfig> plugin;
+
+    public ScanFrameworkCreator(EasyFormatPlugin<? extends FormatPluginConfig> plugin) {
+      this.plugin = plugin;
+    }
+
+    /**
+     * Builds the revised {@link FileBatchReader}-based scan batch.
+     *
+     * @param context
+     * @param scan
+     * @return
+     * @throws ExecutionSetupException
+     */
+
+    @Override
+    public CloseableRecordBatch buildScan(
+        final FragmentContext context,
+        final EasySubScan scan) throws ExecutionSetupException {
+
+      // Assemble the scan operator and its wrapper.
+
+      try {
+        final FileScanBuilder builder = frameworkBuilder(context.getOptions(), scan);
+        builder.setProjection(scan.getColumns());
+        builder.setFiles(scan.getWorkUnits());
+        builder.setConfig(plugin.easyConfig().fsConf);
+
+        // The text readers use required Varchar columns to represent null columns.
+
+        builder.allowRequiredNullColumns(true);
+        final Path selectionRoot = scan.getSelectionRoot();
+        if (selectionRoot != null) {
+          builder.metadataOptions().setSelectionRoot(selectionRoot);
+          builder.metadataOptions().setPartitionDepth(scan.getPartitionDepth());
+        }
+        FileScanFramework framework = builder.buildFileFramework();
+        return new OperatorRecordBatch(
+            context, scan,
+            new ScanOperatorExec(
+                framework));
+      } catch (final UserException e) {
+        // Rethrow user exceptions directly
+        throw e;
+      } catch (final Throwable e) {
+        // Wrap all others
+        throw new ExecutionSetupException(e);
+      }
+    }
+
+    /**
+     * Create the plugin-specific framework that manages the scan. The framework
+     * creates batch readers one by one for each file or block. It defines semantic
+     * rules for projection. It handles "early" or "late" schema readers. A typical
+     * framework builds on standardized frameworks for files in general or text
+     * files in particular.
+     *
+     * @param options system/session options which can be used to control or
+     * customize the scan framework
+     * @param scan the physical operation definition for the scan operation. Contains
+     * one or more files to read. (The Easy format plugin works only for files.)
+     * @return the scan framework which orchestrates the scan operation across
+     * potentially many files
+     * @throws ExecutionSetupException for all setup failures
+     */
+    protected abstract FileScanBuilder frameworkBuilder(
+        OptionManager options, EasySubScan scan) throws ExecutionSetupException;
+  }
+
+  /**
+   * Generic framework creator for files that just use the basic file
+   * support: metadata, etc. Specialized use cases (special "columns"
+   * column, say) will require a specialized implementation.
+   */
+
+  public abstract static class FileScanFrameworkCreator extends ScanFrameworkCreator {
+
+    private final FileReaderFactory readerCreator;
+
+    public FileScanFrameworkCreator(EasyFormatPlugin<? extends FormatPluginConfig> plugin,
+        FileReaderFactory readerCreator) {
+      super(plugin);
+      this.readerCreator = readerCreator;
+    }
+
+    @Override
+    protected FileScanBuilder frameworkBuilder(
+        OptionManager options, EasySubScan scan) throws ExecutionSetupException {
+
+      FileScanBuilder builder = new FileScanBuilder();
+      builder.setReaderFactory(readerCreator);
+      return builder;
+>>>>>>> ea212504f... DRILL-7279: Enable provided schema for text files without headers
     }
   }
 
@@ -182,11 +318,11 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
    * @param name name of the plugin
    * @param config configuration options for this plugin which determine
    * developer-defined runtime behavior
-   * @param context the global server-wide drillbit context
+   * @param context the global server-wide Drillbit context
    * @param storageConfig the configuration for the storage plugin that owns this
    * format plugin
    * @param formatConfig the Jackson-serialized format configuration as created
-   * by the user in the Drill web console. Holds user-defined options.
+   * by the user in the Drill web console. Holds user-defined options
    */
 
   protected EasyFormatPlugin(String name, EasyFormatConfig config, DrillbitContext context,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index 3d122ed..7c3d950 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -44,6 +44,7 @@ import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.RecordReader;
@@ -74,9 +75,38 @@ import com.fasterxml.jackson.annotation.JsonInclude.Include;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
+/**
+ * Text format plugin for CSV and other delimited text formats.
+ * Allows use of a "provided schema", including using table properties
+ * on that schema to override "static" ("or default") properties
+ * defined in the plugin config. Allows, say, having ".csv" files
+ * in which some have no headers (the default) and some do have
+ * headers (as specified via table properties in the provided schema.)
+ * <p>
+ * Makes use of the scan framework and the result set loader mechanism
+ * to allow tight control of the size of produced batches (as well
+ * as to support provided schema.)
+ */
+
 public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextFormatConfig> {
   private final static String PLUGIN_NAME = "text";
 
+  // Provided schema table properties unique to this plugin. If specified
+  // in the provided schema, they override the corresponding property in
+  // the plugin config. Names here match the field names in the format config.
+  // The "text." intermediate name avoids potential conflicts with other
+  // uses of these names and denotes that the names work only for the text
+  // format plugin.
+
+  public static final String TEXT_PREFIX = TupleMetadata.DRILL_PROP_PREFIX + PLUGIN_NAME + ".";
+  public static final String HAS_HEADERS_PROP = TEXT_PREFIX + "extractHeader";
+  public static final String SKIP_FIRST_LINE_PROP = TEXT_PREFIX + "skipFirstLine";
+  public static final String DELIMITER_PROP = TEXT_PREFIX + "fieldDelimiter";
+  public static final String COMMENT_CHAR_PROP = TEXT_PREFIX + "comment";
+  public static final String QUOTE_PROP = TEXT_PREFIX + "quote";
+  public static final String QUOTE_ESCAPE_PROP = TEXT_PREFIX + "escape";
+  public static final String LINE_DELIM_PROP = TEXT_PREFIX + "lineDelimiter";
+
   @JsonTypeName(PLUGIN_NAME)
   @JsonInclude(Include.NON_DEFAULT)
   public static class TextFormatConfig implements FormatPluginConfig {
@@ -182,25 +212,90 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
    */
   private static class ColumnsReaderFactory extends FileReaderFactory {
 
-    private final TextFormatPlugin plugin;
+    private final TextParsingSettingsV3 settings;
 
-    public ColumnsReaderFactory(TextFormatPlugin plugin) {
-      this.plugin = plugin;
+    public ColumnsReaderFactory(TextParsingSettingsV3 settings) {
+      this.settings = settings;
     }
 
     @Override
     public ManagedReader<? extends FileSchemaNegotiator> newReader() {
-      TextParsingSettingsV3 settings = new TextParsingSettingsV3();
-      settings.set(plugin.getConfig());
-      return new CompliantTextBatchReader(settings);
+       return new CompliantTextBatchReader(settings);
+    }
+  }
+
+  /**
+   * Builds the V3 text scan operator.
+   */
+  private static class TextScanBatchCreator extends ScanFrameworkCreator {
+
+    private final TextFormatPlugin textPlugin;
+
+    public TextScanBatchCreator(TextFormatPlugin plugin) {
+      super(plugin);
+      textPlugin = plugin;
+    }
+
+    @Override
+    protected FileScanBuilder frameworkBuilder(
+        OptionManager options,
+        EasySubScan scan) throws ExecutionSetupException {
+      ColumnsScanBuilder builder = new ColumnsScanBuilder();
+      TextParsingSettingsV3 settings = new TextParsingSettingsV3(textPlugin.getConfig(), scan, options);
+      builder.setReaderFactory(new ColumnsReaderFactory(settings));
+
+      // Provide custom error context
+      builder.setContext(
+          new CustomErrorContext() {
+            @Override
+            public void addContext(UserException.Builder builder) {
+              builder.addContext("Format plugin:", PLUGIN_NAME);
+              builder.addContext("Plugin config name:", textPlugin.getName());
+              builder.addContext("Extract headers:",
+                  Boolean.toString(settings.isHeaderExtractionEnabled()));
+              builder.addContext("Skip first line:",
+                  Boolean.toString(settings.isSkipFirstLine()));
+            }
+          });
+
+      // If this format has no headers, or wants to skip them,
+      // then we must use the columns column to hold the data.
+
+      builder.requireColumnsArray(settings.isUseRepeatedVarChar());
+
+      // Text files handle nulls in an unusual way. Missing columns
+      // are set to required Varchar and filled with blanks. Yes, this
+      // means that the SQL statement or code cannot differentiate missing
+      // columns from empty columns, but that is how CSV and other text
+      // files have been defined within Drill.
+
+      builder.setNullType(
+          MajorType.newBuilder()
+            .setMinorType(MinorType.VARCHAR)
+            .setMode(DataMode.REQUIRED)
+            .build());
+
+      // Pass along the output schema, if any
+
+      builder.setOutputSchema(scan.getSchema());
+
+      // CSV maps blank columns to nulls (for nullable non-string columns),
+      // or to the default value (for non-nullable non-string columns.)
+
+      builder.setConversionProperty(AbstractConvertFromString.BLANK_ACTION_PROP,
+          AbstractConvertFromString.BLANK_AS_NULL);
+
+      return builder;
     }
   }
 
-  public TextFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig) {
+  public TextFormatPlugin(String name, DrillbitContext context,
+      Configuration fsConf, StoragePluginConfig storageConfig) {
      this(name, context, fsConf, storageConfig, new TextFormatConfig());
   }
 
-  public TextFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config,
+  public TextFormatPlugin(String name, DrillbitContext context,
+      Configuration fsConf, StoragePluginConfig config,
       TextFormatConfig formatPluginConfig) {
     super(name, easyConfig(fsConf, formatPluginConfig), context, config, formatPluginConfig);
   }
@@ -224,14 +319,16 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
   }
 
   @Override
-  public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns, MetadataProviderManager metadataProviderManager)
+  public AbstractGroupScan getGroupScan(String userName, FileSelection selection,
+      List<SchemaPath> columns, MetadataProviderManager metadataProviderManager)
       throws IOException {
     return new EasyGroupScan(userName, selection, this, columns, selection.selectionRoot, metadataProviderManager);
   }
 
   @Override
   public AbstractGroupScan getGroupScan(String userName, FileSelection selection,
-      List<SchemaPath> columns, OptionManager options, MetadataProviderManager metadataProviderManager) throws IOException {
+      List<SchemaPath> columns, OptionManager options,
+      MetadataProviderManager metadataProviderManager) throws IOException {
     return new EasyGroupScan(userName, selection, this, columns,
         selection.selectionRoot,
         // Some paths provide a null option manager. In that case, default to a
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/CompliantTextBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/CompliantTextBatchReader.java
index 54143ca..19b3dbd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/CompliantTextBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/CompliantTextBatchReader.java
@@ -105,10 +105,10 @@ public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNego
     try {
       TextOutput output;
 
-      if (settings.isHeaderExtractionEnabled()) {
-        output = openWithHeaders(schemaNegotiator);
-      } else {
+      if (settings.isUseRepeatedVarChar()) {
         output = openWithoutHeaders(schemaNegotiator);
+      } else {
+        output = openWithHeaders(schemaNegotiator, settings.providedHeaders());
       }
       if (output == null) {
         return false;
@@ -122,10 +122,17 @@ public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNego
 
   /**
    * Extract header and use that to define the reader schema.
+   *
+   * @param schemaNegotiator used to define the reader schema
+   * @param providedHeaders "artificial" headers created from a
+   * provided schema, if any. Used when using a provided schema
+   * with a text file that contains no headers; ignored for
+   * text file with headers
    */
 
-  private TextOutput openWithHeaders(ColumnsSchemaNegotiator schemaNegotiator) throws IOException {
-    final String [] fieldNames = extractHeader();
+  private TextOutput openWithHeaders(ColumnsSchemaNegotiator schemaNegotiator,
+      String[] providedHeaders) throws IOException {
+    final String [] fieldNames = providedHeaders == null ? extractHeader() : providedHeaders;
     if (fieldNames == null) {
       return null;
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java
index 951bc81..28ddd07 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java
@@ -147,7 +147,6 @@ final class TextInput {
     }
   }
 
-
   /**
    * Helper method to get the most recent characters consumed since the last record started.
    * May get an incomplete string since we don't support stream rewind.  Returns empty string for now.
@@ -196,7 +195,6 @@ final class TextInput {
     }
   }
 
-
   /**
    * Read more data into the buffer. Will also manage split end conditions.
    *
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingSettingsV3.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingSettingsV3.java
index 0341b45..12bbf42 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingSettingsV3.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingSettingsV3.java
@@ -17,45 +17,140 @@
  */
 package org.apache.drill.exec.store.easy.text.compliant.v3;
 
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.store.easy.text.TextFormatPlugin;
 import org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig;
-
 import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
 
 // TODO: Remove the "V3" suffix once the V2 version is retired.
 public class TextParsingSettingsV3 {
 
-  public static final TextParsingSettingsV3 DEFAULT = new TextParsingSettingsV3();
+  private final String emptyValue = null;
+  private final boolean parseUnescapedQuotes = true;
+  private final byte quote;
+  private final byte quoteEscape;
+  private final byte delimiter;
+  private final byte comment;
+
+  private final long maxCharsPerColumn = Character.MAX_VALUE;
+  private final byte normalizedNewLine = b('\n');
+  private final byte[] newLineDelimiter;
+  private final boolean ignoreLeadingWhitespaces = false;
+  private final boolean ignoreTrailingWhitespaces = false;
+  private final String lineSeparatorString = "\n";
+  private boolean skipFirstLine;
 
-  private String emptyValue = null;
-  private boolean parseUnescapedQuotes = true;
-  private byte quote = b('"');
-  private byte quoteEscape = b('"');
-  private byte delimiter = b(',');
-  private byte comment = b('#');
+  private final boolean headerExtractionEnabled;
+  private final boolean useRepeatedVarChar;
+  private final String providedHeaders[];
 
-  private long maxCharsPerColumn = Character.MAX_VALUE;
-  private byte normalizedNewLine = b('\n');
-  private byte[] newLineDelimiter = {normalizedNewLine};
-  private boolean ignoreLeadingWhitespaces;
-  private boolean ignoreTrailingWhitespaces;
-  private String lineSeparatorString = "\n";
-  private boolean skipFirstLine;
+  /**
+   * Configure the properties for this one scan based on:
+   * <p>
+   * <ul>
+   * <li>The defaults in the plugin config (if properties not defined
+   * in the config JSON.</li>
+   * <li>The config values from the config JSON as stored in the
+   * plugin config.</li>
+   * <li>Table function settings expressed in the query (and passed
+   * in as part of the plugin config.</li>
+   * <li>Table properties.</li>
+   * </ul>
+   * <p>
+   * The result is that the user can customize the behavior of a table just
+   * via the table properties; the user need not define a new storage
+   * config just to change a property. For example, by default, the
+   * <tt>`csv`</tt> config has no headers. But, if the user has a ".csv"
+   * file with headers, the user can just customize the table properties.
+   */
+  public TextParsingSettingsV3(TextFormatConfig config,
+      EasySubScan scan, OptionManager options) {
+    TupleMetadata providedSchema = scan.getSchema();
+    boolean extractHeaders = config.isHeaderExtractionEnabled();
+    boolean skipFirst = config.isSkipFirstLine();
+    String providedHeaders[] = null;
+    byte delimChar = bSafe(config.getFieldDelimiter(), "fieldDelimiter");
+    byte commentChar = bSafe(config.getComment(), "comment");
+    byte quoteChar = bSafe(config.getQuote(), "quote");
+    byte quoteEscapeChar = bSafe(config.getEscape(), "escape");
+    byte[] newlineDelim = config.getLineDelimiter().getBytes(Charsets.UTF_8);
+    if (providedSchema != null) {
+      extractHeaders = providedSchema.booleanProperty(
+          TextFormatPlugin.HAS_HEADERS_PROP, extractHeaders);
+      skipFirst = ! extractHeaders & providedSchema.booleanProperty(
+          TextFormatPlugin.SKIP_FIRST_LINE_PROP, skipFirstLine);
+      if (!extractHeaders && ! providedSchema.isEmpty()) {
+        providedHeaders = new String[providedSchema.size()];
+        for (int i = 0; i < providedHeaders.length; i++) {
+          providedHeaders[i] = providedSchema.metadata(i).name();
+        }
+      }
+      delimChar = overrideChar(providedSchema, TextFormatPlugin.DELIMITER_PROP, delimChar);
+      quoteChar = overrideChar(providedSchema, TextFormatPlugin.QUOTE_PROP, quoteChar);
+      quoteEscapeChar = overrideChar(providedSchema, TextFormatPlugin.QUOTE_ESCAPE_PROP, quoteEscapeChar);
+      newlineDelim = newlineDelimBytes(providedSchema, newlineDelim);
+      commentChar = commentChar(providedSchema, commentChar);
+    }
+    skipFirstLine = !extractHeaders && skipFirst;
+    headerExtractionEnabled = extractHeaders;
+    this.providedHeaders = providedHeaders;
+    useRepeatedVarChar = !extractHeaders && providedHeaders == null;
 
-  private boolean headerExtractionEnabled;
-  private boolean useRepeatedVarChar = true;
+    quote = quoteChar;
+    quoteEscape = quoteEscapeChar;
+    newLineDelimiter = newlineDelim;
+    delimiter = delimChar;
+    comment = commentChar;
+  }
+
+  /**
+   * Parse a delimiter from table properties. If the property is unset,
+   * or is a blank string, then uses the delimiter from the plugin config.
+   * Else, if non-blank, uses the first character of the property value.
+   */
+
+  private static byte overrideChar(TupleMetadata providedSchema, String propName, byte configValue) {
+    String value = providedSchema.property(propName);
+    if (value == null || value.isEmpty()) {
+      return configValue;
+    }
+    // Text reader supports only ASCII text and characters.
+    return (byte) value.charAt(0);
+  }
+
+  /**
+   * Parse a comment character from table properties. If the property is unset,
+   * then uses the delimiter from the plugin config. If the properry value is
+   * blank, then uses ASCII NUL (0) as the comment. This value should never
+   * match anything, and effectively disables the comment feature.
+   * Else, if non-blank, uses the first character of the property value.
+   */
+
+  private static byte commentChar(TupleMetadata providedSchema, byte configValue) {
+    String value = providedSchema.property(TextFormatPlugin.COMMENT_CHAR_PROP);
+    if (value == null) {
+      return configValue;
+    }
+    if (value.isEmpty()) {
+      return 0;
+    }
+    // Text reader supports only ASCII text and characters.
+    return (byte) value.charAt(0);
+  }
 
-  public void set(TextFormatConfig config){
-    this.quote = bSafe(config.getQuote(), "quote");
-    this.quoteEscape = bSafe(config.getEscape(), "escape");
-    this.newLineDelimiter = config.getLineDelimiter().getBytes(Charsets.UTF_8);
-    this.delimiter = bSafe(config.getFieldDelimiter(), "fieldDelimiter");
-    this.comment = bSafe(config.getComment(), "comment");
-    this.skipFirstLine = config.isSkipFirstLine();
-    this.headerExtractionEnabled = config.isHeaderExtractionEnabled();
-    if (this.headerExtractionEnabled) {
-      // In case of header TextRecordReader will use set of VarChar vectors vs RepeatedVarChar
-      this.useRepeatedVarChar = false;
+  /**
+   * Return either line delimiter from table properties, or the one
+   * provided as a parameter from the plugin config. The line delimiter
+   * can contain multiple bytes.
+   */
+  private static byte[] newlineDelimBytes(TupleMetadata providedSchema, byte[] configValue) {
+    String value = providedSchema.property(TextFormatPlugin.LINE_DELIM_PROP);
+    if (value == null || value.isEmpty()) {
+      return configValue;
     }
+    return value.getBytes();
   }
 
   public byte getComment() {
@@ -74,10 +169,6 @@ public class TextParsingSettingsV3 {
     return useRepeatedVarChar;
   }
 
-  public void setUseRepeatedVarChar(boolean useRepeatedVarChar) {
-    this.useRepeatedVarChar = useRepeatedVarChar;
-  }
-
   private static byte bSafe(char c, String name) {
     if (c > Byte.MAX_VALUE) {
       throw new IllegalArgumentException(String.format("Failure validating configuration option %s.  Expected a "
@@ -104,35 +195,11 @@ public class TextParsingSettingsV3 {
     return quote;
   }
 
-  /**
-   * Defines the character used for escaping values where the field delimiter is
-   * part of the value. Defaults to '"'
-   *
-   * @param quote
-   *          the quote character
-   */
-  public void setQuote(byte quote) {
-    this.quote = quote;
-  }
-
   public String getLineSeparatorString() {
     return lineSeparatorString;
   }
 
   /**
-   * Identifies whether or not a given character is used for escaping values
-   * where the field delimiter is part of the value
-   *
-   * @param ch
-   *          the character to be verified
-   * @return true if the given character is the character used for escaping
-   *         values, false otherwise
-   */
-  public boolean isQuote(byte ch) {
-    return this.quote == ch;
-  }
-
-  /**
    * Returns the character used for escaping quotes inside an already quoted value. Defaults to '"'
    * @return the quote escape character
    */
@@ -141,30 +208,6 @@ public class TextParsingSettingsV3 {
   }
 
   /**
-   * Defines the character used for escaping quotes inside an already quoted
-   * value. Defaults to '"'
-   *
-   * @param quoteEscape
-   *          the quote escape character
-   */
-  public void setQuoteEscape(byte quoteEscape) {
-    this.quoteEscape = quoteEscape;
-  }
-
-  /**
-   * Identifies whether or not a given character is used for escaping quotes
-   * inside an already quoted value.
-   *
-   * @param ch
-   *          the character to be verified
-   * @return true if the given character is the quote escape character, false
-   *         otherwise
-   */
-  public boolean isQuoteEscape(byte ch) {
-    return this.quoteEscape == ch;
-  }
-
-  /**
    * Returns the field delimiter character. Defaults to ','
    * @return the field delimiter character
    */
@@ -173,23 +216,6 @@ public class TextParsingSettingsV3 {
   }
 
   /**
-   * Defines the field delimiter character. Defaults to ','
-   * @param delimiter the field delimiter character
-   */
-  public void setDelimiter(byte delimiter) {
-    this.delimiter = delimiter;
-  }
-
-  /**
-   * Identifies whether or not a given character represents a field delimiter
-   * @param ch the character to be verified
-   * @return true if the given character is the field delimiter character, false otherwise
-   */
-  public boolean isDelimiter(byte ch) {
-    return this.delimiter == ch;
-  }
-
-  /**
    * Returns the String representation of an empty value (defaults to null)
    *
    * <p>
@@ -203,20 +229,6 @@ public class TextParsingSettingsV3 {
   }
 
   /**
-   * Sets the String representation of an empty value (defaults to null)
-   *
-   * <p>
-   * When reading, if the parser does not read any character from the input, and
-   * the input is within quotes, the empty is used instead of an empty string
-   *
-   * @param emptyValue
-   *          the String representation of an empty value
-   */
-  public void setEmptyValue(String emptyValue) {
-    this.emptyValue = emptyValue;
-  }
-
-  /**
    * Indicates whether the CSV parser should accept unescaped quotes inside
    * quoted values and parse them normally. Defaults to {@code true}.
    *
@@ -228,21 +240,6 @@ public class TextParsingSettingsV3 {
   }
 
   /**
-   * Configures how to handle unescaped quotes inside quoted values. If set to
-   * {@code true}, the parser will parse the quote normally as part of the
-   * value. If set the {@code false}, a
-   * {@link com.univocity.parsers.common.TextParsingException} will be thrown.
-   * Defaults to {@code true}.
-   *
-   * @param parseUnescapedQuotes
-   *          indicates whether or not the CSV parser should accept unescaped
-   *          quotes inside quoted values.
-   */
-  public void setParseUnescapedQuotes(boolean parseUnescapedQuotes) {
-    this.parseUnescapedQuotes = parseUnescapedQuotes;
-  }
-
-  /**
    * Indicates whether or not the first valid record parsed from the input
    * should be considered as the row containing the names of each column
    *
@@ -254,52 +251,21 @@ public class TextParsingSettingsV3 {
     return headerExtractionEnabled;
   }
 
-  /**
-   * Defines whether or not the first valid record parsed from the input should
-   * be considered as the row containing the names of each column
-   *
-   * @param headerExtractionEnabled
-   *          a flag indicating whether the first valid record parsed from the
-   *          input should be considered as the row containing the names of each
-   *          column
-   */
-  public void setHeaderExtractionEnabled(boolean headerExtractionEnabled) {
-    this.headerExtractionEnabled = headerExtractionEnabled;
-  }
-
   public long getMaxCharsPerColumn() {
     return maxCharsPerColumn;
   }
 
-  public void setMaxCharsPerColumn(long maxCharsPerColumn) {
-    this.maxCharsPerColumn = maxCharsPerColumn;
-  }
-
-  public void setComment(byte comment) {
-    this.comment = comment;
-  }
-
   public byte getNormalizedNewLine() {
     return normalizedNewLine;
   }
 
-  public void setNormalizedNewLine(byte normalizedNewLine) {
-    this.normalizedNewLine = normalizedNewLine;
-  }
-
   public boolean isIgnoreLeadingWhitespaces() {
     return ignoreLeadingWhitespaces;
   }
 
-  public void setIgnoreLeadingWhitespaces(boolean ignoreLeadingWhitespaces) {
-    this.ignoreLeadingWhitespaces = ignoreLeadingWhitespaces;
-  }
-
   public boolean isIgnoreTrailingWhitespaces() {
     return ignoreTrailingWhitespaces;
   }
 
-  public void setIgnoreTrailingWhitespaces(boolean ignoreTrailingWhitespaces) {
-    this.ignoreTrailingWhitespaces = ignoreTrailingWhitespaces;
-  }
+  public String[] providedHeaders() { return providedHeaders; }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextReader.java
index 17a076c..78adda0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextReader.java
@@ -17,14 +17,14 @@
  */
 package org.apache.drill.exec.store.easy.text.compliant.v3;
 
-import io.netty.buffer.DrillBuf;
-
 import java.io.IOException;
 
 import org.apache.drill.common.exceptions.UserException;
 
 import com.univocity.parsers.common.TextParsingException;
 
+import io.netty.buffer.DrillBuf;
+
 /*******************************************************************************
  * Portions Copyright 2014 uniVocity Software Pty Ltd
  ******************************************************************************/
@@ -46,6 +46,10 @@ public final class TextReader {
 
   private final TextInput input;
   private final TextOutput output;
+
+  // TODO: Remove this; it is a vestige of the "V2" implementation
+  // and appears to be used only for white-space handling, which is
+  // overkill.
   private final DrillBuf workBuf;
 
   private byte ch;
@@ -230,6 +234,15 @@ public final class TextReader {
    * Recursive function invoked when a quote is encountered. Function also
    * handles the case when there are non-white space characters in the field
    * after the quoted value.
+   * <p>
+   * Handles quotes and quote escapes:
+   * <ul>
+   * <li>[escape][quote] - escapes the quote</li>
+   * <li>[escape][! quote] - emits both the escape and
+   * the next char</li>
+   * <li>escape = quote, [quote][quote] - escapes the
+   * quote.</li>
+   * </ul>
    * @param prev  previous byte read
    * @throws IOException for input file read errors
    */
@@ -239,11 +252,22 @@ public final class TextReader {
     final TextOutput output = this.output;
     final TextInput input = this.input;
     final byte quote = this.quote;
+    final byte quoteEscape = this.quoteEscape;
 
     ch = input.nextCharNoNewLineCheck();
 
     while (!(prev == quote && (ch == delimiter || ch == newLine || isWhite(ch)))) {
-      if (ch != quote) {
+      if (ch == quote) {
+        if (prev == quoteEscape) {
+          output.append(ch);
+          prev = NULL_BYTE;
+        } else {
+          prev = ch;
+        }
+      } else {
+        if (prev == quoteEscape) {
+          output.append(prev);
+        }
         if (prev == quote) { // unescaped quote detected
           if (parseUnescapedQuotes) {
             output.append(quote);
@@ -260,13 +284,12 @@ public final class TextReader {
                     + "Cannot parse CSV input.");
           }
         }
-        output.append(ch);
-        prev = ch;
-      } else if (prev == quoteEscape) {
-        output.append(quote);
-        prev = NULL_BYTE;
-      } else {
-        prev = ch;
+        if (ch == quoteEscape) {
+          prev = ch;
+        } else {
+          output.append(ch);
+          prev = ch;
+        }
       }
       ch = input.nextCharNoNewLineCheck();
     }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java
index 2819aa8..1b7efb2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java
@@ -120,6 +120,7 @@ public class BaseCsvTest extends ClusterTest {
       }
     }
   }
+
   protected String buildBigColFile(boolean withHeader) throws IOException {
     String fileName = "hugeCol.csv";
     try(PrintWriter out = new PrintWriter(new FileWriter(new File(testDir, fileName)))) {
@@ -138,4 +139,28 @@ public class BaseCsvTest extends ClusterTest {
     }
     return fileName;
   }
+
+  protected static final String FILE_N_NAME = "file%d.csv";
+
+  protected static String buildTable(String tableName, String[]...fileContents) throws IOException {
+    File rootDir = new File(testDir, tableName);
+    rootDir.mkdir();
+    for (int i = 0; i < fileContents.length; i++) {
+      String fileName = String.format(FILE_N_NAME, i);
+      buildFile(new File(rootDir, fileName), fileContents[i]);
+    }
+    return "`dfs.data`.`" + tableName + "`";
+  }
+
+  protected void enableSchemaSupport() {
+    enableV3(true);
+    enableSchema(true);
+  }
+
+  protected void resetSchemaSupport() {
+    resetV3();
+    resetSchema();
+  }
+
+>>>>>>> ea212504f... DRILL-7279: Enable provided schema for text files without headers
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvIgnoreHeaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvIgnoreHeaders.java
index d983f87..5a52664 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvIgnoreHeaders.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvIgnoreHeaders.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.easy.text.compliant;
 
 import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
+
 import java.io.File;
 import java.io.IOException;
 
@@ -56,19 +57,19 @@ public class TestCsvIgnoreHeaders  extends BaseCsvTest{
 
   @Test
   public void testColumns() throws IOException {
+    String fileName = "simple.csv";
+    buildFile(fileName, withHeaders);
     try {
       enableV3(false);
-      doTestColumns();
+      doTestColumns(fileName);
       enableV3(true);
-      doTestColumns();
+      doTestColumns(fileName);
     } finally {
       resetV3();
     }
   }
 
-  private void doTestColumns() throws IOException {
-    String fileName = "simple.csv";
-    buildFile(fileName, withHeaders);
+  private void doTestColumns(String fileName) throws IOException {
     String sql = "SELECT columns FROM `dfs.data`.`%s`";
     RowSet actual = client.queryBuilder().sql(sql, fileName).rowSet();
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvTableProperties.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvTableProperties.java
new file mode 100644
index 0000000..a540694
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvTableProperties.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant;
+
+import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.PrintWriter;
+
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.easy.text.TextFormatPlugin;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetBuilder;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test table properties with the compliant text reader. The
+ * table properties override selected properties in the format
+ * plugin config. The tests here start with a "stock" CSV
+ * format plugin config without headers. We then use table
+ * properties to vary the table format: without headers, skip
+ * first row, with headers.
+ * <p>
+ * The tests also verify that, without headers, if a schema
+ * is provided, the text format plugin will create columns
+ * using that schema rather than using the "columns" array
+ * column.
+ */
+
+@Category(RowSetTests.class)
+public class TestCsvTableProperties extends BaseCsvTest {
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    BaseCsvTest.setup(false, false);
+  }
+
+  private static final String COL_SCHEMA = "id int not null, name varchar not null";
+
+  private static final String SCHEMA_SQL =
+    "create schema (%s) " +
+    "for table %s PROPERTIES ('" + TextFormatPlugin.HAS_HEADERS_PROP +
+    "'='%s', '" + TextFormatPlugin.SKIP_FIRST_LINE_PROP + "'='%s')";
+
+  private RowSet expectedSchemaRows() {
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .add("id", MinorType.INT)
+        .add("name", MinorType.VARCHAR)
+        .buildSchema();
+    return new RowSetBuilder(client.allocator(), expectedSchema)
+        .addRow(10, "fred")
+        .addRow(20, "wilma")
+        .build();
+  }
+
+  private RowSet expectedArrayRows() {
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addArray("columns", MinorType.VARCHAR)
+        .buildSchema();
+    return new RowSetBuilder(client.allocator(), expectedSchema)
+        .addSingleCol(strArray("10", "fred"))
+        .addSingleCol(strArray("20", "wilma"))
+        .build();
+  }
+
+  public static String SELECT_ALL = "SELECT * FROM %s";
+
+  private static String noHeaders[] = {
+      "10,fred",
+      "20,wilma"
+  };
+
+  @Test
+  public void testNoHeadersWithSchema() throws Exception {
+    try {
+      enableSchemaSupport();
+      String tablePath = buildTable("noHwS", noHeaders);
+      run(SCHEMA_SQL, COL_SCHEMA, tablePath, false, false);
+      RowSet actual = client.queryBuilder().sql(SELECT_ALL, tablePath).rowSet();
+      RowSetUtilities.verify(expectedSchemaRows(), actual);
+    } finally {
+      resetSchemaSupport();
+    }
+  }
+
+  @Test
+  public void testNoHeadersWithoutSchema() throws Exception {
+    try {
+      enableSchemaSupport();
+      String tablePath = buildTable("noHnoS", noHeaders);
+      run(SCHEMA_SQL, "", tablePath, false, false);
+      RowSet actual = client.queryBuilder().sql(SELECT_ALL, tablePath).rowSet();
+      RowSetUtilities.verify(expectedArrayRows(), actual);
+    } finally {
+      resetSchemaSupport();
+    }
+  }
+
+  private static String extraCols[] = {
+      "10,fred,23.45",
+      "20,wilma,1234.56,vip"
+  };
+
+  @Test
+  public void testNoHeadersWithSchemaExtraCols() throws Exception {
+    try {
+      enableSchemaSupport();
+      String tablePath = buildTable("extraCols", extraCols);
+      run(SCHEMA_SQL, COL_SCHEMA, tablePath, false, false);
+      RowSet actual = client.queryBuilder().sql(SELECT_ALL, tablePath).rowSet();
+      TupleMetadata expectedSchema = new SchemaBuilder()
+          .add("id", MinorType.INT)
+          .add("name", MinorType.VARCHAR)
+          .buildSchema();
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+          .addRow(10, "fred")
+          .addRow(20, "wilma")
+          .build();
+      RowSetUtilities.verify(expected, actual);
+    } finally {
+      resetSchemaSupport();
+    }
+  }
+
+  private static String skipHeaders[] = {
+      "ignore,me",
+      "10,fred",
+      "20,wilma"
+  };
+
+  @Test
+  public void testSkipHeadersWithSchema() throws Exception {
+    try {
+      enableSchemaSupport();
+      String tablePath = buildTable("skipHwS", skipHeaders);
+      run(SCHEMA_SQL, COL_SCHEMA, tablePath, false, true);
+      RowSet actual = client.queryBuilder().sql(SELECT_ALL, tablePath).rowSet();
+      RowSetUtilities.verify(expectedSchemaRows(), actual);
+    } finally {
+      resetSchemaSupport();
+    }
+  }
+
+  @Test
+  public void testSkipHeadersWithoutSchema() throws Exception {
+    try {
+      enableSchemaSupport();
+      String tablePath = buildTable("skipHnoS", skipHeaders);
+      run(SCHEMA_SQL, "", tablePath, false, true);
+      RowSet actual = client.queryBuilder().sql(SELECT_ALL, tablePath).rowSet();
+      RowSetUtilities.verify(expectedArrayRows(), actual);
+    } finally {
+      resetSchemaSupport();
+    }
+  }
+
+  private static String withHeaders[] = {
+      "id, name",
+      "10,fred",
+      "20,wilma"
+  };
+
+  @Test
+  public void testHeadersWithSchema() throws Exception {
+    try {
+      enableSchemaSupport();
+      String tablePath = buildTable("headwS", withHeaders);
+      run(SCHEMA_SQL, COL_SCHEMA, tablePath, true, false);
+      RowSet actual = client.queryBuilder().sql(SELECT_ALL, tablePath).rowSet();
+      RowSetUtilities.verify(expectedSchemaRows(), actual);
+    } finally {
+      resetSchemaSupport();
+    }
+  }
+
+  @Test
+  public void testHeadersWithoutSchema() throws Exception {
+    try {
+      enableSchemaSupport();
+      String tablePath = buildTable("headnoS", withHeaders);
+      run(SCHEMA_SQL, "", tablePath, true, false);
+      RowSet actual = client.queryBuilder().sql(SELECT_ALL, tablePath).rowSet();
+      TupleMetadata expectedSchema = new SchemaBuilder()
+          .add("id", MinorType.VARCHAR)
+          .add("name", MinorType.VARCHAR)
+          .buildSchema();
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+          .addRow("10", "fred")
+          .addRow("20", "wilma")
+          .build();
+      RowSetUtilities.verify(expected, actual);
+    } finally {
+      resetSchemaSupport();
+    }
+  }
+
+  private static String barDelim[] = {
+      "10|fred",
+      "20|wilma"
+  };
+
+  @Test
+  public void testDelimiter() throws Exception {
+    try {
+      enableSchemaSupport();
+      String tablePath = buildTable("barDelim", barDelim);
+      String sql = String.format(SCHEMA_SQL, COL_SCHEMA, tablePath, false, false);
+      sql = sql.substring(0, sql.length() - 1) +
+          ", '" + TextFormatPlugin.DELIMITER_PROP + "'='|')";
+      run(sql);
+      RowSet actual = client.queryBuilder().sql(SELECT_ALL, tablePath).rowSet();
+      RowSetUtilities.verify(expectedSchemaRows(), actual);
+    } finally {
+      resetSchemaSupport();
+    }
+  }
+
+  private static String customCommentChar[] = {
+      "@Comment",
+      "#10,fred",
+      "#20,wilma"
+  };
+
+  private RowSet expectedCommentRows() {
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addArray("columns", MinorType.VARCHAR)
+        .buildSchema();
+    return new RowSetBuilder(client.allocator(), expectedSchema)
+        .addSingleCol(strArray("#10", "fred"))
+        .addSingleCol(strArray("#20", "wilma"))
+        .build();
+  }
+
+  @Test
+  public void testComment() throws Exception {
+    try {
+      enableSchemaSupport();
+      String tablePath = buildTable("customCommentChar", customCommentChar);
+      String sql = String.format(SCHEMA_SQL, "", tablePath, false, false);
+      sql = sql.substring(0, sql.length() - 1) +
+          ", '" + TextFormatPlugin.COMMENT_CHAR_PROP + "'='@')";
+      run(sql);
+      RowSet actual = client.queryBuilder().sql(SELECT_ALL, tablePath).rowSet();
+      RowSetUtilities.verify(expectedCommentRows(), actual);
+    } finally {
+      resetSchemaSupport();
+    }
+  }
+
+  private static String noCommentChar[] = {
+      "#10,fred",
+      "#20,wilma"
+  };
+
+  /**
+   * Users have complained about the comment character. We usually
+   * suggest they change it to some other character. This test verifies
+   * that the plugin will choose the ASCII NUL (0) character if the
+   * comment property is set to a blank string. Since NUL never occurs
+   * in the input, the result is to essentially disable comment support.
+   */
+  @Test
+  public void testNoComment() throws Exception {
+    try {
+      enableSchemaSupport();
+      String tablePath = buildTable("noCommentChar", noCommentChar);
+      String sql = String.format(SCHEMA_SQL, "", tablePath, false, false);
+      sql = sql.substring(0, sql.length() - 1) +
+          ", '" + TextFormatPlugin.COMMENT_CHAR_PROP + "'='')";
+      run(sql);
+      RowSet actual = client.queryBuilder().sql(SELECT_ALL, tablePath).rowSet();
+      RowSetUtilities.verify(expectedCommentRows(), actual);
+    } finally {
+      resetSchemaSupport();
+    }
+  }
+
+  private static String quotesData[] = {
+    "1,@foo@",
+    "2,@foo~@bar@",
+
+    // Test proper handling of escapes. This was broken in V2.
+
+    "3,@foo~bar@",
+    "4,@foo~~bar@"
+  };
+
+  /**
+   * Test quote and quote escape
+   */
+  @Test
+  public void testQuoteChars() throws Exception {
+    try {
+      enableSchemaSupport();
+      String tablePath = buildTable("customQuotes", quotesData);
+      String sql = "create schema () " +
+          "for table " + tablePath + " PROPERTIES ('" +
+          TextFormatPlugin.HAS_HEADERS_PROP + "'='false', '" +
+          TextFormatPlugin.SKIP_FIRST_LINE_PROP + "'='false', '" +
+          TextFormatPlugin.QUOTE_PROP + "'='@', '" +
+          TextFormatPlugin.QUOTE_ESCAPE_PROP + "'='~')";
+      run(sql);
+      RowSet actual = client.queryBuilder().sql(SELECT_ALL, tablePath).rowSet();
+      TupleMetadata expectedSchema = new SchemaBuilder()
+          .addArray("columns", MinorType.VARCHAR)
+          .buildSchema();
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+          .addSingleCol(strArray("1", "foo"))
+          .addSingleCol(strArray("2", "foo@bar"))
+          .addSingleCol(strArray("3", "foo~bar"))
+          .addSingleCol(strArray("4", "foo~~bar"))
+          .build();
+      RowSetUtilities.verify(expected, actual);
+    } finally {
+      resetSchemaSupport();
+    }
+  }
+
+  private static String doubleQuotesData[] = {
+      "1,@foo@",
+      "2,@foo@@bar@",
+    };
+
+  /**
+   * Test that the quote escape can be the quote character
+   * itself. In this case, &lt;escape>&<lt;escape> is the
+   * same as &lt;quote>&lt;quote> and is considered to
+   * be an escaped quote. There is no "orphan" escape
+   * case.
+   */
+  @Test
+  public void testDoubleQuoteChars() throws Exception {
+    try {
+      enableSchemaSupport();
+      String tablePath = buildTable("doubleQuotes", doubleQuotesData);
+      String sql = "create schema () " +
+          "for table " + tablePath + " PROPERTIES ('" +
+          TextFormatPlugin.HAS_HEADERS_PROP + "'='false', '" +
+          TextFormatPlugin.SKIP_FIRST_LINE_PROP + "'='false', '" +
+          TextFormatPlugin.QUOTE_PROP + "'='@', '" +
+          TextFormatPlugin.QUOTE_ESCAPE_PROP + "'='@')";
+      run(sql);
+      RowSet actual = client.queryBuilder().sql(SELECT_ALL, tablePath).rowSet();
+      TupleMetadata expectedSchema = new SchemaBuilder()
+          .addArray("columns", MinorType.VARCHAR)
+          .buildSchema();
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+          .addSingleCol(strArray("1", "foo"))
+          .addSingleCol(strArray("2", "foo@bar"))
+           .build();
+      RowSetUtilities.verify(expected, actual);
+    } finally {
+      resetSchemaSupport();
+    }
+  }
+
+  private static String specialCharsData[] = {
+      "10\u0001'fred'",
+      "20\u0001'wilma'"
+    };
+
+ /**
+   * End-to-end test of special characters for delimiter (a control
+   * character, ASCII 0x01) and quote (same as the SQL quote.)
+   */
+  @Test
+  public void testSpecialChars() throws Exception {
+    try {
+      enableSchemaSupport();
+      String tablePath = buildTable("specialChars", specialCharsData);
+      String sql = String.format("create schema (%s) " +
+          "for table %s PROPERTIES ('" +
+          TextFormatPlugin.HAS_HEADERS_PROP + "'='false', '" +
+          TextFormatPlugin.SKIP_FIRST_LINE_PROP + "'='false', '" +
+          // Obscure Calcite parsing feature. See
+          // SqlParserUtil.checkUnicodeEscapeChar()
+          // See also https://issues.apache.org/jira/browse/CALCITE-2273
+          // \U0001 also seems to work.
+          TextFormatPlugin.DELIMITER_PROP + "'='\01', '" +
+          // Looks like the lexer accepts Java escapes: \n, \r,
+          // presumably \t, though not tested here.
+          TextFormatPlugin.LINE_DELIM_PROP + "'='\n', '" +
+          // See: http://drill.apache.org/docs/lexical-structure/#string
+          TextFormatPlugin.QUOTE_PROP + "'='''')",
+          COL_SCHEMA, tablePath);
+      run(sql);
+      RowSet actual = client.queryBuilder().sql(SELECT_ALL, tablePath).rowSet();
+      RowSetUtilities.verify(expectedSchemaRows(), actual);
+    } finally {
+      resetSchemaSupport();
+    }
+  }
+
+  /**
+   * Verify that a custom newline character works, and that the symbol
+   * '\n' can be used in SQL and is stored properly in the schema file.
+   */
+
+  @Test
+  public void testNewlineProp() throws Exception {
+    try {
+      enableSchemaSupport();
+      String tableName = "newline";
+      File rootDir = new File(testDir, tableName);
+      rootDir.mkdir();
+      try(PrintWriter out = new PrintWriter(new FileWriter(new File(rootDir, ROOT_FILE)))) {
+        out.print("1,fred\r2,wilma\r");
+      }
+      String tablePath = "`dfs.data`.`" + tableName + "`";
+      String sql = "create schema () " +
+          "for table " + tablePath + " PROPERTIES ('" +
+          TextFormatPlugin.HAS_HEADERS_PROP + "'='false', '" +
+          TextFormatPlugin.SKIP_FIRST_LINE_PROP + "'='false', '" +
+          TextFormatPlugin.LINE_DELIM_PROP + "'='\r')";
+      run(sql);
+      RowSet actual = client.queryBuilder().sql(SELECT_ALL, tablePath).rowSet();
+      TupleMetadata expectedSchema = new SchemaBuilder()
+          .addArray("columns", MinorType.VARCHAR)
+          .buildSchema();
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+          .addSingleCol(strArray("1", "fred"))
+          .addSingleCol(strArray("2", "wilma"))
+          .build();
+      RowSetUtilities.verify(expected, actual);
+    } finally {
+      resetSchemaSupport();
+    }
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java
index 63e0988..955eb3d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java
@@ -17,13 +17,11 @@
  */
 package org.apache.drill.exec.store.easy.text.compliant;
 
+import static org.apache.drill.test.rowSet.RowSetUtilities.dec;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.apache.drill.test.rowSet.RowSetUtilities.dec;
 
-import java.io.File;
-import java.io.IOException;
 import java.util.Iterator;
 
 import org.apache.drill.categories.RowSetTests;
@@ -56,7 +54,6 @@ import org.junit.experimental.categories.Category;
 public class TestCsvWithSchema extends BaseCsvTest {
 
   protected static final String FILE1_NAME = "file1.csv";
-  protected static final String FILE_N_NAME = "file%d.csv";
 
   private static String basicFileContents[] = {
     "intcol,datecol,str,dub",
@@ -109,26 +106,6 @@ public class TestCsvWithSchema extends BaseCsvTest {
     BaseCsvTest.setup(false,  true);
   }
 
-  private static String buildTable(String tableName, String[]...fileContents) throws IOException {
-    File rootDir = new File(testDir, tableName);
-    rootDir.mkdir();
-    for (int i = 0; i < fileContents.length; i++) {
-      String fileName = String.format(FILE_N_NAME, i);
-      buildFile(new File(rootDir, fileName), fileContents[i]);
-    }
-    return "`dfs.data`.`" + tableName + "`";
-  }
-
-  private void enableSchemaSupport() {
-    enableV3(true);
-    enableSchema(true);
-  }
-
-  private void resetSchemaSupport() {
-    resetV3();
-    resetSchema();
-  }
-
   /**
    * Test the simplest possible case: a table with one file:
    * <ul>
@@ -1407,7 +1384,7 @@ public class TestCsvWithSchema extends BaseCsvTest {
    * value.
    */
   @Test
-  public void testBlankColsWithNDefaultValue() throws Exception {
+  public void testBlankColsWithNoDefaultValue() throws Exception {
     String tableName = "blankColsNullableSchema";
     String tablePath = buildTable(tableName, blankColContents);
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java
index 2d68a01..1340d90 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java
@@ -259,19 +259,19 @@ public class TestCsvWithoutHeaders extends BaseCsvTest {
 
   @Test
   public void testRaggedRows() throws IOException {
+    String fileName = "ragged.csv";
+    buildFile(fileName, raggedRows);
     try {
       enableV3(false);
-      doTestRaggedRows();
+      doTestRaggedRows(fileName);
       enableV3(true);
-      doTestRaggedRows();
+      doTestRaggedRows(fileName);
     } finally {
       resetV3();
     }
   }
 
-  private void doTestRaggedRows() throws IOException {
-    String fileName = "ragged.csv";
-    buildFile(fileName, raggedRows);
+  private void doTestRaggedRows(String fileName) throws IOException {
     String sql = "SELECT columns FROM `dfs.data`.`%s`";
     RowSet actual = client.queryBuilder().sql(sql, fileName).rowSet();
 
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractPropertied.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractPropertied.java
index 6b7da7a..78eeefa 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractPropertied.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractPropertied.java
@@ -63,7 +63,8 @@ public class AbstractPropertied implements Propertied {
     if (properties == null) {
       return defValue;
     }
-    return properties.get(key);
+    String value = properties.get(key);
+    return value == null ? defValue : value;
   }
 
   @Override
@@ -76,7 +77,13 @@ public class AbstractPropertied implements Propertied {
   }
 
   @Override
-  public boolean getBooleanProperty(String key) {
-    return Boolean.parseBoolean(property(key, Boolean.FALSE.toString()));
+  public boolean booleanProperty(String key) {
+    return booleanProperty(key, false);
+  }
+
+  @Override
+  public boolean booleanProperty(String key, boolean defaultValue) {
+    String value = property(key);
+    return value == null ? defaultValue : Boolean.parseBoolean(value);
   }
 }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/Propertied.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/Propertied.java
index 5e2fbf2..fe39338 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/Propertied.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/Propertied.java
@@ -44,5 +44,6 @@ public interface Propertied {
   String property(String key);
   String property(String key, String defValue);
   void setProperty(String key, String value);
-  boolean getBooleanProperty(String key);
+  boolean booleanProperty(String key);
+  boolean booleanProperty(String key, boolean defaultValue);
 }