You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2022/12/21 01:31:24 UTC

[drill] branch master updated: DRILL-8179: Convert LTSV Format Plugin to EVF2 (#2725)

This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e97b7d0af DRILL-8179: Convert LTSV Format Plugin to EVF2 (#2725)
8e97b7d0af is described below

commit 8e97b7d0af8a9695c0d8e089cce082a4d17e63f3
Author: Charles S. Givre <cg...@apache.org>
AuthorDate: Tue Dec 20 20:31:17 2022 -0500

    DRILL-8179: Convert LTSV Format Plugin to EVF2 (#2725)
---
 contrib/format-ltsv/README.md                      |  28 +++
 contrib/format-ltsv/pom.xml                        |   5 +
 .../drill/exec/store/ltsv/LTSVBatchReader.java     | 265 +++++++++++++++++++++
 .../drill/exec/store/ltsv/LTSVFormatPlugin.java    |  82 +++----
 .../exec/store/ltsv/LTSVFormatPluginConfig.java    |  71 +++++-
 .../drill/exec/store/ltsv/LTSVRecordReader.java    | 165 -------------
 .../drill/exec/store/ltsv/TestLTSVQueries.java     | 158 ++++++++++++
 .../exec/store/ltsv/TestLTSVRecordReader.java      |  85 -------
 8 files changed, 559 insertions(+), 300 deletions(-)

diff --git a/contrib/format-ltsv/README.md b/contrib/format-ltsv/README.md
index 05916b08cb..af71cf51ea 100644
--- a/contrib/format-ltsv/README.md
+++ b/contrib/format-ltsv/README.md
@@ -4,6 +4,18 @@ Drill LTSV storage plugin allows you to perform interactive analysis using SQL a
 
 For more information about LTSV, please see [LTSV (Labeled Tab-separated Values)](http://ltsv.org/).
 
+## Configuration
+There are several optional configuration parameters which you can use to modify how ltsv files are read.  In general, it is not necessary to change these from the defaults.  They are:
+
+* `parseMode`: Sets the error tolerance of the LTSV parser.  Possible values are `lenient` and `strict`.  Defaults to `lenient`.
+* `escapeCharacter`: Character to be used to escape control character.
+* `kvDelimiter`: Character to delimit key/value pairs.
+* `entryDelimiter`: Character to delimit entries.
+* `lineEnding`: Character to denote line endings.
+* `quoteChar`: Character to denote quoted strings.
+
+With the exception of `parseMode`, all fields accept a single character string.
+
 ## Example of Querying an LTSV File
 
 ### About the Data
@@ -36,3 +48,19 @@ Issue a SELECT statement to get the second row in the file.
 +-----------------------------+------------------+---------------+-----------------------+---------+-------+----------+-----------------+----------+----------+------------------+
 1 row selected (6.074 seconds)
 ```
+
+### Providing a Schema
+The LTSV reader does supports provided schema.  You can read about Drill's [provided schema functionality here](https://drill.apache.org/docs/plugin-configuration-basics/#specifying-the-schema-as-table-function-parameter)
+
+An example query would be:
+
+```sql
+SELECT * FROM table(cp.`simple.ltsv` (type=> 'ltsv', schema => 
+    'inline=(`referer` VARCHAR, 
+    `vhost`VARCHAR, `size` INT, 
+    `forwardedfor` VARCHAR, 
+    `reqtime` DOUBLE, 
+    `apptime` DOUBLE, 
+    `status` INT)'))
+```
+Only scalar types are supported in the LTSV reader.
diff --git a/contrib/format-ltsv/pom.xml b/contrib/format-ltsv/pom.xml
index 2a0d244d48..4a33ee5fdc 100644
--- a/contrib/format-ltsv/pom.xml
+++ b/contrib/format-ltsv/pom.xml
@@ -36,6 +36,11 @@
       <artifactId>drill-java-exec</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>com.github.lonely-lockley</groupId>
+      <artifactId>ltsv-parser</artifactId>
+      <version>1.1.0</version>
+    </dependency>
     <!-- Test dependencies -->
     <dependency>
       <groupId>org.apache.drill.exec</groupId>
diff --git a/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVBatchReader.java b/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVBatchReader.java
new file mode 100644
index 0000000000..f25e6728bb
--- /dev/null
+++ b/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVBatchReader.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.ltsv;
+
+import com.github.lolo.ltsv.LtsvParser;
+import com.github.lolo.ltsv.LtsvParser.Builder;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.Map;
+
+public class LTSVBatchReader implements ManagedReader {
+
+  private static final Logger logger = LoggerFactory.getLogger(LTSVBatchReader.class);
+  private final LTSVFormatPluginConfig config;
+  private final FileDescrip file;
+  private final CustomErrorContext errorContext;
+  private final LtsvParser ltsvParser;
+  private final RowSetLoader rowWriter;
+  private final FileSchemaNegotiator negotiator;
+  private InputStream fsStream;
+  private Iterator<Map<String, String>> rowIterator;
+
+
+  public LTSVBatchReader(LTSVFormatPluginConfig config, FileSchemaNegotiator negotiator) {
+    this.config = config;
+    this.negotiator = negotiator;
+    file = negotiator.file();
+    errorContext = negotiator.parentErrorContext();
+    ltsvParser = buildParser();
+
+    openFile();
+
+    // If there is a provided schema, import it
+    if (negotiator.providedSchema() != null) {
+      TupleMetadata schema = negotiator.providedSchema();
+      negotiator.tableSchema(schema, false);
+    }
+    ResultSetLoader loader = negotiator.build();
+    rowWriter = loader.writer();
+
+  }
+
+  private void openFile() {
+    try {
+      fsStream = file.fileSystem().openPossiblyCompressedStream(file.split().getPath());
+    } catch (IOException e) {
+      throw UserException
+          .dataReadError(e)
+          .message("Unable to open LTSV File %s", file.split().getPath() + " " + e.getMessage())
+          .addContext(errorContext)
+          .build(logger);
+    }
+    rowIterator = ltsvParser.parse(fsStream);
+  }
+
+  @Override
+  public boolean next() {
+    while (!rowWriter.isFull()) {
+      if (!processNextRow()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private LtsvParser buildParser() {
+    Builder builder = LtsvParser.builder();
+    builder.trimKeys();
+    builder.trimValues();
+    builder.skipNullValues();
+
+    if (config.getParseMode().contentEquals("strict")) {
+      builder.strict();
+    } else {
+      builder.lenient();
+    }
+
+    if (StringUtils.isNotEmpty(config.getEscapeCharacter())) {
+      builder.withEscapeChar(config.getEscapeCharacter().charAt(0));
+    }
+
+    if (StringUtils.isNotEmpty(config.getKvDelimiter())) {
+      builder.withKvDelimiter(config.getKvDelimiter().charAt(0));
+    }
+
+    if (StringUtils.isNotEmpty(config.getEntryDelimiter())) {
+      builder.withEntryDelimiter(config.getEntryDelimiter().charAt(0));
+    }
+
+    if (StringUtils.isNotEmpty(config.getLineEnding())) {
+      builder.withLineEnding(config.getLineEnding().charAt(0));
+    }
+
+    if (StringUtils.isNotEmpty(config.getQuoteChar())) {
+      builder.withQuoteChar(config.getQuoteChar().charAt(0));
+    }
+
+    return builder.build();
+  }
+
+  private boolean processNextRow() {
+    if (!rowIterator.hasNext()) {
+      return false;
+    }
+    // Start the row
+    String key;
+    String value;
+    int columnIndex;
+    ScalarWriter columnWriter;
+    Map<String, String> row = rowIterator.next();
+
+    // Skip empty lines
+    if (row.isEmpty()) {
+      return true;
+    }
+    rowWriter.start();
+    for (Map.Entry<String,String> field: row.entrySet()) {
+      key = field.getKey();
+      value = field.getValue();
+      columnIndex = getColumnIndex(key);
+      columnWriter = getColumnWriter(key);
+
+
+      if (negotiator.providedSchema() != null) {
+        // Check the type. LTSV will only read other data types if a schema is provided.
+        ColumnMetadata columnMetadata = rowWriter.tupleSchema().metadata(columnIndex);
+        MinorType dataType = columnMetadata.type();
+        LocalTime localTime;
+        LocalDate localDate;
+
+        switch (dataType) {
+          case BIT:
+            columnWriter.setBoolean(Boolean.parseBoolean(value));
+            break;
+          case INT:
+          case SMALLINT:
+          case TINYINT:
+            columnWriter.setInt(Integer.parseInt(value));
+            break;
+          case BIGINT:
+            columnWriter.setLong(Long.parseLong(value));
+            break;
+          case FLOAT8:
+          case FLOAT4:
+            columnWriter.setDouble(Double.parseDouble(value));
+            break;
+          case TIME:
+            columnMetadata = rowWriter.tupleSchema().metadata(key);
+            String dateFormat = columnMetadata.property("drill.format");
+
+            if (Strings.isNullOrEmpty(dateFormat)) {
+              localTime = LocalTime.parse(value);
+            } else {
+              localTime = LocalTime.parse(value, DateTimeFormatter.ofPattern(dateFormat));
+            }
+            columnWriter.setTime(localTime);
+            break;
+          case DATE:
+            dateFormat = columnMetadata.property("drill.format");
+
+            if (Strings.isNullOrEmpty(dateFormat)) {
+              localDate = LocalDate.parse(value);
+            } else {
+              localDate = LocalDate.parse(value, DateTimeFormatter.ofPattern(dateFormat));
+            }
+            columnWriter.setDate(localDate);
+            break;
+          case TIMESTAMP:
+            dateFormat = columnMetadata.property("drill.format");
+            Instant timestamp;
+            if (Strings.isNullOrEmpty(dateFormat)) {
+              timestamp = Instant.parse(value);
+            } else {
+              try {
+                SimpleDateFormat simpleDateFormat = new SimpleDateFormat(dateFormat);
+                Date parsedDate = simpleDateFormat.parse(value);
+                timestamp = Instant.ofEpochMilli(parsedDate.getTime());
+              } catch (ParseException e) {
+                throw UserException.parseError(e)
+                    .message("Cannot parse " + value + " as a timestamp. You can specify a format string in the provided schema to correct this.")
+                    .addContext(errorContext)
+                    .build(logger);
+              }
+            }
+            columnWriter.setTimestamp(timestamp);
+            break;
+          default:
+            columnWriter.setString(value);
+        }
+      } else {
+        columnWriter.setString(value);
+      }
+    }
+    // Finish the row
+    rowWriter.save();
+    return true;
+  }
+
+  @Override
+  public void close() {
+    logger.debug("Closing input stream for LTSV reader.");
+    AutoCloseables.closeSilently(fsStream);
+  }
+
+  private int getColumnIndex(String fieldName) {
+    // Find the TupleWriter object
+    int index = rowWriter.tupleSchema().index(fieldName);
+
+    // Unknown columns are always strings.
+    if (index == -1) {
+      ColumnMetadata colSchema = MetadataUtils.newScalar(fieldName, TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL);
+      index = rowWriter.addColumn(colSchema);
+    }
+    return index;
+  }
+
+  private ScalarWriter getColumnWriter(String fieldName){
+    // Find the TupleWriter object
+    int index = getColumnIndex(fieldName);
+    return rowWriter.scalar(index);
+  }
+}
diff --git a/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVFormatPlugin.java b/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVFormatPlugin.java
index c28b101366..8b077dbf34 100644
--- a/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVFormatPlugin.java
+++ b/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVFormatPlugin.java
@@ -17,69 +17,63 @@
  */
 package org.apache.drill.exec.store.ltsv;
 
-import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.planner.common.DrillStatsTable.TableStatistics;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.RecordWriter;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
-import org.apache.drill.exec.store.dfs.easy.EasyWriter;
-import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
-import java.util.List;
 
 public class LTSVFormatPlugin extends EasyFormatPlugin<LTSVFormatPluginConfig> {
 
-  private static final boolean IS_COMPRESSIBLE = true;
-
   private static final String DEFAULT_NAME = "ltsv";
 
-  public LTSVFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig) {
-    this(name, context, fsConf, storageConfig, new LTSVFormatPluginConfig(null));
-  }
+  private static class LTSVReaderFactory extends FileReaderFactory {
 
-  public LTSVFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config, LTSVFormatPluginConfig formatPluginConfig) {
-    super(name, context, fsConf, config, formatPluginConfig, true, false, false, IS_COMPRESSIBLE, formatPluginConfig.getExtensions(), DEFAULT_NAME);
-  }
+    private final LTSVFormatPluginConfig config;
 
-  @Override
-  public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork, List<SchemaPath> columns, String userName) {
-    return new LTSVRecordReader(context, fileWork.getPath(), dfs, columns);
-  }
+    public LTSVReaderFactory(LTSVFormatPluginConfig config) {
+      super();
+      this.config = config;
+    }
 
-  @Override
-  public String getWriterOperatorType() {
-    throw new UnsupportedOperationException("Drill doesn't currently support writing to LTSV files.");
+    @Override
+    public ManagedReader newReader(FileSchemaNegotiator negotiator) {
+      return new LTSVBatchReader(config, negotiator);
+    }
   }
 
-  @Override
-  public boolean supportsPushDown() {
-    return true;
+  public LTSVFormatPlugin(String name, DrillbitContext context,
+                         Configuration fsConf, StoragePluginConfig storageConfig,
+                         LTSVFormatPluginConfig formatConfig) {
+    super(name, easyConfig(fsConf, formatConfig), context, storageConfig, formatConfig);
   }
 
-  @Override
-  public RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) {
-    throw new UnsupportedOperationException("Drill doesn't currently support writing to LTSV files.");
-  }
-
-  @Override
-  public boolean supportsStatistics() {
-    return false;
-  }
-
-  @Override
-  public TableStatistics readStatistics(FileSystem fs, Path statsTablePath) {
-    throw new UnsupportedOperationException("unimplemented");
+  private static EasyFormatConfig easyConfig(Configuration fsConf, LTSVFormatPluginConfig pluginConfig) {
+    return EasyFormatConfig.builder()
+        .readable(true)
+        .writable(false)
+        .blockSplittable(true)
+        .compressible(true)
+        .supportsProjectPushdown(true)
+        .extensions(pluginConfig.getExtensions())
+        .fsConf(fsConf)
+        .defaultName(DEFAULT_NAME)
+        .scanVersion(ScanFrameworkVersion.EVF_V2)
+        .supportsLimitPushdown(true)
+        .build();
   }
 
   @Override
-  public void writeStatistics(TableStatistics statistics, FileSystem fs, Path statsTablePath) {
-    throw new UnsupportedOperationException("unimplemented");
+  protected void configureScan(FileScanLifecycleBuilder builder, EasySubScan scan) {
+    builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
+    builder.readerFactory(new LTSVReaderFactory(getConfig()));
   }
 }
diff --git a/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVFormatPluginConfig.java b/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVFormatPluginConfig.java
index 11b0554ca2..79c7f4b0cd 100644
--- a/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVFormatPluginConfig.java
+++ b/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVFormatPluginConfig.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.drill.common.PlanStringBuilder;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
@@ -34,12 +35,58 @@ public class LTSVFormatPluginConfig implements FormatPluginConfig {
   private static final List<String> DEFAULT_EXTS = ImmutableList.of("ltsv");
 
   private final List<String> extensions;
+  private final String parseMode;
+  private final String escapeCharacter;
+  private final String kvDelimiter;
+  private final String entryDelimiter;
+  private final String lineEnding;
+  private final String quoteChar;
 
   @JsonCreator
-  public LTSVFormatPluginConfig(
-      @JsonProperty("extensions") List<String> extensions) {
-    this.extensions = extensions == null ?
-        DEFAULT_EXTS : ImmutableList.copyOf(extensions);
+  public LTSVFormatPluginConfig(@JsonProperty("extensions") List<String> extensions,
+                                @JsonProperty("parseMode") String parseMode,
+                                @JsonProperty("escapeCharacter") String escapeCharacter,
+                                @JsonProperty("kvDelimiter") String kvDelimiter,
+                                @JsonProperty("entryDelimiter") String entryDelimiter,
+                                @JsonProperty("lineEnding") String lineEnding,
+                                @JsonProperty("quoteChar") String quoteChar) {
+    this.extensions = extensions == null ? DEFAULT_EXTS : ImmutableList.copyOf(extensions);
+    this.escapeCharacter = escapeCharacter;
+    this.kvDelimiter = kvDelimiter;
+    this.parseMode = StringUtils.isEmpty(parseMode) ? "lenient" : parseMode;
+    this.entryDelimiter = entryDelimiter;
+    this.lineEnding = lineEnding;
+    this.quoteChar = quoteChar;
+  }
+
+  @JsonProperty("parseMode")
+  public String getParseMode() {
+    return parseMode;
+  }
+
+  @JsonProperty("escapeCharacter")
+  public String getEscapeCharacter() {
+    return escapeCharacter;
+  }
+
+  @JsonProperty("kvDelimiter")
+  public String getKvDelimiter() {
+    return kvDelimiter;
+  }
+
+  @JsonProperty("entryDelimiter")
+  public String getEntryDelimiter() {
+    return entryDelimiter;
+  }
+
+  @JsonProperty("lineEnding")
+  public String getLineEnding() {
+    return lineEnding;
+  }
+
+  @JsonProperty("quoteChar")
+  public String getQuoteChar() {
+    return quoteChar;
   }
 
   @JsonInclude(JsonInclude.Include.NON_DEFAULT)
@@ -49,7 +96,7 @@ public class LTSVFormatPluginConfig implements FormatPluginConfig {
 
   @Override
   public int hashCode() {
-    return Objects.hash(extensions);
+    return Objects.hash(extensions,parseMode, escapeCharacter, kvDelimiter, entryDelimiter, lineEnding, quoteChar);
   }
 
   @Override
@@ -60,13 +107,25 @@ public class LTSVFormatPluginConfig implements FormatPluginConfig {
       return false;
     }
     LTSVFormatPluginConfig that = (LTSVFormatPluginConfig) obj;
-    return Objects.equals(extensions, that.extensions);
+    return Objects.equals(extensions, that.extensions) &&
+        Objects.equals(parseMode, that.parseMode) &&
+        Objects.equals(escapeCharacter, that.escapeCharacter) &&
+        Objects.equals(entryDelimiter, that.entryDelimiter) &&
+        Objects.equals(lineEnding, that.lineEnding) &&
+        Objects.equals(quoteChar, that.quoteChar) &&
+        Objects.equals(kvDelimiter, that.kvDelimiter);
   }
 
   @Override
   public String toString() {
     return new PlanStringBuilder(this)
         .field("extensions", extensions)
+        .field("parseMode", parseMode)
+        .field("escapeCharacter", escapeCharacter)
+        .field("kvDelimiter", kvDelimiter)
+        .field("lineEnding", lineEnding)
+        .field("quoteChar", quoteChar)
+        .field("entryDelimiter", entryDelimiter)
         .toString();
   }
 }
diff --git a/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVRecordReader.java b/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVRecordReader.java
deleted file mode 100644
index 619ceb1b53..0000000000
--- a/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVRecordReader.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.ltsv;
-
-import io.netty.buffer.DrillBuf;
-import org.apache.drill.common.AutoCloseables;
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.store.AbstractRecordReader;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
-import org.apache.drill.exec.vector.complex.writer.BaseWriter;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
-import java.text.ParseException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Set;
-
-public class LTSVRecordReader extends AbstractRecordReader {
-
-  private static final Logger logger = LoggerFactory.getLogger(LTSVRecordReader.class);
-
-  private static final int MAX_RECORDS_PER_BATCH = 8096;
-
-  private final String inputPath;
-
-  private final InputStream fsStream;
-
-  private final BufferedReader reader;
-
-  private DrillBuf buffer;
-
-  private VectorContainerWriter writer;
-
-  public LTSVRecordReader(FragmentContext fragmentContext, Path path, DrillFileSystem fileSystem,
-                          List<SchemaPath> columns) throws OutOfMemoryException {
-    this.inputPath = path.toUri().getPath();
-    try {
-      this.fsStream = fileSystem.openPossiblyCompressedStream(path);
-      this.reader = new BufferedReader(new InputStreamReader(fsStream, StandardCharsets.UTF_8));
-      this.buffer = fragmentContext.getManagedBuffer();
-      setColumns(columns);
-    } catch (IOException e) {
-      throw UserException.dataReadError(e)
-        .message(String.format("Failed to open input file: %s", inputPath))
-        .build(logger);
-    }
-  }
-
-  @Override
-  protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> projected) {
-    Set<SchemaPath> transformed = new LinkedHashSet<>();
-    if (!isStarQuery()) {
-      transformed.addAll(projected);
-    } else {
-      transformed.add(SchemaPath.STAR_COLUMN);
-    }
-    return transformed;
-  }
-
-  public void setup(final OperatorContext context, final OutputMutator output) {
-    this.writer = new VectorContainerWriter(output);
-  }
-
-  public int next() {
-    this.writer.allocate();
-    this.writer.reset();
-
-    int recordCount = 0;
-
-    try {
-      BaseWriter.MapWriter map = this.writer.rootAsMap();
-      String line;
-
-      while (recordCount < MAX_RECORDS_PER_BATCH && (line = this.reader.readLine()) != null) {
-        // Skip empty lines
-        if (line.trim().length() == 0) {
-          continue;
-        }
-
-        List<String[]> fields = new ArrayList<>();
-        for (String field : line.split("\t")) {
-          int index = field.indexOf(":");
-          if (index <= 0) {
-            throw new ParseException(String.format("Invalid LTSV format: %s\n%d:%s", inputPath, recordCount + 1, line), 0);
-          }
-
-          String fieldName = field.substring(0, index);
-          String fieldValue = field.substring(index + 1);
-          if (selectedColumn(fieldName)) {
-            fields.add(new String[]{fieldName, fieldValue});
-          }
-        }
-
-        if (fields.size() == 0) {
-          continue;
-        }
-
-        this.writer.setPosition(recordCount);
-        map.start();
-
-        for (String[] field : fields) {
-          byte[] bytes = field[1].getBytes(StandardCharsets.UTF_8);
-          this.buffer = this.buffer.reallocIfNeeded(bytes.length);
-          this.buffer.setBytes(0, bytes, 0, bytes.length);
-          map.varChar(field[0]).writeVarChar(0, bytes.length, buffer);
-        }
-
-        map.end();
-        recordCount++;
-      }
-
-      this.writer.setValueCount(recordCount);
-      return recordCount;
-
-    } catch (final Exception e) {
-      String msg = String.format("Failure while reading messages from %s. Record reader was at record: %d", inputPath, recordCount + 1);
-      throw UserException.dataReadError(e)
-        .message(msg)
-        .build(logger);
-    }
-  }
-
-  private boolean selectedColumn(String fieldName) {
-    for (SchemaPath col : getColumns()) {
-      if (col.equals(SchemaPath.STAR_COLUMN) || col.getRootSegment().getPath().equals(fieldName)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  public void close() throws Exception {
-    AutoCloseables.close(reader, fsStream);
-  }
-}
diff --git a/contrib/format-ltsv/src/test/java/org/apache/drill/exec/store/ltsv/TestLTSVQueries.java b/contrib/format-ltsv/src/test/java/org/apache/drill/exec/store/ltsv/TestLTSVQueries.java
new file mode 100644
index 0000000000..8565703087
--- /dev/null
+++ b/contrib/format-ltsv/src/test/java/org/apache/drill/exec/store/ltsv/TestLTSVQueries.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.ltsv;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestLTSVQueries extends ClusterTest {
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    startCluster(ClusterFixture.builder(dirTestWatcher));
+  }
+
+  @Test
+  public void testWildcard() throws Exception {
+    String sql = "SELECT * FROM cp.`simple.ltsv`";
+    RowSet results  = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addNullable("referer", MinorType.VARCHAR)
+        .addNullable("vhost", MinorType.VARCHAR)
+        .addNullable("size", MinorType.VARCHAR)
+        .addNullable("forwardedfor", MinorType.VARCHAR)
+        .addNullable("reqtime", MinorType.VARCHAR)
+        .addNullable("apptime", MinorType.VARCHAR)
+        .addNullable("host", MinorType.VARCHAR)
+        .addNullable("ua", MinorType.VARCHAR)
+        .addNullable("req", MinorType.VARCHAR)
+        .addNullable("status", MinorType.VARCHAR)
+        .build();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .addRow("-", "api.example.com", "4968", "-", "2.532", "2.532", "xxx.xxx.xxx.xxx", "Java/1.8.0_131", "GET /v1/xxx HTTP/1.1", "200")
+        .addRow("-", "api.example.com", "412", "-", "3.580", "3.580", "xxx.xxx.xxx.xxx", "Java/1.8.0_201", "GET /v1/yyy HTTP/1.1", "200")
+        .build();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testSelectColumns() throws Exception {
+    String sql = "SELECT ua, reqtime FROM cp.`simple.ltsv`";
+    RowSet results  = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addNullable("ua", MinorType.VARCHAR)
+        .addNullable("reqtime", MinorType.VARCHAR)
+        .build();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .addRow("Java/1.8.0_131", "2.532")
+        .addRow("Java/1.8.0_201", "3.580")
+        .build();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testQueryWithConditions() throws Exception {
+    String sql = "SELECT * FROM cp.`simple.ltsv` WHERE reqtime > 3.0";
+    RowSet results  = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addNullable("referer", MinorType.VARCHAR)
+        .addNullable("vhost", MinorType.VARCHAR)
+        .addNullable("size", MinorType.VARCHAR)
+        .addNullable("forwardedfor", MinorType.VARCHAR)
+        .addNullable("reqtime", MinorType.VARCHAR)
+        .addNullable("apptime", MinorType.VARCHAR)
+        .addNullable("host", MinorType.VARCHAR)
+        .addNullable("ua", MinorType.VARCHAR)
+        .addNullable("req", MinorType.VARCHAR)
+        .addNullable("status", MinorType.VARCHAR)
+        .build();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .addRow("-", "api.example.com", "412", "-", "3.580", "3.580", "xxx.xxx.xxx.xxx", "Java/1.8.0_201", "GET /v1/yyy HTTP/1.1", "200")
+        .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testSerDe() throws Exception {
+    String sql = "SELECT COUNT(*) as cnt FROM cp.`simple.ltsv` ";
+    String plan = queryBuilder().sql(sql).explainJson();
+    long cnt = queryBuilder().physical(plan).singletonLong();
+    assertEquals("Counts should match", 2L, cnt);
+  }
+
+  @Test
+  public void testSkipEmptyLines() throws Exception {
+    assertEquals(2, queryBuilder().sql("SELECT * FROM cp.`emptylines.ltsv`").run().recordCount());
+  }
+
+  @Test
+  public void testReadException() throws Exception {
+    try {
+      run("SELECT * FROM table(cp.`invalid.ltsv` (type => 'ltsv', parseMode => 'strict'))");
+      fail();
+    } catch (UserException e) {
+      assertTrue(e.getMessage().contains("DATA_READ ERROR: Empty key detected at line [0] position [49]"));
+    }
+  }
+
+  @Test
+  public void testProvidedSchema() throws Exception {
+    String sql = "SELECT * FROM table(cp.`simple.ltsv` (type=> 'ltsv', schema => 'inline=(`referer` VARCHAR, `vhost` VARCHAR, `size` INT, `forwardedfor` VARCHAR, " +
+        "`reqtime` DOUBLE, `apptime` DOUBLE, `status` INT)'))";
+    RowSet results  = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addNullable("referer", MinorType.VARCHAR)
+        .addNullable("vhost", MinorType.VARCHAR)
+        .addNullable("size", MinorType.INT)
+        .addNullable("forwardedfor", MinorType.VARCHAR)
+        .addNullable("reqtime", MinorType.FLOAT8)
+        .addNullable("apptime", MinorType.FLOAT8)
+        .addNullable("status", MinorType.INT)
+        .addNullable("host", MinorType.VARCHAR)
+        .addNullable("ua", MinorType.VARCHAR)
+        .addNullable("req", MinorType.VARCHAR)
+        .build();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .addRow("-", "api.example.com", 4968, "-", 2.532, 2.532, 200, "xxx.xxx.xxx.xxx", "Java/1.8.0_131", "GET /v1/xxx HTTP/1.1")
+        .addRow("-", "api.example.com", 412, "-", 3.58, 3.58, 200, "xxx.xxx.xxx.xxx", "Java/1.8.0_201", "GET /v1/yyy HTTP/1.1")
+        .build();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+}
diff --git a/contrib/format-ltsv/src/test/java/org/apache/drill/exec/store/ltsv/TestLTSVRecordReader.java b/contrib/format-ltsv/src/test/java/org/apache/drill/exec/store/ltsv/TestLTSVRecordReader.java
deleted file mode 100644
index 419bb6f80d..0000000000
--- a/contrib/format-ltsv/src/test/java/org/apache/drill/exec/store/ltsv/TestLTSVRecordReader.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.ltsv;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.proto.UserBitShared;
-import org.apache.drill.test.ClusterFixture;
-import org.apache.drill.test.ClusterTest;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class TestLTSVRecordReader extends ClusterTest {
-
-  @BeforeClass
-  public static void setup() throws Exception {
-    startCluster(ClusterFixture.builder(dirTestWatcher));
-  }
-
-  @Test
-  public void testWildcard() throws Exception {
-    testBuilder()
-      .sqlQuery("SELECT * FROM cp.`simple.ltsv`")
-      .unOrdered()
-      .baselineColumns("host", "forwardedfor", "req", "status", "size", "referer", "ua", "reqtime", "apptime", "vhost")
-      .baselineValues("xxx.xxx.xxx.xxx", "-", "GET /v1/xxx HTTP/1.1", "200", "4968", "-", "Java/1.8.0_131", "2.532", "2.532", "api.example.com")
-      .baselineValues("xxx.xxx.xxx.xxx", "-", "GET /v1/yyy HTTP/1.1", "200", "412", "-", "Java/1.8.0_201", "3.580", "3.580", "api.example.com")
-      .go();
-  }
-
-  @Test
-  public void testSelectColumns() throws Exception {
-    testBuilder()
-      .sqlQuery("SELECT ua, reqtime FROM cp.`simple.ltsv`")
-      .unOrdered()
-      .baselineColumns("ua", "reqtime")
-      .baselineValues("Java/1.8.0_131", "2.532")
-      .baselineValues("Java/1.8.0_201", "3.580")
-      .go();
-  }
-
-  @Test
-  public void testQueryWithConditions() throws Exception {
-    testBuilder()
-      .sqlQuery("SELECT * FROM cp.`simple.ltsv` WHERE reqtime > 3.0")
-      .unOrdered()
-      .baselineColumns("host", "forwardedfor", "req", "status", "size", "referer", "ua", "reqtime", "apptime", "vhost")
-      .baselineValues("xxx.xxx.xxx.xxx", "-", "GET /v1/yyy HTTP/1.1", "200", "412", "-", "Java/1.8.0_201", "3.580", "3.580", "api.example.com")
-      .go();
-  }
-
-  @Test
-  public void testSkipEmptyLines() throws Exception {
-    assertEquals(2, queryBuilder().sql("SELECT * FROM cp.`emptylines.ltsv`").run().recordCount());
-  }
-
-  @Test
-  public void testReadException() throws Exception {
-    try {
-      run("SELECT * FROM cp.`invalid.ltsv`");
-      fail();
-    } catch (UserException e) {
-      assertEquals(UserBitShared.DrillPBError.ErrorType.DATA_READ, e.getErrorType());
-      assertTrue(e.getMessage().contains("Failure while reading messages from /invalid.ltsv. Record reader was at record: 1"));
-    }
-  }
-}