You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2022/12/22 12:24:31 UTC

[drill] branch master updated: DRILL-8371: Add Write/Insert Capability to Splunk Plugin (#2722)

This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 83bf04cd81 DRILL-8371: Add Write/Insert Capability to Splunk Plugin (#2722)
83bf04cd81 is described below

commit 83bf04cd815e6c6d22d9ef04550cd8113bd9f905
Author: Charles S. Givre <cg...@apache.org>
AuthorDate: Thu Dec 22 07:24:23 2022 -0500

    DRILL-8371: Add Write/Insert Capability to Splunk Plugin (#2722)
---
 contrib/storage-splunk/README.md                   |   5 +
 .../exec/store/splunk/SplunkBatchInsertWriter.java |  38 +++
 .../drill/exec/store/splunk/SplunkBatchWriter.java | 356 +++++++++++++++++++++
 .../drill/exec/store/splunk/SplunkConnection.java  |  10 +-
 .../exec/store/splunk/SplunkInsertWriter.java      |  71 ++++
 .../splunk/SplunkInsertWriterBatchCreator.java     |  42 +++
 .../exec/store/splunk/SplunkPluginConfig.java      |  29 +-
 .../drill/exec/store/splunk/SplunkSchema.java      | 175 ++++++++++
 .../exec/store/splunk/SplunkSchemaFactory.java     |  90 ------
 .../exec/store/splunk/SplunkStoragePlugin.java     |  10 +
 .../drill/exec/store/splunk/SplunkWriter.java      |  84 +++++
 .../store/splunk/SplunkWriterBatchCreator.java     |  42 +++
 .../exec/store/splunk/SplunkConnectionTest.java    |   2 +-
 .../drill/exec/store/splunk/SplunkTestSuite.java   |  11 +-
 .../drill/exec/store/splunk/SplunkWriterTest.java  | 191 +++++++++++
 .../src/test/resources/schema_test.json            |  14 +
 .../src/test/resources/test_data.csvh              |  11 +
 .../src/test/resources/test_data2.csvh             |  11 +
 docs/dev/CreatingAWriter.md                        |  50 +--
 19 files changed, 1111 insertions(+), 131 deletions(-)

diff --git a/contrib/storage-splunk/README.md b/contrib/storage-splunk/README.md
index 09c179dea1..0fe032aa01 100644
--- a/contrib/storage-splunk/README.md
+++ b/contrib/storage-splunk/README.md
@@ -158,6 +158,11 @@ FROM splunk.spl
 WHERE spl='<your SPL query'
 ```
 
+# Writing Data to Splunk
+As of Drill version 2.0 you can write and append data to Splunk.  All fields are sent as strings, to include complex objects.
+
+You can create a new index with a `CREATE TABLE splunk.new_index AS ...` query.  Likewise, you can append to an existing index with an `INSERT INTO splunk.index SELECT...` query.
+
 # Testing the Plugin
 This plugin includes a series of unit tests in the `src/test/` directory, however there are a few tests for which you will need an active Splunk installation to run them.
 Simply follow the instructions below to test Splunk with Drill.
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchInsertWriter.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchInsertWriter.java
new file mode 100644
index 0000000000..b8107ec960
--- /dev/null
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchInsertWriter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
+import org.apache.drill.exec.record.VectorAccessible;
+
+import java.util.List;
+
+public class SplunkBatchInsertWriter extends SplunkBatchWriter {
+
+  public SplunkBatchInsertWriter(UserCredentials userCredentials, List<String> tableIdentifier, SplunkWriter config) {
+    super(userCredentials, tableIdentifier, config);
+    String indexName = tableIdentifier.get(0);
+    destinationIndex = splunkService.getIndexes().get(indexName);
+  }
+
+  @Override
+  public void updateSchema(VectorAccessible batch) {
+    // No op
+  }
+}
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchWriter.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchWriter.java
new file mode 100644
index 0000000000..83f6ded8fc
--- /dev/null
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchWriter.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+
+import com.splunk.Index;
+import com.splunk.IndexCollection;
+import com.splunk.ReceiverBehavior;
+import com.splunk.Service;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.store.AbstractRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.util.Text;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class SplunkBatchWriter extends AbstractRecordWriter {
+
+  private static final Logger logger = LoggerFactory.getLogger(SplunkBatchWriter.class);
+  private static final String DEFAULT_SOURCETYPE = "drill";
+  private final UserCredentials userCredentials;
+  private final List<String> tableIdentifier;
+  private final SplunkWriter config;
+  protected final Service splunkService;
+  private JSONObject splunkEvent;
+  private final List<JSONObject> eventBuffer;
+  protected Index destinationIndex;
+  private int recordCount;
+
+
+  public SplunkBatchWriter(UserCredentials userCredentials, List<String> tableIdentifier, SplunkWriter config) {
+    this.config = config;
+    this.tableIdentifier = tableIdentifier;
+    this.userCredentials = userCredentials;
+    this.splunkEvent = new JSONObject();
+    eventBuffer = new ArrayList<>();
+    SplunkConnection connection = new SplunkConnection(config.getPluginConfig(), userCredentials.getUserName());
+    this.splunkService = connection.connect();
+  }
+
+  @Override
+  public void init(Map<String, String> writerOptions) {
+    // No op
+  }
+
+  /**
+   * Update the schema in RecordWriter. Called before starting writing the records. In this case,
+   * we add the index to Splunk here. Splunk's API is a little sparse and doesn't really do much in the way
+   * of error checking or providing feedback if the operation fails.
+   *
+   * @param batch {@link VectorAccessible} The incoming batch
+   */
+  @Override
+  public void updateSchema(VectorAccessible batch) {
+    logger.debug("Updating schema for Splunk");
+
+    //Get the collection of indexes
+    IndexCollection indexes = splunkService.getIndexes();
+    try {
+      String indexName = tableIdentifier.get(0);
+      indexes.create(indexName);
+      destinationIndex = splunkService.getIndexes().get(indexName);
+    } catch (Exception e) {
+      // We have to catch a generic exception here, as Splunk's SDK does not really provide any kind of
+      // failure messaging.
+      throw UserException.systemError(e)
+        .message("Error creating new index in Splunk plugin: " + e.getMessage())
+        .build(logger);
+    }
+  }
+
+
+  @Override
+  public void startRecord() {
+    logger.debug("Starting record");
+    // Ensure that the new record is empty.
+    splunkEvent = new JSONObject();
+  }
+
+  @Override
+  public void endRecord() {
+    logger.debug("Ending record");
+    recordCount++;
+
+    // Put event in buffer
+    eventBuffer.add(splunkEvent);
+
+    // Write the event to the Splunk index
+    if (recordCount >= config.getPluginConfig().getWriterBatchSize()) {
+      try {
+        writeEvents();
+      } catch (IOException e) {
+        throw  UserException.dataWriteError(e)
+            .message("Error writing data to Splunk: " + e.getMessage())
+            .build(logger);
+      }
+
+      // Reset record count
+      recordCount = 0;
+    }
+  }
+
+
+  /*
+  args – Optional arguments for this stream. Valid parameters are: "host", "host_regex", "source", and "sourcetype".
+   */
+  @Override
+  public void abort() {
+    logger.debug("Aborting writing records to Splunk.");
+    // No op
+  }
+
+  @Override
+  public void cleanup() {
+    try {
+      writeEvents();
+    } catch (IOException e) {
+      throw  UserException.dataWriteError(e)
+          .message("Error writing data to Splunk: " + e.getMessage())
+          .build(logger);
+    }
+  }
+
+  private void writeEvents() throws IOException {
+    // Open the socket and stream, set up a timestamp
+    destinationIndex.attachWith(new ReceiverBehavior() {
+      public void run(OutputStream stream) throws IOException {
+        String eventText;
+
+        for (JSONObject tempEvent : eventBuffer) {
+          eventText = tempEvent.toJSONString() + "'\r\n";
+          stream.write(eventText.getBytes(StandardCharsets.UTF_8));
+        }
+      }
+    });
+
+    // Clear buffer
+    eventBuffer.clear();
+  }
+
+
+  @Override
+  public FieldConverter getNewNullableIntConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewIntConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewNullableBigIntConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewBigIntConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewNullableSmallIntConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewSmallIntConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewNullableTinyIntConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewTinyIntConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewNullableFloat4Converter(int fieldId, String fieldName, FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewFloat4Converter(int fieldId, String fieldName, FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewNullableFloat8Converter(int fieldId, String fieldName, FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewFloat8Converter(int fieldId, String fieldName, FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewNullableVarDecimalConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewVarDecimalConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewNullableDateConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewDateConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewNullableTimeConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewTimeConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewNullableVarCharConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new VarCharSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewNullableTimeStampConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewTimeStampConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewVarCharConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new VarCharSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewNullableBitConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewBitConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewMapConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new ComplexFieldConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewUnionConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new ComplexFieldConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewRepeatedMapConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new ComplexFieldConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewRepeatedListConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new ComplexFieldConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewDictConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new ComplexFieldConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewRepeatedDictConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new ComplexFieldConverter(fieldId, fieldName, reader);
+  }
+
+  public class VarCharSplunkConverter extends FieldConverter {
+
+    public VarCharSplunkConverter(int fieldID, String fieldName, FieldReader reader) {
+      super(fieldID, fieldName, reader);
+    }
+
+    @Override
+    public void writeField() {
+      Text text = reader.readText();
+      if (text != null && text.getLength() > 0) {
+        byte[] bytes =text.copyBytes();
+        splunkEvent.put(fieldName, new String(bytes));
+      }
+    }
+  }
+
+  public class ScalarSplunkConverter extends FieldConverter {
+    public ScalarSplunkConverter(int fieldID, String fieldName, FieldReader reader) {
+      super(fieldID, fieldName, reader);
+    }
+
+    @Override
+    public void writeField() {
+      splunkEvent.put(fieldName, String.valueOf(reader.readObject()));
+    }
+  }
+
+  public class ComplexFieldConverter extends FieldConverter {
+    public ComplexFieldConverter(int fieldID, String fieldName, FieldReader reader) {
+      super(fieldID, fieldName, reader);
+    }
+
+    @Override
+    public void writeField() throws IOException {
+      splunkEvent.put(fieldName, reader.readObject());
+    }
+  }
+}
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java
index c9571c6d5b..02cb431e81 100644
--- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java
@@ -30,16 +30,15 @@ import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.TimeUnit;
-
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLSocketFactory;
 import javax.net.ssl.TrustManager;
 import javax.net.ssl.X509TrustManager;
-
 import java.security.KeyManagementException;
 import java.security.NoSuchAlgorithmException;
 import java.security.cert.X509Certificate;
+import java.util.concurrent.TimeUnit;
+
 import java.util.Optional;
 
 /**
@@ -80,7 +79,7 @@ public class SplunkConnection {
     this.owner = config.getOwner();
     this.token = config.getToken();
     this.cookie = config.getCookie();
-    this.validateCertificates = Optional.ofNullable(config.getValidateCertificates()).orElse(true);
+    this.validateCertificates = config.getValidateCertificates();
     this.connectionAttempts = config.getReconnectRetries();
     service = connect();
   }
@@ -107,9 +106,8 @@ public class SplunkConnection {
 
   /**
    * Connects to Splunk instance
-   * @return an active Splunk connection.
+   * @return an active Splunk {@link Service} connection.
    */
-
   public Service connect() {
     HttpService.setSslSecurityProtocol(SSLSecurityProtocol.TLSv1_2);
     HttpService.setValidateCertificates(validateCertificates);
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkInsertWriter.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkInsertWriter.java
new file mode 100644
index 0000000000..4bf28f3c3e
--- /dev/null
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkInsertWriter.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import java.util.List;
+
+public class SplunkInsertWriter extends SplunkWriter {
+  public static final String OPERATOR_TYPE = "SPLUNK_INSERT_WRITER";
+
+  private final SplunkStoragePlugin plugin;
+  private final List<String> tableIdentifier;
+
+  @JsonCreator
+  public SplunkInsertWriter(
+      @JsonProperty("child") PhysicalOperator child,
+      @JsonProperty("tableIdentifier") List<String> tableIdentifier,
+      @JsonProperty("storage") SplunkPluginConfig storageConfig,
+      @JacksonInject StoragePluginRegistry pluginRegistry) {
+    super(child, tableIdentifier, pluginRegistry.resolve(storageConfig, SplunkStoragePlugin.class));
+    this.plugin = pluginRegistry.resolve(storageConfig, SplunkStoragePlugin.class);
+    this.tableIdentifier = tableIdentifier;
+  }
+
+  SplunkInsertWriter(PhysicalOperator child, List<String> tableIdentifier, SplunkStoragePlugin plugin) {
+    super(child, tableIdentifier, plugin);
+    this.tableIdentifier = tableIdentifier;
+    this.plugin = plugin;
+  }
+
+  @Override
+  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+    return new SplunkInsertWriter(child, tableIdentifier, plugin);
+  }
+
+  public List<String> getTableIdentifier() {
+    return tableIdentifier;
+  }
+
+  @Override
+  public String getOperatorType() {
+    return OPERATOR_TYPE;
+  }
+
+  @JsonIgnore
+  public SplunkPluginConfig getPluginConfig() {
+    return this.plugin.getConfig();
+  }
+}
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkInsertWriterBatchCreator.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkInsertWriterBatchCreator.java
new file mode 100644
index 0000000000..460745fe78
--- /dev/null
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkInsertWriterBatchCreator.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.InsertWriterRecordBatch;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+
+import java.util.List;
+
+@SuppressWarnings("unused")
+public class SplunkInsertWriterBatchCreator implements BatchCreator<SplunkInsertWriter> {
+  @Override
+  public CloseableRecordBatch getBatch(ExecutorFragmentContext context, SplunkInsertWriter config, List<RecordBatch> children) {
+    assert children != null && children.size() == 1;
+
+    UserCredentials userCreds = context.getContextInformation().getQueryUserCredentials();
+
+    return new InsertWriterRecordBatch(config, children.iterator().next(), context,
+        new SplunkBatchInsertWriter(userCreds, config.getTableIdentifier(), config)
+    );
+  }
+}
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java
index 579edaf1b6..759c2a8d46 100644
--- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java
@@ -37,6 +37,7 @@ public class SplunkPluginConfig extends StoragePluginConfig {
 
   public static final String NAME = "splunk";
   public static final int DISABLED_RECONNECT_RETRIES = 1;
+  public static final int DEFAULT_WRITER_BATCH_SIZE = 1000;
 
   private final String scheme;
   private final String hostname;
@@ -47,8 +48,10 @@ public class SplunkPluginConfig extends StoragePluginConfig {
   private final String owner;
   private final String token;
   private final String cookie;
-  private final Boolean validateCertificates;
+  private final boolean validateCertificates;
   private final Integer reconnectRetries;
+  private final boolean writable;
+  private final Integer writerBatchSize;
 
   @JsonCreator
   public SplunkPluginConfig(@JsonProperty("username") String username,
@@ -60,12 +63,14 @@ public class SplunkPluginConfig extends StoragePluginConfig {
                             @JsonProperty("owner") String owner,
                             @JsonProperty("token") String token,
                             @JsonProperty("cookie") String cookie,
-                            @JsonProperty("validateCertificates") Boolean validateCertificates,
+                            @JsonProperty("validateCertificates") boolean validateCertificates,
                             @JsonProperty("earliestTime") String earliestTime,
                             @JsonProperty("latestTime") String latestTime,
                             @JsonProperty("credentialsProvider") CredentialsProvider credentialsProvider,
                             @JsonProperty("reconnectRetries") Integer reconnectRetries,
-                            @JsonProperty("authMode") String authMode) {
+                            @JsonProperty("authMode") String authMode,
+                            @JsonProperty("writable") boolean writable,
+                            @JsonProperty("writableBatchSize") Integer writerBatchSize) {
     super(CredentialProviderUtils.getCredentialsProvider(username, password, credentialsProvider),
         credentialsProvider == null, AuthMode.parseOrDefault(authMode, AuthMode.SHARED_USER));
     this.scheme = scheme;
@@ -75,10 +80,12 @@ public class SplunkPluginConfig extends StoragePluginConfig {
     this.owner = owner;
     this.token = token;
     this.cookie = cookie;
+    this.writable = writable;
     this.validateCertificates = validateCertificates;
     this.earliestTime = earliestTime;
     this.latestTime = latestTime == null ? "now" : latestTime;
     this.reconnectRetries = reconnectRetries;
+    this.writerBatchSize = writerBatchSize;
   }
 
   private SplunkPluginConfig(SplunkPluginConfig that, CredentialsProvider credentialsProvider) {
@@ -89,11 +96,13 @@ public class SplunkPluginConfig extends StoragePluginConfig {
     this.app = that.app;
     this.owner = that.owner;
     this.token = that.token;
+    this.writable = that.writable;
     this.cookie = that.cookie;
     this.validateCertificates = that.validateCertificates;
     this.earliestTime = that.earliestTime;
     this.latestTime = that.latestTime;
     this.reconnectRetries = that.reconnectRetries;
+    this.writerBatchSize = that.writerBatchSize;
   }
 
   /**
@@ -154,6 +163,10 @@ public class SplunkPluginConfig extends StoragePluginConfig {
     return port;
   }
 
+  public Boolean isWritable() {
+    return writable;
+  }
+
   @JsonProperty("app")
   public String getApp() {
     return app;
@@ -175,7 +188,7 @@ public class SplunkPluginConfig extends StoragePluginConfig {
   }
 
   @JsonProperty("validateCertificates")
-  public Boolean getValidateCertificates() {
+  public boolean getValidateCertificates() {
     return validateCertificates;
   }
 
@@ -194,6 +207,11 @@ public class SplunkPluginConfig extends StoragePluginConfig {
     return reconnectRetries != null ? reconnectRetries : DISABLED_RECONNECT_RETRIES;
   }
 
+  @JsonProperty("writerBatchSize")
+  public int getWriterBatchSize() {
+    return writerBatchSize != null ? writerBatchSize : DEFAULT_WRITER_BATCH_SIZE;
+  }
+
   private static CredentialsProvider getCredentialsProvider(CredentialsProvider credentialsProvider) {
     return credentialsProvider != null ? credentialsProvider : PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER;
   }
@@ -211,6 +229,7 @@ public class SplunkPluginConfig extends StoragePluginConfig {
       Objects.equals(hostname, thatConfig.hostname) &&
       Objects.equals(port, thatConfig.port) &&
       Objects.equals(app, thatConfig.app) &&
+      Objects.equals(writable, thatConfig.writable) &&
       Objects.equals(owner, thatConfig.owner) &&
       Objects.equals(token, thatConfig.token) &&
       Objects.equals(cookie, thatConfig.cookie) &&
@@ -231,6 +250,7 @@ public class SplunkPluginConfig extends StoragePluginConfig {
       owner,
       token,
       cookie,
+      writable,
       validateCertificates,
       earliestTime,
       latestTime,
@@ -245,6 +265,7 @@ public class SplunkPluginConfig extends StoragePluginConfig {
       .field("scheme", scheme)
       .field("hostname", hostname)
       .field("port", port)
+      .field("writable", writable)
       .field("app", app)
       .field("owner", owner)
       .field("token", token)
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java
new file mode 100644
index 0000000000..875d052a31
--- /dev/null
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+import com.splunk.IndexCollection;
+import org.apache.calcite.schema.Table;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Writer;
+import org.apache.drill.exec.planner.logical.CreateTableEntry;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
+import org.apache.drill.exec.planner.logical.ModifyTableEntry;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.StorageStrategy;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class SplunkSchema extends AbstractSchema {
+  private final static Logger logger = LoggerFactory.getLogger(SplunkSchema.class);
+  private static final String SPL_TABLE_NAME = "spl";
+  private final Map<String, DynamicDrillTable> activeTables = new HashMap<>();
+  private final SplunkStoragePlugin plugin;
+  private final String queryUserName;
+
+  public SplunkSchema(SplunkStoragePlugin plugin, String queryUserName) {
+    super(Collections.emptyList(), plugin.getName());
+    this.plugin = plugin;
+    this.queryUserName = queryUserName;
+
+
+    registerIndexes();
+  }
+
+  @Override
+  public Table getTable(String name) {
+    DynamicDrillTable table = activeTables.get(name);
+    if (table != null) {
+      // If the table was found, return it.
+      return table;
+    } else if (activeTables.containsKey(name)) {
+      // Register the table if it is in the list of indexes.
+      return registerTable(name, new DynamicDrillTable(plugin, plugin.getName(),
+        new SplunkScanSpec(plugin.getName(), name, plugin.getConfig(), queryUserName)));
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public boolean showInInformationSchema() {
+    return true;
+  }
+
+  @Override
+  public Set<String> getTableNames() {
+    return Sets.newHashSet(activeTables.keySet());
+  }
+
+  private DynamicDrillTable registerTable(String name, DynamicDrillTable table) {
+    activeTables.put(name, table);
+    return table;
+  }
+
+  @Override
+  public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns,
+    StorageStrategy strategy) {
+    if (plugin.getConfig().isWritable() == null || (! plugin.getConfig().isWritable())) {
+      throw UserException
+        .dataWriteError()
+        .message(plugin.getName() + " is not writable.")
+        .build(logger);
+    }
+
+    return new CreateTableEntry() {
+      @Override
+      public Writer getWriter(PhysicalOperator child) throws IOException {
+        return new SplunkWriter(child, Collections.singletonList(tableName),  plugin);
+      }
+
+      @Override
+      public List<String> getPartitionColumns() {
+        return Collections.emptyList();
+      }
+    };
+  }
+
+  /**
+   * This function contains the logic to delete an index from Splunk. The Splunk SDK does not
+   * have any kind of indication whether the operation succeeded or failed.
+   * @param indexName The name of the index to be deleted.
+   */
+  @Override
+  public void dropTable(String indexName) {
+    SplunkConnection connection = initializeConnection();
+
+    //Get the collection of indexes
+    IndexCollection indexes = connection.connect().getIndexes();
+
+    // Drop the index
+    indexes.remove(indexName);
+  }
+
+  @Override
+  public ModifyTableEntry modifyTable(String tableName) {
+    return child -> new SplunkInsertWriter(child, Collections.singletonList(tableName), plugin);
+  }
+
+  @Override
+  public boolean isMutable() {
+    return plugin.getConfig().isWritable();
+  }
+
+  @Override
+  public String getTypeName() {
+    return SplunkPluginConfig.NAME;
+  }
+
+  private void registerIndexes() {
+    // Verify that the connection is successful.  If not, don't register any indexes,
+    // and throw an exception.
+    SplunkConnection connection = initializeConnection();
+
+    // Add default "spl" table to index list.
+    registerTable(SPL_TABLE_NAME, new DynamicDrillTable(plugin, plugin.getName(),
+      new SplunkScanSpec(plugin.getName(), SPL_TABLE_NAME, plugin.getConfig(), queryUserName)));
+
+    // Retrieve and add all other Splunk indexes
+    for (String indexName : connection.getIndexes().keySet()) {
+      logger.debug("Registering {}", indexName);
+      registerTable(indexName, new DynamicDrillTable(plugin, plugin.getName(),
+        new SplunkScanSpec(plugin.getName(), indexName, plugin.getConfig(), queryUserName)));
+    }
+  }
+
+  private SplunkConnection initializeConnection() {
+    // Verify that the connection is successful.  If not, don't register any indexes,
+    // and throw an exception.
+    SplunkPluginConfig config = plugin.getConfig();
+    SplunkConnection connection;
+    try {
+      connection = new SplunkConnection(config, queryUserName);
+      connection.connect();
+    } catch (Exception e) {
+      // Catch any connection errors that may happen.
+      throw UserException.connectionError()
+        .message("Unable to connect to Splunk: " +  plugin.getName() + " " + e.getMessage())
+        .build(logger);
+    }
+    return connection;
+  }
+}
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchemaFactory.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchemaFactory.java
index 8270c45d58..aa070a4ff7 100644
--- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchemaFactory.java
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchemaFactory.java
@@ -19,25 +19,11 @@
 package org.apache.drill.exec.store.splunk;
 
 import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.Table;
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.planner.logical.DynamicDrillTable;
-import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.AbstractSchemaFactory;
 import org.apache.drill.exec.store.SchemaConfig;
-import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
 
 public class SplunkSchemaFactory extends AbstractSchemaFactory {
 
-  private static final Logger logger = LoggerFactory.getLogger(SplunkSchemaFactory.class);
-  private static final String SPL_TABLE_NAME = "spl";
   private final SplunkStoragePlugin plugin;
   private String queryUserName;
 
@@ -52,80 +38,4 @@ public class SplunkSchemaFactory extends AbstractSchemaFactory {
     SplunkSchema schema = new SplunkSchema(plugin, queryUserName);
     SchemaPlus plusOfThis = parent.add(schema.getName(), schema);
   }
-
-  static class SplunkSchema extends AbstractSchema {
-
-    private final Map<String, DynamicDrillTable> activeTables = new HashMap<>();
-    private final SplunkStoragePlugin plugin;
-    private final String queryUserName;
-
-    public SplunkSchema(SplunkStoragePlugin plugin, String queryUserName) {
-      super(Collections.emptyList(), plugin.getName());
-      this.plugin = plugin;
-      this.queryUserName = queryUserName;
-
-
-      registerIndexes();
-    }
-
-    @Override
-    public Table getTable(String name) {
-      DynamicDrillTable table = activeTables.get(name);
-      if (table != null) {
-        // If the table was found, return it.
-        return table;
-      } else {
-        // Register the table
-        return registerTable(name, new DynamicDrillTable(plugin, plugin.getName(),
-          new SplunkScanSpec(plugin.getName(), name, plugin.getConfig(), queryUserName)));
-      }
-    }
-
-    @Override
-    public boolean showInInformationSchema() {
-      return true;
-    }
-
-    @Override
-    public Set<String> getTableNames() {
-      return Sets.newHashSet(activeTables.keySet());
-    }
-
-    private DynamicDrillTable registerTable(String name, DynamicDrillTable table) {
-      activeTables.put(name, table);
-      return table;
-    }
-
-    @Override
-    public String getTypeName() {
-      return SplunkPluginConfig.NAME;
-    }
-
-    private void registerIndexes() {
-      // Verify that the connection is successful.  If not, don't register any indexes,
-      // and throw an exception.
-      SplunkPluginConfig config = plugin.getConfig();
-      SplunkConnection connection;
-      try {
-        connection = new SplunkConnection(config, queryUserName);
-        connection.connect();
-      } catch (Exception e) {
-        // Catch any connection errors that may happen.
-        throw UserException.connectionError()
-          .message("Unable to connect to Splunk: " +  plugin.getName() + " " + e.getMessage())
-          .build(logger);
-      }
-
-      // Add default "spl" table to index list.
-      registerTable(SPL_TABLE_NAME, new DynamicDrillTable(plugin, plugin.getName(),
-        new SplunkScanSpec(plugin.getName(), SPL_TABLE_NAME, plugin.getConfig(), queryUserName)));
-
-      // Retrieve and add all other Splunk indexes
-      for (String indexName : connection.getIndexes().keySet()) {
-        logger.debug("Registering {}", indexName);
-        registerTable(indexName, new DynamicDrillTable(plugin, plugin.getName(),
-          new SplunkScanSpec(plugin.getName(), indexName, config, queryUserName)));
-      }
-    }
-  }
 }
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkStoragePlugin.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkStoragePlugin.java
index f04e25dc1a..e8eead6e1f 100644
--- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkStoragePlugin.java
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkStoragePlugin.java
@@ -65,6 +65,16 @@ public class SplunkStoragePlugin extends AbstractStoragePlugin {
     return true;
   }
 
+  @Override
+  public boolean supportsWrite() {
+    return config.isWritable();
+  }
+
+  @Override
+  public boolean supportsInsert() {
+    return config.isWritable();
+  }
+
   @Override
   public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
     // Check to see if user translation is enabled.  If so, and creds are
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkWriter.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkWriter.java
new file mode 100644
index 0000000000..8925e12ac9
--- /dev/null
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkWriter.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.exec.physical.base.AbstractWriter;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import java.util.List;
+
+public class SplunkWriter extends AbstractWriter {
+
+  public static final String OPERATOR_TYPE = "SPLUNK_WRITER";
+
+  private final SplunkStoragePlugin plugin;
+  private final List<String> tableIdentifier;
+
+  @JsonCreator
+  public SplunkWriter(
+    @JsonProperty("child") PhysicalOperator child,
+    @JsonProperty("tableIdentifier") List<String> tableIdentifier,
+    @JsonProperty("storage") SplunkPluginConfig storageConfig,
+    @JacksonInject StoragePluginRegistry engineRegistry) {
+    super(child);
+    this.plugin = engineRegistry.resolve(storageConfig, SplunkStoragePlugin.class);
+    this.tableIdentifier = tableIdentifier;
+
+  }
+
+  SplunkWriter(PhysicalOperator child, List<String> tableIdentifier, SplunkStoragePlugin plugin) {
+    super(child);
+    this.tableIdentifier = tableIdentifier;
+    this.plugin = plugin;
+  }
+
+  @Override
+  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+    return new SplunkWriter(child, tableIdentifier, plugin);
+  }
+
+  public List<String> getTableIdentifier() {
+    return tableIdentifier;
+  }
+
+  @Override
+  public String getOperatorType() {
+    return OPERATOR_TYPE;
+  }
+
+  @JsonIgnore
+  public SplunkPluginConfig getPluginConfig() {
+    return this.plugin.getConfig();
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+      .field("tableName", tableIdentifier)
+      .field("storageStrategy", getStorageStrategy())
+      .toString();
+  }
+
+}
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkWriterBatchCreator.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkWriterBatchCreator.java
new file mode 100644
index 0000000000..d6728965db
--- /dev/null
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkWriterBatchCreator.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.splunk;
+
+import java.util.List;
+
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.WriterRecordBatch;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+
+@SuppressWarnings("unused")
+public class SplunkWriterBatchCreator implements BatchCreator<SplunkWriter> {
+
+  @Override
+  public CloseableRecordBatch getBatch(ExecutorFragmentContext context, SplunkWriter config, List<RecordBatch> children) {
+    assert children != null && children.size() == 1;
+
+    UserCredentials userCreds = context.getContextInformation().getQueryUserCredentials();
+
+    return new WriterRecordBatch(config, children.iterator().next(), context,
+      new SplunkBatchWriter(userCreds, config.getTableIdentifier(), config)
+    );
+  }
+}
diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java
index 370c072cfd..c6ef9a123d 100644
--- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java
+++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java
@@ -58,7 +58,7 @@ public class SplunkConnectionTest extends SplunkBaseTest {
         SPLUNK_STORAGE_PLUGIN_CONFIG.getLatestTime(),
         null,
         SPLUNK_STORAGE_PLUGIN_CONFIG.getReconnectRetries(),
-        StoragePluginConfig.AuthMode.SHARED_USER.name()
+        StoragePluginConfig.AuthMode.SHARED_USER.name(), true, null
       );
       SplunkConnection sc = new SplunkConnection(invalidSplunkConfig, null);
       sc.connect();
diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java
index faa716ce4b..27024b8ea1 100644
--- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java
+++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java
@@ -52,7 +52,8 @@ import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorT
   SplunkIndexesTest.class,
   SplunkPluginTest.class,
   SplunkTestSplunkUtils.class,
-  TestSplunkUserTranslation.class
+  TestSplunkUserTranslation.class,
+  SplunkWriterTest.class
 })
 
 @Category({SlowTest.class})
@@ -93,11 +94,11 @@ public class SplunkTestSuite extends ClusterTest {
         SPLUNK_STORAGE_PLUGIN_CONFIG = new SplunkPluginConfig(
           SPLUNK_LOGIN, SPLUNK_PASS,
           "http", hostname, port,
-          null, null, null, null, null, // app, owner, token, cookie, validateCertificates
+          null, null, null, null, false, // app, owner, token, cookie, validateCertificates
           "1", "now",
           null,
           4,
-          StoragePluginConfig.AuthMode.SHARED_USER.name()
+          StoragePluginConfig.AuthMode.SHARED_USER.name(), true, null
         );
         SPLUNK_STORAGE_PLUGIN_CONFIG.setEnabled(true);
         pluginRegistry.put(SplunkPluginConfig.NAME, SPLUNK_STORAGE_PLUGIN_CONFIG);
@@ -115,11 +116,11 @@ public class SplunkTestSuite extends ClusterTest {
         SPLUNK_STORAGE_PLUGIN_CONFIG_WITH_USER_TRANSLATION = new SplunkPluginConfig(
           null, null, // username, password
           "http", hostname, port,
-          null, null, null, null, null, // app, owner, token, cookie, validateCertificates
+          null, null, null, null, false, // app, owner, token, cookie, validateCertificates
           "1", "now",
           credentialsProvider,
           4,
-          AuthMode.USER_TRANSLATION.name()
+          AuthMode.USER_TRANSLATION.name(), true, null
         );
         SPLUNK_STORAGE_PLUGIN_CONFIG_WITH_USER_TRANSLATION.setEnabled(true);
         pluginRegistry.put("ut_splunk", SPLUNK_STORAGE_PLUGIN_CONFIG_WITH_USER_TRANSLATION);
diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkWriterTest.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkWriterTest.java
new file mode 100644
index 0000000000..7c8e04b377
--- /dev/null
+++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkWriterTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+import org.apache.drill.categories.SlowTest;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.DirectRowSet;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.QueryBuilder.QuerySummary;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runners.MethodSorters;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@FixMethodOrder(MethodSorters.JVM)
+@Category({SlowTest.class})
+public class SplunkWriterTest extends SplunkBaseTest {
+
+  @Test
+  public void testBasicCTAS() throws Exception {
+
+    // Verify that there is no index called t1 in Splunk
+    String sql = "SELECT * FROM INFORMATION_SCHEMA.`TABLES` WHERE TABLE_SCHEMA = 'splunk' AND TABLE_NAME LIKE 't1'";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    assertEquals(0, results.rowCount());
+    results.clear();
+
+    // Now create the table
+    sql = "CREATE TABLE `splunk`.`t1` AS SELECT * FROM cp.`test_data.csvh`";
+    QuerySummary summary = client.queryBuilder().sql(sql).run();
+    assertTrue(summary.succeeded());
+
+    // Verify that an index was created called t1 in Splunk
+    sql = "SELECT * FROM INFORMATION_SCHEMA.`TABLES` WHERE TABLE_SCHEMA = 'splunk' AND TABLE_NAME LIKE 't1'";
+    results = client.queryBuilder().sql(sql).rowSet();
+    assertEquals(1, results.rowCount());
+    results.clear();
+
+    // There seems to be some delay between the Drill query writing the data and the data being made
+    // accessible.
+    Thread.sleep(30000);
+
+    // Next verify that the results arrived.
+    sql = "SELECT clientip, categoryId FROM splunk.`t1`";
+    results = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("clientip", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("categoryId", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow("198.35.2.120", "ACCESSORIES")
+      .addRow("198.35.2.120", null)
+      .addRow("198.35.2.120", null)
+      .addRow("198.35.2.120", "STRATEGY")
+      .addRow("198.35.2.120", "NULL")
+      .build();
+    RowSetUtilities.verify(expected, results);
+
+    // Now drop the index
+    sql = "DROP TABLE splunk.`t1`";
+    summary = client.queryBuilder().sql(sql).run();
+    assertTrue(summary.succeeded());
+
+    // Verify that the index was deleted.
+    sql = "SELECT * FROM INFORMATION_SCHEMA.`TABLES` WHERE TABLE_SCHEMA = 'splunk' AND TABLE_NAME LIKE 't1'";
+    results = client.queryBuilder().sql(sql).rowSet();
+    assertEquals(0, results.rowCount());
+    results.clear();
+  }
+
+  @Test
+  public void testBasicCTASWithScalarDataTypes() throws Exception {
+    String query = "CREATE TABLE splunk.t2 AS " +
+      "SELECT CAST(1 AS INTEGER) AS int_field," +
+      "CAST(2 AS BIGINT) AS bigint_field," +
+      "CAST(3.0 AS FLOAT) AS float4_field," +
+      "CAST(4.0 AS DOUBLE) AS float8_field," +
+      "'5.0' AS varchar_field," +
+      "CAST('2021-01-01' AS DATE) as date_field," +
+      "CAST('12:00:00' AS TIME) as time_field, " +
+      "CAST('2015-12-30 22:55:55.23' AS TIMESTAMP) as timestamp_field, true AS boolean_field " +
+      "FROM (VALUES(1))";
+    // Create the table and insert the values
+    QuerySummary insertResults = queryBuilder().sql(query).run();
+    assertTrue(insertResults.succeeded());
+    Thread.sleep(15000);
+
+    // Query the table to see if the insertion was successful
+    String testQuery = "SELECT int_field, bigint_field, float4_field, float8_field, varchar_field," +
+      "date_field, time_field, timestamp_field, boolean_field FROM splunk.t2";
+    DirectRowSet results = queryBuilder().sql(testQuery).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .addNullable("int_field", MinorType.VARCHAR)
+      .addNullable("bigint_field", MinorType.VARCHAR)
+      .addNullable("float4_field", MinorType.VARCHAR)
+      .addNullable("float8_field", MinorType.VARCHAR)
+      .addNullable("varchar_field", MinorType.VARCHAR)
+      .addNullable("date_field", MinorType.VARCHAR)
+      .addNullable("time_field", MinorType.VARCHAR)
+      .addNullable("timestamp_field", MinorType.VARCHAR)
+      .addNullable("boolean_field", MinorType.VARCHAR)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow("1", "2", "3.0", "4.0", "5.0", "2021-01-01", "12:00", "2015-12-30T22:55:55.230", "true")
+      .build();
+
+    RowSetUtilities.verify(expected, results);
+
+    // Now drop the table
+    String dropQuery = "DROP TABLE splunk.t2";
+    QuerySummary dropResults = queryBuilder().sql(dropQuery).run();
+    assertTrue(dropResults.succeeded());
+  }
+
+  @Test
+  public void testInsert() throws Exception {
+
+    // Create the table
+    String sql = "CREATE TABLE `splunk`.`t3` AS SELECT * FROM cp.`test_data.csvh`";
+    QuerySummary summary = client.queryBuilder().sql(sql).run();
+    assertTrue(summary.succeeded());
+
+    Thread.sleep(30000);
+
+    // Now insert more records
+    sql = "INSERT INTO `splunk`.`t3`  SELECT * FROM cp.`test_data2.csvh`";
+    summary = client.queryBuilder().sql(sql).run();
+    assertTrue(summary.succeeded());
+
+    // There seems to be some delay between the Drill query writing the data and the data being made
+    // accessible.
+    Thread.sleep(30000);
+
+    // Next verify that the results arrived.
+    sql = "SELECT COUNT(*) as row_count FROM splunk.`t3`";
+    long resultCount = client.queryBuilder().sql(sql).singletonLong();
+    assertEquals(15L, resultCount);
+
+    // Now drop the index
+    sql = "DROP TABLE splunk.`t3`";
+    summary = client.queryBuilder().sql(sql).run();
+    assertTrue(summary.succeeded());
+  }
+
+  @Test
+  public void testComplexFields() throws Exception {
+    String sql = "CREATE TABLE `splunk`.`t4` AS SELECT record FROM cp.`schema_test.json`";
+    QuerySummary summary = client.queryBuilder().sql(sql).run();
+    assertTrue(summary.succeeded());
+
+    Thread.sleep(30000);
+
+    sql = "SELECT COUNT(*) FROM splunk.t4";
+    long resultCount = client.queryBuilder().sql(sql).singletonLong();
+    assertEquals(1L, resultCount);
+
+    // Now drop the index
+    sql = "DROP TABLE splunk.`t4`";
+    summary = client.queryBuilder().sql(sql).run();
+    assertTrue(summary.succeeded());
+  }
+}
diff --git a/contrib/storage-splunk/src/test/resources/schema_test.json b/contrib/storage-splunk/src/test/resources/schema_test.json
new file mode 100644
index 0000000000..36d8b923d6
--- /dev/null
+++ b/contrib/storage-splunk/src/test/resources/schema_test.json
@@ -0,0 +1,14 @@
+{
+  "record" : {
+    "int_field": 1,
+    "double_field": 2.0,
+    "string_field": "My string",
+    "int_list": [1,2,3],
+    "double_list": [1.0,2.0,3.0],
+    "map": {
+      "nested_int_field" : 5,
+      "nested_double_field": 5.0,
+      "nested_string_field": "5.0"
+    }
+  }
+}
diff --git a/contrib/storage-splunk/src/test/resources/test_data.csvh b/contrib/storage-splunk/src/test/resources/test_data.csvh
new file mode 100644
index 0000000000..ad07fc0898
--- /dev/null
+++ b/contrib/storage-splunk/src/test/resources/test_data.csvh
@@ -0,0 +1,11 @@
+JSESSIONID,action,bytes,categoryId,clientip,cookie,"date_hour","date_mday","date_minute","date_month","date_second","date_wday","date_year","date_zone",eventtype,file,host,ident,index,items,linecount,method,msg,other,productId,punct,q,referer,"referer_domain","req_time",root,source,sourcetype,"splunk_server","splunk_server_group",start,status,t,timeendpos,timestartpos,uri,"uri_domain","uri_path","uri_query",user,useragent,version,"_bkt","_cd","_eventtype_color","_indextime","_kv","_raw", [...]
+SD6SL9FF1ADFF4957,view,956,ACCESSORIES,"198.35.2.120",,14,2,45,may,44,saturday,2020,local,,"cart.do","web_application","-",main,,1,GET,,912,"WC-SH-A01","..._-_-_[//:::]_""_/.?=&=--&=__.""___""://../.?=""_""/.",,"http://www.buttercupgames.com/category.screen?categoryId=ACCESSORIES","http://www.buttercupgames.com","02/May/2020:14:45:44",,"access_30DAY.log","access_combined_wcookie","Charless-MacBook-Pro.local",,,200,,38,18,"/cart.do?action=view&productId=WC-SH-A01&JSESSIONID=SD6SL9FF1ADFF4 [...]
+main","access_combined_wcookie","2020-05-02 14:45:44.000 EDT"
+SD6SL9FF1ADFF4957,,2095,,"198.35.2.120",,14,2,45,may,33,saturday,2020,local,,"product.screen","web_application","-",main,,1,GET,,953,"SF-BVS-01","..._-_-_[//:::]_""_/.?=--&=__.""___""://../.?=--""_""/.",,"http://www.buttercupgames.com/product.screen?productId=SF-BVS-01","http://www.buttercupgames.com","02/May/2020:14:45:33",,"access_30DAY.log","access_combined_wcookie","Charless-MacBook-Pro.local",,,408,,38,18,"/product.screen?productId=SF-BVS-01&JSESSIONID=SD6SL9FF1ADFF4957",,"/product. [...]
+main","access_combined_wcookie","2020-05-02 14:45:33.000 EDT"
+SD6SL9FF1ADFF4957,view,2219,,"198.35.2.120",,14,2,45,may,21,saturday,2020,local,,"product.screen","web_application","-",main,,1,POST,,267,"MB-AG-G07","..._-_-_[//:::]_""_/.?=--&=__.""___""://../.?=&=--""_""",,"http://www.buttercupgames.com/cart.do?action=view&productId=MB-AG-G07","http://www.buttercupgames.com","02/May/2020:14:45:21",,"access_30DAY.log","access_combined_wcookie","Charless-MacBook-Pro.local",,,200,,38,18,"/product.screen?productId=MB-AG-G07&JSESSIONID=SD6SL9FF1ADFF4957",, [...]
+main","access_combined_wcookie","2020-05-02 14:45:21.000 EDT"
+SD6SL9FF1ADFF4957,,2729,STRATEGY,"198.35.2.120",,14,2,45,may,13,saturday,2020,local,,"category.screen","web_application","-",main,,1,POST,,439,,"..._-_-_[//:::]_""_/.?=&=__.""___""://../.?=""_""/._(__",,"http://www.buttercupgames.com/category.screen?categoryId=STRATEGY","http://www.buttercupgames.com","02/May/2020:14:45:13",,"access_30DAY.log","access_combined_wcookie","Charless-MacBook-Pro.local",,,200,,38,18,"/category.screen?categoryId=STRATEGY&JSESSIONID=SD6SL9FF1ADFF4957",,"/categor [...]
+main","access_combined_wcookie","2020-05-02 14:45:13.000 EDT"
+SD6SL9FF1ADFF4957,,3546,NULL,"198.35.2.120",,14,2,45,may,9,saturday,2020,local,,"product.screen","web_application","-",main,,1,GET,,575,"SF-BVS-01","..._-_-_[//:::]_""_/.?=--&=__.""___""://../.?=""_""/._(",,"http://www.buttercupgames.com/category.screen?categoryId=NULL","http://www.buttercupgames.com","02/May/2020:14:45:09",,"access_30DAY.log","access_combined_wcookie","Charless-MacBook-Pro.local",,,403,,38,18,"/product.screen?productId=SF-BVS-01&JSESSIONID=SD6SL9FF1ADFF4957",,"/product. [...]
+main","access_combined_wcookie","2020-05-02 14:45:09.000 EDT"
diff --git a/contrib/storage-splunk/src/test/resources/test_data2.csvh b/contrib/storage-splunk/src/test/resources/test_data2.csvh
new file mode 100644
index 0000000000..d4cccc5136
--- /dev/null
+++ b/contrib/storage-splunk/src/test/resources/test_data2.csvh
@@ -0,0 +1,11 @@
+id,first_name,last_name,email,ip_address
+1,Jody,Sothern,jsothern0@hubpages.com,159.132.69.78
+2,Tobit,Sultan,tsultan1@zimbio.com,235.111.16.104
+3,Ottilie,Del Dello,odeldello2@list-manage.com,221.16.29.94
+4,Evy,Andrus,eandrus3@pbs.org,7.61.102.158
+5,Rosalyn,Norley,rnorley4@comsenz.com,227.107.95.194
+6,Pollyanna,Romme,promme5@wordpress.com,176.233.151.91
+7,Rona,Oppie,roppie6@fema.gov,7.249.35.7
+8,Lisabeth,Ibbison,libbison7@indiegogo.com,83.194.246.44
+9,Germana,Evens,gevens8@discuz.net,96.247.70.171
+10,Rhys,Loachhead,rloachhead9@hatena.ne.jp,200.55.203.228
diff --git a/docs/dev/CreatingAWriter.md b/docs/dev/CreatingAWriter.md
index c6b19dbb13..ce3aac8e82 100644
--- a/docs/dev/CreatingAWriter.md
+++ b/docs/dev/CreatingAWriter.md
@@ -1,13 +1,13 @@
 # Creating a Writer for a Storage Plugin
-This tutorial explains the mostly undocumented features of how to create a writer for a Drill storage plugin.  
+This tutorial explains the mostly undocumented features of how to create a writer for a Drill storage plugin.
 
-Much has been written about creating a storage plugin in Drill [1], however all this focuses on the reader interface.  What if you want to write data back out to that storage 
-plugin?  At the time of writing , Drill only implemented writing for filesystems, and for the Kudu storage plugin, 
-however [DRILL-8005](https://github.com/apache/drill/pull/2327) adds this capability to the JDBC storage plugin as well. This will hopefully be merged in Drill 1.20.0. 
+Much has been written about creating a storage plugin in Drill [1], however all this focuses on the reader interface.  What if you want to write data back out to that storage
+plugin?  At the time of writing , Drill only implemented writing for filesystems, and for the Kudu storage plugin,
+however [DRILL-8005](https://github.com/apache/drill/pull/2327) adds this capability to the JDBC storage plugin as well. This will hopefully be merged in Drill 1.20.0.
 
 ## Step One: Set the Flags to Enable Writing
-The absolute first thing you will need to do is set a `boolean` flag in whichever class extends the `AbstractStoragePlugin`.  This is accomplished by overwriting the  
-`supportsWrite()` and making sure that it returns `true`.  In the case of the JDBC plugin, it is configurable whether the individual connection is writable or not, so we are 
+The absolute first thing you will need to do is set a `boolean` flag in whichever class extends the `AbstractStoragePlugin`.  This is accomplished by overwriting the
+`supportsWrite()` and making sure that it returns `true`.  In the case of the JDBC plugin, it is configurable whether the individual connection is writable or not, so we are
 pulling the value from there, however this could simply be `return true`.
 
 ```java
@@ -17,7 +17,7 @@ pulling the value from there, however this could simply be `return true`.
   }
 ```
 
-You will also need to ovewrite the `isMutable()` function as well.  
+You will also need to overwrite the `isMutable()` function as well.
 
 ```java
   @Override
@@ -25,7 +25,7 @@ You will also need to ovewrite the `isMutable()` function as well.
     return plugin.getConfig().isWritable();
   }
 ```
-At this point, I would recommend creating a unit test something like the following.  For the JDBC plugin, there are unit tests for writable and unwritable plugin instances.  
+At this point, I would recommend creating a unit test something like the following.  For the JDBC plugin, there are unit tests for writable and unwritable plugin instances.
 You could just set these functions to return `true`, however in the case of JDBC, we wanted this controllable at the config level.
 
 ```java
@@ -48,7 +48,7 @@ At the time of writing, Drill only supports the following DDL statements:
 * `DROP TABLE`
 * `DROP TABLE IF NOT EXISTS`
 
-In order to implement the logic to create a table, you'll have to find the classes in your storage plugin that extend the `AbstractSchema` class.  The first step is to 
+In order to implement the logic to create a table, you'll have to find the classes in your storage plugin that extend the `AbstractSchema` class.  The first step is to
 overwrite the `createTableEntry()` function as shown below.
 
 ```java
@@ -76,18 +76,18 @@ overwrite the `createTableEntry()` function as shown below.
     };
   }
 ```
-This function should be overwritten in the lowest level class that extends `AbstractSchema`.  In the example above, the function first checks to see if the storage plugin is 
+This function should be overwritten in the lowest level class that extends `AbstractSchema`.  In the example above, the function first checks to see if the storage plugin is
 writable or not, and if not throws an exception.  This was done so that the user received an understandable error message.
 
 THe JDBC implementation does not allow for partitions in inserts, so this function simply returns an empty collection.
 
-At this point, you should set a breakpoint in the `CreateTableEntry`, run the unit test and make sure that it is getting to the function. 
+At this point, you should set a breakpoint in the `CreateTableEntry`, run the unit test and make sure that it is getting to the function.
 
 ## Step Three: Batching and Writing
-The next step you'll have to do is create a class which extends the `AbstractWriter` class. This is really just a serializable holder.  Take a look at the `JdbcWriter` class in 
-the JDBC Storage Plugin for an example. 
+The next step you'll have to do is create a class which extends the `AbstractWriter` class. This is really just a serializable holder.  Take a look at the `JdbcWriter` class in
+the JDBC Storage Plugin for an example.
 
-You'll also need to create a batch creator object which creates the actual batch writers. 
+You'll also need to create a batch creator object which creates the actual batch writers.
 
 ```java
 public class JdbcWriterBatchCreator implements BatchCreator<JdbcWriter> {
@@ -104,22 +104,22 @@ public class JdbcWriterBatchCreator implements BatchCreator<JdbcWriter> {
 ```
 The example above from the JDBC plugin is a single threaded example.  However, you could implement this such that multiple batch readers would be created.
 
-The final step is to implement a class which extends the `AbstractRecordWriter` interface.  This interface was originally meant for writing files, so not all of these methods 
+The final step is to implement a class which extends the `AbstractRecordWriter` interface.  This interface was originally meant for writing files, so not all of these methods
 will line up well with other storage plugins.  In any event, you will have to implement the following methods:
 
-* `init(Map<String, String> writerOptions)`:  This function is called once when the first RecordBatch is created.  This function could be used to establish connections or do 
-  other preparatory work. 
-* `updateSchema(VectorAccessible batch)`:  The `updateSchema()` function is also called once when the schema is actually created or updated.  It is unclear whether this 
-  function is called during each batch.  
-* `startRecord()`:  Called at the beginning of each record.  This corresponds to the beginning of a record row. 
-* `endRecord()`: Called at the end of each row record. 
+* `init(Map<String, String> writerOptions)`:  This function is called once when the first RecordBatch is created.  This function could be used to establish connections or do
+  other preparatory work.
+* `updateSchema(VectorAccessible batch)`:  The `updateSchema()` function is also called once when the schema is actually created or updated.  It is unclear whether this
+  function is called during each batch.
+* `startRecord()`:  Called at the beginning of each record.  This corresponds to the beginning of a record row.
+* `endRecord()`: Called at the end of each row record.
 * `abort()`:  Called in the event the writing did not succeed.
-* `cleanup()`:  This is called at the end of a successful batch.  For the JDBC plugin, this is where the actual INSERT query is executed. 
+* `cleanup()`:  This is called at the end of a successful batch.  For the JDBC plugin, this is where the actual INSERT query is executed.
 
-Once you've implemented these methods, you should put a breakpoint in each one, run your unit test and see that all the functions are being hit. 
+Once you've implemented these methods, you should put a breakpoint in each one, run your unit test and see that all the functions are being hit.
 
-Now the bad news, you'll need to create functions which overwrite the converters for all data types.  You can do this with a FreeMarker template, but you'll have to have one 
-function for each data type, and for each mode, `NULLABLE`, `REQUIRED`, and `REPEATED`.  Which data types you'll need to implement will depend on what data types are supported 
+Now the bad news, you'll need to create functions which overwrite the converters for all data types.  You can do this with a FreeMarker template, but you'll have to have one
+function for each data type, and for each mode, `NULLABLE`, `REQUIRED`, and `REPEATED`.  Which data types you'll need to implement will depend on what data types are supported
 in your source system.  If you don't implement a particular converter function, the user will receive an error stating that the particular data type is not supported.
 
 ```java