You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2021/02/09 00:19:52 UTC

[drill] branch master updated: Drill-7751: Add Storage Plugin for Splunk

This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 2cf8bfc  Drill-7751: Add Storage Plugin for Splunk
2cf8bfc is described below

commit 2cf8bfc120d2d478ac20525c24c79f280b7ca722
Author: Charles Givre <cg...@apache.org>
AuthorDate: Mon Feb 8 17:54:53 2021 -0500

    Drill-7751: Add Storage Plugin for Splunk
---
 .../apache/drill/categories/SplunkStorageTest.java |  25 ++
 contrib/pom.xml                                    |   1 +
 .../drill/exec/store/kafka/TestKafkaSuit.java      |   2 +-
 contrib/storage-splunk/README.md                   | 152 ++++++++
 contrib/storage-splunk/pom.xml                     |  99 ++++++
 .../drill/exec/store/splunk/SplunkBatchReader.java | 392 +++++++++++++++++++++
 .../drill/exec/store/splunk/SplunkConnection.java  |  99 ++++++
 .../drill/exec/store/splunk/SplunkGroupScan.java   | 320 +++++++++++++++++
 .../exec/store/splunk/SplunkPluginConfig.java      | 120 +++++++
 .../exec/store/splunk/SplunkPushDownListener.java  | 147 ++++++++
 .../exec/store/splunk/SplunkQueryBuilder.java      | 256 ++++++++++++++
 .../exec/store/splunk/SplunkScanBatchCreator.java  |  94 +++++
 .../drill/exec/store/splunk/SplunkScanSpec.java    |  58 +++
 .../exec/store/splunk/SplunkSchemaFactory.java     | 121 +++++++
 .../exec/store/splunk/SplunkStoragePlugin.java     |  83 +++++
 .../drill/exec/store/splunk/SplunkSubScan.java     | 141 ++++++++
 .../drill/exec/store/splunk/SplunkUtils.java       |  64 ++++
 .../main/resources/bootstrap-storage-plugins.json  |  14 +
 .../src/main/resources/drill-module.conf           |  27 ++
 .../drill/exec/store/splunk/SplunkBaseTest.java    |  41 +++
 .../exec/store/splunk/SplunkConnectionTest.java    |  86 +++++
 .../drill/exec/store/splunk/SplunkIndexesTest.java |  58 +++
 .../exec/store/splunk/SplunkLimitPushDownTest.java |  69 ++++
 .../drill/exec/store/splunk/SplunkPluginTest.java  | 291 +++++++++++++++
 .../exec/store/splunk/SplunkQueryBuilderTest.java  | 211 +++++++++++
 .../exec/store/splunk/SplunkTestSplunkUtils.java   |  47 +++
 .../drill/exec/store/splunk/SplunkTestSuite.java   |  97 +++++
 .../src/test/resources/logback-test.xml.bak        |  59 ++++
 .../src/test/resources/test_data.csv               |  11 +
 contrib/storage-splunk/src/test/splunk.md          |  22 ++
 distribution/pom.xml                               |   5 +
 distribution/src/assemble/component.xml            |   1 +
 pom.xml                                            |   5 +
 33 files changed, 3217 insertions(+), 1 deletion(-)

diff --git a/common/src/test/java/org/apache/drill/categories/SplunkStorageTest.java b/common/src/test/java/org/apache/drill/categories/SplunkStorageTest.java
new file mode 100644
index 0000000..3f552b6
--- /dev/null
+++ b/common/src/test/java/org/apache/drill/categories/SplunkStorageTest.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.categories;
+
+/**
+ * This is a category used to mark unit tests that test the Splunk storage plugin.
+ */
+public class SplunkStorageTest {
+}
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 47bdb51..3876e20 100644
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -59,6 +59,7 @@
     <module>storage-kafka</module>
     <module>storage-kudu</module>
     <module>storage-opentsdb</module>
