You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2021/11/24 22:38:23 UTC

[drill] branch master updated: DRILL-8054: Add SAS Format Plugin (#2386)

This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new b915ece  DRILL-8054: Add SAS Format Plugin (#2386)
b915ece is described below

commit b915ece6e460102302edc75afbcf81be4f90b325
Author: Charles S. Givre <cg...@apache.org>
AuthorDate: Wed Nov 24 17:38:16 2021 -0500

    DRILL-8054: Add SAS Format Plugin (#2386)
    
    * Initial commit
    
    * Initial commit
    
    * Added SerDe
    
    * Dates working
    
    * Added Implicit Fields and date unit tests
    
    * Updated docs and added unit tests
    
    * Exclude SAS test files from license check
    
    * Added Time Support
    
    * Fixed checkstyle issues
    
    * Addressed Review Comments
    
    * Fixed unused imports
---
 contrib/format-sas/README.md                       |  34 ++
 contrib/format-sas/pom.xml                         |  86 ++++
 .../drill/exec/store/sas/SasBatchReader.java       | 456 +++++++++++++++++++++
 .../drill/exec/store/sas/SasFormatConfig.java      |  52 +++
 .../drill/exec/store/sas/SasFormatPlugin.java      |  91 ++++
 .../main/resources/bootstrap-format-plugins.json   |  37 ++
 .../src/main/resources/drill-module.conf           |  23 ++
 .../apache/drill/exec/store/sas/TestSasReader.java | 197 +++++++++
 .../test/resources/sas/all_rand_normal.sas7bdat    | Bin 0 -> 13312 bytes
 .../src/test/resources/sas/date_formats.sas7bdat   | Bin 0 -> 131072 bytes
 .../src/test/resources/sas/mixed_data_one.sas7bdat | Bin 0 -> 17408 bytes
 .../src/test/resources/sas/mixed_data_two.sas7bdat | Bin 0 -> 25600 bytes
 .../src/test/resources/sas/time_formats.sas7bdat   | Bin 0 -> 131072 bytes
 contrib/pom.xml                                    |   1 +
 distribution/pom.xml                               |   5 +
 distribution/src/assemble/component.xml            |   1 +
 pom.xml                                            |   3 +
 17 files changed, 986 insertions(+)

diff --git a/contrib/format-sas/README.md b/contrib/format-sas/README.md
new file mode 100644
index 0000000..89434a0
--- /dev/null
+++ b/contrib/format-sas/README.md
@@ -0,0 +1,34 @@
+# Format Plugin for SAS Files
+This format plugin enables Drill to read SAS files. (sas7bdat)
+
+## Data Types
+The SAS format supports the `VARCHAR`, `LONG`, `DOUBLE`, `TIME` and `DATE` types.
+
+## Implicit Fields (Metadata)
+The SAS reader provides the following file metadata fields:
+* `_compression_method`
+* `_encoding`
+* `_file_label`
+* `_file_type`
+* `_os_name`
+* `_os_type`
+* `_sas_release`
+* `_session_encoding`
+* `_date_created`
+* `_date_modified`
+
+## Schema Provisioning
+Drill will infer the schema of your data.  
+
+## Configuration Options 
+This function has no configuration options other than the file extension.
+
+```json
+  "sas": {
+  "type": "sas",
+  "extensions": [
+    "sas7bdat"
+  ]
+}
+```
+This plugin is enabled by default. 
diff --git a/contrib/format-sas/pom.xml b/contrib/format-sas/pom.xml
new file mode 100644
index 0000000..7ad0e4a
--- /dev/null
+++ b/contrib/format-sas/pom.xml
@@ -0,0 +1,86 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>drill-contrib-parent</artifactId>
+    <groupId>org.apache.drill.contrib</groupId>
+    <version>1.20.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>drill-format-sas</artifactId>
+  <name>Drill : Contrib : Format : SAS</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>drill-java-exec</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.epam</groupId>
+      <artifactId>parso</artifactId>
+      <version>2.0.14</version>
+    </dependency>
+    <!-- Test dependencies -->
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>drill-java-exec</artifactId>
+      <classifier>tests</classifier>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.drill</groupId>
+      <artifactId>drill-common</artifactId>
+      <classifier>tests</classifier>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-resources-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-java-sources</id>
+            <phase>process-sources</phase>
+            <goals>
+              <goal>copy-resources</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${basedir}/target/classes/org/apache/drill/exec/store/sas
+              </outputDirectory>
+              <resources>
+                <resource>
+                  <directory>src/main/java/org/apache/drill/exec/store/sas</directory>
+                  <filtering>true</filtering>
+                </resource>
+              </resources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasBatchReader.java b/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasBatchReader.java
new file mode 100644
index 0000000..187b174
--- /dev/null
+++ b/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasBatchReader.java
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.sas;
+
+import com.epam.parso.Column;
+import com.epam.parso.SasFileProperties;
+import com.epam.parso.SasFileReader;
+import com.epam.parso.impl.SasFileReaderImpl;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.parquet.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+public class SasBatchReader implements ManagedReader<FileScanFramework.FileSchemaNegotiator> {
+  private static final Logger logger = LoggerFactory.getLogger(SasBatchReader.class);
+  private final int maxRecords;
+  private final List<SasColumnWriter> writerList;
+  private FileSplit split;
+  private InputStream fsStream;
+  private SasFileReader sasFileReader;
+  private CustomErrorContext errorContext;
+  private RowSetLoader rowWriter;
+  private Object[] firstRow;
+
+
+  private String compressionMethod;
+  private String fileLabel;
+  private String fileType;
+  private String osName;
+  private String osType;
+  private String sasRelease;
+  private String sessionEncoding;
+  private String serverType;
+  private LocalDate dateCreated;
+  private LocalDate dateModified;
+
+  private enum IMPLICIT_STRING_COLUMN {
+    COMPRESSION_METHOD("_compression_method"),
+    ENCODING("_encoding"),
+    FILE_LABEL("_file_label"),
+    FILE_TYPE("_file_type"),
+    OS_NAME("_os_name"),
+    OS_TYPE("_os_type"),
+    SAS_RELEASE("_sas_release"),
+    SESSION_ENCODING("_session_encoding");
+
+    private final String fieldName;
+
+    IMPLICIT_STRING_COLUMN(String fieldName) {
+      this.fieldName = fieldName;
+    }
+
+    public String getFieldName() {
+      return fieldName;
+    }
+  }
+
+  private enum IMPLICIT_DATE_COLUMN {
+    CREATED_DATE("_date_created"),
+    MODIFIED_DATE("_date_modified");
+
+    private final String fieldName;
+
+    IMPLICIT_DATE_COLUMN(String fieldName) {
+      this.fieldName = fieldName;
+    }
+
+    public String getFieldName() {
+      return fieldName;
+    }
+  }
+
+  public static class SasReaderConfig {
+    protected final SasFormatPlugin plugin;
+    public SasReaderConfig(SasFormatPlugin plugin) {
+      this.plugin = plugin;
+    }
+  }
+
+  public SasBatchReader(int maxRecords) {
+    this.maxRecords = maxRecords;
+    writerList = new ArrayList<>();
+  }
+
+  @Override
+  public boolean open(FileScanFramework.FileSchemaNegotiator negotiator) {
+    split = negotiator.split();
+    errorContext = negotiator.parentErrorContext();
+    openFile(negotiator);
+
+    TupleMetadata schema;
+    if (negotiator.hasProvidedSchema()) {
+      schema = negotiator.providedSchema();
+    } else {
+      schema = buildSchema();
+    }
+    schema = addImplicitColumnsToSchema(schema);
+    negotiator.tableSchema(schema, true);
+
+    ResultSetLoader loader = negotiator.build();
+    rowWriter = loader.writer();
+    buildWriterList(schema);
+
+    return true;
+  }
+
+  private void openFile(FileScanFramework.FileSchemaNegotiator negotiator) {
+    try {
+      fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      sasFileReader = new SasFileReaderImpl(fsStream);
+      firstRow = sasFileReader.readNext();
+    } catch (IOException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Unable to open SAS File %s", split.getPath())
+        .addContext(e.getMessage())
+        .addContext(errorContext)
+        .build(logger);
+    }
+  }
+
+  private TupleMetadata buildSchema() {
+    SchemaBuilder builder = new SchemaBuilder();
+    List<Column> columns = sasFileReader.getColumns();
+    int counter = 0;
+    for (Column column : columns) {
+      String fieldName = column.getName();
+      try {
+        MinorType type = getType(firstRow[counter].getClass().getSimpleName());
+        if (type == MinorType.BIGINT && !column.getFormat().isEmpty()) {
+          logger.debug("Found possible time");
+          type = MinorType.TIME;
+        }
+        builder.addNullable(fieldName, type);
+      } catch (Exception e) {
+        throw UserException.dataReadError()
+          .message("Error with column type: " + firstRow[counter].getClass().getSimpleName())
+          .addContext(errorContext)
+          .build(logger);
+      }
+      counter++;
+    }
+
+    return builder.buildSchema();
+  }
+
+  private void buildWriterList(TupleMetadata schema) {
+    int colIndex = 0;
+    for (MaterializedField field : schema.toFieldList()) {
+      String fieldName = field.getName();
+      MinorType type = field.getType().getMinorType();
+      if (type == MinorType.FLOAT8) {
+        writerList.add(new DoubleSasColumnWriter(colIndex, fieldName, rowWriter));
+      } else if (type == MinorType.BIGINT) {
+        writerList.add(new BigIntSasColumnWriter(colIndex, fieldName, rowWriter));
+      } else if (type == MinorType.DATE) {
+        writerList.add(new DateSasColumnWriter(colIndex, fieldName, rowWriter));
+      } else if (type == MinorType.TIME) {
+        writerList.add(new TimeSasColumnWriter(colIndex, fieldName, rowWriter));
+      } else if (type == MinorType.VARCHAR){
+        writerList.add(new StringSasColumnWriter(colIndex, fieldName, rowWriter));
+      } else {
+        throw UserException.dataReadError()
+          .message(fieldName + " is an unparsable data type: " + type.name() + ".  The SAS reader does not support this data type.")
+          .addContext(errorContext)
+          .build(logger);
+      }
+      colIndex++;
+    }
+  }
+
+  private MinorType getType(String simpleType) {
+    switch (simpleType) {
+      case "String":
+        return MinorType.VARCHAR;
+      case "Double":
+        return MinorType.FLOAT8;
+      case "Long":
+        return MinorType.BIGINT;
+      case "Date":
+        return MinorType.DATE;
+      default:
+        throw UserException.dataReadError()
+          .message("SAS Reader does not support data type: " + simpleType)
+          .addContext(errorContext)
+          .build(logger);
+    }
+  }
+
+  private TupleMetadata addImplicitColumnsToSchema(TupleMetadata schema) {
+    SchemaBuilder builder = new SchemaBuilder();
+    ColumnMetadata colSchema;
+    builder.addAll(schema);
+    SasFileProperties fileProperties = sasFileReader.getSasFileProperties();
+
+    // Add String Metadata columns
+    for (IMPLICIT_STRING_COLUMN name : IMPLICIT_STRING_COLUMN.values()) {
+      colSchema = MetadataUtils.newScalar(name.getFieldName(), MinorType.VARCHAR, DataMode.OPTIONAL);
+      colSchema.setBooleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD, true);
+      builder.add(colSchema);
+    }
+
+    // Add Date Column Names
+    for (IMPLICIT_DATE_COLUMN name : IMPLICIT_DATE_COLUMN.values()) {
+      colSchema = MetadataUtils.newScalar(name.getFieldName(), MinorType.DATE, DataMode.OPTIONAL);
+      colSchema.setBooleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD, true);
+      builder.add(colSchema);
+    }
+
+    populateMetadata(fileProperties);
+    return builder.build();
+  }
+
+  @Override
+  public boolean next() {
+    while (!rowWriter.isFull()) {
+      if (!processNextRow()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public void close() {
+    AutoCloseables.closeSilently(fsStream);
+  }
+
+  private boolean processNextRow() {
+    if (rowWriter.limitReached(maxRecords)) {
+      return false;
+    }
+    Object[] row;
+    try {
+      // Process first row
+      if (firstRow != null) {
+        row = firstRow;
+        firstRow = null;
+      } else {
+        row = sasFileReader.readNext();
+      }
+
+      if (row == null) {
+        return false;
+      }
+
+      rowWriter.start();
+      for (int i = 0; i < row.length; i++) {
+        writerList.get(i).load(row);
+      }
+
+      // Write Metadata
+      writeMetadata(row.length);
+      rowWriter.save();
+    } catch (IOException e) {
+      throw UserException.dataReadError()
+        .message("Error reading SAS file: " + e.getMessage())
+        .addContext(errorContext)
+        .build(logger);
+    }
+    return true;
+  }
+
+  private void populateMetadata(SasFileProperties fileProperties) {
+    compressionMethod = fileProperties.getCompressionMethod();
+    fileLabel = fileProperties.getFileLabel();
+    fileType = fileProperties.getFileType();
+    osName = fileProperties.getOsName();
+    osType = fileProperties.getOsType();
+    sasRelease = fileProperties.getSasRelease();
+    sessionEncoding = fileProperties.getSessionEncoding();
+    serverType = fileProperties.getServerType();
+    dateCreated = convertDateToLocalDate(fileProperties.getDateCreated());
+    dateModified = convertDateToLocalDate(fileProperties.getDateCreated());
+  }
+
+  private void writeMetadata(int startIndex) {
+    ((StringSasColumnWriter)writerList.get(startIndex)).load(compressionMethod);
+    ((StringSasColumnWriter)writerList.get(startIndex+1)).load(fileLabel);
+    ((StringSasColumnWriter)writerList.get(startIndex+2)).load(fileType);
+    ((StringSasColumnWriter)writerList.get(startIndex+3)).load(osName);
+    ((StringSasColumnWriter)writerList.get(startIndex+4)).load(osType);
+    ((StringSasColumnWriter)writerList.get(startIndex+5)).load(sasRelease);
+    ((StringSasColumnWriter)writerList.get(startIndex+6)).load(sessionEncoding);
+    ((StringSasColumnWriter)writerList.get(startIndex+7)).load(serverType);
+
+    ((DateSasColumnWriter)writerList.get(startIndex+8)).load(dateCreated);
+    ((DateSasColumnWriter)writerList.get(startIndex+9)).load(dateModified);
+  }
+
+  private static LocalDate convertDateToLocalDate(Date date) {
+    return Instant.ofEpochMilli(date.toInstant().toEpochMilli())
+      .atZone(ZoneOffset.ofHours(0))
+      .toLocalDate();
+  }
+
+  public abstract static class SasColumnWriter {
+    final String columnName;
+    final ScalarWriter writer;
+    final int columnIndex;
+
+    public SasColumnWriter(int columnIndex, String columnName, ScalarWriter writer) {
+      this.columnIndex = columnIndex;
+      this.columnName = columnName;
+      this.writer = writer;
+    }
+
+    public abstract void load (Object[] row);
+  }
+
+  public static class StringSasColumnWriter extends SasColumnWriter {
+
+    StringSasColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter) {
+      super(columnIndex, columnName, rowWriter.scalar(columnName));
+    }
+
+    @Override
+    public void load(Object[] row) {
+      writer.setString((String) row[columnIndex]);
+    }
+
+    public void load (String value) {
+      if (!Strings.isNullOrEmpty(value)) {
+        writer.setString(value);
+      }
+    }
+  }
+
+  public static class BigIntSasColumnWriter extends SasColumnWriter {
+
+    BigIntSasColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter) {
+      super(columnIndex, columnName, rowWriter.scalar(columnName));
+    }
+
+    @Override
+    public void load(Object[] row) {
+      writer.setLong((Long) row[columnIndex]);
+    }
+  }
+
+  public static class DateSasColumnWriter extends SasColumnWriter {
+
+    DateSasColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter) {
+      super(columnIndex, columnName, rowWriter.scalar(columnName));
+    }
+
+    @Override
+    public void load(Object[] row) {
+      LocalDate value = convertDateToLocalDate((Date)row[columnIndex]);
+      writer.setDate(value);
+    }
+
+    public void load(LocalDate date) {
+      writer.setDate(date);
+    }
+  }
+
+  public static class TimeSasColumnWriter extends SasColumnWriter {
+
+    TimeSasColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter) {
+      super(columnIndex, columnName, rowWriter.scalar(columnName));
+    }
+
+    @Override
+    public void load(Object[] row) {
+      int seconds = ((Long)row[columnIndex]).intValue();
+      LocalTime value = LocalTime.parse(formatSeconds(seconds));
+      writer.setTime(value);
+    }
+
+    private String formatSeconds(int timeInSeconds)
+    {
+      int hours = timeInSeconds / 3600;
+      int secondsLeft = timeInSeconds - hours * 3600;
+      int minutes = secondsLeft / 60;
+      int seconds = secondsLeft - minutes * 60;
+
+      StringBuilder formattedTime = new StringBuilder();
+      if (hours < 10) {
+        formattedTime.append("0");
+      }
+      formattedTime.append(hours).append(":");
+
+      if (minutes < 10) {
+        formattedTime.append("0");
+      }
+      formattedTime.append(minutes).append(":");
+
+      if (seconds < 10) {
+        formattedTime.append("0");
+      }
+      formattedTime.append(seconds);
+      return formattedTime.toString();
+    }
+  }
+
+  public static class DoubleSasColumnWriter extends SasColumnWriter {
+
+    DoubleSasColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter) {
+      super(columnIndex, columnName, rowWriter.scalar(columnName));
+    }
+
+    @Override
+    public void load(Object[] row) {
+      // The SAS reader does something strange with zeros. For whatever reason, even if the
+      // field is a floating point number, the value is returned as a long.  This causes class
+      // cast exceptions.
+      if (row[columnIndex].equals(0L)) {
+        writer.setDouble(0.0);
+      } else {
+        writer.setDouble((Double) row[columnIndex]);
+      }
+    }
+  }
+}
diff --git a/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasFormatConfig.java b/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasFormatConfig.java
new file mode 100644
index 0000000..9c0a055
--- /dev/null
+++ b/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasFormatConfig.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.sas;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+
+import java.util.Collections;
+import java.util.List;
+
+@EqualsAndHashCode
+@ToString
+@JsonTypeName(SasFormatPlugin.DEFAULT_NAME)
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)
+public class SasFormatConfig implements FormatPluginConfig {
+  private final List<String> extensions;
+
+  // Omitted properties take reasonable defaults
+  @JsonCreator
+  public SasFormatConfig(@JsonProperty("extensions") List<String> extensions) {
+    this.extensions = extensions == null ? Collections.singletonList("sas7bdat") : ImmutableList.copyOf(extensions);
+  }
+
+  @JsonInclude(JsonInclude.Include.NON_DEFAULT)
+  public List<String> getExtensions() {
+    return extensions;
+  }
+
+}
diff --git a/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasFormatPlugin.java b/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasFormatPlugin.java
new file mode 100644
index 0000000..442e48b
--- /dev/null
+++ b/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasFormatPlugin.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.sas;
+
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.hadoop.conf.Configuration;
+
+
+public class SasFormatPlugin extends EasyFormatPlugin<SasFormatConfig> {
+
+  protected static final String DEFAULT_NAME = "sas";
+
+  private static class SasReaderFactory extends FileReaderFactory {
+
+    private final int maxRecords;
+
+    public SasReaderFactory(int maxRecords) {
+      this.maxRecords = maxRecords;
+    }
+
+    @Override
+    public ManagedReader<? extends FileSchemaNegotiator> newReader() {
+      return new SasBatchReader(maxRecords);
+    }
+  }
+
+  public SasFormatPlugin(String name, DrillbitContext context,
+                          Configuration fsConf, StoragePluginConfig storageConfig,
+                          SasFormatConfig formatConfig) {
+    super(name, easyConfig(fsConf, formatConfig), context, storageConfig, formatConfig);
+  }
+
+  private static EasyFormatConfig easyConfig(Configuration fsConf, SasFormatConfig pluginConfig) {
+    return EasyFormatConfig.builder()
+      .readable(true)
+      .writable(false)
+      .blockSplittable(false)
+      .blockSplittable(false)
+      .compressible(true)
+      .supportsProjectPushdown(true)
+      .extensions(pluginConfig.getExtensions())
+      .fsConf(fsConf)
+      .defaultName(DEFAULT_NAME)
+      .useEnhancedScan(true)
+      .supportsLimitPushdown(true)
+      .build();
+  }
+
+  @Override
+  public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(
+    EasySubScan scan, OptionManager options)  {
+    return new SasBatchReader(scan.getMaxRecords());
+  }
+
+  @Override
+  protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) {
+    FileScanBuilder builder = new FileScanBuilder();
+    builder.setReaderFactory(new SasReaderFactory(scan.getMaxRecords()));
+
+    initScanBuilder(builder, scan);
+    builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
+    return builder;
+  }
+}
diff --git a/contrib/format-sas/src/main/resources/bootstrap-format-plugins.json b/contrib/format-sas/src/main/resources/bootstrap-format-plugins.json
new file mode 100644
index 0000000..f9a744d
--- /dev/null
+++ b/contrib/format-sas/src/main/resources/bootstrap-format-plugins.json
@@ -0,0 +1,37 @@
+{
+  "storage": {
+    "dfs": {
+      "type": "file",
+      "formats": {
+        "sas": {
+          "type": "sas",
+          "extensions": [
+            "sas7bdat"
+          ]
+        }
+      }
+    },
+    "cp": {
+      "type": "file",
+      "formats": {
+        "sas": {
+          "type": "sas",
+          "extensions": [
+            "sas7bdat"
+          ]
+        }
+      }
+    },
+    "s3": {
+      "type": "file",
+      "formats": {
+        "sas": {
+          "type": "sas",
+          "extensions": [
+            "sas7bdat"
+          ]
+        }
+      }
+    }
+  }
+}
diff --git a/contrib/format-sas/src/main/resources/drill-module.conf b/contrib/format-sas/src/main/resources/drill-module.conf
new file mode 100644
index 0000000..6feb1f3
--- /dev/null
+++ b/contrib/format-sas/src/main/resources/drill-module.conf
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#  This file tells Drill to consider this module when class path scanning.
+#  This file can also include any supplementary configuration information.
+#  This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+
+drill.classpath.scanning.packages += "org.apache.drill.exec.store.sas"
diff --git a/contrib/format-sas/src/test/java/org/apache/drill/exec/store/sas/TestSasReader.java b/contrib/format-sas/src/test/java/org/apache/drill/exec/store/sas/TestSasReader.java
new file mode 100644
index 0000000..b45ba68
--- /dev/null
+++ b/contrib/format-sas/src/test/java/org/apache/drill/exec/store/sas/TestSasReader.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.sas;
+
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.nio.file.Paths;
+import java.time.LocalDate;
+import java.time.LocalTime;
+
+import static org.junit.Assert.assertEquals;
+import static org.apache.drill.test.QueryTestUtil.generateCompressedFile;
+
+
+@Category(RowSetTests.class)
+public class TestSasReader extends ClusterTest {
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
+
+    // Needed for compressed file unit test
+    dirTestWatcher.copyResourceToRoot(Paths.get("sas/"));
+  }
+
+  @Test
+  public void testStarQuery() throws Exception {
+    String sql = "SELECT * FROM cp.`sas/mixed_data_two.sas7bdat` WHERE x1 = 1";
+    RowSet results  = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .addNullable("x1", MinorType.BIGINT)
+      .addNullable("x2", MinorType.FLOAT8)
+      .addNullable("x3", MinorType.VARCHAR)
+      .addNullable("x4", MinorType.FLOAT8)
+      .addNullable("x5", MinorType.FLOAT8)
+      .addNullable("x6", MinorType.FLOAT8)
+      .addNullable("x7", MinorType.FLOAT8)
+      .addNullable("x8", MinorType.FLOAT8)
+      .addNullable("x9", MinorType.FLOAT8)
+      .addNullable("x10", MinorType.FLOAT8)
+      .addNullable("x11", MinorType.FLOAT8)
+      .addNullable("x12", MinorType.FLOAT8)
+      .addNullable("x13", MinorType.FLOAT8)
+      .addNullable("x14", MinorType.FLOAT8)
+      .addNullable("x15", MinorType.BIGINT)
+      .addNullable("x16", MinorType.BIGINT)
+      .addNullable("x17", MinorType.BIGINT)
+      .addNullable("x18", MinorType.BIGINT)
+      .addNullable("x19", MinorType.BIGINT)
+      .addNullable("x20", MinorType.BIGINT)
+      .addNullable("x21", MinorType.BIGINT)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(1L, 1.1, "AAAAAAAA", 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 31626061L, 31625961L, 31627061L, 31616061L, 31636061L, 31526061L, 31726061L)
+      .addRow(1L, 1.1, "AAAAAAAA", 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 31626061L, 31625961L, 31627061L, 31616061L, 31636061L, 31526061L, 31726061L)
+      .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testMetadataColumns() throws Exception {
+    String sql = "SELECT _compression_method, _file_label, _file_type, " +
+      "_os_name, _os_type, _sas_release, _session_encoding, _server_type, " +
+      "_date_created, _date_modified FROM cp.`sas/date_formats.sas7bdat`";
+    RowSet results  = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .addNullable("_compression_method", MinorType.VARCHAR)
+      .addNullable("_file_label", MinorType.VARCHAR)
+      .addNullable("_file_type", MinorType.VARCHAR)
+      .addNullable("_os_name", MinorType.VARCHAR)
+      .addNullable("_os_type", MinorType.VARCHAR)
+      .addNullable("_sas_release", MinorType.VARCHAR)
+      .addNullable("_session_encoding", MinorType.VARCHAR)
+      .addNullable("_server_type", MinorType.VARCHAR)
+      .addNullable("_date_created", MinorType.DATE)
+      .addNullable("_date_modified", MinorType.DATE)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(null, "DATA", null, null, "9.0401M4", null, "X64_7PRO", null,
+        LocalDate.parse("2017-03-14"), LocalDate.parse("2017-03-14"))
+      .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testCompressedFile() throws Exception {
+    generateCompressedFile("sas/mixed_data_two.sas7bdat", "zip", "sas/mixed_data_two.sas7bdat.zip" );
+
+    String sql = "SELECT x1, x2, x3 FROM dfs.`sas/mixed_data_two.sas7bdat.zip` WHERE x1 = 1";
+    RowSet results  = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .addNullable("x1", MinorType.BIGINT)
+      .addNullable("x2", MinorType.FLOAT8)
+      .addNullable("x3", MinorType.VARCHAR)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(1L, 1.1, "AAAAAAAA")
+      .addRow(1L, 1.1, "AAAAAAAA")
+      .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testDates() throws Exception {
+    String sql = "SELECT b8601da, e8601da, `date` FROM cp.`sas/date_formats.sas7bdat`";
+    RowSet results  = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .addNullable("b8601da", MinorType.DATE)
+      .addNullable("e8601da", MinorType.DATE)
+      .addNullable("date", MinorType.DATE)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(LocalDate.parse("2017-03-14"), LocalDate.parse("2017-03-14"), LocalDate.parse("2017-03-14"))
+      .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testTimes() throws Exception {
+    String sql = "SELECT * FROM cp.`sas/time_formats.sas7bdat`";
+    RowSet results  = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .addNullable("E8601LZ", MinorType.TIME)
+      .addNullable("E8601TM", MinorType.TIME)
+      .addNullable("HHMM", MinorType.TIME)
+      .addNullable("HOUR", MinorType.TIME)
+      .addNullable("MMSS", MinorType.TIME)
+      .addNullable("TIME", MinorType.TIME)
+      .addNullable("TIMEAMPM", MinorType.TIME)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(LocalTime.parse("10:10:10"), LocalTime.parse("10:10:10"), LocalTime.parse("10:10:10"),
+        LocalTime.parse("10:10:10"), LocalTime.parse("10:10:10"), LocalTime.parse("10:10:10"), LocalTime.parse("10:10:10"))
+      .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testSerDe() throws Exception {
+    String sql = "SELECT COUNT(*) as cnt FROM cp.`sas/mixed_data_two.sas7bdat` ";
+    String plan = queryBuilder().sql(sql).explainJson();
+    long cnt = queryBuilder().physical(plan).singletonLong();
+    assertEquals("Counts should match", 50L, cnt);
+  }
+
+  @Test
+  public void testLimitPushdown() throws Exception {
+    String sql = "SELECT * FROM cp.`sas/mixed_data_one.sas7bdat` LIMIT 5";
+
+    queryBuilder()
+      .sql(sql)
+      .planMatcher()
+      .include("Limit", "maxRecords=5")
+      .match();
+  }
+}
diff --git a/contrib/format-sas/src/test/resources/sas/all_rand_normal.sas7bdat b/contrib/format-sas/src/test/resources/sas/all_rand_normal.sas7bdat
new file mode 100644
index 0000000..be589f8
Binary files /dev/null and b/contrib/format-sas/src/test/resources/sas/all_rand_normal.sas7bdat differ
diff --git a/contrib/format-sas/src/test/resources/sas/date_formats.sas7bdat b/contrib/format-sas/src/test/resources/sas/date_formats.sas7bdat
new file mode 100644
index 0000000..fdd184f
Binary files /dev/null and b/contrib/format-sas/src/test/resources/sas/date_formats.sas7bdat differ
diff --git a/contrib/format-sas/src/test/resources/sas/mixed_data_one.sas7bdat b/contrib/format-sas/src/test/resources/sas/mixed_data_one.sas7bdat
new file mode 100644
index 0000000..af1421b
Binary files /dev/null and b/contrib/format-sas/src/test/resources/sas/mixed_data_one.sas7bdat differ
diff --git a/contrib/format-sas/src/test/resources/sas/mixed_data_two.sas7bdat b/contrib/format-sas/src/test/resources/sas/mixed_data_two.sas7bdat
new file mode 100644
index 0000000..1a9560e
Binary files /dev/null and b/contrib/format-sas/src/test/resources/sas/mixed_data_two.sas7bdat differ
diff --git a/contrib/format-sas/src/test/resources/sas/time_formats.sas7bdat b/contrib/format-sas/src/test/resources/sas/time_formats.sas7bdat
new file mode 100644
index 0000000..66668a8
Binary files /dev/null and b/contrib/format-sas/src/test/resources/sas/time_formats.sas7bdat differ
diff --git a/contrib/pom.xml b/contrib/pom.xml
index db4e161..7dde85e 100644
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -49,6 +49,7 @@
     <module>format-httpd</module>
     <module>format-esri</module>
     <module>format-hdf5</module>
+    <module>format-sas</module>
     <module>format-spss</module>
     <module>format-xml</module>
     <module>format-image</module>
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 4bcf9cf..53f3786 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -399,6 +399,11 @@
         </dependency>
         <dependency>
           <groupId>org.apache.drill.contrib</groupId>
+          <artifactId>drill-format-sas</artifactId>
+          <version>${project.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.drill.contrib</groupId>
           <artifactId>drill-format-ltsv</artifactId>
           <version>${project.version}</version>
         </dependency>
diff --git a/distribution/src/assemble/component.xml b/distribution/src/assemble/component.xml
index ebe8b71..9734396 100644
--- a/distribution/src/assemble/component.xml
+++ b/distribution/src/assemble/component.xml
@@ -52,6 +52,7 @@
         <include>org.apache.drill.contrib:drill-format-httpd:jar</include>
         <include>org.apache.drill.contrib:drill-format-excel:jar</include>
         <include>org.apache.drill.contrib:drill-format-spss:jar</include>
+        <include>org.apache.drill.contrib:drill-format-sas:jar</include>
         <include>org.apache.drill.contrib:drill-jdbc-storage:jar</include>
         <include>org.apache.drill.contrib:drill-kudu-storage:jar</include>
         <include>org.apache.drill.contrib:drill-storage-splunk:jar</include>
diff --git a/pom.xml b/pom.xml
index d520102..a170b99 100644
--- a/pom.xml
+++ b/pom.xml
@@ -374,6 +374,7 @@
         <configuration>
           <excludes>
             <!-- Types log1, log2, sqllog and sqllog2, ssdlog are used to test he logRegex format plugin. -->
+            <!-- Type sas7bdat files are SAS file and are used to test the SAS format plugin -->
             <exclude>**/clientlib/y2038/*.c</exclude> <!-- All the files here should have MIT License -->
             <exclude>**/clientlib/y2038/*.h</exclude> <!-- All the files here should have MIT License -->
             <exclude>**/resources/parquet/**/*</exclude>
@@ -381,6 +382,7 @@
             <exclude>**/*.woff2</exclude>
             <exclude>**/*.ks</exclude>
             <exclude>**/*.pcap</exclude>
+            <exclude>**/*.sas7bdat</exclude>
             <exclude>**/*.log1</exclude>
             <exclude>**/*.log2</exclude>
             <exclude>**/*.sav</exclude>
@@ -695,6 +697,7 @@
               <exclude>**/*.woff2</exclude>
               <exclude>**/*.ks</exclude>
               <exclude>**/*.pcap</exclude>
+              <exclude>**/*.sas7bdat</exclude>
               <exclude>**/*.sav</exclude>
               <exclude>**/*.log1</exclude>
               <exclude>**/*.log2</exclude>