You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2021/11/24 22:38:23 UTC
[drill] branch master updated: DRILL-8054: Add SAS Format Plugin (#2386)
This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new b915ece DRILL-8054: Add SAS Format Plugin (#2386)
b915ece is described below
commit b915ece6e460102302edc75afbcf81be4f90b325
Author: Charles S. Givre <cg...@apache.org>
AuthorDate: Wed Nov 24 17:38:16 2021 -0500
DRILL-8054: Add SAS Format Plugin (#2386)
* Initial commit
* Initial commit
* Added SerDe
* Dates working
* Added Implicit Fields and date unit tests
* Updated docs and added unit tests
* Exclude SAS test files from license check
* Added Time Support
* Fixed checkstyle issues
* Addressed Review Comments
* Fixed unused imports
---
contrib/format-sas/README.md | 34 ++
contrib/format-sas/pom.xml | 86 ++++
.../drill/exec/store/sas/SasBatchReader.java | 456 +++++++++++++++++++++
.../drill/exec/store/sas/SasFormatConfig.java | 52 +++
.../drill/exec/store/sas/SasFormatPlugin.java | 91 ++++
.../main/resources/bootstrap-format-plugins.json | 37 ++
.../src/main/resources/drill-module.conf | 23 ++
.../apache/drill/exec/store/sas/TestSasReader.java | 197 +++++++++
.../test/resources/sas/all_rand_normal.sas7bdat | Bin 0 -> 13312 bytes
.../src/test/resources/sas/date_formats.sas7bdat | Bin 0 -> 131072 bytes
.../src/test/resources/sas/mixed_data_one.sas7bdat | Bin 0 -> 17408 bytes
.../src/test/resources/sas/mixed_data_two.sas7bdat | Bin 0 -> 25600 bytes
.../src/test/resources/sas/time_formats.sas7bdat | Bin 0 -> 131072 bytes
contrib/pom.xml | 1 +
distribution/pom.xml | 5 +
distribution/src/assemble/component.xml | 1 +
pom.xml | 3 +
17 files changed, 986 insertions(+)
diff --git a/contrib/format-sas/README.md b/contrib/format-sas/README.md
new file mode 100644
index 0000000..89434a0
--- /dev/null
+++ b/contrib/format-sas/README.md
@@ -0,0 +1,34 @@
+# Format Plugin for SAS Files
+This format plugin enables Drill to read SAS files. (sas7bdat)
+
+## Data Types
+The SAS format supports the `VARCHAR`, `LONG`, `DOUBLE`, `TIME` and `DATE` types.
+
+## Implicit Fields (Metadata)
+The SAS reader provides the following file metadata fields:
+* `_compression_method`
+* `_encoding`
+* `_file_label`
+* `_file_type`
+* `_os_name`
+* `_os_type`
+* `_sas_release`
+* `_session_encoding`
+* `_date_created`
+* `_date_modified`
+
+## Schema Provisioning
+Drill will infer the schema of your data.
+
+## Configuration Options
+This function has no configuration options other than the file extension.
+
+```json
+ "sas": {
+ "type": "sas",
+ "extensions": [
+ "sas7bdat"
+ ]
+}
+```
+This plugin is enabled by default.
diff --git a/contrib/format-sas/pom.xml b/contrib/format-sas/pom.xml
new file mode 100644
index 0000000..7ad0e4a
--- /dev/null
+++ b/contrib/format-sas/pom.xml
@@ -0,0 +1,86 @@
+<?xml version="1.0"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>drill-contrib-parent</artifactId>
+ <groupId>org.apache.drill.contrib</groupId>
+ <version>1.20.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>drill-format-sas</artifactId>
+ <name>Drill : Contrib : Format : SAS</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.drill.exec</groupId>
+ <artifactId>drill-java-exec</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.epam</groupId>
+ <artifactId>parso</artifactId>
+ <version>2.0.14</version>
+ </dependency>
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.apache.drill.exec</groupId>
+ <artifactId>drill-java-exec</artifactId>
+ <classifier>tests</classifier>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.drill</groupId>
+ <artifactId>drill-common</artifactId>
+ <classifier>tests</classifier>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy-java-sources</id>
+ <phase>process-sources</phase>
+ <goals>
+ <goal>copy-resources</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${basedir}/target/classes/org/apache/drill/exec/store/sas
+ </outputDirectory>
+ <resources>
+ <resource>
+ <directory>src/main/java/org/apache/drill/exec/store/sas</directory>
+ <filtering>true</filtering>
+ </resource>
+ </resources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasBatchReader.java b/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasBatchReader.java
new file mode 100644
index 0000000..187b174
--- /dev/null
+++ b/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasBatchReader.java
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.sas;
+
+import com.epam.parso.Column;
+import com.epam.parso.SasFileProperties;
+import com.epam.parso.SasFileReader;
+import com.epam.parso.impl.SasFileReaderImpl;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.parquet.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+public class SasBatchReader implements ManagedReader<FileScanFramework.FileSchemaNegotiator> {
+ private static final Logger logger = LoggerFactory.getLogger(SasBatchReader.class);
+ private final int maxRecords;
+ private final List<SasColumnWriter> writerList;
+ private FileSplit split;
+ private InputStream fsStream;
+ private SasFileReader sasFileReader;
+ private CustomErrorContext errorContext;
+ private RowSetLoader rowWriter;
+ private Object[] firstRow;
+
+
+ private String compressionMethod;
+ private String fileLabel;
+ private String fileType;
+ private String osName;
+ private String osType;
+ private String sasRelease;
+ private String sessionEncoding;
+ private String serverType;
+ private LocalDate dateCreated;
+ private LocalDate dateModified;
+
+ private enum IMPLICIT_STRING_COLUMN {
+ COMPRESSION_METHOD("_compression_method"),
+ ENCODING("_encoding"),
+ FILE_LABEL("_file_label"),
+ FILE_TYPE("_file_type"),
+ OS_NAME("_os_name"),
+ OS_TYPE("_os_type"),
+ SAS_RELEASE("_sas_release"),
+ SESSION_ENCODING("_session_encoding");
+
+ private final String fieldName;
+
+ IMPLICIT_STRING_COLUMN(String fieldName) {
+ this.fieldName = fieldName;
+ }
+
+ public String getFieldName() {
+ return fieldName;
+ }
+ }
+
+ private enum IMPLICIT_DATE_COLUMN {
+ CREATED_DATE("_date_created"),
+ MODIFIED_DATE("_date_modified");
+
+ private final String fieldName;
+
+ IMPLICIT_DATE_COLUMN(String fieldName) {
+ this.fieldName = fieldName;
+ }
+
+ public String getFieldName() {
+ return fieldName;
+ }
+ }
+
+ public static class SasReaderConfig {
+ protected final SasFormatPlugin plugin;
+ public SasReaderConfig(SasFormatPlugin plugin) {
+ this.plugin = plugin;
+ }
+ }
+
+ public SasBatchReader(int maxRecords) {
+ this.maxRecords = maxRecords;
+ writerList = new ArrayList<>();
+ }
+
+ @Override
+ public boolean open(FileScanFramework.FileSchemaNegotiator negotiator) {
+ split = negotiator.split();
+ errorContext = negotiator.parentErrorContext();
+ openFile(negotiator);
+
+ TupleMetadata schema;
+ if (negotiator.hasProvidedSchema()) {
+ schema = negotiator.providedSchema();
+ } else {
+ schema = buildSchema();
+ }
+ schema = addImplicitColumnsToSchema(schema);
+ negotiator.tableSchema(schema, true);
+
+ ResultSetLoader loader = negotiator.build();
+ rowWriter = loader.writer();
+ buildWriterList(schema);
+
+ return true;
+ }
+
+ private void openFile(FileScanFramework.FileSchemaNegotiator negotiator) {
+ try {
+ fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+ sasFileReader = new SasFileReaderImpl(fsStream);
+ firstRow = sasFileReader.readNext();
+ } catch (IOException e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Unable to open SAS File %s", split.getPath())
+ .addContext(e.getMessage())
+ .addContext(errorContext)
+ .build(logger);
+ }
+ }
+
+ private TupleMetadata buildSchema() {
+ SchemaBuilder builder = new SchemaBuilder();
+ List<Column> columns = sasFileReader.getColumns();
+ int counter = 0;
+ for (Column column : columns) {
+ String fieldName = column.getName();
+ try {
+ MinorType type = getType(firstRow[counter].getClass().getSimpleName());
+ if (type == MinorType.BIGINT && !column.getFormat().isEmpty()) {
+ logger.debug("Found possible time");
+ type = MinorType.TIME;
+ }
+ builder.addNullable(fieldName, type);
+ } catch (Exception e) {
+ throw UserException.dataReadError()
+ .message("Error with column type: " + firstRow[counter].getClass().getSimpleName())
+ .addContext(errorContext)
+ .build(logger);
+ }
+ counter++;
+ }
+
+ return builder.buildSchema();
+ }
+
+ private void buildWriterList(TupleMetadata schema) {
+ int colIndex = 0;
+ for (MaterializedField field : schema.toFieldList()) {
+ String fieldName = field.getName();
+ MinorType type = field.getType().getMinorType();
+ if (type == MinorType.FLOAT8) {
+ writerList.add(new DoubleSasColumnWriter(colIndex, fieldName, rowWriter));
+ } else if (type == MinorType.BIGINT) {
+ writerList.add(new BigIntSasColumnWriter(colIndex, fieldName, rowWriter));
+ } else if (type == MinorType.DATE) {
+ writerList.add(new DateSasColumnWriter(colIndex, fieldName, rowWriter));
+ } else if (type == MinorType.TIME) {
+ writerList.add(new TimeSasColumnWriter(colIndex, fieldName, rowWriter));
+ } else if (type == MinorType.VARCHAR){
+ writerList.add(new StringSasColumnWriter(colIndex, fieldName, rowWriter));
+ } else {
+ throw UserException.dataReadError()
+ .message(fieldName + " is an unparsable data type: " + type.name() + ". The SAS reader does not support this data type.")
+ .addContext(errorContext)
+ .build(logger);
+ }
+ colIndex++;
+ }
+ }
+
+ private MinorType getType(String simpleType) {
+ switch (simpleType) {
+ case "String":
+ return MinorType.VARCHAR;
+ case "Double":
+ return MinorType.FLOAT8;
+ case "Long":
+ return MinorType.BIGINT;
+ case "Date":
+ return MinorType.DATE;
+ default:
+ throw UserException.dataReadError()
+ .message("SAS Reader does not support data type: " + simpleType)
+ .addContext(errorContext)
+ .build(logger);
+ }
+ }
+
+ private TupleMetadata addImplicitColumnsToSchema(TupleMetadata schema) {
+ SchemaBuilder builder = new SchemaBuilder();
+ ColumnMetadata colSchema;
+ builder.addAll(schema);
+ SasFileProperties fileProperties = sasFileReader.getSasFileProperties();
+
+ // Add String Metadata columns
+ for (IMPLICIT_STRING_COLUMN name : IMPLICIT_STRING_COLUMN.values()) {
+ colSchema = MetadataUtils.newScalar(name.getFieldName(), MinorType.VARCHAR, DataMode.OPTIONAL);
+ colSchema.setBooleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD, true);
+ builder.add(colSchema);
+ }
+
+ // Add Date Column Names
+ for (IMPLICIT_DATE_COLUMN name : IMPLICIT_DATE_COLUMN.values()) {
+ colSchema = MetadataUtils.newScalar(name.getFieldName(), MinorType.DATE, DataMode.OPTIONAL);
+ colSchema.setBooleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD, true);
+ builder.add(colSchema);
+ }
+
+ populateMetadata(fileProperties);
+ return builder.build();
+ }
+
+ @Override
+ public boolean next() {
+ while (!rowWriter.isFull()) {
+ if (!processNextRow()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void close() {
+ AutoCloseables.closeSilently(fsStream);
+ }
+
+ private boolean processNextRow() {
+ if (rowWriter.limitReached(maxRecords)) {
+ return false;
+ }
+ Object[] row;
+ try {
+ // Process first row
+ if (firstRow != null) {
+ row = firstRow;
+ firstRow = null;
+ } else {
+ row = sasFileReader.readNext();
+ }
+
+ if (row == null) {
+ return false;
+ }
+
+ rowWriter.start();
+ for (int i = 0; i < row.length; i++) {
+ writerList.get(i).load(row);
+ }
+
+ // Write Metadata
+ writeMetadata(row.length);
+ rowWriter.save();
+ } catch (IOException e) {
+ throw UserException.dataReadError()
+ .message("Error reading SAS file: " + e.getMessage())
+ .addContext(errorContext)
+ .build(logger);
+ }
+ return true;
+ }
+
+ private void populateMetadata(SasFileProperties fileProperties) {
+ compressionMethod = fileProperties.getCompressionMethod();
+ fileLabel = fileProperties.getFileLabel();
+ fileType = fileProperties.getFileType();
+ osName = fileProperties.getOsName();
+ osType = fileProperties.getOsType();
+ sasRelease = fileProperties.getSasRelease();
+ sessionEncoding = fileProperties.getSessionEncoding();
+ serverType = fileProperties.getServerType();
+ dateCreated = convertDateToLocalDate(fileProperties.getDateCreated());
+ dateModified = convertDateToLocalDate(fileProperties.getDateCreated());
+ }
+
+ private void writeMetadata(int startIndex) {
+ ((StringSasColumnWriter)writerList.get(startIndex)).load(compressionMethod);
+ ((StringSasColumnWriter)writerList.get(startIndex+1)).load(fileLabel);
+ ((StringSasColumnWriter)writerList.get(startIndex+2)).load(fileType);
+ ((StringSasColumnWriter)writerList.get(startIndex+3)).load(osName);
+ ((StringSasColumnWriter)writerList.get(startIndex+4)).load(osType);
+ ((StringSasColumnWriter)writerList.get(startIndex+5)).load(sasRelease);
+ ((StringSasColumnWriter)writerList.get(startIndex+6)).load(sessionEncoding);
+ ((StringSasColumnWriter)writerList.get(startIndex+7)).load(serverType);
+
+ ((DateSasColumnWriter)writerList.get(startIndex+8)).load(dateCreated);
+ ((DateSasColumnWriter)writerList.get(startIndex+9)).load(dateModified);
+ }
+
+ private static LocalDate convertDateToLocalDate(Date date) {
+ return Instant.ofEpochMilli(date.toInstant().toEpochMilli())
+ .atZone(ZoneOffset.ofHours(0))
+ .toLocalDate();
+ }
+
+ public abstract static class SasColumnWriter {
+ final String columnName;
+ final ScalarWriter writer;
+ final int columnIndex;
+
+ public SasColumnWriter(int columnIndex, String columnName, ScalarWriter writer) {
+ this.columnIndex = columnIndex;
+ this.columnName = columnName;
+ this.writer = writer;
+ }
+
+ public abstract void load (Object[] row);
+ }
+
+ public static class StringSasColumnWriter extends SasColumnWriter {
+
+ StringSasColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter) {
+ super(columnIndex, columnName, rowWriter.scalar(columnName));
+ }
+
+ @Override
+ public void load(Object[] row) {
+ writer.setString((String) row[columnIndex]);
+ }
+
+ public void load (String value) {
+ if (!Strings.isNullOrEmpty(value)) {
+ writer.setString(value);
+ }
+ }
+ }
+
+ public static class BigIntSasColumnWriter extends SasColumnWriter {
+
+ BigIntSasColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter) {
+ super(columnIndex, columnName, rowWriter.scalar(columnName));
+ }
+
+ @Override
+ public void load(Object[] row) {
+ writer.setLong((Long) row[columnIndex]);
+ }
+ }
+
+ public static class DateSasColumnWriter extends SasColumnWriter {
+
+ DateSasColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter) {
+ super(columnIndex, columnName, rowWriter.scalar(columnName));
+ }
+
+ @Override
+ public void load(Object[] row) {
+ LocalDate value = convertDateToLocalDate((Date)row[columnIndex]);
+ writer.setDate(value);
+ }
+
+ public void load(LocalDate date) {
+ writer.setDate(date);
+ }
+ }
+
+ public static class TimeSasColumnWriter extends SasColumnWriter {
+
+ TimeSasColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter) {
+ super(columnIndex, columnName, rowWriter.scalar(columnName));
+ }
+
+ @Override
+ public void load(Object[] row) {
+ int seconds = ((Long)row[columnIndex]).intValue();
+ LocalTime value = LocalTime.parse(formatSeconds(seconds));
+ writer.setTime(value);
+ }
+
+ private String formatSeconds(int timeInSeconds)
+ {
+ int hours = timeInSeconds / 3600;
+ int secondsLeft = timeInSeconds - hours * 3600;
+ int minutes = secondsLeft / 60;
+ int seconds = secondsLeft - minutes * 60;
+
+ StringBuilder formattedTime = new StringBuilder();
+ if (hours < 10) {
+ formattedTime.append("0");
+ }
+ formattedTime.append(hours).append(":");
+
+ if (minutes < 10) {
+ formattedTime.append("0");
+ }
+ formattedTime.append(minutes).append(":");
+
+ if (seconds < 10) {
+ formattedTime.append("0");
+ }
+ formattedTime.append(seconds);
+ return formattedTime.toString();
+ }
+ }
+
+ public static class DoubleSasColumnWriter extends SasColumnWriter {
+
+ DoubleSasColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter) {
+ super(columnIndex, columnName, rowWriter.scalar(columnName));
+ }
+
+ @Override
+ public void load(Object[] row) {
+ // The SAS reader does something strange with zeros. For whatever reason, even if the
+ // field is a floating point number, the value is returned as a long. This causes class
+ // cast exceptions.
+ if (row[columnIndex].equals(0L)) {
+ writer.setDouble(0.0);
+ } else {
+ writer.setDouble((Double) row[columnIndex]);
+ }
+ }
+ }
+}
diff --git a/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasFormatConfig.java b/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasFormatConfig.java
new file mode 100644
index 0000000..9c0a055
--- /dev/null
+++ b/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasFormatConfig.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.sas;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+
+import java.util.Collections;
+import java.util.List;
+
+@EqualsAndHashCode
+@ToString
+@JsonTypeName(SasFormatPlugin.DEFAULT_NAME)
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)
+public class SasFormatConfig implements FormatPluginConfig {
+ private final List<String> extensions;
+
+ // Omitted properties take reasonable defaults
+ @JsonCreator
+ public SasFormatConfig(@JsonProperty("extensions") List<String> extensions) {
+ this.extensions = extensions == null ? Collections.singletonList("sas7bdat") : ImmutableList.copyOf(extensions);
+ }
+
+ @JsonInclude(JsonInclude.Include.NON_DEFAULT)
+ public List<String> getExtensions() {
+ return extensions;
+ }
+
+}
diff --git a/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasFormatPlugin.java b/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasFormatPlugin.java
new file mode 100644
index 0000000..442e48b
--- /dev/null
+++ b/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasFormatPlugin.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.sas;
+
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.hadoop.conf.Configuration;
+
+
+public class SasFormatPlugin extends EasyFormatPlugin<SasFormatConfig> {
+
+ protected static final String DEFAULT_NAME = "sas";
+
+ private static class SasReaderFactory extends FileReaderFactory {
+
+ private final int maxRecords;
+
+ public SasReaderFactory(int maxRecords) {
+ this.maxRecords = maxRecords;
+ }
+
+ @Override
+ public ManagedReader<? extends FileSchemaNegotiator> newReader() {
+ return new SasBatchReader(maxRecords);
+ }
+ }
+
+ public SasFormatPlugin(String name, DrillbitContext context,
+ Configuration fsConf, StoragePluginConfig storageConfig,
+ SasFormatConfig formatConfig) {
+ super(name, easyConfig(fsConf, formatConfig), context, storageConfig, formatConfig);
+ }
+
+ private static EasyFormatConfig easyConfig(Configuration fsConf, SasFormatConfig pluginConfig) {
+ return EasyFormatConfig.builder()
+ .readable(true)
+ .writable(false)
+ .blockSplittable(false)
+ .blockSplittable(false)
+ .compressible(true)
+ .supportsProjectPushdown(true)
+ .extensions(pluginConfig.getExtensions())
+ .fsConf(fsConf)
+ .defaultName(DEFAULT_NAME)
+ .useEnhancedScan(true)
+ .supportsLimitPushdown(true)
+ .build();
+ }
+
+ @Override
+ public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(
+ EasySubScan scan, OptionManager options) {
+ return new SasBatchReader(scan.getMaxRecords());
+ }
+
+ @Override
+ protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) {
+ FileScanBuilder builder = new FileScanBuilder();
+ builder.setReaderFactory(new SasReaderFactory(scan.getMaxRecords()));
+
+ initScanBuilder(builder, scan);
+ builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
+ return builder;
+ }
+}
diff --git a/contrib/format-sas/src/main/resources/bootstrap-format-plugins.json b/contrib/format-sas/src/main/resources/bootstrap-format-plugins.json
new file mode 100644
index 0000000..f9a744d
--- /dev/null
+++ b/contrib/format-sas/src/main/resources/bootstrap-format-plugins.json
@@ -0,0 +1,37 @@
+{
+ "storage": {
+ "dfs": {
+ "type": "file",
+ "formats": {
+ "sas": {
+ "type": "sas",
+ "extensions": [
+ "sas7bdat"
+ ]
+ }
+ }
+ },
+ "cp": {
+ "type": "file",
+ "formats": {
+ "sas": {
+ "type": "sas",
+ "extensions": [
+ "sas7bdat"
+ ]
+ }
+ }
+ },
+ "s3": {
+ "type": "file",
+ "formats": {
+ "sas": {
+ "type": "sas",
+ "extensions": [
+ "sas7bdat"
+ ]
+ }
+ }
+ }
+ }
+}
diff --git a/contrib/format-sas/src/main/resources/drill-module.conf b/contrib/format-sas/src/main/resources/drill-module.conf
new file mode 100644
index 0000000..6feb1f3
--- /dev/null
+++ b/contrib/format-sas/src/main/resources/drill-module.conf
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This file tells Drill to consider this module when class path scanning.
+# This file can also include any supplementary configuration information.
+# This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+
+drill.classpath.scanning.packages += "org.apache.drill.exec.store.sas"
diff --git a/contrib/format-sas/src/test/java/org/apache/drill/exec/store/sas/TestSasReader.java b/contrib/format-sas/src/test/java/org/apache/drill/exec/store/sas/TestSasReader.java
new file mode 100644
index 0000000..b45ba68
--- /dev/null
+++ b/contrib/format-sas/src/test/java/org/apache/drill/exec/store/sas/TestSasReader.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.sas;
+
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.nio.file.Paths;
+import java.time.LocalDate;
+import java.time.LocalTime;
+
+import static org.junit.Assert.assertEquals;
+import static org.apache.drill.test.QueryTestUtil.generateCompressedFile;
+
+
+@Category(RowSetTests.class)
+public class TestSasReader extends ClusterTest {
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
+
+ // Needed for compressed file unit test
+ dirTestWatcher.copyResourceToRoot(Paths.get("sas/"));
+ }
+
+ @Test
+ public void testStarQuery() throws Exception {
+ String sql = "SELECT * FROM cp.`sas/mixed_data_two.sas7bdat` WHERE x1 = 1";
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addNullable("x1", MinorType.BIGINT)
+ .addNullable("x2", MinorType.FLOAT8)
+ .addNullable("x3", MinorType.VARCHAR)
+ .addNullable("x4", MinorType.FLOAT8)
+ .addNullable("x5", MinorType.FLOAT8)
+ .addNullable("x6", MinorType.FLOAT8)
+ .addNullable("x7", MinorType.FLOAT8)
+ .addNullable("x8", MinorType.FLOAT8)
+ .addNullable("x9", MinorType.FLOAT8)
+ .addNullable("x10", MinorType.FLOAT8)
+ .addNullable("x11", MinorType.FLOAT8)
+ .addNullable("x12", MinorType.FLOAT8)
+ .addNullable("x13", MinorType.FLOAT8)
+ .addNullable("x14", MinorType.FLOAT8)
+ .addNullable("x15", MinorType.BIGINT)
+ .addNullable("x16", MinorType.BIGINT)
+ .addNullable("x17", MinorType.BIGINT)
+ .addNullable("x18", MinorType.BIGINT)
+ .addNullable("x19", MinorType.BIGINT)
+ .addNullable("x20", MinorType.BIGINT)
+ .addNullable("x21", MinorType.BIGINT)
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(1L, 1.1, "AAAAAAAA", 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 31626061L, 31625961L, 31627061L, 31616061L, 31636061L, 31526061L, 31726061L)
+ .addRow(1L, 1.1, "AAAAAAAA", 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 31626061L, 31625961L, 31627061L, 31616061L, 31636061L, 31526061L, 31726061L)
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ @Test
+ public void testMetadataColumns() throws Exception {
+ String sql = "SELECT _compression_method, _file_label, _file_type, " +
+ "_os_name, _os_type, _sas_release, _session_encoding, _server_type, " +
+ "_date_created, _date_modified FROM cp.`sas/date_formats.sas7bdat`";
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addNullable("_compression_method", MinorType.VARCHAR)
+ .addNullable("_file_label", MinorType.VARCHAR)
+ .addNullable("_file_type", MinorType.VARCHAR)
+ .addNullable("_os_name", MinorType.VARCHAR)
+ .addNullable("_os_type", MinorType.VARCHAR)
+ .addNullable("_sas_release", MinorType.VARCHAR)
+ .addNullable("_session_encoding", MinorType.VARCHAR)
+ .addNullable("_server_type", MinorType.VARCHAR)
+ .addNullable("_date_created", MinorType.DATE)
+ .addNullable("_date_modified", MinorType.DATE)
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(null, "DATA", null, null, "9.0401M4", null, "X64_7PRO", null,
+ LocalDate.parse("2017-03-14"), LocalDate.parse("2017-03-14"))
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ @Test
+ public void testCompressedFile() throws Exception {
+ generateCompressedFile("sas/mixed_data_two.sas7bdat", "zip", "sas/mixed_data_two.sas7bdat.zip" );
+
+ String sql = "SELECT x1, x2, x3 FROM dfs.`sas/mixed_data_two.sas7bdat.zip` WHERE x1 = 1";
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addNullable("x1", MinorType.BIGINT)
+ .addNullable("x2", MinorType.FLOAT8)
+ .addNullable("x3", MinorType.VARCHAR)
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(1L, 1.1, "AAAAAAAA")
+ .addRow(1L, 1.1, "AAAAAAAA")
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ @Test
+ public void testDates() throws Exception {
+ String sql = "SELECT b8601da, e8601da, `date` FROM cp.`sas/date_formats.sas7bdat`";
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addNullable("b8601da", MinorType.DATE)
+ .addNullable("e8601da", MinorType.DATE)
+ .addNullable("date", MinorType.DATE)
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(LocalDate.parse("2017-03-14"), LocalDate.parse("2017-03-14"), LocalDate.parse("2017-03-14"))
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ @Test
+ public void testTimes() throws Exception {
+ String sql = "SELECT * FROM cp.`sas/time_formats.sas7bdat`";
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addNullable("E8601LZ", MinorType.TIME)
+ .addNullable("E8601TM", MinorType.TIME)
+ .addNullable("HHMM", MinorType.TIME)
+ .addNullable("HOUR", MinorType.TIME)
+ .addNullable("MMSS", MinorType.TIME)
+ .addNullable("TIME", MinorType.TIME)
+ .addNullable("TIMEAMPM", MinorType.TIME)
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(LocalTime.parse("10:10:10"), LocalTime.parse("10:10:10"), LocalTime.parse("10:10:10"),
+ LocalTime.parse("10:10:10"), LocalTime.parse("10:10:10"), LocalTime.parse("10:10:10"), LocalTime.parse("10:10:10"))
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ @Test
+ public void testSerDe() throws Exception {
+ String sql = "SELECT COUNT(*) as cnt FROM cp.`sas/mixed_data_two.sas7bdat` ";
+ String plan = queryBuilder().sql(sql).explainJson();
+ long cnt = queryBuilder().physical(plan).singletonLong();
+ assertEquals("Counts should match", 50L, cnt);
+ }
+
+ @Test
+ public void testLimitPushdown() throws Exception {
+ String sql = "SELECT * FROM cp.`sas/mixed_data_one.sas7bdat` LIMIT 5";
+
+ queryBuilder()
+ .sql(sql)
+ .planMatcher()
+ .include("Limit", "maxRecords=5")
+ .match();
+ }
+}
diff --git a/contrib/format-sas/src/test/resources/sas/all_rand_normal.sas7bdat b/contrib/format-sas/src/test/resources/sas/all_rand_normal.sas7bdat
new file mode 100644
index 0000000..be589f8
Binary files /dev/null and b/contrib/format-sas/src/test/resources/sas/all_rand_normal.sas7bdat differ
diff --git a/contrib/format-sas/src/test/resources/sas/date_formats.sas7bdat b/contrib/format-sas/src/test/resources/sas/date_formats.sas7bdat
new file mode 100644
index 0000000..fdd184f
Binary files /dev/null and b/contrib/format-sas/src/test/resources/sas/date_formats.sas7bdat differ
diff --git a/contrib/format-sas/src/test/resources/sas/mixed_data_one.sas7bdat b/contrib/format-sas/src/test/resources/sas/mixed_data_one.sas7bdat
new file mode 100644
index 0000000..af1421b
Binary files /dev/null and b/contrib/format-sas/src/test/resources/sas/mixed_data_one.sas7bdat differ
diff --git a/contrib/format-sas/src/test/resources/sas/mixed_data_two.sas7bdat b/contrib/format-sas/src/test/resources/sas/mixed_data_two.sas7bdat
new file mode 100644
index 0000000..1a9560e
Binary files /dev/null and b/contrib/format-sas/src/test/resources/sas/mixed_data_two.sas7bdat differ
diff --git a/contrib/format-sas/src/test/resources/sas/time_formats.sas7bdat b/contrib/format-sas/src/test/resources/sas/time_formats.sas7bdat
new file mode 100644
index 0000000..66668a8
Binary files /dev/null and b/contrib/format-sas/src/test/resources/sas/time_formats.sas7bdat differ
diff --git a/contrib/pom.xml b/contrib/pom.xml
index db4e161..7dde85e 100644
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -49,6 +49,7 @@
<module>format-httpd</module>
<module>format-esri</module>
<module>format-hdf5</module>
+ <module>format-sas</module>
<module>format-spss</module>
<module>format-xml</module>
<module>format-image</module>
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 4bcf9cf..53f3786 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -399,6 +399,11 @@
</dependency>
<dependency>
<groupId>org.apache.drill.contrib</groupId>
+ <artifactId>drill-format-sas</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.drill.contrib</groupId>
<artifactId>drill-format-ltsv</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/distribution/src/assemble/component.xml b/distribution/src/assemble/component.xml
index ebe8b71..9734396 100644
--- a/distribution/src/assemble/component.xml
+++ b/distribution/src/assemble/component.xml
@@ -52,6 +52,7 @@
<include>org.apache.drill.contrib:drill-format-httpd:jar</include>
<include>org.apache.drill.contrib:drill-format-excel:jar</include>
<include>org.apache.drill.contrib:drill-format-spss:jar</include>
+ <include>org.apache.drill.contrib:drill-format-sas:jar</include>
<include>org.apache.drill.contrib:drill-jdbc-storage:jar</include>
<include>org.apache.drill.contrib:drill-kudu-storage:jar</include>
<include>org.apache.drill.contrib:drill-storage-splunk:jar</include>
diff --git a/pom.xml b/pom.xml
index d520102..a170b99 100644
--- a/pom.xml
+++ b/pom.xml
@@ -374,6 +374,7 @@
<configuration>
<excludes>
<!-- Types log1, log2, sqllog and sqllog2, ssdlog are used to test he logRegex format plugin. -->
+ <!-- Type sas7bdat files are SAS file and are used to test the SAS format plugin -->
<exclude>**/clientlib/y2038/*.c</exclude> <!-- All the files here should have MIT License -->
<exclude>**/clientlib/y2038/*.h</exclude> <!-- All the files here should have MIT License -->
<exclude>**/resources/parquet/**/*</exclude>
@@ -381,6 +382,7 @@
<exclude>**/*.woff2</exclude>
<exclude>**/*.ks</exclude>
<exclude>**/*.pcap</exclude>
+ <exclude>**/*.sas7bdat</exclude>
<exclude>**/*.log1</exclude>
<exclude>**/*.log2</exclude>
<exclude>**/*.sav</exclude>
@@ -695,6 +697,7 @@
<exclude>**/*.woff2</exclude>
<exclude>**/*.ks</exclude>
<exclude>**/*.pcap</exclude>
+ <exclude>**/*.sas7bdat</exclude>
<exclude>**/*.sav</exclude>
<exclude>**/*.log1</exclude>
<exclude>**/*.log2</exclude>