You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2021/11/25 22:14:22 UTC

[GitHub] [drill] paul-rogers commented on a change in pull request #2282: DRILL-7978: Fixed Width Format Plugin

paul-rogers commented on a change in pull request #2282:
URL: https://github.com/apache/drill/pull/2282#discussion_r757130589



##########
File path: contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthBatchReader.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+
+public class FixedwidthBatchReader implements ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(FixedwidthBatchReader.class);
+  private FileSplit split;
+  private final int maxRecords;
+  private final FixedwidthFormatConfig config;
+  private CustomErrorContext errorContext;
+  private InputStream fsStream;
+  private ResultSetLoader loader;
+  private RowSetLoader writer;
+  private BufferedReader reader;
+  private int lineNum;
+
+  public FixedwidthBatchReader(FixedwidthFormatConfig config, int maxRecords) {
+    this.config = config;
+    this.maxRecords = maxRecords;
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    split = negotiator.split();
+    errorContext = negotiator.parentErrorContext();
+    lineNum = 0;
+    try {
+      fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      negotiator.tableSchema(buildSchema(), true);
+      loader = negotiator.build();
+    } catch (Exception e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Failed to open input file: {}", split.getPath().toString())
+        .addContext(errorContext)
+        .addContext(e.getMessage())
+        .build(logger);
+    }
+    reader = new BufferedReader(new InputStreamReader(fsStream, Charsets.UTF_8));
+    return true;
+  }
+
+  @Override
+  public boolean next() { // Use loader to read data from file to turn into Drill rows
+    String line;
+    RowSetLoader writer = loader.writer();
+
+    try {
+      line = reader.readLine();
+      while (!writer.isFull() && line != null) {
+        writer.start();
+        parseLine(line, writer);
+        writer.save();
+        line = reader.readLine();

Review comment:
       You can lose a line here: read the next line, but the writer is full, so we discard it. As @cgivre suggests:
   
   ```java
   while (!writer.isFull()) {
     String line = reader.readLine();
     if (line == null) {
       break;
     }
     writer.start();
     parseLine(line, writer);
     writer.save();
   }
   ```    

##########
File path: contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthFormatConfig.java
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+@JsonTypeName(FixedwidthFormatPlugin.DEFAULT_NAME)
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)
+public class FixedwidthFormatConfig implements FormatPluginConfig {
+  private static final Logger logger = LoggerFactory.getLogger(FixedwidthFormatConfig.class);
+  private final List<String> extensions;
+  private final List<FixedwidthFieldConfig> fields;
+  private final List<TypeProtos.MinorType> validDataTypes = Arrays.asList(TypeProtos.MinorType.INT, TypeProtos.MinorType.VARCHAR,
+    TypeProtos.MinorType.DATE, TypeProtos.MinorType.TIME, TypeProtos.MinorType.TIMESTAMP, TypeProtos.MinorType.FLOAT4,
+    TypeProtos.MinorType.FLOAT8, TypeProtos.MinorType.BIGINT, TypeProtos.MinorType.VARDECIMAL);
+
+  @JsonCreator
+  public FixedwidthFormatConfig(@JsonProperty("extensions") List<String> extensions,
+                                @JsonProperty("fields") List<FixedwidthFieldConfig> fields) {
+    this.extensions = extensions == null ? Collections.singletonList("fwf") : ImmutableList.copyOf(extensions);
+    Collections.sort(fields);
+    this.fields = fields;
+
+    validateFieldInput();

Review comment:
       This is just a bit dangerous. We validate on deserialize. This seems like a great idea: we do the deserialize in the UI when the user saves the config. But, we also do it on system start. If the deserialize fails there, Drill won't start and it takes a long time to figure out why.
   
   We don't have a good answer for config validation. I'd suggest adding a `validate()` method that we call:
   
   * When saving a config from the UI
   * When planning a query using a config
   
   But, allow an invalid config to be stored. Otherwise, unless everything is perfect, nothing can be saved. And, JSON, in a text editor, is a horrible way to write complex config such as this one.

##########
File path: contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthBatchReader.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+
+public class FixedwidthBatchReader implements ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(FixedwidthBatchReader.class);
+  private FileSplit split;
+  private final int maxRecords;
+  private final FixedwidthFormatConfig config;
+  private CustomErrorContext errorContext;
+  private InputStream fsStream;
+  private ResultSetLoader loader;
+  private RowSetLoader writer;
+  private BufferedReader reader;
+  private int lineNum;
+
+  public FixedwidthBatchReader(FixedwidthFormatConfig config, int maxRecords) {
+    this.config = config;
+    this.maxRecords = maxRecords;
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    split = negotiator.split();
+    errorContext = negotiator.parentErrorContext();
+    lineNum = 0;
+    try {
+      fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      negotiator.tableSchema(buildSchema(), true);
+      loader = negotiator.build();
+    } catch (Exception e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Failed to open input file: {}", split.getPath().toString())
+        .addContext(errorContext)
+        .addContext(e.getMessage())
+        .build(logger);
+    }
+    reader = new BufferedReader(new InputStreamReader(fsStream, Charsets.UTF_8));
+    return true;
+  }
+
+  @Override
+  public boolean next() { // Use loader to read data from file to turn into Drill rows
+    String line;
+    RowSetLoader writer = loader.writer();
+
+    try {
+      line = reader.readLine();
+      while (!writer.isFull() && line != null) {
+        writer.start();
+        parseLine(line, writer);
+        writer.save();
+        line = reader.readLine();
+        lineNum++;
+      }
+    } catch (IOException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Failed to read input file: {}", split.getPath().toString())
+        .addContext(errorContext)
+        .addContext(e.getMessage())
+        .addContext("Line Number", lineNum)
+        .build(logger);
+    }
+    return writer.limitReached(maxRecords);  // returns false when maxRecords limit has been reached
+  }
+
+  @Override
+  public void close() {
+    if (fsStream != null){
+      AutoCloseables.closeSilently(fsStream);

Review comment:
       The reason it an be out of the `if` statement is that the method itself handles null values. Otherwise, the code would be fine as it is if it called `close()` directly.

##########
File path: contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthBatchReader.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+
+public class FixedwidthBatchReader implements ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(FixedwidthBatchReader.class);
+  private FileSplit split;
+  private final int maxRecords;
+  private final FixedwidthFormatConfig config;
+  private CustomErrorContext errorContext;
+  private InputStream fsStream;
+  private ResultSetLoader loader;
+  private RowSetLoader writer;
+  private BufferedReader reader;
+  private int lineNum;
+
+  public FixedwidthBatchReader(FixedwidthFormatConfig config, int maxRecords) {
+    this.config = config;
+    this.maxRecords = maxRecords;
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    split = negotiator.split();
+    errorContext = negotiator.parentErrorContext();
+    lineNum = 0;
+    try {
+      fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());

Review comment:
       Fixed-width files seem perfect for HDFS splits.
   
   So there is a trick here for old-school HDFS systems (if anyone still runs them.) A large file will be split across HDFS nodes, often at 256MB boundaries. A reader that supports "splits" has to handle the fact that node 1 will read the first 256 MB, node 2 the next 256 MB, and so on. Standard HDFS stuff.
   
   This means the reader has to accept the offset and scan for the first record separator after that start point. If the fixed-width records are newline-terminated, that means finding the next newline.
   
   Also, the reader has to stop after it finds the record terminator after its assigned split. (On HDFS, that means part of the last record will be read from a remote HDFS node.)
   
   Further, it means that line numbers, as counted here, are relative: they are from the start of the current split (since node 2, reading the second split, doesn't know the number of records in the first split.)
   
   The CSV reader handles this busy-work, but it a way that is a bit hard to follow as an example, sadly.

##########
File path: contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthFieldConfig.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.types.TypeProtos;
+
+import java.util.Objects;
+
+
+@JsonTypeName("fixedwidthReaderFieldDescription")
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)
+public class FixedwidthFieldConfig implements Comparable<FixedwidthFieldConfig> {
+
+  private final String name;
+  private final int index;
+  private int width;
+  private TypeProtos.MinorType type;
+  private final String dateTimeFormat;
+
+  public FixedwidthFieldConfig(@JsonProperty("name") String name,
+                               @JsonProperty("index") int index,
+                               @JsonProperty("width") int width,
+                               @JsonProperty("type") TypeProtos.MinorType type) {
+    this(name, index, width, type, null);
+  }
+
+  @JsonCreator
+  public FixedwidthFieldConfig(@JsonProperty("name") String name,
+                               @JsonProperty("index") int index,
+                               @JsonProperty("width") int width,

Review comment:
       This makes the user do the math. Given indexes, we can compute the width. Given widths, we can compute the index. The one missing piece might the the width of the field separator (if any).

##########
File path: contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthFieldConfig.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.types.TypeProtos;
+
+import java.util.Objects;
+
+
+@JsonTypeName("fixedwidthReaderFieldDescription")
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)
+public class FixedwidthFieldConfig implements Comparable<FixedwidthFieldConfig> {

Review comment:
       Just a high-level design note: we now have multiple plugin configs that ask the user to use ad-hoc formats for specifying a per-file schema. This is not how storage plugins were meant to be used, but it is all we have.
   
   Over time, we need a better solution. The provided-schema is a start: it provides a single, well-defines syntax for schema. But, it is a bit limited in handling per-format specific, such as width. (There are extension properties for this kind of information, but that's a bit fiddly.)
   
   We need a way to provide the schema of a file simply as a file the user creates that sits along side the data file. Something like a `schema.json` file if all files in a directory have the same schema, or a `foo.json` if `foo.txt` has a distinct schema.
   
   Support for such a file should be added to the V2 mechanism. (Just use the new format in place of the provided schema.)

##########
File path: contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthFormatPlugin.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.hadoop.conf.Configuration;
+
+
+public class FixedwidthFormatPlugin extends EasyFormatPlugin<FixedwidthFormatConfig> {
+
+  protected static final String DEFAULT_NAME = "fixedwidth";
+
+  private static class FixedwidthReaderFactory extends FileReaderFactory {
+
+    private final FixedwidthFormatConfig config;
+    private final int maxRecords;
+
+    public FixedwidthReaderFactory(FixedwidthFormatConfig config, int maxRecords) {
+      this.config = config;
+      this.maxRecords = maxRecords;
+    }
+
+    @Override
+    public ManagedReader<? extends FileSchemaNegotiator> newReader() {
+      return new FixedwidthBatchReader(config, maxRecords);
+    }
+  }
+
+  public FixedwidthFormatPlugin(String name,
+                                DrillbitContext context,
+                                Configuration fsConf,
+                                StoragePluginConfig storageConfig,
+                                FixedwidthFormatConfig formatConfig) {
+    super(name, easyConfig(fsConf, formatConfig), context, storageConfig, formatConfig);
+  }
+
+  private static EasyFormatConfig easyConfig(Configuration fsConf, FixedwidthFormatConfig pluginConfig) {
+    return EasyFormatConfig.builder()
+      .readable(true)
+      .writable(false)
+      .blockSplittable(false) // Change to true
+      .compressible(true)
+      .supportsProjectPushdown(true)
+      .extensions(pluginConfig.getExtensions())
+      .fsConf(fsConf)
+      .defaultName(DEFAULT_NAME)
+      .useEnhancedScan(true)
+      .supportsLimitPushdown(true)
+      .build();
+  }
+
+  @Override
+  public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(
+    EasySubScan scan, OptionManager options) {
+    return new FixedwidthBatchReader(getConfig(), scan.getMaxRecords());
+  }
+
+  @Override
+  protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) {
+    FileScanBuilder builder = new FileScanBuilder();
+    builder.setReaderFactory(new FixedwidthReaderFactory(getConfig(), scan.getMaxRecords()));
+    initScanBuilder(builder, scan);
+    builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
+    return builder;
+  }
+}

Review comment:
       Nit: missing newline.

##########
File path: contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthFormatConfig.java
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+@JsonTypeName(FixedwidthFormatPlugin.DEFAULT_NAME)
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)
+public class FixedwidthFormatConfig implements FormatPluginConfig {
+  private static final Logger logger = LoggerFactory.getLogger(FixedwidthFormatConfig.class);
+  private final List<String> extensions;
+  private final List<FixedwidthFieldConfig> fields;
+  private final List<TypeProtos.MinorType> validDataTypes = Arrays.asList(TypeProtos.MinorType.INT, TypeProtos.MinorType.VARCHAR,
+    TypeProtos.MinorType.DATE, TypeProtos.MinorType.TIME, TypeProtos.MinorType.TIMESTAMP, TypeProtos.MinorType.FLOAT4,
+    TypeProtos.MinorType.FLOAT8, TypeProtos.MinorType.BIGINT, TypeProtos.MinorType.VARDECIMAL);
+
+  @JsonCreator
+  public FixedwidthFormatConfig(@JsonProperty("extensions") List<String> extensions,
+                                @JsonProperty("fields") List<FixedwidthFieldConfig> fields) {
+    this.extensions = extensions == null ? Collections.singletonList("fwf") : ImmutableList.copyOf(extensions);
+    Collections.sort(fields);
+    this.fields = fields;
+
+    validateFieldInput();
+  }
+
+  @JsonInclude(JsonInclude.Include.NON_DEFAULT)
+  public List<String> getExtensions() {
+    return extensions;
+  }
+
+  public List<FixedwidthFieldConfig> getFields() {
+    return fields;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(extensions, fields);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    FixedwidthFormatConfig other = (FixedwidthFormatConfig) obj;
+    return Objects.equals(extensions, other.extensions)
+            && Objects.equals(fields, other.fields);
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+            .field("extensions", extensions)
+            .field("fields", fields)
+            .toString();
+  }
+
+
+  @JsonIgnore
+  public boolean hasFields() {
+    return fields != null && ! fields.isEmpty();
+  }
+
+  @JsonIgnore
+  public List<String> getFieldNames() {
+    List<String> result = new ArrayList<>();
+    if (! hasFields()) {
+      return result;
+    }
+
+    for (FixedwidthFieldConfig field : fields) {
+      result.add(field.getName());
+    }
+    return result;
+  }
+
+  @JsonIgnore
+  public List<Integer> getFieldIndices() {
+    List<Integer> result = new ArrayList<>();
+    if (! hasFields()) {
+      return result;
+    }
+
+    for (FixedwidthFieldConfig field : fields) {
+      result.add(field.getIndex());
+    }
+    return result;
+  }
+
+  @JsonIgnore
+  public List<Integer> getFieldWidths() {
+    List<Integer> result = new ArrayList<>();
+    if (! hasFields()) {
+      return result;
+    }
+
+    for (FixedwidthFieldConfig field : fields) {
+      result.add(field.getWidth());
+    }
+    return result;
+  }
+
+  @JsonIgnore
+  public List<TypeProtos.MinorType> getFieldTypes() {
+    List<TypeProtos.MinorType> result = new ArrayList<>();
+    if (! hasFields()) {
+      return result;
+    }
+
+    for (FixedwidthFieldConfig field : fields) {
+      result.add(field.getType());
+    }
+    return result;
+  }
+
+  @JsonIgnore
+  public void setFieldTypes(int i) {
+    for (FixedwidthFieldConfig field : fields) {
+      if (field.getIndex() == i) {
+        field.setType();
+      }
+    }
+  }
+
+  @JsonIgnore

Review comment:
       No need for this tag: Jackson doesn't know what to do with this method anyway.

##########
File path: contrib/format-fixedwidth/src/test/java/org/apache/drill/exec/store/fixedwidth/TestFixedwidthRecordReader.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.nio.file.Paths;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+
+import static org.apache.drill.test.QueryTestUtil.generateCompressedFile;
+import static org.junit.Assert.assertEquals;
+
+@Category(RowSetTests.class)
+public class TestFixedwidthRecordReader extends ClusterTest {
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
+
+    FixedwidthFormatConfig formatConfig = new FixedwidthFormatConfig(Lists.newArrayList("fwf"),
+      Lists.newArrayList(
+        new FixedwidthFieldConfig("Number", 1, 5, TypeProtos.MinorType.VARDECIMAL),
+        new FixedwidthFieldConfig("Address",12, 3,TypeProtos.MinorType.INT,  ""),

Review comment:
       Nit: spacing after comma. Remove unwanted format as per previous line.

##########
File path: contrib/format-fixedwidth/src/test/resources/fwf/test_blankrow.fwf
##########
@@ -0,0 +1,26 @@
+12.34 test 567 02-10-2021 10:30:27 02-10-2021T10:30:27.00Z
+56.78 TEST 890 07-27-2021 12:40:15 07-27-2021T12:40:15.00Z
+11.11 abcd 111 11-11-1111 11:11:11 11-11-1111T11:11:11.11Z
+22.22 efgh 222 01-22-2222 22:22:22 01-22-2222T22:22:22.22Z
+33.33 ijkl 333 02-01-3333 01:33:33 02-01-3333T01:33:33.33Z
+44.44 mnop 444 03-02-4444 02:44:44 03-02-4444T02:44:44.44Z
+55.55 qrst 555 04-03-5555 03:55:55 04-03-5555T03:55:55.55Z
+66.66 uvwx 666 05-04-6666 04:01:01 05-04-6666T04:01:01.01Z
+77.77 yzzz 777 06-05-7777 05:11:11 06-05-7777T05:11:11.11Z
+88.88 aabb 888 07-06-8888 06:22:22 07-07-8888T06:22:22.22Z
+88.88 aabb 888 07-06-8888 06:22:22 07-07-8888T06:22:22.22Z
+

Review comment:
       Is the blank line intentional? Kind of hard to document data files. You could allow comments (lines that start with `#`, say, but there is always someone who has valid data that starts with `#`.) Another idea would be to add a "explanation.txt" file that explains the purpose of each of the data files.

##########
File path: contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthBatchReader.java
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+
+public class FixedwidthBatchReader implements ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(FixedwidthBatchReader.class);
+  private FileSplit split;
+  private final int maxRecords;
+  private final FixedwidthFormatConfig config;
+  private CustomErrorContext errorContext;
+  private InputStream fsStream;
+  private ResultSetLoader loader;
+  private RowSetLoader writer;
+  private BufferedReader reader;
+  private int lineNum;
+
+  public FixedwidthBatchReader(FixedwidthFormatConfig config, int maxRecords) {
+    this.config = config;
+    this.maxRecords = maxRecords;
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    split = negotiator.split();
+    errorContext = negotiator.parentErrorContext();
+    lineNum = 0;
+    try {
+      fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      negotiator.tableSchema(buildSchema(), true);
+      loader = negotiator.build();
+    } catch (Exception e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Failed to open input file: {}", split.getPath().toString())
+        .addContext(errorContext)
+        .addContext(e.getMessage())
+        .build(logger);
+    }
+    reader = new BufferedReader(new InputStreamReader(fsStream, Charsets.UTF_8));
+    return true;
+  }
+
+  @Override
+  public boolean next() { // Use loader to read data from file to turn into Drill rows
+    String line;
+    RowSetLoader writer = loader.writer();
+
+    try {
+      line = reader.readLine();
+      while (!writer.isFull() && line != null) {
+        writer.start();
+        parseLine(line, writer);
+        writer.save();
+        line = reader.readLine();
+        lineNum++;
+      }
+    } catch (IOException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Failed to read input file: {}", split.getPath().toString())
+        .addContext(errorContext)
+        .addContext(e.getMessage())
+        .addContext("Line Number", lineNum)
+        .build(logger);
+    }
+    return writer.limitReached(maxRecords);  // returns false when maxRecords limit has been reached
+  }
+
+  @Override
+  public void close() {
+    try {
+      fsStream.close();
+      loader.close();
+    } catch (Exception e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Failed to close input file: {}", split.getPath().toString())
+        .addContext(errorContext)
+        .addContext(e.getMessage())
+        .build(logger);
+    }
+  }
+
+  private TupleMetadata buildSchema() {
+    SchemaBuilder builder = new SchemaBuilder();
+    for (FixedwidthFieldConfig field : config.getFields()) {
+      builder.addNullable(field.getFieldName(), field.getDataType());
+    }
+    return builder.buildSchema();
+  }
+
+
+  private boolean parseLine(String line, RowSetLoader writer) throws IOException {
+    int i = 0;
+    TypeProtos.MinorType dataType;
+    String dateTimeFormat;
+    String value;
+    for (FixedwidthFieldConfig field : config.getFields()) {
+      value = line.substring(field.getStartIndex() - 1, field.getStartIndex() + field.getFieldWidth() - 1);
+      dataType = field.getDataType();
+      dateTimeFormat = field.getDateTimeFormat();
+      DateTimeFormatter formatter = DateTimeFormatter.ofPattern(dateTimeFormat, Locale.ENGLISH);
+      try {
+        switch (dataType) {

Review comment:
       To be clearer: what we want is to minimize the per-field work. Ideally, we'd set up an array of column converters so the loop looks like:
   
   for (int i = 0; i < config.getFields().size(); i++) {
     FixedwidthFieldConfig field = config.getFields().get(i);
     value = line.substring(field.getIndex() - 1, field.getIndex() + field.getWidth() - 1);
     writer.scalar(i).setString(value)
   }
   ```
   
   However, the above requires the user to specify the config. For every file. On every Drill cluster. Better, if the schema is not given, infer it from the first (few) row(s). Then, set up runtime field objects:
   
   ```java
   for (FieldReader fieldReader : fieldReaders) {
     fieldReader.load(line);
   }
   ```
   
   The field reader:
   
   * Remembers the start and offset
   * Pulls out the string, handling a truncated line
   * Calls a cached column writer
   
   @cgivre mentioned the idea of a column converter. There is a defined set for the common cases. The underlying mechanism sets them up for you. (V2 makes it simpler.) That way, a call to `setString()` directly invokes the thing that converts from string and writes the resulting value: no per-column switch needed.

##########
File path: contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthFormatConfig.java
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+@JsonTypeName(FixedwidthFormatPlugin.DEFAULT_NAME)
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)
+public class FixedwidthFormatConfig implements FormatPluginConfig {
+  private static final Logger logger = LoggerFactory.getLogger(FixedwidthFormatConfig.class);
+  private final List<String> extensions;
+  private final List<FixedwidthFieldConfig> fields;
+  private final List<TypeProtos.MinorType> validDataTypes = Arrays.asList(TypeProtos.MinorType.INT, TypeProtos.MinorType.VARCHAR,
+    TypeProtos.MinorType.DATE, TypeProtos.MinorType.TIME, TypeProtos.MinorType.TIMESTAMP, TypeProtos.MinorType.FLOAT4,
+    TypeProtos.MinorType.FLOAT8, TypeProtos.MinorType.BIGINT, TypeProtos.MinorType.VARDECIMAL);
+
+  @JsonCreator
+  public FixedwidthFormatConfig(@JsonProperty("extensions") List<String> extensions,
+                                @JsonProperty("fields") List<FixedwidthFieldConfig> fields) {
+    this.extensions = extensions == null ? Collections.singletonList("fwf") : ImmutableList.copyOf(extensions);
+    Collections.sort(fields);
+    this.fields = fields;
+
+    validateFieldInput();
+  }
+
+  @JsonInclude(JsonInclude.Include.NON_DEFAULT)
+  public List<String> getExtensions() {
+    return extensions;
+  }
+
+  public List<FixedwidthFieldConfig> getFields() {
+    return fields;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(extensions, fields);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    FixedwidthFormatConfig other = (FixedwidthFormatConfig) obj;
+    return Objects.equals(extensions, other.extensions)
+            && Objects.equals(fields, other.fields);
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+            .field("extensions", extensions)
+            .field("fields", fields)
+            .toString();
+  }
+
+
+  @JsonIgnore
+  public boolean hasFields() {
+    return fields != null && ! fields.isEmpty();
+  }
+
+  @JsonIgnore
+  public List<String> getFieldNames() {
+    List<String> result = new ArrayList<>();
+    if (! hasFields()) {
+      return result;
+    }
+
+    for (FixedwidthFieldConfig field : fields) {
+      result.add(field.getName());
+    }
+    return result;
+  }
+
+  @JsonIgnore
+  public List<Integer> getFieldIndices() {
+    List<Integer> result = new ArrayList<>();
+    if (! hasFields()) {
+      return result;
+    }
+
+    for (FixedwidthFieldConfig field : fields) {
+      result.add(field.getIndex());
+    }
+    return result;
+  }
+
+  @JsonIgnore
+  public List<Integer> getFieldWidths() {
+    List<Integer> result = new ArrayList<>();
+    if (! hasFields()) {
+      return result;
+    }
+
+    for (FixedwidthFieldConfig field : fields) {
+      result.add(field.getWidth());
+    }
+    return result;
+  }
+
+  @JsonIgnore
+  public List<TypeProtos.MinorType> getFieldTypes() {
+    List<TypeProtos.MinorType> result = new ArrayList<>();
+    if (! hasFields()) {
+      return result;
+    }
+
+    for (FixedwidthFieldConfig field : fields) {
+      result.add(field.getType());
+    }
+    return result;
+  }
+
+  @JsonIgnore
+  public void setFieldTypes(int i) {
+    for (FixedwidthFieldConfig field : fields) {
+      if (field.getIndex() == i) {
+        field.setType();
+      }
+    }
+  }
+
+  @JsonIgnore
+  public void validateFieldInput(){
+    Set<String> uniqueNames = new HashSet<>();
+    List<Integer> fieldIndices = this.getFieldIndices();
+    List<Integer> fieldWidths = this.getFieldWidths();
+    List<String> fieldNames = this.getFieldNames();
+    List<TypeProtos.MinorType> fieldTypes = this.getFieldTypes();
+    int prevIndexAndWidth = -1;
+
+    /* Validate Field Name - Ensure field is not empty, does not exceed maximum length,
+    is valid SQL syntax, and no two fields have the same name
+     */
+    for (String name : this.getFieldNames()){
+      if (name.length() == 0){
+        throw UserException
+          .validationError()
+          .message("Blank field name detected.")
+          .addContext("Plugin", FixedwidthFormatPlugin.DEFAULT_NAME)
+          .build(logger);
+      }
+      if (name.length() > 1024) {
+        throw UserException
+          .validationError()
+          .message("Exceeds maximum length of 1024 characters: " + name.substring(0, 1024))
+          .addContext("Plugin", FixedwidthFormatPlugin.DEFAULT_NAME)
+          .build(logger);
+      }
+      if (!Pattern.matches("[a-zA-Z]\\w*", name)) {
+        throw UserException
+          .validationError()
+          .message("Invalid input: " + name)

Review comment:
       This message will (I hope) be shown to the poor user trying to get a config right in the Drill web UI. Can we be more specific? Such as, "Name is not valid. Only letters allowed."
   
   But, then, why don't we allow numbers or underscores? "Field_3"? If we allowed that, you could use the [Java method](https://docs.oracle.com/javase/8/docs/api/javax/lang/model/SourceVersion.html#isName-java.lang.CharSequence-). Or, extend the pattern.
   
   Also, it is cleaner to use `name = name.strip()` to remove both leading and trailing whitespace so we don't leave whitespace in the name. Otherwise, all users have to know to do their own `strip()` call.

##########
File path: contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthBatchReader.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+
+public class FixedwidthBatchReader implements ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(FixedwidthBatchReader.class);
+  private FileSplit split;
+  private final int maxRecords;
+  private final FixedwidthFormatConfig config;
+  private CustomErrorContext errorContext;
+  private InputStream fsStream;
+  private ResultSetLoader loader;
+  private RowSetLoader writer;
+  private BufferedReader reader;
+  private int lineNum;
+
+  public FixedwidthBatchReader(FixedwidthFormatConfig config, int maxRecords) {
+    this.config = config;
+    this.maxRecords = maxRecords;
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    split = negotiator.split();
+    errorContext = negotiator.parentErrorContext();
+    lineNum = 0;
+    try {
+      fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      negotiator.tableSchema(buildSchema(), true);
+      loader = negotiator.build();
+    } catch (Exception e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Failed to open input file: {}", split.getPath().toString())
+        .addContext(errorContext)
+        .addContext(e.getMessage())

Review comment:
       I like that second line (if you mean the `message` call). Another solution is to change `message()` to `addContext()`. This way, we preserve the message from the actual error, and add context to explain the source of the error. Then, as Charles suggested, we don't need the `addContext(e.getMessage())` bit.

##########
File path: contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthBatchReader.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+
+public class FixedwidthBatchReader implements ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(FixedwidthBatchReader.class);
+  private FileSplit split;
+  private final int maxRecords;
+  private final FixedwidthFormatConfig config;
+  private CustomErrorContext errorContext;
+  private InputStream fsStream;
+  private ResultSetLoader loader;
+  private RowSetLoader writer;
+  private BufferedReader reader;
+  private int lineNum;
+
+  public FixedwidthBatchReader(FixedwidthFormatConfig config, int maxRecords) {
+    this.config = config;
+    this.maxRecords = maxRecords;
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    split = negotiator.split();
+    errorContext = negotiator.parentErrorContext();
+    lineNum = 0;
+    try {
+      fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      negotiator.tableSchema(buildSchema(), true);

Review comment:
       This is done internally in V2. V2 does it that way because it became clear that there is no reason for every reader to have to know to do this same pattern.

##########
File path: contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthBatchReader.java
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+
+public class FixedwidthBatchReader implements ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(FixedwidthBatchReader.class);
+  private FileSplit split;
+  private final int maxRecords;
+  private final FixedwidthFormatConfig config;
+  private CustomErrorContext errorContext;
+  private InputStream fsStream;
+  private ResultSetLoader loader;
+  private RowSetLoader writer;
+  private BufferedReader reader;
+  private int lineNum;
+
+  public FixedwidthBatchReader(FixedwidthFormatConfig config, int maxRecords) {
+    this.config = config;
+    this.maxRecords = maxRecords;
+  }
+
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    split = negotiator.split();
+    errorContext = negotiator.parentErrorContext();
+    lineNum = 0;
+    try {
+      fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      negotiator.tableSchema(buildSchema(), true);
+      loader = negotiator.build();
+    } catch (Exception e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Failed to open input file: {}", split.getPath().toString())
+        .addContext(errorContext)
+        .addContext(e.getMessage())
+        .build(logger);
+    }
+    reader = new BufferedReader(new InputStreamReader(fsStream, Charsets.UTF_8));
+    return true;
+  }
+
+  @Override
+  public boolean next() { // Use loader to read data from file to turn into Drill rows
+    String line;
+    RowSetLoader writer = loader.writer();
+
+    try {
+      line = reader.readLine();
+      while (!writer.isFull() && line != null) {
+        writer.start();
+        parseLine(line, writer);
+        writer.save();
+        line = reader.readLine();
+        lineNum++;
+      }
+    } catch (IOException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Failed to read input file: {}", split.getPath().toString())
+        .addContext(errorContext)
+        .addContext(e.getMessage())
+        .addContext("Line Number", lineNum)
+        .build(logger);
+    }
+    return writer.limitReached(maxRecords);  // returns false when maxRecords limit has been reached
+  }
+
+  @Override
+  public void close() {
+    if (fsStream != null){
+      AutoCloseables.closeSilently(fsStream);
+      fsStream = null;
+    }
+  }
+
+  private TupleMetadata buildSchema() {
+    SchemaBuilder builder = new SchemaBuilder();
+    for (FixedwidthFieldConfig field : config.getFields()) {
+      if (field.getType() == TypeProtos.MinorType.VARDECIMAL){
+        builder.addNullable(field.getName(), TypeProtos.MinorType.VARDECIMAL,38,4);
+        //revisit this
+      } else {
+        builder.addNullable(field.getName(), field.getType());
+      }
+    }
+    return builder.buildSchema();
+  }
+
+
+  private boolean parseLine(String line, RowSetLoader writer) throws IOException {
+    int i = 0;
+    TypeProtos.MinorType dataType;
+    String dateTimeFormat;
+    String value;
+    for (FixedwidthFieldConfig field : config.getFields()) {

Review comment:
       What you want to do is to resolve the schema at open time, not when parsing. At open time, you can:
   
   * Get the schema from the plugin, if provided, and use that as the schema.
   * Else, sample the first line to infer the schema.
   
   Since this is a fixed format, we don't want to rediscover the schema on every line: that costs too much. (Think of the case of reading 100M or 1B rows: optimizing the inner loop is critical.)

##########
File path: contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthBatchReader.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+
+public class FixedwidthBatchReader implements ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(FixedwidthBatchReader.class);
+  private FileSplit split;
+  private final int maxRecords;
+  private final FixedwidthFormatConfig config;
+  private CustomErrorContext errorContext;
+  private InputStream fsStream;
+  private ResultSetLoader loader;
+  private RowSetLoader writer;
+  private BufferedReader reader;
+  private int lineNum;
+
+  public FixedwidthBatchReader(FixedwidthFormatConfig config, int maxRecords) {
+    this.config = config;
+    this.maxRecords = maxRecords;
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    split = negotiator.split();
+    errorContext = negotiator.parentErrorContext();
+    lineNum = 0;
+    try {
+      fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      negotiator.tableSchema(buildSchema(), true);
+      loader = negotiator.build();
+    } catch (Exception e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Failed to open input file: {}", split.getPath().toString())
+        .addContext(errorContext)
+        .addContext(e.getMessage())
+        .build(logger);
+    }
+    reader = new BufferedReader(new InputStreamReader(fsStream, Charsets.UTF_8));
+    return true;
+  }
+
+  @Override
+  public boolean next() { // Use loader to read data from file to turn into Drill rows
+    String line;
+    RowSetLoader writer = loader.writer();
+
+    try {
+      line = reader.readLine();
+      while (!writer.isFull() && line != null) {
+        writer.start();
+        parseLine(line, writer);
+        writer.save();
+        line = reader.readLine();
+        lineNum++;
+      }
+    } catch (IOException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Failed to read input file: {}", split.getPath().toString())
+        .addContext(errorContext)
+        .addContext(e.getMessage())

Review comment:
       See explanation above. Ideally:
   
   ```java
         throw UserException
           .dataReadError(e)
           .addContext("Failed to read input file: {}", split.getPath().toString())
           .addContext(errorContext)
           .addContext("Line Number", lineNum)
           .build(logger);
   ```
   
   Thanks for adding the line number: nice touch.

##########
File path: contrib/format-fixedwidth/src/test/resources/fwf/test_blankrow.fwf
##########
@@ -0,0 +1,26 @@
+12.34 test 567 02-10-2021 10:30:27 02-10-2021T10:30:27.00Z
+56.78 TEST 890 07-27-2021 12:40:15 07-27-2021T12:40:15.00Z
+11.11 abcd 111 11-11-1111 11:11:11 11-11-1111T11:11:11.11Z
+22.22 efgh 222 01-22-2222 22:22:22 01-22-2222T22:22:22.22Z
+33.33 ijkl 333 02-01-3333 01:33:33 02-01-3333T01:33:33.33Z
+44.44 mnop 444 03-02-4444 02:44:44 03-02-4444T02:44:44.44Z
+55.55 qrst 555 04-03-5555 03:55:55 04-03-5555T03:55:55.55Z
+66.66 uvwx 666 05-04-6666 04:01:01 05-04-6666T04:01:01.01Z
+77.77 yzzz 777 06-05-7777 05:11:11 06-05-7777T05:11:11.11Z
+88.88 aabb 888 07-06-8888 06:22:22 07-07-8888T06:22:22.22Z
+88.88 aabb 888 07-06-8888 06:22:22 07-07-8888T06:22:22.22Z
+

Review comment:
       Also, I'd hoped to see all manner of invalid files:
   
   * Truncated line (not just an entirely blank line)
   * String where a number should be.
   * Badly formatted date or time.
   
   Such files will ensure that the error handling works and will raise that perpetual question: if we're a hundred million lines into a fixed-width file, and we hit an error, should we ignore that line and move to the next, or should we fail the query?

##########
File path: contrib/format-fixedwidth/src/test/java/org/apache/drill/exec/store/fixedwidth/TestFixedwidthRecordReader.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.nio.file.Paths;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+
+import static org.apache.drill.test.QueryTestUtil.generateCompressedFile;
+import static org.junit.Assert.assertEquals;
+
+@Category(RowSetTests.class)
+public class TestFixedwidthRecordReader extends ClusterTest {
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
+
+    FixedwidthFormatConfig formatConfig = new FixedwidthFormatConfig(Lists.newArrayList("fwf"),
+      Lists.newArrayList(
+        new FixedwidthFieldConfig("Number", 1, 5, TypeProtos.MinorType.VARDECIMAL),
+        new FixedwidthFieldConfig("Address",12, 3,TypeProtos.MinorType.INT,  ""),
+        new FixedwidthFieldConfig("Letter", 7,4, TypeProtos.MinorType.VARCHAR, ""),
+        new FixedwidthFieldConfig("Date",16, 10,TypeProtos.MinorType.DATE,  "MM-dd-yyyy"),
+        new FixedwidthFieldConfig( "Time", 27, 8,TypeProtos.MinorType.TIME,"HH:mm:ss" ),
+        new FixedwidthFieldConfig("DateTime", 36, 23,TypeProtos.MinorType.TIMESTAMP, "MM-dd-yyyy'T'HH:mm:ss.SSX" )
+      ));
+    cluster.defineFormat("dfs", "fwf", formatConfig);
+    cluster.defineFormat("cp", "fwf", formatConfig);
+
+    // Needed for compressed file unit test
+    dirTestWatcher.copyResourceToRoot(Paths.get("fwf/"));
+  }
+
+  @Test
+  public void testStarQuery() throws Exception {
+    String sql = "SELECT * FROM cp.`fwf/test.fwf`";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    RowSet expected = setupTestData();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testExplicitAllQuery() throws Exception {
+    String sql = "SELECT Number, Letter, Address, `Date`, `Time`, DateTime FROM cp.`fwf/test.fwf`";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    RowSet expected = setupTestData();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testExplicitQuery() throws Exception {
+    String sql = "SELECT Number, Letter, Address FROM cp.`fwf/test.fwf` WHERE Letter='yzzz'";
+    QueryBuilder q = client.queryBuilder().sql(sql);
+    RowSet results = q.rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .addNullable("Number", TypeProtos.MinorType.VARDECIMAL,38,4)
+      .addNullable("Letter", TypeProtos.MinorType.VARCHAR)
+      .addNullable("Address", TypeProtos.MinorType.INT)
+      .buildSchema();
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(77.77, "yzzz", 777)
+      .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  //Test Serialization/Deserialization
+  @Test
+  public void testSerDe() throws Exception {
+    String sql = "SELECT COUNT(*) FROM cp.`fwf/test.fwf`";
+    String plan = queryBuilder().sql(sql).explainJson();
+    long cnt = queryBuilder().physical(plan).singletonLong();
+    assertEquals(25L, cnt);
+  }
+
+  @Test
+  public void testStarQueryWithCompressedFile() throws Exception {
+    generateCompressedFile("fwf/test.fwf", "zip", "fwf/test.fwf.zip" );
+
+    String sql = "SELECT * FROM dfs.`fwf/test.fwf.zip`";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    RowSet expected = setupTestData();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  // Test Entering invalid schemata - incorrect limits
+    // Undefined field, what happens
+    // Parse invalid file, make sure correct error
+
+
+  @Test
+  public void testOutOfOrder() throws Exception{
+    String sql = "SELECT Address, DateTime, `Date`, Letter FROM cp.`fwf/test.fwf`";
+    QueryBuilder q = client.queryBuilder().sql(sql);
+    RowSet results = q.rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .addNullable("Address", TypeProtos.MinorType.INT)
+      .addNullable("DateTime", TypeProtos.MinorType.TIMESTAMP)
+      .addNullable("Date", TypeProtos.MinorType.DATE)
+      .addNullable("Letter", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(567, Instant.parse("2021-02-10T15:30:27.00Z"), LocalDate.parse("2021-02-10"), "test")

Review comment:
       Generally, for something like this, you can use a `LIMIT 2` and only check the first row or two. The code is not going to change column order after the 10th row!

##########
File path: contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthBatchReader.java
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+
+public class FixedwidthBatchReader implements ManagedReader<FileSchemaNegotiator> {

Review comment:
       @cgivre, it's been a while since I looked at that stuff. As I recall, the CSV reader was converted, but it is a bit obscure. The unit test also show what can be done, IIRC.
   
   Basically, V2 gets schema from multiple sources:
   
   * Schema info in the plan object (if available in the future)
   * Provided schema (the current interim solution to system-provided schema)
   * Schema implied by the selected columns (for example, a[1] means a has to be an array.)
   * Schema defined by the data source itself (as in Parquet, or in CSV where you can choose the `columns` array)
   * Schema discovered by the traditional "schema on read"
   
   The V2 `SchemaNegotiator` provides the needed methods. V2 will then come up with the final schema. V2 lets you read the entire row (columns a, b, c, say) even if the query wants only column a: V2 silently discards the unwanted columns.

##########
File path: contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthFormatPlugin.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.hadoop.conf.Configuration;
+
+
+public class FixedwidthFormatPlugin extends EasyFormatPlugin<FixedwidthFormatConfig> {
+
+  protected static final String DEFAULT_NAME = "fixedwidth";
+
+  private static class FixedwidthReaderFactory extends FileReaderFactory {
+
+    private final FixedwidthFormatConfig config;
+    private final int maxRecords;
+
+    public FixedwidthReaderFactory(FixedwidthFormatConfig config, int maxRecords) {
+      this.config = config;
+      this.maxRecords = maxRecords;
+    }
+
+    @Override
+    public ManagedReader<? extends FileSchemaNegotiator> newReader() {
+      return new FixedwidthBatchReader(config, maxRecords);
+    }
+  }
+
+  public FixedwidthFormatPlugin(String name,
+                                DrillbitContext context,
+                                Configuration fsConf,
+                                StoragePluginConfig storageConfig,
+                                FixedwidthFormatConfig formatConfig) {
+    super(name, easyConfig(fsConf, formatConfig), context, storageConfig, formatConfig);
+  }
+
+  private static EasyFormatConfig easyConfig(Configuration fsConf, FixedwidthFormatConfig pluginConfig) {
+    return EasyFormatConfig.builder()
+      .readable(true)
+      .writable(false)
+      .blockSplittable(false) // Change to true

Review comment:
       But only if the additional work described above is done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org