+    <module>storage-splunk</module>
     <module>storage-http</module>
     <module>storage-druid</module>
     <module>storage-elasticsearch</module>
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
index 0d61fd7..7556779 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
@@ -61,7 +61,7 @@ public class TestKafkaSuit extends BaseTest {
 
   private static ZkClient zkClient;
 
-  private static volatile AtomicInteger initCount = new AtomicInteger(0);
+  private static AtomicInteger initCount = new AtomicInteger(0);
 
   static final int NUM_JSON_MSG = 10;
 
diff --git a/contrib/storage-splunk/README.md b/contrib/storage-splunk/README.md
new file mode 100644
index 0000000..a8fd0e7
--- /dev/null
+++ b/contrib/storage-splunk/README.md
@@ -0,0 +1,152 @@
+# Drill Connector for Splunk
+This plugin enables Drill to query Splunk. 
+
+## Configuration
+To connect Drill to Splunk, create a new storage plugin with the following configuration:
+
+To successfully connect, Splunk uses port `8089` for interfaces.  This port must be open for Drill to query Splunk. 
+
+```json
+{
+   "type":"splunk",
+   "username": "admin",
+   "password": "changeme",
+   "hostname": "localhost",
+   "port": 8089,
+   "earliestTime": "-14d",
+   "latestTime": "now",
+   "enabled": false
+}
+```
+
+## Understanding Splunk's Data Model
+Splunk's primary use case is analyzing event logs with a timestamp. As such, data is indexed by the timestamp, with the most recent data being indexed first.  By default, Splunk
+ will sort the data in reverse chronological order.  Large Splunk installations will put older data into buckets of hot, warm and cold storage with the "cold" storage on the
+  slowest and cheapest disks.
+  
+With this understood, it is **very** important to put time boundaries on your Splunk queries. The Drill plugin allows you to set default values in the configuration such that every
+ query you run will be bounded by these boundaries.  Alternatively, you can set the time boundaries at query time.  In either case, you will achieve the best performance when
+  you are asking Splunk for the smallest amount of data possible.
+  
+## Understanding Drill's Data Model with Splunk
+Drill treats Splunk indexes as tables. Splunk's access model does not restrict to the catalog, but does restrict access to the actual data. It is therefore possible that you can
+ see the names of indexes to which you do not have access.  You can view the list of available indexes with a `SHOW TABLES IN splunk` query.
+  
+```
+apache drill> SHOW TABLES IN splunk;
++--------------+----------------+
+| TABLE_SCHEMA |   TABLE_NAME   |
++--------------+----------------+
+| splunk       | summary        |
+| splunk       | splunklogger   |
+| splunk       | _thefishbucket |
+| splunk       | _audit         |
+| splunk       | _internal      |
+| splunk       | _introspection |
+| splunk       | main           |
+| splunk       | history        |
+| splunk       | _telemetry     |
++--------------+----------------+
+9 rows selected (0.304 seconds)
+```
+To query Splunk from Drill, use the following format: 
+```sql
+SELECT <fields>
+FROM splunk.<index>
+```
+  
+ ## Bounding Your Queries
+  When you learn to query Splunk via their interface, the first thing you learn is to bound your queries so that they are looking at the shortest time span possible. When using
+   Drill to query Splunk, it is advisable to do the same thing, and Drill offers two ways to accomplish this: via the configuration and at query time.
+   
+  ### Bounding your Queries at Query Time
+  The easiest way to bound your query is to do so at querytime via special filters in the `WHERE` clause. There are two special fields, `earliestTime` and `latestTime` which can
+   be set to bound the query. If they are not set, the query will be bounded to the defaults set in the configuration.
+   
+   You can use any of the time formats specified in the Splunk documentation here:   
+  https://docs.splunk.com/Documentation/Splunk/8.0.3/SearchReference/SearchTimeModifiers
+  
+  So if you wanted to see your data for the last 15 minutes, you could execute the following query:
+
+```sql
+SELECT <fields>
+FROM splunk.<index>
+WHERE earliestTime='-15m' AND latestTime='now'
+```
+The variables set in a query override the defaults from the configuration. 
+  
+ ## Data Types
+  Splunk does not have sophisticated data types and unfortunately does not provide metadata from its query results.  With the exception of the fields below, Drill will interpret
+   all fields as `VARCHAR` and hence you will have to convert them to the appropriate data type at query time.
+  
+  #### Timestamp Fields
+  * `_indextime`
+  * `_time` 
+  
+  #### Numeric Fields
+  * `date_hour` 
+  * `date_mday`
+  * `date_minute`
+  * `date_second` 
+  * `date_year`
+  * `linecount`
+  
+ ### Nested Data
+ Splunk has two different types of nested data which roughly map to Drill's `LIST` and `MAP` data types. Unfortunately, there is no easy way to identify whether a field is a
+  nested field at querytime as Splunk does not provide any metadata and therefore all fields are treated as `VARCHAR`.
+  
+  However, Drill does have built in functions to easily convert Splunk multifields into Drill `LIST` and `MAP` data types. For a LIST, simply use the 
+  `SPLIT(<field>, ' ')` function to split the field into a `LIST`.
+  
+  `MAP` data types are rendered as JSON in Splunk. Fortunately JSON can easily be parsed into a Drill Map by using the `convert_fromJSON()` function.  The query below
+   demonstrates how to convert a JSON column into a Drill `MAP`.
+  
+```sql
+SELECT convert_fromJSON(_raw) 
+FROM splunk.spl
+WHERE spl = '| makeresults
+| eval _raw="{\"pc\":{\"label\":\"PC\",\"count\":24,\"peak24\":12},\"ps3\":
+{\"label\":\"PS3\",\"count\":51,\"peak24\":10},\"xbox\":
+{\"label\":\"XBOX360\",\"count\":40,\"peak24\":11},\"xone\":
+{\"label\":\"XBOXONE\",\"count\":105,\"peak24\":99},\"ps4\":
+{\"label\":\"PS4\",\"count\":200,\"peak24\":80}}"'
+```
+
+### Selecting Fields
+When you execute a query in Drill for Splunk, the fields you select are pushed down to Splunk. Therefore, it will always be more efficient to explicitly specify fields to push
+ down to Splunk rather than using `SELECT *` queries.
+ 
+ ### Special Fields
+ There are several fields which can be included in a Drill query 
+ 
+ * `spl`:  If you just want to send an SPL query to Splunk, this will do that. 
+ * `earliestTime`: Overrides the `earliestTime` setting in the configuration. 
+ * `latestTime`: Overrides the `latestTime` setting in the configuration. 
+  
+### Sorting Results
+Due to the nature of Splunk indexes, data will always be returned in reverse chronological order. Thus, sorting is not necessary if that is the desired order.
+
+## Sending Arbitrary SPL to Splunk
+There is a special table called `spl` which you can use to send arbitrary queries to Splunk. If you use this table, you must include a query in the `spl` filter as shown below:
+```sql
+SELECT *
+FROM splunk.spl
+WHERE spl='<your SPL query'
+```
+
+# Testing the Plugin
+This plugin includes a series of unit tests in the `src/test/` directory, however there are a few tests for which you will need an active Splunk installation to run them.  
+Simply follow the instructions below to test Splunk with Drill.
+ 
+ ###  Step 1: Get Splunk
+ From Splunk's website, simply download and install the free version here: https://www.splunk.com/en_us/download/splunk-enterprise.html
+ 
+ Once you've downloaded Splunk, start it up, and make sure everything is working properly. 
+ 
+ ### Step 2:  Add Data
+ Next, go here: https://docs.splunk.com/Documentation/Splunk/7.0.3/SearchTutorial/Systemrequirements and download the dummy datasets that Splunk provides. Once you've downloaded
+  this data, have Splunk index this data and you're ready to go from the Splunk end. 
+  
+## Known Limitations
+* At present, Drill will not interpret Splunk multifields as anything other than a String. If there is interest, this feature can be implemented.
+ 
\ No newline at end of file
diff --git a/contrib/storage-splunk/pom.xml b/contrib/storage-splunk/pom.xml
new file mode 100644
index 0000000..db1d46c
--- /dev/null
+++ b/contrib/storage-splunk/pom.xml
@@ -0,0 +1,99 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>drill-contrib-parent</artifactId>
+    <groupId>org.apache.drill.contrib</groupId>
+    <version>1.19.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>drill-storage-splunk</artifactId>
+  <name>Drill : Contrib : Storage : Splunk</name>
+  <properties>
+    <okhttp.version>4.5.0</okhttp.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>drill-java-exec</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.splunk</groupId>
+      <artifactId>splunk</artifactId>
+      <version>1.6.5.0</version>
+    </dependency>
+    <!-- Test dependencies -->
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>drill-java-exec</artifactId>
+      <classifier>tests</classifier>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.drill</groupId>
+      <artifactId>drill-common</artifactId>
+      <classifier>tests</classifier>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>testcontainers</artifactId>
+      <version>1.15.1</version>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-resources-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-java-sources</id>
+            <phase>process-sources</phase>
+            <goals>
+              <goal>copy-resources</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${basedir}/target/classes/org/apache/drill/exec/store/splunk
+              </outputDirectory>
+              <resources>
+                <resource>
+                  <directory>src/main/java/org/apache/drill/exec/store/splunk</directory>
+                  <filtering>true</filtering>
+                </resource>
+              </resources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java
new file mode 100644
index 0000000..50c1575
--- /dev/null
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+import com.splunk.JobExportArgs;
+import com.splunk.Service;
+import com.univocity.parsers.common.processor.RowListProcessor;
+import com.univocity.parsers.csv.CsvParser;
+import com.univocity.parsers.csv.CsvParserSettings;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.base.filter.ExprNode;
+import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class SplunkBatchReader implements ManagedReader<SchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(SplunkBatchReader.class);
+  private static final List<String> INT_COLS = new ArrayList<>(Arrays.asList("date_hour", "date_mday", "date_minute", "date_second", "date_year", "linecount"));
+  private static final List<String> TS_COLS = new ArrayList<>(Arrays.asList("_indextime", "_time"));
+  private static final String EARLIEST_TIME_COLUMN = "earliestTime";
+  private static final String LATEST_TIME_COLUMN = "latestTime";
+
+  private final SplunkPluginConfig config;
+  private final SplunkSubScan subScan;
+  private final List<SchemaPath> projectedColumns;
+  private final Service splunkService;
+  private final SplunkScanSpec subScanSpec;
+  private final CsvParserSettings csvSettings;
+  private JobExportArgs exportArgs;
+  private InputStream searchResults;
+  private CsvParser csvReader;
+  private String[] firstRow;
+  private CustomErrorContext errorContext;
+
+  private List<SplunkColumnWriter> columnWriters;
+  private SchemaBuilder builder;
+  private RowSetLoader rowWriter;
+  private Stopwatch timer;
+
+
+  public SplunkBatchReader(SplunkPluginConfig config, SplunkSubScan subScan) {
+    this.config = config;
+    this.subScan = subScan;
+    this.projectedColumns = subScan.getColumns();
+    this.subScanSpec = subScan.getScanSpec();
+    SplunkConnection connection = new SplunkConnection(config);
+    this.splunkService = connection.connect();
+
+    this.csvSettings = new CsvParserSettings();
+    csvSettings.setLineSeparatorDetectionEnabled(true);
+    RowListProcessor rowProcessor = new RowListProcessor();
+    csvSettings.setProcessor(rowProcessor);
+    csvSettings.setMaxCharsPerColumn(ValueVector.MAX_BUFFER_SIZE);
+  }
+
+  @Override
+  public boolean open(SchemaNegotiator negotiator) {
+    timer = Stopwatch.createUnstarted();
+    timer.start();
+
+    this.errorContext = negotiator.parentErrorContext();
+
+    String queryString = buildQueryString();
+
+    logger.debug("Query Sent to Splunk: {}", queryString);
+    // Execute the query
+    searchResults = splunkService.export(queryString, exportArgs);
+    logger.debug("Time to execute query: {} milliseconds", timer.elapsed().getNano() / 100000);
+
+    /*
+    Splunk produces poor output from the API.  Of the available choices, CSV was the easiest to deal with.  Unfortunately,
+    the data is not consistent, as some fields are quoted, some are not.
+    */
+    this.csvReader = new CsvParser(csvSettings);
+    logger.debug("Time to open CSV Parser: {} milliseconds", timer.elapsed().getNano() / 100000);
+    csvReader.beginParsing(searchResults, "utf-8");
+    logger.debug("Time to open input stream: {} milliseconds", timer.elapsed().getNano() / 100000);
+
+    // Build the Schema
+    builder = new SchemaBuilder();
+    TupleMetadata drillSchema = buildSchema();
+    negotiator.tableSchema(drillSchema, false);
+    ResultSetLoader resultLoader = negotiator.build();
+
+    // Create ScalarWriters
+    rowWriter = resultLoader.writer();
+    populateWriterArray();
+    logger.debug("Completed open function in {} milliseconds", timer.elapsed().getNano() / 100000);
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    while (!rowWriter.isFull()) {
+      if (!processRow()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public void close() {
+    timer.stop();
+    if (searchResults != null) {
+      AutoCloseables.closeSilently(searchResults);
+      searchResults = null;
+    }
+  }
+
+  /**
+   * Splunk returns the data in CSV format with some fields escaped and some not.  Splunk does
+   * not have the concept of datatypes, or at least does not make the metadata available in the API, so
+   * the best solution is to provide a list of columns that are known to be a specific data type such as _time,
+   * indextime, the various date components etc and map those as the appropriate columns.  Then map everything else as a string.
+   */
+  private TupleMetadata buildSchema() {
+
+    this.firstRow = csvReader.parseNext();
+
+    // Case for empty dataset
+    if (firstRow == null) {
+      return builder.buildSchema();
+    }
+
+    // Parse the first row
+    for (String value : firstRow) {
+      if (INT_COLS.contains(value)) {
+        builder.addNullable(value, MinorType.INT);
+      } else if (TS_COLS.contains(value)) {
+        builder.addNullable(value, MinorType.TIMESTAMP);
+      } else {
+        try {
+          builder.addNullable(value, MinorType.VARCHAR);
+        } catch (Exception e) {
+          logger.warn("Splunk attempted to add duplicate column {}", value);
+        }
+      }
+    }
+    logger.debug("Time to build schmea: {} milliseconds", timer.elapsed().getNano() / 100000);
+    return builder.buildSchema();
+  }
+
+  private void populateWriterArray() {
+    // Case for empty result set
+    if (firstRow == null || firstRow.length == 0) {
+      return;
+    }
+    columnWriters = new ArrayList<>();
+
+    int colPosition = 0;
+    for (String value : firstRow) {
+      if (INT_COLS.contains(value)) {
+        columnWriters.add(new IntColumnWriter(value, rowWriter, colPosition));
+      } else if (TS_COLS.contains(value)) {
+        columnWriters.add(new TimestampColumnWriter(value, rowWriter, colPosition));
+      } else {
+        columnWriters.add(new StringColumnWriter(value, rowWriter, colPosition));
+      }
+      colPosition++;
+    }
+    logger.debug("Time to populate writer array: {} milliseconds", timer.elapsed().getNano() / 100000);
+  }
+
+  private boolean processRow() {
+    String[] nextRow = csvReader.parseNext();
+    if (nextRow == null) {
+      return false;
+    }
+    rowWriter.start();
+    for (SplunkColumnWriter columnWriter : columnWriters) {
+      columnWriter.load(nextRow);
+    }
+    rowWriter.save();
+    return true;
+  }
+
+  /**
+   * Checks to see whether the query is a star query. For our purposes, the star query is
+   * anything that contains only the ** and the SPECIAL_COLUMNS which are not projected.
+   * @return true if it is a star query, false if not.
+   */
+  private boolean isStarQuery() {
+    if (Utilities.isStarQuery(projectedColumns)) {
+      return true;
+    }
+
+    List<SplunkUtils.SPECIAL_FIELDS> specialFields = Arrays.asList(SplunkUtils.SPECIAL_FIELDS.values());
+
+    for (SchemaPath path: projectedColumns) {
+      if (path.nameEquals("**")) {
+        return true;
+      } else {
+        return specialFields.contains(path.getAsNamePart());
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Determines whether a field is a Splunk multifield.
+   * @param fieldValue The field to be tested
+   * @return True if a multifield, false if not.
+   */
+  protected static boolean isMultiField(String fieldValue) {
+    return (fieldValue.startsWith("{") && fieldValue.endsWith("}"));
+  }
+
+
+  private String buildQueryString () {
+    String earliestTime = null;
+    String latestTime = null;
+    Map<String, ExprNode.ColRelOpConstNode> filters = subScan.getFilters();
+
+    exportArgs = new JobExportArgs();
+
+    // Set to normal search mode
+    exportArgs.setSearchMode(JobExportArgs.SearchMode.NORMAL);
+
+    // Set all time stamps to epoch seconds
+    exportArgs.setTimeFormat("%s");
+
+    // Set output mode to CSV
+    exportArgs.setOutputMode(JobExportArgs.OutputMode.CSV);
+    exportArgs.setEnableLookups(true);
+
+    // Splunk searches perform best when they are time bound.  This allows the user to set
+    // default time boundaries in the config.  These will be overwritten in filter pushdowns
+    if (filters != null && filters.containsKey(EARLIEST_TIME_COLUMN)) {
+      earliestTime = filters.get(EARLIEST_TIME_COLUMN).value.value.toString();
+
+      // Remove from map
+      filters.remove(EARLIEST_TIME_COLUMN);
+    }
+
+    if (filters != null && filters.containsKey(LATEST_TIME_COLUMN)) {
+      latestTime = filters.get(LATEST_TIME_COLUMN).value.value.toString();
+
+      // Remove from map so they are not pushed down into the query
+      filters.remove(LATEST_TIME_COLUMN);
+    }
+
+    if (earliestTime == null) {
+      earliestTime = config.getEarliestTime();
+    }
+
+    if (latestTime == null) {
+      latestTime = config.getLatestTime();
+    }
+
+    logger.debug("Query time bounds: {} and {}", earliestTime, latestTime);
+    exportArgs.setEarliestTime(earliestTime);
+    exportArgs.setLatestTime(latestTime);
+
+    // Special case: If the user wishes to send arbitrary SPL to Splunk, the user can use the "SPL"
+    // Index and spl filter
+    if (subScanSpec.getIndexName().equalsIgnoreCase("spl")) {
+      if (filters == null || filters.get("spl") == null) {
+        throw UserException
+          .validationError()
+          .message("SPL cannot be empty when querying spl table.")
+          .addContext(errorContext)
+          .build(logger);
+      }
+      return filters.get("spl").value.value.toString();
+    }
+
+    SplunkQueryBuilder builder = new SplunkQueryBuilder(subScanSpec.getIndexName());
+
+    // Set the sourcetype
+    if (filters != null && filters.containsKey("sourcetype")) {
+      String sourcetype = filters.get("sourcetype").value.value.toString();
+      builder.addSourceType(sourcetype);
+      filters.remove("sourcetype");
+    }
+
+    // Pushdown the selected fields for non star queries.
+    if (! isStarQuery()) {
+      builder.addField(projectedColumns);
+    }
+
+    // Apply filters
+    builder.addFilters(filters);
+
+    // Apply limits
+    if (subScan.getMaxRecords() > 0) {
+      builder.addLimit(subScan.getMaxRecords());
+    }
+    return builder.build();
+  }
+
+  public abstract static class SplunkColumnWriter {
+
+    final String colName;
+    ScalarWriter columnWriter;
+    RowSetLoader rowWriter;
+    int columnIndex;
+
+    public SplunkColumnWriter(String colName, RowSetLoader rowWriter, int columnIndex) {
+      this.colName = colName;
+      this.rowWriter = rowWriter;
+      this.columnWriter = rowWriter.scalar(colName);
+      this.columnIndex = columnIndex;
+    }
+
+    public void load(String[] record) {}
+  }
+
+  public static class StringColumnWriter extends SplunkColumnWriter {
+
+    StringColumnWriter(String colName, RowSetLoader rowWriter, int columnIndex) {
+      super(colName, rowWriter, columnIndex);
+    }
+
+    @Override
+    public void load(String[] record) {
+      String value = record[columnIndex];
+      if (Strings.isNullOrEmpty(value)) {
+        columnWriter.setNull();
+      }  else {
+        columnWriter.setString(value);
+      }
+    }
+  }
+
+  public static class IntColumnWriter extends SplunkColumnWriter {
+
+    IntColumnWriter(String colName, RowSetLoader rowWriter, int columnIndex) {
+      super(colName, rowWriter, columnIndex);
+    }
+
+    @Override
+    public void load(String[] record) {
+      int value = Integer.parseInt(record[columnIndex]);
+      columnWriter.setInt(value);
+    }
+  }
+
+  /**
+   * There are two known time columns in Splunk, the _time and _indextime.
+   */
+  public static class TimestampColumnWriter extends SplunkColumnWriter {
+
+    TimestampColumnWriter(String colName, RowSetLoader rowWriter, int columnIndex) {
+      super(colName, rowWriter, columnIndex);
+    }
+
+    @Override
+    public void load(String[] record) {
+      long value = Long.parseLong(record[columnIndex]) * 1000;
+      columnWriter.setTimestamp(Instant.ofEpochMilli(value));
+    }
+  }
+}
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java
new file mode 100644
index 0000000..d2b82d8
--- /dev/null
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+import com.splunk.ConfCollection;
+import com.splunk.EntityCollection;
+import com.splunk.HttpService;
+import com.splunk.Index;
+import com.splunk.SSLSecurityProtocol;
+import com.splunk.Service;
+import com.splunk.ServiceArgs;
+import org.apache.drill.common.exceptions.UserException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class wraps the functionality of the Splunk connection for Drill.
+ */
+public class SplunkConnection {
+
+  private static final Logger logger = LoggerFactory.getLogger(SplunkConnection.class);
+
+  private final String username;
+  private final String password;
+  private final String hostname;
+  private final int port;
+  private Service service;
+
+  public SplunkConnection(SplunkPluginConfig config) {
+    this.username = config.getUsername();
+    this.password = config.getPassword();
+    this.hostname = config.getHostname();
+    this.port = config.getPort();
+    service = connect();
+    ConfCollection confs = service.getConfs();
+  }
+
+  /**
+   * This constructor is used for testing only
+   * @param config
+   * @param service
+   */
+  public SplunkConnection(SplunkPluginConfig config, Service service) {
+    this.username = config.getUsername();
+    this.password = config.getPassword();
+    this.hostname = config.getHostname();
+    this.port = config.getPort();
+    this.service = service;
+  }
+
+  /**
+   * Connects to Splunk instance
+   * @return an active Splunk connection.
+   */
+  public Service connect() {
+    HttpService.setSslSecurityProtocol(SSLSecurityProtocol.TLSv1_2);
+    ServiceArgs loginArgs = new ServiceArgs();
+    loginArgs.setHost(hostname);
+    loginArgs.setPort(port);
+    loginArgs.setPassword(password);
+    loginArgs.setUsername(username);
+
+    try {
+      service = Service.connect(loginArgs);
+    } catch (Exception e) {
+      throw UserException
+        .connectionError()
+        .message("Unable to connect to Splunk at %s:%s", hostname, port)
+        .addContext(e.getMessage())
+        .build(logger);
+    }
+    logger.debug("Successfully connected to {} on port {}", hostname, port);
+    return service;
+  }
+
+  /**
+   * Gets the available indexes from Splunk. Drill treats these as a table.
+   * @return A collection of Splunk indexes
+   */
+  public EntityCollection<Index> getIndexes() {
+    return service.getIndexes();
+  }
+}
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkGroupScan.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkGroupScan.java
new file mode 100644
index 0000000..69ed312
--- /dev/null
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkGroupScan.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.planner.logical.DrillScanRel;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.store.base.filter.ExprNode;
+import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class SplunkGroupScan extends AbstractGroupScan {
+
+  private final SplunkPluginConfig config;
+  private final List<SchemaPath> columns;
+  private final SplunkScanSpec splunkScanSpec;
+  private final Map<String, ExprNode.ColRelOpConstNode> filters;
+  private final ScanStats scanStats;
+  private final double filterSelectivity;
+  private final int maxRecords;
+
+  private int hashCode;
+
+  /**
+   * Creates a new group scan from the storage plugin.
+   */
+  public SplunkGroupScan (SplunkScanSpec scanSpec) {
+    super("no-user");
+    this.splunkScanSpec = scanSpec;
+    this.config = scanSpec.getConfig();
+    this.columns = ALL_COLUMNS;
+    this.filters = null;
+    this.filterSelectivity = 0.0;
+    this.maxRecords = -1;
+    this.scanStats = computeScanStats();
+
+  }
+
+  /**
+   * Copies the group scan during many stages of Calcite operation.
+   */
+  public SplunkGroupScan(SplunkGroupScan that) {
+    super(that);
+    this.config = that.config;
+    this.splunkScanSpec = that.splunkScanSpec;
+    this.columns = that.columns;
+    this.filters = that.filters;
+    this.filterSelectivity = that.filterSelectivity;
+    this.maxRecords = that.maxRecords;
+
+    // Calcite makes many copies in the later stage of planning
+    // without changing anything. Retain the previous stats.
+    this.scanStats = that.scanStats;
+  }
+
+  /**
+   * Applies columns. Oddly called multiple times, even when
+   * the scan already has columns.
+   */
+  public SplunkGroupScan(SplunkGroupScan that, List<SchemaPath> columns) {
+    super(that);
+    this.columns = columns;
+    this.splunkScanSpec = that.splunkScanSpec;
+    this.config = that.config;
+
+    // Oddly called later in planning, after earlier assigning columns,
+    // to again assign columns. Retain filters, but compute new stats.
+    this.filters = that.filters;
+    this.filterSelectivity = that.filterSelectivity;
+    this.maxRecords = that.maxRecords;
+    this.scanStats = computeScanStats();
+
+  }
+
+  /**
+   * Adds a filter to the scan.
+   */
+  public SplunkGroupScan(SplunkGroupScan that, Map<String, ExprNode.ColRelOpConstNode> filters,
+                       double filterSelectivity) {
+    super(that);
+    this.columns = that.columns;
+    this.splunkScanSpec = that.splunkScanSpec;
+    this.config = that.config;
+
+    // Applies a filter.
+    this.filters = filters;
+    this.filterSelectivity = filterSelectivity;
+    this.maxRecords = that.maxRecords;
+    this.scanStats = computeScanStats();
+  }
+
+  /**
+   * Deserialize a group scan. Not called in normal operation. Probably used
+   * only if Drill executes a logical plan.
+   */
+  @JsonCreator
+  public SplunkGroupScan(
+    @JsonProperty("config") SplunkPluginConfig config,
+    @JsonProperty("columns") List<SchemaPath> columns,
+    @JsonProperty("splunkScanSpec") SplunkScanSpec splunkScanSpec,
+    @JsonProperty("filters") Map<String, ExprNode.ColRelOpConstNode> filters,
+    @JsonProperty("filterSelectivity") double selectivity,
+    @JsonProperty("maxRecords") int maxRecords
+  ) {
+    super("no-user");
+    this.config = config;
+    this.columns = columns;
+    this.splunkScanSpec = splunkScanSpec;
+    this.filters = filters;
+    this.filterSelectivity = selectivity;
+    this.maxRecords = maxRecords;
+    this.scanStats = computeScanStats();
+  }
+
+  /**
+   * Adds a limit to the group scan
+   * @param that Previous SplunkGroupScan
+   * @param maxRecords the limit pushdown
+   */
+  public SplunkGroupScan(SplunkGroupScan that, int maxRecords) {
+    super(that);
+    this.columns = that.columns;
+    // Apply the limit
+    this.maxRecords = maxRecords;
+    this.splunkScanSpec = that.splunkScanSpec;
+    this.config = that.config;
+    this.filters = that.filters;
+    this.filterSelectivity = that.filterSelectivity;
+    this.scanStats = computeScanStats();
+  }
+
+  @JsonProperty("config")
+  public SplunkPluginConfig config() { return config; }
+
+  @JsonProperty("columns")
+  public List<SchemaPath> columns() { return columns; }
+
+  @JsonProperty("splunkScanSpec")
+  public SplunkScanSpec splunkScanSpec() { return splunkScanSpec; }
+
+  @JsonProperty("filters")
+  public Map<String, ExprNode.ColRelOpConstNode> filters() { return filters; }
+
+  @JsonProperty("maxRecords")
+  public int maxRecords() { return maxRecords; }
+
+  @JsonProperty("filterSelectivity")
+  public double selectivity() { return filterSelectivity; }
+
+
+  @Override
+  public void applyAssignments(List<CoordinationProtos.DrillbitEndpoint> endpoints) { }
+
+  @Override
+  public SubScan getSpecificScan(int minorFragmentId) {
+    return new SplunkSubScan(config, splunkScanSpec, columns, filters, maxRecords);
+  }
+
+  @Override
+  public int getMaxParallelizationWidth() {
+    return 1;
+  }
+
+  @Override
+  public boolean canPushdownProjects(List<SchemaPath> columns) {
+    return true;
+  }
+
+  @Override
+  public boolean supportsLimitPushdown() {
+    return true;
+  }
+
+  @Override
+  public GroupScan applyLimit(int maxRecords) {
+    if (maxRecords == this.maxRecords) {
+      return null;
+    }
+    return new SplunkGroupScan(this, maxRecords);
+  }
+
+  @Override
+  public String getDigest() {
+    return toString();
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    Preconditions.checkArgument(children.isEmpty());
+    return new SplunkGroupScan(this);
+  }
+
+  @Override
+  public ScanStats getScanStats() {
+
+    // Since this class is immutable, compute stats once and cache
+    // them. If the scan changes (adding columns, adding filters), we
+    // get a new scan without cached stats.
+    return scanStats;
+  }
+
+  private ScanStats computeScanStats() {
+
+    // If this config allows filters, then make the default
+    // cost very high to force the planner to choose the version
+    // with filters.
+    if (allowsFilters() && !hasFilters() && !hasLimit()) {
+      return new ScanStats(ScanStats.GroupScanProperty.ESTIMATED_TOTAL_COST,
+        1E9, 1E112, 1E12);
+    }
+
+    // No good estimates at all, just make up something.
+    double estRowCount = 100_000;
+
+    // NOTE this was important! if the predicates don't make the query more
+    // efficient they won't get pushed down
+    if (hasFilters()) {
+      estRowCount *= filterSelectivity;
+    }
+
+    if (maxRecords > 0) {
+      estRowCount = estRowCount / 2;
+    }
+
+    double estColCount = Utilities.isStarQuery(columns) ? DrillScanRel.STAR_COLUMN_COST : columns.size();
+    double valueCount = estRowCount * estColCount;
+    double cpuCost = valueCount;
+    double ioCost = valueCount;
+
+    // Force the caller to use our costs rather than the
+    // defaults (which sets IO cost to zero).
+    return new ScanStats(ScanStats.GroupScanProperty.ESTIMATED_TOTAL_COST,
+      estRowCount, cpuCost, ioCost);
+  }
+
+  @JsonIgnore
+  public boolean hasFilters() {
+    return filters != null;
+  }
+
+  @JsonIgnore
+  public boolean hasLimit() { return maxRecords == -1; }
+
+  @JsonIgnore
+  public boolean allowsFilters() {
+    return true;
+  }
+
+  @Override
+  public GroupScan clone(List<SchemaPath> columns) {
+    return new SplunkGroupScan(this, columns);
+  }
+
+  @Override
+  public int hashCode() {
+
+    // Hash code is cached since Calcite calls this method many times.
+    if (hashCode == 0) {
+      // Don't include cost; it is derived.
+      hashCode = Objects.hash(splunkScanSpec, config, splunkScanSpec, columns);
+    }
+    return hashCode;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+
+    // Don't include cost; it is derived.
+    SplunkGroupScan other = (SplunkGroupScan) obj;
+    return Objects.equals(splunkScanSpec, other.splunkScanSpec())
+      && Objects.equals(config, other.config())
+      && Objects.equals(columns, other.columns())
+      && Objects.equals(maxRecords, other.maxRecords());
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+      .field("config", config)
+      .field("scan spec", splunkScanSpec)
+      .field("columns", columns)
+      .field("maxRecords", maxRecords)
+      .toString();
+  }
+}
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java
new file mode 100644
index 0000000..d889c01
--- /dev/null
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.logical.StoragePluginConfigBase;
+
+import java.util.Objects;
+
+@JsonTypeName(SplunkPluginConfig.NAME)
+public class SplunkPluginConfig extends StoragePluginConfigBase {
+
+  public static final String NAME = "splunk";
+
+  private final String username;
+  private final String password;
+  private final String hostname;
+  private final String earliestTime;
+  private final String latestTime;
+
+  private final int port;
+
+  @JsonCreator
+  public SplunkPluginConfig(@JsonProperty("username") String username,
+                            @JsonProperty("password") String password,
+                            @JsonProperty("hostname") String hostname,
+                            @JsonProperty("port") int port,
+                            @JsonProperty("earliestTime") String earliestTime,
+                            @JsonProperty("latestTime") String latestTime) {
+    this.username = username;
+    this.password = password;
+    this.hostname = hostname;
+    this.port = port;
+    this.earliestTime = earliestTime;
+    this.latestTime = latestTime == null ? "now" : latestTime;
+  }
+
+  @JsonProperty("username")
+  public String getUsername() {
+    return username;
+  }
+
+  @JsonProperty("password")
+  public String getPassword() {
+    return password;
+  }
+
+  @JsonProperty("hostname")
+  public String getHostname() {
+    return hostname;
+  }
+
+  @JsonProperty("port")
+  public int getPort() {
+    return port;
+  }
+
+  @JsonProperty("earliestTime")
+  public String getEarliestTime() {
+    return earliestTime;
+  }
+
+  @JsonProperty("latestTime")
+  public String getLatestTime() {
+    return latestTime;
+  }
+
+
+  @Override
+  public boolean equals(Object that) {
+    if (this == that) {
+      return true;
+    } else if (that == null || getClass() != that.getClass()) {
+      return false;
+    }
+    SplunkPluginConfig thatConfig = (SplunkPluginConfig) that;
+    return Objects.equals(username, thatConfig.username) &&
+      Objects.equals(password, thatConfig.password) &&
+      Objects.equals(hostname, thatConfig.hostname) &&
+      Objects.equals(port, thatConfig.port) &&
+      Objects.equals(earliestTime, thatConfig.earliestTime) &&
+      Objects.equals(latestTime, thatConfig.latestTime);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(username, password, hostname, port, earliestTime, latestTime);
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+      .field("username", username)
+      .maskedField("password", password)
+      .field("hostname", hostname)
+      .field("port", port)
+      .field("earliestTime", earliestTime)
+      .field("latestTime", latestTime)
+      .toString();
+  }
+}
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPushDownListener.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPushDownListener.java
new file mode 100644
index 0000000..ea04f0e
--- /dev/null
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPushDownListener.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.splunk;
+
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Pair;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.base.filter.ExprNode;
+import org.apache.drill.exec.store.base.filter.ExprNode.AndNode;
+import org.apache.drill.exec.store.base.filter.ExprNode.ColRelOpConstNode;
+import org.apache.drill.exec.store.base.filter.ExprNode.OrNode;
+import org.apache.drill.exec.store.base.filter.FilterPushDownListener;
+import org.apache.drill.exec.store.base.filter.FilterPushDownStrategy;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The Splunk storage plugin accepts filters which are:
+ * <ul>
+ * <li>A single column = value expression </li>
+ * <li>An AND'ed set of such expressions,</li>
+ * <li>If the value is one with an unambiguous conversion to
+ * a string. (That is, not dates, binary, maps, etc.)</li>
+ * </ul>
+ */
+public class SplunkPushDownListener implements FilterPushDownListener {
+
+  public static Set<StoragePluginOptimizerRule> rulesFor(OptimizerRulesContext optimizerRulesContext) {
+    return FilterPushDownStrategy.rulesFor(new SplunkPushDownListener());
+  }
+
+  @Override
+  public String prefix() {
+    return "Splunk";
+  }
+
+  @Override
+  public boolean isTargetScan(GroupScan groupScan) {
+    return groupScan instanceof SplunkGroupScan;
+  }
+
+  @Override
+  public ScanPushDownListener builderFor(GroupScan groupScan) {
+    SplunkGroupScan splunkScan = (SplunkGroupScan) groupScan;
+    if (splunkScan.hasFilters() || !splunkScan.allowsFilters()) {
+      return null;
+    } else {
+      return new SplunkScanPushDownListener(splunkScan);
+    }
+  }
+
+  private static class SplunkScanPushDownListener implements ScanPushDownListener {
+
+    private final SplunkGroupScan groupScan;
+    private final Map<String, String> filterParams = CaseInsensitiveMap.newHashMap();
+
+    SplunkScanPushDownListener(SplunkGroupScan groupScan) {
+      this.groupScan = groupScan;
+      for (SchemaPath field : groupScan.columns()) {
+        filterParams.put(field.getAsUnescapedPath(), field.getAsUnescapedPath());
+      }
+    }
+
+    @Override
+    public ExprNode accept(ExprNode node) {
+      if (node instanceof OrNode) {
+        return null;
+      } else if (node instanceof ColRelOpConstNode) {
+        return acceptRelOp((ColRelOpConstNode) node);
+      } else {
+        return null;
+      }
+    }
+
+    private ColRelOpConstNode acceptRelOp(ColRelOpConstNode relOp) {
+      return acceptColumn(relOp.colName) && acceptType(relOp.value.type) ? relOp : null;
+    }
+
+    /**
+     * Only accept columns in the filter params list.
+     */
+    private boolean acceptColumn(String colName) {
+      return filterParams.containsKey(colName);
+    }
+
+    /**
+     * Only accept types which have an unambiguous mapping to
+     * String.
+     */
+    private boolean acceptType(MinorType type) {
+      switch (type) {
+        case BIGINT:
+        case BIT:
+        case FLOAT4:
+        case FLOAT8:
+        case INT:
+        case SMALLINT:
+        case VARCHAR:
+        case VARDECIMAL:
+          return true;
+        default:
+          return false;
+      }
+    }
+
+    /**
+     * Convert the nodes to a map of param/string pairs using
+     * the case specified in the storage plugin config.
+     */
+    @Override
+    public Pair<GroupScan, List<RexNode>> transform(AndNode andNode) {
+      Map<String, ExprNode.ColRelOpConstNode> filters = new HashMap<>();
+      double selectivity = 1;
+      for (ExprNode expr : andNode.children) {
+        ColRelOpConstNode relOp = (ColRelOpConstNode) expr;
+        filters.put(filterParams.get(relOp.colName), relOp);
+        selectivity *= relOp.op.selectivity();
+      }
+      SplunkGroupScan newScan = new SplunkGroupScan(groupScan, filters, selectivity);
+      return Pair.of(newScan, Collections.emptyList());
+    }
+  }
+}
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkQueryBuilder.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkQueryBuilder.java
new file mode 100644
index 0000000..c13c08e
--- /dev/null
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkQueryBuilder.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.store.base.filter.ExprNode;
+import org.apache.drill.exec.store.base.filter.RelOp;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+
+import java.util.List;
+import java.util.Map;
+
+public class SplunkQueryBuilder {
+  public final static String EQUAL_OPERATOR = "=";
+  public final static String NOT_EQUAL_OPERATOR = "!=";
+  public final static String GREATER_THAN = ">";
+  public final static String GREATER_THAN_EQ = ">=";
+  public final static String LESS_THAN = "<";
+  public final static String LESS_THAN_EQ = "<=";
+
+  private String query;
+  private String sourceTypes;
+  private String fieldList;
+  private String filters;
+  private int sourcetypeCount;
+  private int limit;
+
+  public SplunkQueryBuilder (String index) {
+    this.filters = "";
+    sourcetypeCount = 0;
+    query = "search index=" + index;
+  }
+
+  /**
+   * Adds a sourcetype to the Splunk query.  Splunk indexes its data by indexes, then within the index, organizes
+   * the data by sourcetype, which could be a reference to the underlying source system.  For instance, sourcetype
+   * might be csv files, log files, Azure storage or whatever.  Since this is a sort of special metadata case,
+   * it is better to apply this separately than a regular filter.  Sourcetypes can accept wildcards, but cannot accept
+   * any other operator other than = or !=.
+   * @param sourceType The Splunk Sourcetype to be added to the Splunk query.
+   */
+  public void addSourceType(String sourceType) {
+    if (this.sourceTypes == null) {
+      this.sourceTypes = " sourcetype=\"" + sourceType + "\"";
+    } else if (! sourceTypes.contains("sourcetype=\"" + sourceType + "\"")) {
+      this.sourceTypes += " OR sourcetype=\"" + sourceType + "\"";
+    }
+    sourcetypeCount++;
+  }
+
+  /**
+   * Adds a field name to a Splunk query.  To push down the projection into Splunk,
+   * Splunk accepts arguments in the format | fields foo, bar, car.  This function adds these fields to the query.
+   * As an error preventative measure, this function will ignore ** from Drill.
+   * @param field The field to be added to the query
+   */
+  public void addField (String field) {
+    // Double Star fields cause errors and we will not add to the field list
+    if (field.equalsIgnoreCase("**") || Strings.isNullOrEmpty(field)) {
+      return;
+    }
+
+    // Case for first field
+    if (fieldList == null) {
+      this.fieldList = field;
+    } else {
+      this.fieldList += "," + field;
+    }
+  }
+
+  /**
+   * Creates the field list of r
+   * As an error preventative measure, this function will ignore ** from Drill.
+   * @param columnList SchemaPath of columns to be added to the field list
+   */
+  public void addField (List<SchemaPath> columnList) {
+    for (SchemaPath column : columnList) {
+      String columnName = column.getAsUnescapedPath();
+      if (columnName.equalsIgnoreCase("**") || Strings.isNullOrEmpty(columnName)) {
+        continue;
+      } else {
+        addField(columnName);
+      }
+    }
+  }
+
+  /**
+   * Adds a row limit to the query. Ignores anything <= zero.
+   * This method should only be called once, but if is called more than once,
+   * it will set the limit to the most recent value.
+   * @param limit Positive, non-zero integer of number of desired rows.
+   */
+  public void addLimit(int limit) {
+    if (limit > 0) {
+      this.limit = limit;
+    }
+  }
+
+  /**
+   * Adds a filter to the Splunk query.  Splunk treats all filters as
+   * AND filters, without explicitly noting that.  The operator should be the actual operator
+   * @param left The field to be filtered
+   * @param right The value of that field
+   * @param operator The actual operator to go in the SPL query
+   */
+  public void addFilter(String left, String right, String operator) {
+    filters = filters + " " + left + operator + quoteString(right);
+  }
+
+  /**
+   * Adds an isnotnull() filter to the Splunk query
+   * @param fieldName The field name which should be null
+   */
+  public void addNotNullFilter(String fieldName) {
+    filters = filters + " isnotnull(" + fieldName + ") ";
+  }
+
+  /**
+   * Adds an isnull() filter to the Splunk query
+   * @param fieldName The field name which should be null
+   */
+  public void addNullFilter(String fieldName) {
+    filters = filters + " isnull(" + fieldName + ") ";
+  }
+
+  /**
+   * Processes the filters for a Splunk query
+   * @param filters A HashMap of filters
+   */
+  public void addFilters(Map<String, ExprNode.ColRelOpConstNode> filters) {
+    if (filters == null) {
+      return;
+    }
+    for ( Map.Entry filter : filters.entrySet()) {
+      String fieldName = ((ExprNode.ColRelOpConstNode)filter.getValue()).colName;
+      RelOp operation = ((ExprNode.ColRelOpConstNode)filter.getValue()).op;
+      String value = ((ExprNode.ColRelOpConstNode)filter.getValue()).value.value.toString();
+
+      // Ignore special cases
+      if (SplunkUtils.isSpecialField(fieldName)) {
+        // Sourcetypes are a special case and can be added via filter pushdown
+        if (fieldName.equalsIgnoreCase("sourcetype")) {
+          addSourceType(value);
+        }
+        continue;
+      }
+
+      switch (operation) {
+        case EQ:
+          // Sourcetypes are a special case and can be added via filter pushdown
+          if (fieldName.equalsIgnoreCase("sourcetype")) {
+            addSourceType(value);
+          } else {
+            addFilter(fieldName, value, EQUAL_OPERATOR);
+          }
+          break;
+        case NE:
+          addFilter(fieldName, value, NOT_EQUAL_OPERATOR);
+          break;
+        case GE:
+          addFilter(fieldName, value, GREATER_THAN_EQ);
+          break;
+        case GT:
+          addFilter(fieldName, value, GREATER_THAN);
+          break;
+        case LE:
+          addFilter(fieldName, value, LESS_THAN_EQ);
+          break;
+        case LT:
+          addFilter(fieldName, value, LESS_THAN);
+          break;
+        case IS_NULL:
+          addNullFilter(fieldName);
+          break;
+        case IS_NOT_NULL:
+          addNotNullFilter(fieldName);
+          break;
+      }
+    }
+  }
+
+  /**
+   * Adds quotes around text for use in SPL queries. Ignores numbers
+   * @param word The input word to be quoted.
+   * @return The text with quotes
+   */
+  public String quoteString(String word) {
+
+    if (! isNumeric(word)) {
+      return "\"" + word + "\"";
+    } else {
+      return word;
+    }
+  }
+
+  public String build() {
+    // Add the sourcetype
+    if (! Strings.isNullOrEmpty(sourceTypes)) {
+
+      // Add parens
+      if (sourcetypeCount > 1) {
+        sourceTypes = " (" + sourceTypes.trim() + ")";
+      }
+      query += sourceTypes;
+    }
+
+    // Add filters
+    if (! Strings.isNullOrEmpty(filters)) {
+      query += filters;
+    }
+
+    // Add fields
+    if (! Strings.isNullOrEmpty(fieldList)) {
+      query += " | fields " + fieldList;
+    }
+
+    // Add limit
+    if (this.limit > 0) {
+      query += " | head " + this.limit;
+    }
+
+    // Add table logic. This tells Splunk to return the data in tabular form rather than the mess that it usually generates
+    if ( Strings.isNullOrEmpty(fieldList)) {
+      fieldList = "*";
+    }
+    query += " | table " + fieldList;
+
+    return query;
+  }
+
+  /**
+   * Returns true if the given string is numeric, false if not
+   * @param str The string to test for numeric
+   * @return True if the string is numeric, false if not.
+   */
+  public static boolean isNumeric(String str)
+  {
+    return str.matches("-?\\d+(\\.\\d+)?");
+  }
+}
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkScanBatchCreator.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkScanBatchCreator.java
new file mode 100644
index 0000000..0b50124
--- /dev/null
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkScanBatchCreator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.splunk;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ScanFrameworkBuilder;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import java.util.List;
+
+public class SplunkScanBatchCreator implements BatchCreator<SplunkSubScan> {
+
+  @Override
+  public CloseableRecordBatch getBatch(ExecutorFragmentContext context,
+                                       SplunkSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException {
+    Preconditions.checkArgument(children.isEmpty());
+
+    try {
+      ScanFrameworkBuilder builder = createBuilder(context.getOptions(), subScan);
+      return builder.buildScanOperator(context, subScan);
+    } catch (UserException e) {
+      // Rethrow user exceptions directly
+      throw e;
+    } catch (Throwable e) {
+      // Wrap all others
+      throw new ExecutionSetupException(e);
+    }
+  }
+
+  private ScanFrameworkBuilder createBuilder(OptionManager options, SplunkSubScan subScan) {
+    SplunkPluginConfig config = subScan.getConfig();
+    ScanFrameworkBuilder builder = new ScanFrameworkBuilder();
+    builder.projection(subScan.getColumns());
+    builder.setUserName(subScan.getUserName());
+
+    // Reader
+    ReaderFactory readerFactory = new SplunkReaderFactory(config, subScan);
+    builder.setReaderFactory(readerFactory);
+    builder.nullType(Types.optional(MinorType.VARCHAR));
+    return builder;
+  }
+
+  private static class SplunkReaderFactory implements ReaderFactory {
+
+    private final SplunkPluginConfig config;
+    private final SplunkSubScan subScan;
+    private int count;
+
+    public SplunkReaderFactory(SplunkPluginConfig config, SplunkSubScan subScan) {
+      this.config = config;
+      this.subScan = subScan;
+    }
+
+    @Override
+    public void bind(ManagedScanFramework framework) {
+    }
+
+    @Override
+    public ManagedReader<SchemaNegotiator> next() {
+      // Only a single scan (in a single thread)
+      if (count++ == 0) {
+        return new SplunkBatchReader(config, subScan);
+      }
+      return null;
+    }
+  }
+}
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkScanSpec.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkScanSpec.java
new file mode 100644
index 0000000..513db63
--- /dev/null
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkScanSpec.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.PlanStringBuilder;
+
+@JsonTypeName("splunk-scan-spec")
+public class SplunkScanSpec {
+  private final String pluginName;
+  private final String indexName;
+  private final SplunkPluginConfig config;
+
+  @JsonCreator
+  public SplunkScanSpec(@JsonProperty("pluginName") String pluginName,
+                        @JsonProperty("indexName") String indexName,
+                        @JsonProperty("config") SplunkPluginConfig config) {
+    this.pluginName = pluginName;
+    this.indexName = indexName;
+    this.config = config;
+  }
+
+  @JsonProperty("pluginName")
+  public String getPluginName() { return pluginName; }
+
+  @JsonProperty("indexName")
+  public String getIndexName() { return indexName; }
+
+  @JsonProperty("config")
+  public SplunkPluginConfig getConfig() { return config; }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+      .field("config", config)
+      .field("schemaName", pluginName)
+      .field("indexName", indexName)
+      .toString();
+  }
+}
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchemaFactory.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchemaFactory.java
new file mode 100644
index 0000000..fa46ea0
--- /dev/null
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchemaFactory.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+import com.splunk.EntityCollection;
+import com.splunk.Index;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.AbstractSchemaFactory;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class SplunkSchemaFactory extends AbstractSchemaFactory {
+
+  private static final Logger logger = LoggerFactory.getLogger(SplunkSchemaFactory.class);
+  private static final String SPL_TABLE_NAME = "spl";
+  private final SplunkStoragePlugin plugin;
+  private final EntityCollection<Index> indexes;
+
+  public SplunkSchemaFactory(SplunkStoragePlugin plugin) {
+    super(plugin.getName());
+    this.plugin = plugin;
+    SplunkPluginConfig config = plugin.getConfig();
+    SplunkConnection connection = new SplunkConnection(config);
+
+
+
+    // Get Splunk Indexes
+    connection.connect();
+    indexes = connection.getIndexes();
+  }
+
+  @Override
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
+    SplunkSchema schema = new SplunkSchema(plugin);
+    SchemaPlus plusOfThis = parent.add(schema.getName(), schema);
+  }
+
+  class SplunkSchema extends AbstractSchema {
+
+    private final Map<String, DynamicDrillTable> activeTables = new HashMap<>();
+    private final SplunkStoragePlugin plugin;
+
+    public SplunkSchema(SplunkStoragePlugin plugin) {
+      super(Collections.emptyList(), plugin.getName());
+      this.plugin = plugin;
+      registerIndexes();
+    }
+
+    @Override
+    public Table getTable(String name) {
+      DynamicDrillTable table = activeTables.get(name);
+      if (table != null) {
+        // If the table was found, return it.
+        return table;
+      } else {
+        // Register the table
+        return registerTable(name, new DynamicDrillTable(plugin, plugin.getName(),
+          new SplunkScanSpec(plugin.getName(), name, plugin.getConfig())));
+      }
+    }
+
+    @Override
+    public boolean showInInformationSchema() {
+      return true;
+    }
+
+    @Override
+    public Set<String> getTableNames() {
+      return Sets.newHashSet(activeTables.keySet());
+    }
+
+    private DynamicDrillTable registerTable(String name, DynamicDrillTable table) {
+      activeTables.put(name, table);
+      return table;
+    }
+
+    @Override
+    public String getTypeName() {
+      return SplunkPluginConfig.NAME;
+    }
+
+    private void registerIndexes() {
+      // Add default "spl" table to index list.
+      registerTable(SPL_TABLE_NAME, new DynamicDrillTable(plugin, plugin.getName(),
+        new SplunkScanSpec(plugin.getName(), SPL_TABLE_NAME, plugin.getConfig())));
+
+      // Add all other indexes
+      for (String indexName : indexes.keySet()) {
+        logger.debug("Registering {}", indexName);
+        registerTable(indexName, new DynamicDrillTable(plugin, plugin.getName(),
+          new SplunkScanSpec(plugin.getName(), indexName, plugin.getConfig())));
+      }
+    }
+  }
+}
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkStoragePlugin.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkStoragePlugin.java
new file mode 100644
index 0000000..dc011a6
--- /dev/null
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkStoragePlugin.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.planner.PlannerPhase;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.base.filter.FilterPushDownUtils;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+
+import java.io.IOException;
+import java.util.Set;
+
+public class SplunkStoragePlugin extends AbstractStoragePlugin {
+
+  private final SplunkPluginConfig config;
+  private final SplunkSchemaFactory schemaFactory;
+
+  public SplunkStoragePlugin(SplunkPluginConfig configuration, DrillbitContext context, String name) {
+    super(context, name);
+    this.config = configuration;
+    this.schemaFactory = new SplunkSchemaFactory(this);
+  }
+
+  @Override
+  public SplunkPluginConfig getConfig() {
+    return config;
+  }
+
+  @Override
+  public boolean supportsRead() {
+    return true;
+  }
+
+  @Override
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
+    schemaFactory.registerSchemas(schemaConfig, parent);
+  }
+
+  @Override
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
+    SplunkScanSpec scanSpec = selection.getListWith(context.getLpPersistence().getMapper(), new TypeReference<SplunkScanSpec>() {});
+    return new SplunkGroupScan(scanSpec);
+  }
+
+  @Override
+  public Set<? extends RelOptRule> getOptimizerRules(OptimizerRulesContext optimizerContext, PlannerPhase phase) {
+
+    // Push-down planning is done at the logical phase so it can
+    // influence parallelization in the physical phase. Note that many
+    // existing plugins perform filter push-down at the physical
+    // phase, which also works fine if push-down is independent of
+    // parallelization.
+    if (FilterPushDownUtils.isFilterPushDownPhase(phase)) {
+      return SplunkPushDownListener.rulesFor(optimizerContext);
+    } else {
+      return ImmutableSet.of();
+    }
+  }
+}
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSubScan.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSubScan.java
new file mode 100644
index 0000000..21b1237
--- /dev/null
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSubScan.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.store.base.filter.ExprNode;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+@JsonTypeName("splunk-sub-scan")
+public class SplunkSubScan extends AbstractBase implements SubScan {
+
+  private final SplunkPluginConfig config;
+  private final SplunkScanSpec splunkScanSpec;
+  private final List<SchemaPath> columns;
+  private final Map<String, ExprNode.ColRelOpConstNode> filters;
+  private final int maxRecords;
+
+  @JsonCreator
+  public SplunkSubScan(
+    @JsonProperty("config") SplunkPluginConfig config,
+    @JsonProperty("tableSpec") SplunkScanSpec splunkScanSpec,
+    @JsonProperty("columns") List<SchemaPath> columns,
+    @JsonProperty("filters") Map<String, ExprNode.ColRelOpConstNode> filters,
+    @JsonProperty("maxRecords") int maxRecords) {
+      super("user");
+      this.config = config;
+      this.splunkScanSpec = splunkScanSpec;
+      this.columns = columns;
+      this.filters = filters;
+      this.maxRecords = maxRecords;
+  }
+
+  @JsonProperty("config")
+  public SplunkPluginConfig getConfig() {
+    return config;
+  }
+
+  @JsonProperty("tableSpec")
+  public SplunkScanSpec getScanSpec() {
+    return splunkScanSpec;
+  }
+
+  @JsonProperty("columns")
+  public List<SchemaPath> getColumns() {
+    return columns;
+  }
+
+  @JsonProperty("filters")
+  public Map<String, ExprNode.ColRelOpConstNode> getFilters() {
+    return filters;
+  }
+
+  @JsonProperty("maxRecords")
+  public int getMaxRecords() {
+    return maxRecords;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(
+    PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+    return physicalVisitor.visitSubScan(this, value);
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    return new SplunkSubScan(config, splunkScanSpec, columns, filters, maxRecords);
+  }
+
+  @Override
+  public Iterator<PhysicalOperator> iterator() {
+    return ImmutableSet.<PhysicalOperator>of().iterator();
+  }
+
+  @Override
+  @JsonIgnore
+  public String getOperatorType() {
+    return "SPLUNK";
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+      .field("config", config)
+      .field("tableSpec", splunkScanSpec)
+      .field("columns", columns)
+      .field("filters", filters)
+      .field("maxRecords", maxRecords)
+      .toString();
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(config, splunkScanSpec, columns, filters, maxRecords);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    SplunkSubScan other = (SplunkSubScan) obj;
+    return Objects.equals(splunkScanSpec, other.splunkScanSpec)
+      && Objects.equals(config, other.config)
+      && Objects.equals(columns, other.columns)
+      && Objects.equals(filters, other.filters)
+      && Objects.equals(maxRecords, other.maxRecords);
+  }
+}
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkUtils.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkUtils.java
new file mode 100644
index 0000000..f34f64d
--- /dev/null
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+public class SplunkUtils {
+  /**
+   * These are special fields that alter the queries sent to Splunk.
+   */
+  public enum SPECIAL_FIELDS {
+    /**
+     * The sourcetype of a query. Specifying the sourcetype can improve query performance.
+     */
+    SOURCETYPE("sourcetype"),
+    /**
+     * Used to send raw SPL to Splunk
+     */
+    SPL("spl"),
+    /**
+     * The earliest time boundary of a query, overwrites config variable
+     */
+    EARLIEST_TIME("earliestTime"),
+    /**
+     * The latest time bound of a query, overwrites config variable
+     */
+    LATEST_TIME("latestTime");
+
+    public final String field;
+
+    SPECIAL_FIELDS(String field) {
+      this.field = field;
+    }
+
+  }
+
+  /**
+   * Indicates whether the field in question is a special field and should be pushed down to the query or not.
+   * @param unknownField The field to be pushed down
+   * @return true if the field is a special field, false if not.
+   */
+  public static boolean isSpecialField (String unknownField) {
+    for (SPECIAL_FIELDS specialFieldName : SPECIAL_FIELDS.values()) {
+      if (specialFieldName.field.equalsIgnoreCase(unknownField)) {
+        return true;
+      }
+    }
+    return false;
+  }
+}
diff --git a/contrib/storage-splunk/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-splunk/src/main/resources/bootstrap-storage-plugins.json
new file mode 100644
index 0000000..8a55547
--- /dev/null
+++ b/contrib/storage-splunk/src/main/resources/bootstrap-storage-plugins.json
@@ -0,0 +1,14 @@
+{
+  "storage":{
+    "splunk" : {
+      "type":"splunk",
+      "username": "admin",
+      "password": "changeme",
+      "hostname": "localhost",
+      "port": 8089,
+      "earliestTime": "-14d",
+      "latestTime": "now",
+      "enabled": false
+    }
+  }
+}
diff --git a/contrib/storage-splunk/src/main/resources/drill-module.conf b/contrib/storage-splunk/src/main/resources/drill-module.conf
new file mode 100644
index 0000000..436154a
--- /dev/null
+++ b/contrib/storage-splunk/src/main/resources/drill-module.conf
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#  This file tells Drill to consider this module when class path scanning.
+#  This file can also include any supplementary configuration information.
+#  This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+
+drill: {
+  classpath.scanning: {
+    packages += "org.apache.drill.exec.store.splunk"
+  }
+}
diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkBaseTest.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkBaseTest.java
new file mode 100644
index 0000000..e9fdad2
--- /dev/null
+++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkBaseTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+import org.apache.drill.test.ClusterTest;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+
+public class SplunkBaseTest extends ClusterTest {
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // Make sure this test is only running as part of the suit
+    Assume.assumeTrue(SplunkTestSuite.isRunningSuite());
+    SplunkTestSuite.initSplunk();
+  }
+
+  @AfterClass
+  public static void shutdown() {
+    if (SplunkTestSuite.isRunningSuite()) {
+      SplunkTestSuite.tearDownCluster();
+    }
+  }
+}
diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java
new file mode 100644
index 0000000..a538809
--- /dev/null
+++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+import com.splunk.EntityCollection;
+import com.splunk.Index;
+import org.apache.drill.common.exceptions.UserException;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.drill.exec.store.splunk.SplunkTestSuite.SPLUNK_STORAGE_PLUGIN_CONFIG;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class SplunkConnectionTest extends SplunkBaseTest {
+
+  @Test
+  public void testConnection() {
+    SplunkConnection sc = new SplunkConnection(SPLUNK_STORAGE_PLUGIN_CONFIG);
+    sc.connect();
+  }
+
+  @Test
+  public void testConnectionFail() {
+    try {
+      SplunkPluginConfig invalidSplunkConfig = new SplunkPluginConfig(
+              "hacker",
+              "hacker",
+              SPLUNK_STORAGE_PLUGIN_CONFIG.getHostname(),
+              SPLUNK_STORAGE_PLUGIN_CONFIG.getPort(),
+              SPLUNK_STORAGE_PLUGIN_CONFIG.getEarliestTime(),
+              SPLUNK_STORAGE_PLUGIN_CONFIG.getLatestTime()
+      );
+      SplunkConnection sc = new SplunkConnection(invalidSplunkConfig);
+      sc.connect();
+      fail();
+    } catch (UserException e) {
+      assertTrue(e.getMessage().contains("CONNECTION ERROR: Unable to connect to Splunk"));
+    }
+  }
+
+  @Test
+  public void testGetIndexes() {
+    SplunkConnection sc = new SplunkConnection(SPLUNK_STORAGE_PLUGIN_CONFIG);
+    EntityCollection<Index> indexes = sc.getIndexes();
+    assertEquals(9, indexes.size());
+
+    List<String> expectedIndexNames = new ArrayList<>();
+    expectedIndexNames.add("_audit");
+    expectedIndexNames.add("_internal");
+    expectedIndexNames.add("_introspection");
+    expectedIndexNames.add("_telemetry");
+    expectedIndexNames.add("_thefishbucket");
+    expectedIndexNames.add("history");
+    expectedIndexNames.add("main");
+    expectedIndexNames.add("splunklogger");
+    expectedIndexNames.add("summary");
+
+    List<String> indexNames = new ArrayList<>();
+    for (Index index : indexes.values()) {
+      indexNames.add(index.getName());
+    }
+
+    assertEquals(indexNames, expectedIndexNames);
+
+  }
+}
diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkIndexesTest.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkIndexesTest.java
new file mode 100644
index 0000000..9904ffd
--- /dev/null
+++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkIndexesTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+import org.apache.drill.categories.SlowTest;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({SlowTest.class})
+public class SplunkIndexesTest extends SplunkBaseTest {
+
+  @Test
+  public void testGetSplunkIndexes() throws Exception {
+    String sql = "SHOW TABLES IN `splunk`";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("TABLE_SCHEMA", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("TABLE_NAME", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow("splunk", "summary")
+      .addRow("splunk", "splunklogger")
+      .addRow("splunk", "_thefishbucket")
+      .addRow("splunk", "_audit")
+      .addRow("splunk", "_internal")
+      .addRow("splunk", "_introspection")
+      .addRow("splunk", "main")
+      .addRow("splunk", "history")
+      .addRow("splunk", "spl")
+      .addRow("splunk", "_telemetry")
+      .build();
+
+    RowSetUtilities.verify(expected, results);
+  }
+}
diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkLimitPushDownTest.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkLimitPushDownTest.java
new file mode 100644
index 0000000..813c665
--- /dev/null
+++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkLimitPushDownTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+import org.apache.drill.categories.SlowTest;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({SlowTest.class})
+public class SplunkLimitPushDownTest extends SplunkBaseTest {
+
+  @Test
+  public void testLimit() throws Exception {
+    String sql = "SELECT * FROM splunk._audit LIMIT 5";
+    queryBuilder()
+      .sql(sql)
+      .planMatcher()
+      .include("Limit", "maxRecords=5")
+      .match();
+  }
+
+  @Test
+  public void testLimitWithOrderBy() throws Exception {
+    // Limit should not be pushed down for this example due to the sort
+    String sql = "SELECT * FROM splunk._audit ORDER BY ip LIMIT 4";
+    queryBuilder()
+      .sql(sql)
+      .planMatcher()
+      .include("Limit", "maxRecords=-1")
+      .match();
+  }
+
+  @Test
+  public void testLimitWithOffset() throws Exception {
+    // Limit should be pushed down and include the offset
+    String sql = "SELECT * FROM splunk._audit LIMIT 4 OFFSET 5";
+    queryBuilder()
+      .sql(sql)
+      .planMatcher()
+      .include("Limit", "maxRecords=9")
+      .match();
+  }
+
+  @Test
+  public void testLimitWithFilter() throws Exception {
+    String sql = "SELECT * FROM splunk._audit WHERE rating = 52.17 LIMIT 4";
+    queryBuilder()
+      .sql(sql)
+      .planMatcher()
+      .include("Limit", "maxRecords=4")
+      .match();
+  }
+}
diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java
new file mode 100644
index 0000000..4e2ee80
--- /dev/null
+++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.splunk;
+
+import org.apache.drill.categories.SlowTest;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category({SlowTest.class})
+public class SplunkPluginTest extends SplunkBaseTest {
+
+  @Test
+  public void verifyPluginConfig() throws Exception {
+    String sql = "SELECT SCHEMA_NAME, TYPE FROM INFORMATION_SCHEMA.`SCHEMATA` WHERE TYPE='splunk'\n" +
+      "ORDER BY SCHEMA_NAME";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("SCHEMA_NAME", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("TYPE", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow("splunk", "splunk")
+      .build();
+
+    RowSetUtilities.verify(expected, results);
+  }
+
+  @Test
+  public void verifyIndexes() throws Exception {
+    String sql = "SHOW TABLES IN `splunk`";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("TABLE_SCHEMA", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("TABLE_NAME", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow("splunk", "summary")
+      .addRow("splunk", "splunklogger")
+      .addRow("splunk", "_thefishbucket")
+      .addRow("splunk", "_audit")
+      .addRow("splunk", "_internal")
+      .addRow("splunk", "_introspection")
+      .addRow("splunk", "main")
+      .addRow("splunk", "history")
+      .addRow("splunk", "spl")
+      .addRow("splunk", "_telemetry")
+      .build();
+
+    RowSetUtilities.verify(expected, results);
+  }
+
+  @Test
+  @Ignore("timestamp parsing error in antlr generated code")
+  public void testStarQuery() throws Exception {
+    String sql = "SELECT * FROM splunk._telemetry LIMIT 1";
+    client.testBuilder()
+      .sqlQuery(sql)
+      .baselineColumns("acceleration_id", "action", "add_offset", "add_timestamp", "apiEndTime", "apiStartTime",
+              "api_et", "api_lt", "app", "autojoin", "available_count", "buckets", "cache_size", "clientip",
+              "considered_events", "data_format", "decompressed_slices", "drop_count", "duration_command_search_index",
+              "duration_command_search_index_bucketcache_hit", "duration_command_search_index_bucketcache_miss",
+              "duration_command_search_rawdata", "duration_command_search_rawdata_bucketcache_hit",
+              "duration_command_search_rawdata_bucketcache_miss", "earliest", "elapsed_ms", "eliminated_buckets",
+              "enable_lookups", "event_count", "eventtype", "exec_time", "extra_fields", "field1", "format",
+              "fully_completed_search", "has_error_msg", "host", "index", "info",
+              "invocations_command_search_index_bucketcache_error", "invocations_command_search_index_bucketcache_hit",
+              "invocations_command_search_index_bucketcache_miss", "invocations_command_search_rawdata_bucketcache_error",
+              "invocations_command_search_rawdata_bucketcache_hit", "invocations_command_search_rawdata_bucketcache_miss",
+              "is_realtime", "latest", "linecount", "max_count", "maxtime", "mode", "multiValueField", "object",
+              "operation", "provenance", "reaso", "result_count", "roles", "savedsearch_name", "scan_count", "search",
+              "search_et", "search_id", "search_lt", "search_startup_time", "searched_buckets", "segmentation", "session",
+               "source", "sourcetype", "sourcetype_count__audittrail", "sourcetype_count__first_install_too_small",
+              "sourcetype_count__http_event_collector_metrics", "sourcetype_count__kvstore", "sourcetype_count__mongod",
+              "sourcetype_count__scheduler", "sourcetype_count__search_telemetry", "sourcetype_count__splunk_resource_usage",
+              "sourcetype_count__splunk_version", "sourcetype_count__splunk_web_access", "sourcetype_count__splunk_web_service",
+              "sourcetype_count__splunkd", "sourcetype_count__splunkd_access", "sourcetype_count__splunkd_conf",
+              "sourcetype_count__splunkd_stderr", "sourcetype_count__splunkd_ui_access", "splunk_server",
+              "splunk_server_group", "subsecond", "timestamp", "total_run_time", "total_slices", "ttl", "user", "useragent",
+              "_bkt", "_cd", "_eventtype_color", "_indextime", "_kv", "_raw", "_serial", "_si", "_sourcetype", "_subsecond",
+              "_time")
+      .expectsNumRecords(1)
+      .go();
+  }
+
+  @Test
+  @Ignore("the result is not consistent on system tables")
+  public void testRawSPLQuery() throws Exception {
+    String sql = "SELECT * FROM splunk.spl WHERE spl = 'search index=_internal earliest=1 latest=now | fieldsummary'";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("field", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("count", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("distinct_count", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("is_exact", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("max", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("mean", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("min", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("numeric_count", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("stdev", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("values", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow("index", "0", "0", "1", null, null, null, "0", null, "[]")
+      .build();
+
+    RowSetUtilities.verify(expected, results);
+  }
+
+  @Test
+  public void testExplictFieldsQuery() throws Exception {
+    String sql = "SELECT acceleration_id, action, add_offset, add_timestamp FROM splunk._audit LIMIT 2";
+
+    client.testBuilder()
+      .sqlQuery(sql)
+      .unOrdered()
+      .baselineColumns("acceleration_id", "action", "add_offset", "add_timestamp")
+      .expectsNumRecords(2)
+      .go();
+  }
+
+  @Test
+  public void testExplicitFieldsWithLimitQuery() throws Exception {
+    String sql = "SELECT action, _sourcetype, _subsecond, _time FROM splunk._audit LIMIT 3";
+    client.testBuilder()
+      .sqlQuery(sql)
+      .unOrdered()
+      .baselineColumns( "acceleration_id", "action", "add_offset", "add_timestamp")
+      .expectsNumRecords(3)
+      .go();
+  }
+
+  @Test
+  public void testExplicitFieldsWithSourceType() throws Exception {
+    String sql = "SELECT action, _sourcetype, _subsecond, _time FROM splunk._audit WHERE sourcetype='audittrail' LIMIT 5";
+    client.testBuilder()
+      .sqlQuery(sql)
+      .unOrdered()
+      .baselineColumns( "acceleration_id", "action", "add_offset", "add_timestamp")
+      .expectsNumRecords(5)
+      .go();
+  }
+
+  @Test
+  public void testExplicitFieldsWithOneFieldLimitQuery() throws Exception {
+    String sql = "SELECT `component` FROM splunk.`_introspection` ORDER BY `component` LIMIT 2";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    results.print();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("component", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow("Dispatch")
+      .addRow("Fishbucket")
+      .build();
+
+    RowSetUtilities.verify(expected, results);
+  }
+
+  @Test
+  @Ignore("the result is not consistent on system tables. The table may be empty before test running")
+  public void testSingleEqualityFilterQuery() throws Exception {
+    String sql = "SELECT action, _sourcetype FROM splunk._audit where action='edit'";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("action", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("_sourcetype", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow("edit", "audittrail")
+      .build();
+
+    RowSetUtilities.verify(expected, results);
+  }
+
+  @Test
+  @Ignore("the result is not consistent on system tables")
+  public void testMultipleEqualityFilterQuery() throws Exception {
+    String sql = "SELECT _time, clientip, file, host FROM splunk.main WHERE file='cart.do' AND clientip='217.15.20.146'";
+    client.testBuilder()
+      .sqlQuery(sql)
+      .ordered()
+      .expectsNumRecords(164)
+      .go();
+  }
+
+  @Test
+  public void testFilterOnUnProjectedColumnQuery() throws Exception {
+    String sql = "SELECT action, _sourcetype, _subsecond, _time FROM splunk._audit WHERE sourcetype='audittrail' LIMIT 5";
+    client.testBuilder()
+        .sqlQuery(sql)
+        .unOrdered()
+        .baselineColumns( "acceleration_id", "action", "add_offset", "add_timestamp")
+        .expectsNumRecords(5)
+        .go();
+  }
+
+  @Test
+  @Ignore("the result is not consistent on system tables")
+  public void testGreaterThanFilterQuery() throws Exception {
+    String sql = "SELECT clientip, file, bytes FROM splunk.main WHERE bytes > 40000";
+    client.testBuilder()
+      .sqlQuery(sql)
+      .ordered()
+      .expectsNumRecords(235)
+      .go();
+  }
+
+  @Test
+  public void testArbitrarySPL() throws Exception {
+    String sql = "SELECT field1, _mkv_child, multiValueField FROM splunk.spl WHERE spl='|noop| makeresults | eval field1 = \"abc def ghi jkl mno pqr stu vwx yz\" | makemv field1 | mvexpand " +
+      "field1 | eval " +
+      "multiValueField = \"cat dog bird\" | makemv multiValueField' LIMIT 10\n";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("field1", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("_mkv_child", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("multiValueField", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .build();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow("abc", "0", "cat dog bird")
+      .addRow("def", "1", "cat dog bird")
+      .addRow("ghi", "2", "cat dog bird")
+      .addRow("jkl", "3", "cat dog bird")
+      .addRow("mno", "4", "cat dog bird")
+      .addRow("pqr", "5", "cat dog bird")
+      .addRow("stu", "6", "cat dog bird")
+      .addRow("vwx", "7", "cat dog bird")
+      .addRow("yz", "8", "cat dog bird")
+      .build();
+
+    RowSetUtilities.verify(expected, results);
+  }
+
+  @Test
+  public void testSPLQueryWithMissingSPL() throws Exception {
+    String sql = "SELECT * FROM splunk.spl";
+    try {
+      client.queryBuilder().sql(sql).rowSet();
+      fail();
+    } catch (UserException e) {
+      assertTrue(e.getMessage().contains("SPL cannot be empty when querying spl table"));
+    }
+  }
+
+  @Test
+  public void testSerDe() throws Exception {
+    String sql = "select min(linecount) from splunk._audit";
+    String plan = queryBuilder().sql(sql).explainJson();
+    int cnt = queryBuilder().physical(plan).singletonInt();
+    assertEquals("Counts should match", 1, cnt);
+  }
+}
diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkQueryBuilderTest.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkQueryBuilderTest.java
new file mode 100644
index 0000000..e24e9e8
--- /dev/null
+++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkQueryBuilderTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.store.base.filter.ConstantHolder;
+import org.apache.drill.exec.store.base.filter.ExprNode;
+import org.apache.drill.exec.store.base.filter.RelOp;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class SplunkQueryBuilderTest {
+
+  @Test
+  public void testSimpleQuery() {
+    SplunkQueryBuilder builder = new SplunkQueryBuilder("main");
+    String query = builder.build();
+    assertEquals("search index=main | table *", query);
+  }
+
+  @Test
+  public void testAddSingleFieldQuery() {
+    SplunkQueryBuilder builder = new SplunkQueryBuilder("main");
+    builder.addField("field1");
+    String query = builder.build();
+    assertEquals("search index=main | fields field1 | table field1", query);
+  }
+
+  @Test
+  public void testAddMultipleFieldQuery() {
+    SplunkQueryBuilder builder = new SplunkQueryBuilder("main");
+    builder.addField("field1");
+    builder.addField("field2");
+    builder.addField("field3");
+    String query = builder.build();
+    assertEquals("search index=main | fields field1,field2,field3 | table field1,field2,field3", query);
+  }
+
+  @Test
+  public void testFieldsAndFiltersQuery() {
+    SplunkQueryBuilder builder = new SplunkQueryBuilder("main");
+
+    Map<String, ExprNode.ColRelOpConstNode> filters = new HashMap<>();
+    filters.put("field1", new ExprNode.ColRelOpConstNode("field1", RelOp.EQ, new ConstantHolder(TypeProtos.MinorType.VARCHAR, "foo")));
+
+    builder.addField("field1");
+    builder.addField("field2");
+    builder.addField("field3");
+
+    builder.addFilters(filters);
+
+    String query = builder.build();
+    assertEquals("search index=main field1=\"foo\" | fields field1,field2,field3 | table field1,field2,field3", query);
+  }
+
+  @Test
+  public void testFieldsAndSourcetypeQuery() {
+    SplunkQueryBuilder builder = new SplunkQueryBuilder("main");
+
+    Map<String, ExprNode.ColRelOpConstNode> filters = new HashMap<>();
+    filters.put("field1", new ExprNode.ColRelOpConstNode("field1", RelOp.EQ, new ConstantHolder(TypeProtos.MinorType.VARCHAR, "foo")));
+    filters.put("sourcetype", new ExprNode.ColRelOpConstNode("sourcetype", RelOp.EQ, new ConstantHolder(TypeProtos.MinorType.VARCHAR, "st")));
+
+    builder.addField("field1");
+    builder.addField("field2");
+    builder.addField("field3");
+
+    builder.addFilters(filters);
+
+    String query = builder.build();
+    assertEquals("search index=main sourcetype=\"st\" field1=\"foo\" | fields field1,field2,field3 | table field1,field2,field3", query);
+  }
+
+  @Test
+  public void testGTQuery() {
+    SplunkQueryBuilder builder = new SplunkQueryBuilder("main");
+
+    Map<String, ExprNode.ColRelOpConstNode> filters = new HashMap<>();
+    filters.put("field1", new ExprNode.ColRelOpConstNode("field1", RelOp.GT, new ConstantHolder(TypeProtos.MinorType.INT, 5)));
+
+    builder.addField("field1");
+    builder.addFilters(filters);
+
+    String query = builder.build();
+    assertEquals("search index=main field1>5 | fields field1 | table field1", query);
+  }
+
+  @Test
+  public void testGEQuery() {
+    SplunkQueryBuilder builder = new SplunkQueryBuilder("main");
+
+    Map<String, ExprNode.ColRelOpConstNode> filters = new HashMap<>();
+    filters.put("field1", new ExprNode.ColRelOpConstNode("field1", RelOp.GE, new ConstantHolder(TypeProtos.MinorType.INT, 5)));
+
+    builder.addField("field1");
+    builder.addFilters(filters);
+
+    String query = builder.build();
+    assertEquals("search index=main field1>=5 | fields field1 | table field1", query);
+  }
+
+  @Test
+  public void testLEQuery() {
+    SplunkQueryBuilder builder = new SplunkQueryBuilder("main");
+
+    Map<String, ExprNode.ColRelOpConstNode> filters = new HashMap<>();
+    filters.put("field1", new ExprNode.ColRelOpConstNode("field1", RelOp.LE, new ConstantHolder(TypeProtos.MinorType.INT, 5)));
+
+    builder.addField("field1");
+    builder.addFilters(filters);
+
+    String query = builder.build();
+    assertEquals("search index=main field1<=5 | fields field1 | table field1", query);
+  }
+
+  @Test
+  public void testLTQuery() {
+    SplunkQueryBuilder builder = new SplunkQueryBuilder("main");
+
+    Map<String, ExprNode.ColRelOpConstNode> filters = new HashMap<>();
+    filters.put("field1", new ExprNode.ColRelOpConstNode("field1", RelOp.LT, new ConstantHolder(TypeProtos.MinorType.INT, 5)));
+
+    builder.addField("field1");
+    builder.addFilters(filters);
+
+    String query = builder.build();
+    assertEquals("search index=main field1<5 | fields field1 | table field1", query);
+  }
+
+  @Test
+  public void testStarAndSourcetypeQuery() {
+    SplunkQueryBuilder builder = new SplunkQueryBuilder("main");
+
+    Map<String, ExprNode.ColRelOpConstNode> filters = new HashMap<>();
+    filters.put("field1", new ExprNode.ColRelOpConstNode("field1", RelOp.EQ, new ConstantHolder(TypeProtos.MinorType.VARCHAR, "foo")));
+    filters.put("sourcetype", new ExprNode.ColRelOpConstNode("sourcetype", RelOp.EQ, new ConstantHolder(TypeProtos.MinorType.VARCHAR, "st")));
+
+    builder.addField("field1");
+    builder.addField("field2");
+    builder.addField("field3");
+
+    builder.addFilters(filters);
+
+    String query = builder.build();
+    assertEquals("search index=main sourcetype=\"st\" field1=\"foo\" | fields field1,field2,field3 | table field1,field2,field3", query);
+  }
+
+  @Test
+  public void testLimitQuery() {
+    SplunkQueryBuilder builder = new SplunkQueryBuilder("main");
+    builder.addLimit(5);
+    String query = builder.build();
+    assertEquals("search index=main | head 5 | table *", query);
+  }
+
+  @Test
+  public void testAddSingleSourcetypeQuery() {
+    SplunkQueryBuilder builder = new SplunkQueryBuilder("main");
+    builder.addSourceType("access_combined_wcookie");
+    String query = builder.build();
+    assertEquals("search index=main sourcetype=\"access_combined_wcookie\" | table *", query);
+  }
+  @Test
+  public void testSingleFilterQuery() {
+    SplunkQueryBuilder builder = new SplunkQueryBuilder("main");
+    builder.addFilter("field1", "value1", SplunkQueryBuilder.EQUAL_OPERATOR);
+    String query = builder.build();
+    assertEquals("search index=main field1=\"value1\" | table *", query);
+  }
+
+  @Test
+  public void testMultipleFilterQuery() {
+    SplunkQueryBuilder builder = new SplunkQueryBuilder("main");
+    builder.addFilter("field1", "value1", SplunkQueryBuilder.EQUAL_OPERATOR);
+    builder.addFilter("field2", "value2", SplunkQueryBuilder.EQUAL_OPERATOR);
+    builder.addFilter("field3", "value3", SplunkQueryBuilder.EQUAL_OPERATOR);
+    String query = builder.build();
+    assertEquals("search index=main field1=\"value1\" field2=\"value2\" field3=\"value3\" | table *", query);
+  }
+
+  @Test
+  public void testAddMultipleSourcetypeQuery() {
+    SplunkQueryBuilder builder = new SplunkQueryBuilder("main");
+    builder.addSourceType("access_combined_wcookie");
+    builder.addSourceType("sourcetype2");
+    builder.addSourceType("sourcetype3");
+
+    String query = builder.build();
+    assertEquals("search index=main (sourcetype=\"access_combined_wcookie\" OR sourcetype=\"sourcetype2\" OR sourcetype=\"sourcetype3\") | table *", query);
+  }
+}
diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSplunkUtils.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSplunkUtils.java
new file mode 100644
index 0000000..f60dafb
--- /dev/null
+++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSplunkUtils.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.splunk;
+
+import org.apache.drill.categories.SlowTest;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@Category({SlowTest.class})
+public class SplunkTestSplunkUtils {
+
+  @Test
+  public void testIsSpecialField() {
+    assertTrue(SplunkUtils.isSpecialField("sourcetype"));
+    assertTrue(SplunkUtils.isSpecialField("earliestTime"));
+    assertTrue(SplunkUtils.isSpecialField("latestTime"));
+    assertTrue(SplunkUtils.isSpecialField("spl"));
+  }
+
+  @Test
+  public void testIsNotSpecialField() {
+    assertFalse(SplunkUtils.isSpecialField("bob"));
+    assertFalse(SplunkUtils.isSpecialField("ip_address"));
+    assertFalse(SplunkUtils.isSpecialField("mac_address"));
+    assertFalse(SplunkUtils.isSpecialField("latest_Time"));
+  }
+}
diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java
new file mode 100644
index 0000000..91fb89d
--- /dev/null
+++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+import org.apache.drill.categories.SlowTest;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+  SplunkConnectionTest.class,
+  SplunkQueryBuilderTest.class,
+  SplunkLimitPushDownTest.class,
+  SplunkIndexesTest.class,
+  SplunkPluginTest.class,
+  SplunkTestSplunkUtils.class
+})
+
+@Category({SlowTest.class})
+public class SplunkTestSuite extends ClusterTest {
+  private static final Logger logger = LoggerFactory.getLogger(SplunkTestSuite.class);
+
+  protected static SplunkPluginConfig SPLUNK_STORAGE_PLUGIN_CONFIG = null;
+  public static final String SPLUNK_LOGIN = "admin";
+  public static final String SPLUNK_PASS = "password";
+
+  private static volatile boolean runningSuite = true;
+  private static AtomicInteger initCount = new AtomicInteger(0);
+  @ClassRule
+  public static GenericContainer<?> splunk = new GenericContainer<>(DockerImageName.parse("splunk/splunk:8.1"))
+          .withExposedPorts(8089, 8089)
+          .withEnv("SPLUNK_START_ARGS", "--accept-license")
+          .withEnv("SPLUNK_PASSWORD", SPLUNK_PASS);
+
+  @BeforeClass
+  public static void initSplunk() throws Exception {
+    synchronized (SplunkTestSuite.class) {
+      if (initCount.get() == 0) {
+        startCluster(ClusterFixture.builder(dirTestWatcher));
+        splunk.start();
+        String hostname = splunk.getHost();
+        Integer port = splunk.getFirstMappedPort();
+        StoragePluginRegistry pluginRegistry = cluster.drillbit().getContext().getStorage();
+        SPLUNK_STORAGE_PLUGIN_CONFIG = new SplunkPluginConfig(SPLUNK_LOGIN, SPLUNK_PASS, hostname, port, "1", "now");
+        SPLUNK_STORAGE_PLUGIN_CONFIG.setEnabled(true);
+        pluginRegistry.put(SplunkPluginConfig.NAME, SPLUNK_STORAGE_PLUGIN_CONFIG);
+        runningSuite = true;
+      }
+      initCount.incrementAndGet();
+      runningSuite = true;
+    }
+    logger.info("Initialized Splunk in Docker container");
+  }
+
+  @AfterClass
+  public static void tearDownCluster() {
+    synchronized (SplunkTestSuite.class) {
+      if (initCount.decrementAndGet() == 0) {
+        splunk.close();
+      }
+    }
+  }
+
+  public static boolean isRunningSuite() {
+    return runningSuite;
+  }
+}
\ No newline at end of file
diff --git a/contrib/storage-splunk/src/test/resources/logback-test.xml.bak b/contrib/storage-splunk/src/test/resources/logback-test.xml.bak
new file mode 100644
index 0000000..1d8f1e7
--- /dev/null
+++ b/contrib/storage-splunk/src/test/resources/logback-test.xml.bak
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+    http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+
+<!--
+  This class is provided in the event someone wishes to do additional work on this
+  plugin.  To use, simply rename this file logback-test.xml
+-->
+<configuration>
+  <if condition='property("drill.lilith.enable").equalsIgnoreCase("true")'>
+    <then>
+      <appender name="SOCKET" class="de.huxhorn.lilith.logback.appender.ClassicMultiplexSocketAppender">
+        <Compressing>true</Compressing>
+        <ReconnectionDelay>10000</ReconnectionDelay>
+        <IncludeCallerData>true</IncludeCallerData>
+        <RemoteHosts>${LILITH_HOSTNAME:-localhost}</RemoteHosts>
+      </appender>
+
+      <logger name="org.apache.drill" additivity="false">
+        <level value="DEBUG"/>
+        <appender-ref ref="SOCKET"/>
+      </logger>
+
+      <logger name="query.logger" additivity="false">
+        <level value="ERROR"/>
+        <appender-ref ref="SOCKET"/>
+      </logger>
+      <logger name="org.apache.drill.exec.store.splunk">
+        <level value="DEBUG"/>
+        <appender-ref ref="SOCKET"/>
+      </logger>
+    </then>
+  </if>
+
+  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+    <!-- encoders are assigned the type
+         ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+    <encoder>
+      <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
+    </encoder>
+  </appender>
+  <logger name="org.apache.drill.exec.store.splunk" additivity="false">
+    <level value="DEBUG" />
+    <appender-ref ref="STDOUT" />
+  </logger>
+</configuration>
diff --git a/contrib/storage-splunk/src/test/resources/test_data.csv b/contrib/storage-splunk/src/test/resources/test_data.csv
new file mode 100644
index 0000000..ad07fc0
--- /dev/null
+++ b/contrib/storage-splunk/src/test/resources/test_data.csv
@@ -0,0 +1,11 @@
+JSESSIONID,action,bytes,categoryId,clientip,cookie,"date_hour","date_mday","date_minute","date_month","date_second","date_wday","date_year","date_zone",eventtype,file,host,ident,index,items,linecount,method,msg,other,productId,punct,q,referer,"referer_domain","req_time",root,source,sourcetype,"splunk_server","splunk_server_group",start,status,t,timeendpos,timestartpos,uri,"uri_domain","uri_path","uri_query",user,useragent,version,"_bkt","_cd","_eventtype_color","_indextime","_kv","_raw", [...]
+SD6SL9FF1ADFF4957,view,956,ACCESSORIES,"198.35.2.120",,14,2,45,may,44,saturday,2020,local,,"cart.do","web_application","-",main,,1,GET,,912,"WC-SH-A01","..._-_-_[//:::]_""_/.?=&=--&=__.""___""://../.?=""_""/.",,"http://www.buttercupgames.com/category.screen?categoryId=ACCESSORIES","http://www.buttercupgames.com","02/May/2020:14:45:44",,"access_30DAY.log","access_combined_wcookie","Charless-MacBook-Pro.local",,,200,,38,18,"/cart.do?action=view&productId=WC-SH-A01&JSESSIONID=SD6SL9FF1ADFF4 [...]
+main","access_combined_wcookie","2020-05-02 14:45:44.000 EDT"
+SD6SL9FF1ADFF4957,,2095,,"198.35.2.120",,14,2,45,may,33,saturday,2020,local,,"product.screen","web_application","-",main,,1,GET,,953,"SF-BVS-01","..._-_-_[//:::]_""_/.?=--&=__.""___""://../.?=--""_""/.",,"http://www.buttercupgames.com/product.screen?productId=SF-BVS-01","http://www.buttercupgames.com","02/May/2020:14:45:33",,"access_30DAY.log","access_combined_wcookie","Charless-MacBook-Pro.local",,,408,,38,18,"/product.screen?productId=SF-BVS-01&JSESSIONID=SD6SL9FF1ADFF4957",,"/product. [...]
+main","access_combined_wcookie","2020-05-02 14:45:33.000 EDT"
+SD6SL9FF1ADFF4957,view,2219,,"198.35.2.120",,14,2,45,may,21,saturday,2020,local,,"product.screen","web_application","-",main,,1,POST,,267,"MB-AG-G07","..._-_-_[//:::]_""_/.?=--&=__.""___""://../.?=&=--""_""",,"http://www.buttercupgames.com/cart.do?action=view&productId=MB-AG-G07","http://www.buttercupgames.com","02/May/2020:14:45:21",,"access_30DAY.log","access_combined_wcookie","Charless-MacBook-Pro.local",,,200,,38,18,"/product.screen?productId=MB-AG-G07&JSESSIONID=SD6SL9FF1ADFF4957",, [...]
+main","access_combined_wcookie","2020-05-02 14:45:21.000 EDT"
+SD6SL9FF1ADFF4957,,2729,STRATEGY,"198.35.2.120",,14,2,45,may,13,saturday,2020,local,,"category.screen","web_application","-",main,,1,POST,,439,,"..._-_-_[//:::]_""_/.?=&=__.""___""://../.?=""_""/._(__",,"http://www.buttercupgames.com/category.screen?categoryId=STRATEGY","http://www.buttercupgames.com","02/May/2020:14:45:13",,"access_30DAY.log","access_combined_wcookie","Charless-MacBook-Pro.local",,,200,,38,18,"/category.screen?categoryId=STRATEGY&JSESSIONID=SD6SL9FF1ADFF4957",,"/categor [...]
+main","access_combined_wcookie","2020-05-02 14:45:13.000 EDT"
+SD6SL9FF1ADFF4957,,3546,NULL,"198.35.2.120",,14,2,45,may,9,saturday,2020,local,,"product.screen","web_application","-",main,,1,GET,,575,"SF-BVS-01","..._-_-_[//:::]_""_/.?=--&=__.""___""://../.?=""_""/._(",,"http://www.buttercupgames.com/category.screen?categoryId=NULL","http://www.buttercupgames.com","02/May/2020:14:45:09",,"access_30DAY.log","access_combined_wcookie","Charless-MacBook-Pro.local",,,403,,38,18,"/product.screen?productId=SF-BVS-01&JSESSIONID=SD6SL9FF1ADFF4957",,"/product. [...]
+main","access_combined_wcookie","2020-05-02 14:45:09.000 EDT"
diff --git a/contrib/storage-splunk/src/test/splunk.md b/contrib/storage-splunk/src/test/splunk.md
new file mode 100644
index 0000000..45038f2
--- /dev/null
+++ b/contrib/storage-splunk/src/test/splunk.md
@@ -0,0 +1,22 @@
+# Set up Splunk Docker Env for testing
+   _Note: currently `testcontainers` Maven lib is used for automatic Splunk Docker container running_
+## Pull docker image
+``docker pull splunk/splunk``
+## Start container
+``docker run -d -p 8000:8000 -p 8089:8089 -e "SPLUNK_START_ARGS=--accept-license" -e "SPLUNK_PASSWORD=password" --name splunk splunk/splunk:latest``
+## Get a session key using the /services/auth/login endpoint:
+``
+curl -k https://localhost:8089/services/auth/login --data-urlencode username=admin --data-urlencode password=pass
+``
+## Open the bash console for the container:
+``
+docker exec -it {container_name} bash
+``
+   
+   The response is your session key:
+```
+<response>
+  <sessionKey>192fd3e46a31246da7ea7f109e7f95fd</sessionKey>
+</response>
+```
+   See more details: https://docs.splunk.com/Documentation/Splunk/8.1.1/RESTUM/RESTusing#Authentication_with_HTTP_Authorization_tokens
\ No newline at end of file
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 5a21c9d..1ec77af 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -344,6 +344,11 @@
         </dependency>
         <dependency>
           <groupId>org.apache.drill.contrib</groupId>
+          <artifactId>drill-storage-splunk</artifactId>
+          <version>${project.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.drill.contrib</groupId>
           <artifactId>drill-udfs</artifactId>
           <version>${project.version}</version>
         </dependency>
diff --git a/distribution/src/assemble/component.xml b/distribution/src/assemble/component.xml
index 2fce9f0..515c105 100644
--- a/distribution/src/assemble/component.xml
+++ b/distribution/src/assemble/component.xml
@@ -54,6 +54,7 @@
         <include>org.apache.drill.contrib:drill-format-spss:jar</include>
         <include>org.apache.drill.contrib:drill-jdbc-storage:jar</include>
         <include>org.apache.drill.contrib:drill-kudu-storage:jar</include>
+        <include>org.apache.drill.contrib:drill-storage-splunk:jar</include>
         <include>org.apache.drill.contrib:drill-storage-kafka:jar</include>
         <include>org.apache.drill.contrib:drill-storage-elasticsearch:jar</include>
         <include>org.apache.drill.contrib:drill-storage-cassandra:jar</include>
diff --git a/pom.xml b/pom.xml
index fb0412c..6b2bd1a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -216,6 +216,11 @@
       <id>jitpack.io</id>
       <url>https://jitpack.io</url>
     </repository>
+    <repository>
+      <id>splunk-artifactory</id>
+      <name>Splunk Releases</name>
+      <url>https://splunk.jfrog.io/artifactory/ext-releases-local</url>
+    </repository>
   </repositories>
 
   <issueManagement>