You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2022/09/14 23:33:22 UTC

[drill] branch master updated: DRILL-8303: Add support for inserts into JDBC storage (#2646)

This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 975b7d60a6 DRILL-8303: Add support for inserts into JDBC storage (#2646)
975b7d60a6 is described below

commit 975b7d60a6520e1ff2ba17bbae7347739ed2e628
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Thu Sep 15 02:33:14 2022 +0300

    DRILL-8303: Add support for inserts into JDBC storage (#2646)
    
    * DRILL-8303: Add support for inserts into JDBC storage
---
 .../exec/store/jdbc/CapitalizingJdbcSchema.java    |  39 +-
 .../drill/exec/store/jdbc/DrillJdbcConvention.java |  41 +-
 .../drill/exec/store/jdbc/JdbcBatchReader.java     |  73 ++-
 .../{JdbcWriter.java => JdbcInsertWriter.java}     |  65 +-
 ...ator.java => JdbcInsertWriterBatchCreator.java} |  19 +-
 .../drill/exec/store/jdbc/JdbcRecordWriter.java    | 716 ++++-----------------
 .../drill/exec/store/jdbc/JdbcStoragePlugin.java   |  32 +-
 .../exec/store/jdbc/JdbcTableModifyWriter.java     |  35 +
 .../apache/drill/exec/store/jdbc/JdbcWriter.java   |  23 +-
 .../exec/store/jdbc/JdbcWriterBatchCreator.java    |   3 +-
 .../store/jdbc/utils/CreateTableStmtBuilder.java   | 164 ++---
 .../store/jdbc/utils/InsertStatementBuilder.java   |  73 +++
 .../exec/store/jdbc/utils/JdbcDDLQueryUtils.java   | 114 ----
 .../store/jdbc/TestCreateTableStmtBuilder.java     |  51 --
 .../exec/store/jdbc/TestJdbcInsertWithH2.java      | 247 +++++++
 .../exec/store/jdbc/TestJdbcInsertWithMySQL.java   | 281 ++++++++
 .../store/jdbc/TestJdbcInsertWithPostgres.java     | 253 ++++++++
 .../exec/store/jdbc/TestJdbcWriterWithH2.java      |  13 +-
 .../exec/store/jdbc/TestJdbcWriterWithMySQL.java   |  23 +-
 .../store/jdbc/TestJdbcWriterWithPostgres.java     |  17 +-
 .../org/apache/drill/exec/ops/QueryContext.java    |   2 +-
 .../physical/base/AbstractPhysicalVisitor.java     |   6 +
 .../drill/exec/physical/base/PhysicalVisitor.java  |   2 +
 .../drill/exec/physical/config/TableModify.java    |  56 ++
 .../physical/impl/InsertWriterRecordBatch.java     |  59 ++
 .../exec/physical/impl/WriterRecordBatch.java      |  30 +-
 .../apache/drill/exec/planner/PlannerPhase.java    |   6 +-
 .../drill/exec/planner/common/DrillRelOptUtil.java |   2 +-
 .../generators/NonCoveringIndexPlanGenerator.java  |   2 +-
 .../exec/planner/logical/DrillTableModify.java     |  66 ++
 .../exec/planner/logical/DrillTableModifyRule.java |  52 ++
 .../exec/planner/logical/ModifyTableEntry.java     |  29 +
 .../drill/exec/planner/physical/ScreenPrule.java   |   4 +-
 .../exec/planner/physical/TableModifyPrel.java     | 112 ++++
 .../{ScreenPrule.java => TableModifyPrule.java}    |  34 +-
 .../planner/physical/visitor/BasePrelVisitor.java  |   6 +
 .../exec/planner/physical/visitor/PrelVisitor.java |   2 +
 .../physical/visitor/PrelVisualizerVisitor.java    |   8 +
 .../drill/exec/planner/sql/DrillSqlWorker.java     |  21 +-
 .../exec/planner/sql/handlers/InsertHandler.java   |  61 ++
 .../planner/sql/handlers/SqlHandlerConfig.java     |  26 +-
 .../apache/drill/exec/store/AbstractSchema.java    |  13 +
 .../drill/exec/store/AbstractStoragePlugin.java    |   5 +
 .../org/apache/drill/exec/store/StoragePlugin.java |   5 +
 .../drill/common/logical/data/InsertWriter.java    |  31 +
 45 files changed, 1895 insertions(+), 1027 deletions(-)

diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/CapitalizingJdbcSchema.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/CapitalizingJdbcSchema.java
index 09b09873a9..713653e743 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/CapitalizingJdbcSchema.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/CapitalizingJdbcSchema.java
@@ -18,10 +18,10 @@
 package org.apache.drill.exec.store.jdbc;
 
 import javax.sql.DataSource;
-import java.io.IOException;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -34,14 +34,18 @@ import org.apache.calcite.schema.Function;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.Writer;
 import org.apache.drill.exec.planner.logical.CreateTableEntry;
+import org.apache.drill.exec.planner.logical.ModifyTableEntry;
+import org.apache.drill.exec.planner.sql.parser.SqlDropTable;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.StorageStrategy;
-import org.apache.drill.exec.store.jdbc.utils.JdbcDDLQueryUtils;
-import org.apache.drill.exec.store.jdbc.utils.CreateTableStmtBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -122,9 +126,8 @@ public class CapitalizingJdbcSchema extends AbstractSchema {
     return new CreateTableEntry() {
 
       @Override
-      public Writer getWriter(PhysicalOperator child) throws IOException {
-        String tableWithSchema = CreateTableStmtBuilder.buildCompleteTableName(tableName, catalog, schema);
-        return new JdbcWriter(child, tableWithSchema, inner, plugin);
+      public Writer getWriter(PhysicalOperator child) {
+        return new JdbcWriter(child, getFullTablePath(tableName), inner, plugin);
       }
 
       @Override
@@ -134,6 +137,11 @@ public class CapitalizingJdbcSchema extends AbstractSchema {
     };
   }
 
+  @Override
+  public ModifyTableEntry modifyTable(String tableName) {
+    return child -> new JdbcInsertWriter(child, getFullTablePath(tableName), inner, plugin);
+  }
+
   @Override
   public void dropTable(String tableName) {
     if (! plugin.getConfig().isWritable()) {
@@ -143,10 +151,11 @@ public class CapitalizingJdbcSchema extends AbstractSchema {
         .build(logger);
     }
 
-    String tableWithSchema = CreateTableStmtBuilder.buildCompleteTableName(tableName, catalog, schema);
-    String dropTableQuery = String.format("DROP TABLE %s", tableWithSchema);
+    List<String> names = getFullTablePath(tableName);
     SqlDialect dialect = plugin.getDialect(inner.getDataSource());
-    dropTableQuery = JdbcDDLQueryUtils.cleanDDLQuery(dropTableQuery, dialect);
+    String dropTableQuery = SqlDropTable.OPERATOR.createCall(
+        SqlParserPos.ZERO, new SqlIdentifier(names, SqlParserPos.ZERO), SqlLiteral.createBoolean(false, SqlParserPos.ZERO))
+      .toSqlString(dialect).getSql();
 
     try (Connection conn = inner.getDataSource().getConnection();
          Statement stmt = conn.createStatement()) {
@@ -167,6 +176,18 @@ public class CapitalizingJdbcSchema extends AbstractSchema {
     }
   }
 
+  private List<String> getFullTablePath(String tableName) {
+    List<String> names = new ArrayList<>();
+    if (!StringUtils.isEmpty(catalog)) {
+      names.add(catalog);
+    }
+    if (!StringUtils.isEmpty(schema)) {
+      names.add(schema);
+    }
+    names.add(tableName);
+    return names;
+  }
+
   @Override
   public boolean isMutable() {
     return plugin.getConfig().isWritable();
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcConvention.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcConvention.java
index 3639628b2c..09ccb57238 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcConvention.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcConvention.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.store.jdbc;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
-import java.util.stream.Collectors;
+import java.util.function.Consumer;
 
 import org.apache.calcite.adapter.jdbc.JdbcConvention;
 import org.apache.calcite.adapter.jdbc.JdbcRules;
@@ -31,7 +31,9 @@ import org.apache.calcite.linq4j.tree.ConstantUntypedNull;
 import org.apache.calcite.plan.Convention;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelRule;
 import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.rel.convert.ConverterRule;
 import org.apache.calcite.sql.SqlDialect;
 import org.apache.drill.exec.planner.RuleInstance;
 import org.apache.drill.exec.planner.logical.DrillRel;
@@ -40,6 +42,7 @@ import org.apache.drill.exec.store.enumerable.plan.DrillJdbcRuleBase;
 import org.apache.drill.exec.store.enumerable.plan.VertexDrelConverterRule;
 import org.apache.drill.exec.store.jdbc.rules.JdbcLimitRule;
 import org.apache.drill.exec.store.jdbc.rules.JdbcSortRule;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
 
 /**
@@ -55,22 +58,16 @@ public class DrillJdbcConvention extends JdbcConvention {
 
   private final ImmutableSet<RelOptRule> rules;
   private final JdbcStoragePlugin plugin;
-  private final String username;
 
   DrillJdbcConvention(SqlDialect dialect, String name, JdbcStoragePlugin plugin, String username) {
     super(dialect, ConstantUntypedNull.INSTANCE, name);
     this.plugin = plugin;
-    this.username = username;
-    List<RelOptRule> calciteJdbcRules = JdbcRules.rules(this, DrillRelFactories.LOGICAL_BUILDER).stream()
-        .filter(rule -> !EXCLUDED_CALCITE_RULES.contains(rule.getClass()))
-        .collect(Collectors.toList());
 
     List<RelTrait> inputTraits = Arrays.asList(
       Convention.NONE,
       DrillRel.DRILL_LOGICAL);
 
     ImmutableSet.Builder<RelOptRule> builder = ImmutableSet.<RelOptRule>builder()
-      .addAll(calciteJdbcRules)
       .add(new JdbcIntermediatePrelConverterRule(this, username))
       .add(VertexDrelConverterRule.create(this))
       .add(RuleInstance.FILTER_SET_OP_TRANSPOSE_RULE)
@@ -81,6 +78,9 @@ public class DrillJdbcConvention extends JdbcConvention {
         .add(new DrillJdbcRuleBase.DrillJdbcFilterRule(inputTrait, this))
         .add(new JdbcSortRule(inputTrait, this))
         .add(new JdbcLimitRule(inputTrait, this));
+      rules(inputTrait, this).stream()
+        .filter(rule -> !EXCLUDED_CALCITE_RULES.contains(rule.getClass()))
+        .forEach(builder::add);
     }
 
     this.rules = builder.build();
@@ -98,4 +98,31 @@ public class DrillJdbcConvention extends JdbcConvention {
   public JdbcStoragePlugin getPlugin() {
     return plugin;
   }
+
+  private static List<RelOptRule> rules(
+    RelTrait inputTrait, JdbcConvention out) {
+    ImmutableList.Builder<RelOptRule> b = ImmutableList.builder();
+    foreachRule(out, r ->
+      b.add(r.config
+        .as(ConverterRule.Config.class)
+        .withConversion(r.getOperand().getMatchedClass(), inputTrait, out, r.config.description())
+        .withRelBuilderFactory(DrillRelFactories.LOGICAL_BUILDER)
+        .toRule()));
+    return b.build();
+  }
+
+  private static void foreachRule(JdbcConvention out,
+    Consumer<RelRule<?>> consumer) {
+    consumer.accept(JdbcToEnumerableConverterRule.create(out));
+    consumer.accept(JdbcRules.JdbcJoinRule.create(out));
+    consumer.accept(JdbcProjectRule.create(out));
+    consumer.accept(JdbcFilterRule.create(out));
+    consumer.accept(JdbcRules.JdbcAggregateRule.create(out));
+    consumer.accept(JdbcRules.JdbcSortRule.create(out));
+    consumer.accept(JdbcRules.JdbcUnionRule.create(out));
+    consumer.accept(JdbcRules.JdbcIntersectRule.create(out));
+    consumer.accept(JdbcRules.JdbcMinusRule.create(out));
+    consumer.accept(JdbcRules.JdbcTableModificationRule.create(out));
+    consumer.accept(JdbcRules.JdbcValuesRule.create(out));
+  }
 }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchReader.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchReader.java
index 178a929442..30270d6664 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchReader.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchReader.java
@@ -67,6 +67,7 @@ public class JdbcBatchReader implements ManagedReader<SchemaNegotiator> {
   private Connection connection;
   private PreparedStatement statement;
   private ResultSet resultSet;
+  private Integer updateCount;
   private RowSetLoader rowWriter;
   private CustomErrorContext errorContext;
   private List<JdbcColumnWriter> columnWriters;
@@ -130,10 +131,15 @@ public class JdbcBatchReader implements ManagedReader<SchemaNegotiator> {
     this.errorContext = negotiator.parentErrorContext();
     try {
       connection = source.getConnection();
+      TupleMetadata drillSchema;
       statement = connection.prepareStatement(sql);
-      resultSet = statement.executeQuery();
-
-      TupleMetadata drillSchema = buildSchema();
+      if (statement.execute()) {
+        resultSet = statement.getResultSet();
+        drillSchema = buildSchema();
+      } else {
+        updateCount = statement.getUpdateCount();
+        drillSchema = buildUpdateQuerySchema();
+      }
       negotiator.tableSchema(drillSchema, true);
       ResultSetLoader resultSetLoader = negotiator.build();
 
@@ -164,15 +170,11 @@ public class JdbcBatchReader implements ManagedReader<SchemaNegotiator> {
 
   private boolean processRow() {
     try {
-      if (!resultSet.next()) {
-        return false;
-      }
-      rowWriter.start();
-      // Process results
-      for (JdbcColumnWriter writer : columnWriters) {
-        writer.load(resultSet);
+      if (resultSet != null) {
+        return processResultSetRow();
+      } else {
+        return processUpdateRow();
       }
-      rowWriter.save();
     } catch (SQLException e) {
       throw UserException
         .dataReadError(e)
@@ -181,7 +183,29 @@ public class JdbcBatchReader implements ManagedReader<SchemaNegotiator> {
         .addContext(errorContext)
         .build(logger);
     }
+  }
+
+  private boolean processResultSetRow() throws SQLException {
+    if (!resultSet.next()) {
+      return false;
+    }
+    rowWriter.start();
+    // Process results
+    for (JdbcColumnWriter writer : columnWriters) {
+      writer.load(resultSet);
+    }
+    rowWriter.save();
+    return true;
+  }
 
+  private boolean processUpdateRow() {
+    if (updateCount == null) {
+      return false;
+    }
+    rowWriter.start();
+    rowWriter.scalar(columns.get(0).getRootSegmentPath()).setLong(updateCount);
+    rowWriter.save();
+    updateCount = null;
     return true;
   }
 
@@ -245,6 +269,33 @@ public class JdbcBatchReader implements ManagedReader<SchemaNegotiator> {
     return builder.buildSchema();
   }
 
+  private TupleMetadata buildUpdateQuerySchema() throws SQLException {
+    if (columns.size() != 1) {
+      throw UserException
+        .validationError()
+        .message(
+          "Expected columns count differs from the returned one.\n" +
+            "Expected columns: %s\n" +
+            "Returned columns count: %s",
+          columns, 1)
+        .addContext("Sql", sql)
+        .addContext(errorContext)
+        .build(logger);
+    }
+
+    String name = columns.get(0).getRootSegmentPath();
+    int width = DRILL_REL_DATATYPE_SYSTEM.getMaxNumericPrecision();
+    int scale = 0;
+
+    MinorType minorType = MinorType.BIGINT;
+
+    jdbcColumns = new ArrayList<>();
+    jdbcColumns.add(new JdbcColumn(name, minorType, 0, scale, width));
+    return new SchemaBuilder()
+      .addNullable(name, minorType, width, scale)
+      .buildSchema();
+  }
+
   private void populateWriterArray() {
     columnWriters = new ArrayList<>();
 
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcInsertWriter.java
similarity index 51%
copy from contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriter.java
copy to contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcInsertWriter.java
index 05924a1d80..e691f3b328 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriter.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcInsertWriter.java
@@ -17,47 +17,31 @@
  */
 package org.apache.drill.exec.store.jdbc;
 
-import java.io.IOException;
-
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.calcite.adapter.jdbc.JdbcSchema;
-import org.apache.drill.common.PlanStringBuilder;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.physical.base.AbstractWriter;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 
-import com.fasterxml.jackson.annotation.JacksonInject;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-public class JdbcWriter extends AbstractWriter {
-
-  public static final String OPERATOR_TYPE = "JDBC_WRITER";
+import java.util.List;
 
-  private final JdbcStoragePlugin plugin;
-  private final String tableName;
-  private final JdbcSchema inner;
+public class JdbcInsertWriter extends JdbcWriter {
+  public static final String OPERATOR_TYPE = "JDBC_INSERT_WRITER";
 
   @JsonCreator
-  public JdbcWriter(
+  public JdbcInsertWriter(
     @JsonProperty("child") PhysicalOperator child,
-    @JsonProperty("name") String name,
+    @JsonProperty("tableIdentifier") List<String> tableIdentifier,
     @JsonProperty("storage") StoragePluginConfig storageConfig,
     @JacksonInject JdbcSchema inner,
-    @JacksonInject StoragePluginRegistry engineRegistry) throws IOException, ExecutionSetupException {
-    super(child);
-    this.plugin = engineRegistry.resolve(storageConfig, JdbcStoragePlugin.class);
-    this.tableName = name;
-    this.inner = inner;
+    @JacksonInject StoragePluginRegistry engineRegistry) {
+    super(child, tableIdentifier, inner, engineRegistry.resolve(storageConfig, JdbcStoragePlugin.class));
   }
 
-  JdbcWriter(PhysicalOperator child, String name, JdbcSchema inner, JdbcStoragePlugin plugin) {
-    super(child);
-    this.tableName = name;
-    this.plugin = plugin;
-    this.inner = inner;
+  JdbcInsertWriter(PhysicalOperator child, List<String> tableIdentifier, JdbcSchema inner, JdbcStoragePlugin plugin) {
+    super(child, tableIdentifier, inner, plugin);
   }
 
   @Override
@@ -67,30 +51,7 @@ public class JdbcWriter extends AbstractWriter {
 
   @Override
   protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
-    return new JdbcWriter(child, tableName, inner, plugin);
+    return new JdbcInsertWriter(child, getTableIdentifier(), getInner(), getPlugin());
   }
 
-  public String getTableName() {
-    return tableName;
-  }
-
-  public StoragePluginConfig getStorage() {
-    return plugin.getConfig();
-  }
-
-  @JsonIgnore
-  public JdbcSchema getInner() { return inner; }
-
-  @JsonIgnore
-  public JdbcStoragePlugin getPlugin() {
-    return plugin;
-  }
-
-  @Override
-  public String toString() {
-    return new PlanStringBuilder(this)
-      .field("tableName", tableName)
-      .field("storageStrategy", getStorageStrategy())
-      .toString();
-  }
 }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriterBatchCreator.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcInsertWriterBatchCreator.java
similarity index 83%
copy from contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriterBatchCreator.java
copy to contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcInsertWriterBatchCreator.java
index 182c104714..1c86e6447d 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriterBatchCreator.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcInsertWriterBatchCreator.java
@@ -17,22 +17,23 @@
  */
 package org.apache.drill.exec.store.jdbc;
 
-import java.util.List;
-
-import javax.sql.DataSource;
-
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.impl.BatchCreator;
-import org.apache.drill.exec.physical.impl.WriterRecordBatch;
+import org.apache.drill.exec.physical.impl.InsertWriterRecordBatch;
 import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
 
-public class JdbcWriterBatchCreator implements BatchCreator<JdbcWriter> {
+import javax.sql.DataSource;
+import java.util.List;
+
+@SuppressWarnings("unused")
+public class JdbcInsertWriterBatchCreator implements BatchCreator<JdbcInsertWriter> {
 
   @Override
-  public CloseableRecordBatch getBatch(ExecutorFragmentContext context, JdbcWriter config, List<RecordBatch> children)
+  public CloseableRecordBatch getBatch(ExecutorFragmentContext context,
+    JdbcInsertWriter config, List<RecordBatch> children)
     throws ExecutionSetupException {
     assert children != null && children.size() == 1;
 
@@ -44,11 +45,11 @@ public class JdbcWriterBatchCreator implements BatchCreator<JdbcWriter> {
         config.getPlugin().getName()
       )));
 
-    return new WriterRecordBatch(
+    return new InsertWriterRecordBatch(
       config,
       children.iterator().next(),
       context,
-      new JdbcRecordWriter(ds, null, config.getTableName(), config)
+      new JdbcTableModifyWriter(ds, config.getTableIdentifier(), config)
     );
   }
 }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
index e403e4f09d..ebe907392a 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
@@ -19,62 +19,32 @@
 package org.apache.drill.exec.store.jdbc;
 
 import org.apache.calcite.sql.SqlDialect;
-import org.apache.calcite.sql.SqlDialect.DatabaseProduct;
-import org.apache.calcite.sql.util.SqlBuilder;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.DateString;
+import org.apache.calcite.util.TimeString;
+import org.apache.calcite.util.TimestampString;
 import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.types.TypeProtos.DataMode;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers;
-import org.apache.drill.exec.expr.holders.BigIntHolder;
-import org.apache.drill.exec.expr.holders.BitHolder;
-import org.apache.drill.exec.expr.holders.DateHolder;
-import org.apache.drill.exec.expr.holders.Float4Holder;
-import org.apache.drill.exec.expr.holders.Float8Holder;
-import org.apache.drill.exec.expr.holders.IntHolder;
-import org.apache.drill.exec.expr.holders.NullableBigIntHolder;
-import org.apache.drill.exec.expr.holders.NullableBitHolder;
-import org.apache.drill.exec.expr.holders.NullableDateHolder;
-import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
-import org.apache.drill.exec.expr.holders.NullableFloat8Holder;
-import org.apache.drill.exec.expr.holders.NullableIntHolder;
-import org.apache.drill.exec.expr.holders.NullableSmallIntHolder;
-import org.apache.drill.exec.expr.holders.NullableTimeHolder;
-import org.apache.drill.exec.expr.holders.NullableTimeStampHolder;
-import org.apache.drill.exec.expr.holders.NullableTinyIntHolder;
-import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
-import org.apache.drill.exec.expr.holders.NullableVarDecimalHolder;
-import org.apache.drill.exec.expr.holders.SmallIntHolder;
-import org.apache.drill.exec.expr.holders.TimeHolder;
-import org.apache.drill.exec.expr.holders.TimeStampHolder;
-import org.apache.drill.exec.expr.holders.TinyIntHolder;
-import org.apache.drill.exec.expr.holders.VarCharHolder;
-import org.apache.drill.exec.expr.holders.VarDecimalHolder;
-import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.store.AbstractRecordWriter;
 import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
-import org.apache.drill.exec.store.jdbc.utils.JdbcDDLQueryUtils;
 import org.apache.drill.exec.store.jdbc.utils.CreateTableStmtBuilder;
-import org.apache.drill.exec.util.DecimalUtility;
+import org.apache.drill.exec.store.jdbc.utils.InsertStatementBuilder;
 import org.apache.drill.exec.vector.complex.reader.FieldReader;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
-import org.apache.parquet.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.sql.DataSource;
 import java.io.IOException;
-import java.math.BigDecimal;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.text.Format;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
+import java.time.ZoneOffset;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -83,51 +53,19 @@ public class JdbcRecordWriter extends AbstractRecordWriter {
 
   private static final Logger logger = LoggerFactory.getLogger(JdbcRecordWriter.class);
 
-  private final String tableName;
-  private Connection connection;
-  private final SqlDialect dialect;
-  private final List<Object> rowList;
-  private final List<JdbcWriterField> fields;
-  private final String rawTableName;
+  private final List<String> tableIdentifier;
+  private final Connection connection;
+  protected final SqlDialect dialect;
+  private final InsertStatementBuilder insertStatementBuilder;
   private final JdbcWriter config;
-  private SqlBuilder insertQueryBuilder;
-  private boolean firstRecord;
   private int recordCount;
 
-  /*
-   * This map maps JDBC data types to their Drill equivalents.  The basic strategy is that if there
-   * is a Drill equivalent, then do the mapping as expected.
-   *
-   * All flavors of character fields are mapped to VARCHAR in Drill. All versions of binary fields are
-   * mapped to VARBINARY.
-   */
-  public static final ImmutableMap<MinorType, Integer> JDBC_TYPE_MAPPINGS = ImmutableMap.<MinorType, Integer>builder()
-      .put(MinorType.FLOAT8, java.sql.Types.DOUBLE)
-      .put(MinorType.FLOAT4, java.sql.Types.FLOAT)
-      .put(MinorType.TINYINT, java.sql.Types.TINYINT)
-      .put(MinorType.SMALLINT, java.sql.Types.SMALLINT)
-      .put(MinorType.INT, java.sql.Types.INTEGER)
-      .put(MinorType.BIGINT, java.sql.Types.BIGINT)
-      .put(MinorType.VARCHAR, java.sql.Types.VARCHAR)
-      .put(MinorType.VARBINARY, java.sql.Types.VARBINARY)
-      .put(MinorType.VARDECIMAL, java.sql.Types.DECIMAL)
-      .put(MinorType.DATE, java.sql.Types.DATE)
-      .put(MinorType.TIME, java.sql.Types.TIME)
-      .put(MinorType.TIMESTAMP, java.sql.Types.TIMESTAMP)
-      .put(MinorType.BIT, java.sql.Types.BOOLEAN)
-      .build();
-
-  public JdbcRecordWriter(DataSource source, OperatorContext context, String name, JdbcWriter config) {
-    this.tableName = JdbcDDLQueryUtils.addBackTicksToTable(name);
-    this.rowList = new ArrayList<>();
+  public JdbcRecordWriter(DataSource source, List<String> tableIdentifier, JdbcWriter config) {
+    this.tableIdentifier = tableIdentifier;
     this.dialect = config.getPlugin().getDialect(source);
     this.config = config;
-    this.rawTableName = name;
-    this.fields = new ArrayList<>();
-    this.firstRecord = true;
     this.recordCount = 0;
-
-    this.insertQueryBuilder = initializeInsertQuery();
+    this.insertStatementBuilder = getInsertStatementBuilder(tableIdentifier);
 
     try {
       this.connection = source.getConnection();
@@ -139,6 +77,10 @@ public class JdbcRecordWriter extends AbstractRecordWriter {
     }
   }
 
+  protected InsertStatementBuilder getInsertStatementBuilder(List<String> tableIdentifier) {
+    return new InsertStatementBuilder(tableIdentifier, dialect);
+  }
+
   @Override
   public void init(Map<String, String> writerOptions) {
     // Nothing to see here...
@@ -147,35 +89,20 @@ public class JdbcRecordWriter extends AbstractRecordWriter {
   @Override
   public void updateSchema(VectorAccessible batch) {
     BatchSchema schema = batch.getSchema();
-    String columnName;
-    MinorType type;
-    String sql;
-    boolean nullable = false;
-    CreateTableStmtBuilder queryBuilder = new CreateTableStmtBuilder(tableName, dialect);
-
+    CreateTableStmtBuilder queryBuilder = new CreateTableStmtBuilder(tableIdentifier, dialect);
     for (MaterializedField field : schema) {
-      columnName = JdbcDDLQueryUtils.addBackTicksToField(field.getName());
-      type = field.getType().getMinorType();
-      logger.debug("Adding column {} of type {}.", columnName, type);
+      logger.debug("Adding column {} of type {}.", field.getName(), field.getType().getMinorType());
 
       if (field.getType().getMode() == DataMode.REPEATED) {
         throw UserException.dataWriteError()
-          .message("Drill does not yet support writing arrays to JDBC. " + columnName + " is an array.")
+          .message("Drill does not yet support writing arrays to JDBC. " + field.getName() + " is an array.")
           .build(logger);
       }
 
-      if (field.getType().getMode() == DataMode.OPTIONAL) {
-        nullable = true;
-      }
-
-      int precision = field.getPrecision();
-      int scale = field.getScale();
-
-      queryBuilder.addColumn(columnName, field.getType().getMinorType(), nullable, precision, scale);
+      queryBuilder.addColumn(field);
     }
 
-    sql = queryBuilder.build().getCreateTableQuery();
-    sql = JdbcDDLQueryUtils.cleanDDLQuery(sql, dialect);
+    String sql = queryBuilder.build();
     logger.debug("Final query: {}", sql);
 
     // Execute the query to build the schema
@@ -192,83 +119,26 @@ public class JdbcRecordWriter extends AbstractRecordWriter {
 
   @Override
   public void startRecord() {
-    rowList.clear();
+    insertStatementBuilder.resetRow();
 
-    if (!firstRecord) {
-      insertQueryBuilder.append(",");
-    }
-    insertQueryBuilder.append("(");
     logger.debug("Start record");
   }
 
   @Override
   public void endRecord() throws IOException {
     logger.debug("Ending record");
-
-    // Add values to rowString
-    for (int i = 0; i < rowList.size(); i++) {
-      if (i > 0) {
-        insertQueryBuilder.append(",");
-      }
-
-      // Add null value to rowstring
-      if (rowList.get(i) instanceof String && ((String) rowList.get(i)).equalsIgnoreCase("null")) {
-        insertQueryBuilder.append("null");
-        continue;
-      }
-
-      JdbcWriterField currentField = fields.get(i);
-      if (currentField.getDataType() == MinorType.VARCHAR) {
-        String value = null;
-        // Get the string value
-        if (currentField.getMode() == DataMode.REQUIRED) {
-          VarCharHolder varCharHolder = (VarCharHolder) rowList.get(i);
-          value = StringFunctionHelpers.getStringFromVarCharHolder(varCharHolder);
-        } else {
-          try {
-            NullableVarCharHolder nullableVarCharHolder = (NullableVarCharHolder) rowList.get(i);
-            value = StringFunctionHelpers.getStringFromVarCharHolder(nullableVarCharHolder);
-          } catch (ClassCastException e) {
-            logger.error("Unable to read field: {}",  rowList.get(i));
-          }
-        }
-
-        // Add to value string
-        insertQueryBuilder.literal(value);
-      } else if (currentField.getDataType() == MinorType.DATE) {
-        String dateString = formatDateForInsertQuery((Long) rowList.get(i));
-        insertQueryBuilder.literal(dateString);
-      } else if (currentField.getDataType() == MinorType.TIME) {
-        String timeString = formatTimeForInsertQuery((Integer) rowList.get(i));
-        insertQueryBuilder.literal(timeString);
-      } else if (currentField.getDataType() == MinorType.TIMESTAMP) {
-        String timeString = formatTimeStampForInsertQuery((Long) rowList.get(i));
-        insertQueryBuilder.literal(timeString);
-      } else {
-        if (Strings.isNullOrEmpty(rowList.get(i).toString())) {
-          insertQueryBuilder.append("null");
-        } else {
-          insertQueryBuilder.append(rowList.get(i).toString());
-        }
-      }
-    }
+    insertStatementBuilder.endRecord();
 
     recordCount++;
-    firstRecord = false;
-    insertQueryBuilder.append(")");
 
     if (recordCount >= config.getPlugin().getConfig().getWriterBatchSize()) {
-      // Execute the insert query
-      String insertQuery = insertQueryBuilder.toString();
-      executeInsert(insertQuery);
+      executeInsert(insertStatementBuilder.buildInsertQuery());
 
       // Reset the batch
       recordCount = 0;
-      firstRecord = true;
-      insertQueryBuilder = initializeInsertQuery();
     }
 
-    rowList.clear();
+    insertStatementBuilder.resetRow();
   }
 
   @Override
@@ -279,10 +149,8 @@ public class JdbcRecordWriter extends AbstractRecordWriter {
   @Override
   public void cleanup() throws IOException {
     logger.debug("Cleanup record");
-    // Execute last query
-    String insertQuery = insertQueryBuilder.toString();
     if (recordCount != 0) {
-      executeInsert(insertQuery);
+      executeInsert(insertStatementBuilder.buildInsertQuery());
     }
     AutoCloseables.closeSilently(connection);
   }
@@ -301,600 +169,254 @@ public class JdbcRecordWriter extends AbstractRecordWriter {
     }
   }
 
-  private SqlBuilder initializeInsertQuery() {
-    SqlBuilder builder = new SqlBuilder(this.dialect);
-
-    // Apache Phoenix does not support INSERT but does support UPSERT using the same syntax
-    if (dialect == DatabaseProduct.PHOENIX.getDialect()) {
-      builder.append("UPSERT INTO ");
-    } else {
-      builder.append("INSERT INTO ");
-    }
-
-    JdbcDDLQueryUtils.addTableToInsertQuery(builder, rawTableName);
-    builder.append (" VALUES ");
-    return builder;
-  }
-
-  /**
-   * Drill returns longs for date values. This function converts longs into dates formatted
-   * in YYYY-MM-dd format for insertion into a database.
-   * @param dateVal long representing a naive date
-   * @return A date string formatted YYYY-MM-dd
-   */
-  private String formatDateForInsertQuery(Long dateVal) {
-    Date date=new Date(dateVal);
-    SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd");
-    return df2.format(date);
-  }
-
-  /**
-   * Drill returns longs for time values. This function converts longs into times formatted
-   * in HH:mm:ss format for insertion into a database.
-   * @param millis Milliseconds since the epoch.
-   * @return A time string formatted for insertion into a database.
-   */
-  private String formatTimeForInsertQuery(Integer millis) {
-    return String.format("%02d:%02d:%02d", TimeUnit.MILLISECONDS.toHours(millis),
-      TimeUnit.MILLISECONDS.toMinutes(millis) % TimeUnit.HOURS.toMinutes(1),
-      TimeUnit.MILLISECONDS.toSeconds(millis) % TimeUnit.MINUTES.toSeconds(1));
-  }
-
-  /**
-   * Drill returns longs for date times. This function converts
-   * @param time An input long that represents a timestamp
-   * @return A ISO formatted timestamp.
-   */
-  private String formatTimeStampForInsertQuery(Long time) {
-    Date date = new Date(time);
-    Format format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-    return format.format(date);
-  }
+  public class NullableJdbcConverter extends FieldConverter {
+    private final FieldConverter delegate;
 
-  @Override
-  public FieldConverter getNewNullableIntConverter(int fieldId, String fieldName, FieldReader reader) {
-    return new NullableIntJDBCConverter(fieldId, fieldName, reader, fields);
-  }
-
-  public class NullableIntJDBCConverter extends FieldConverter {
-    private final NullableIntHolder holder = new NullableIntHolder();
-
-    public NullableIntJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
-      super(fieldID, fieldName, reader);
-      fields.add(new JdbcWriterField(fieldName, MinorType.INT, DataMode.OPTIONAL));
+    public NullableJdbcConverter(int fieldId, String fieldName, FieldReader reader, FieldConverter delegate) {
+      super(fieldId, fieldName, reader);
+      this.delegate = delegate;
     }
 
     @Override
-    public void writeField() {
-      if (!reader.isSet()) {
-        rowList.add("null");
-        return;
+    public void writeField() throws IOException {
+      if (reader.isSet()) {
+        delegate.writeField();
+      } else {
+        insertStatementBuilder.addRowValue(SqlLiteral.createNull(SqlParserPos.ZERO));
       }
-      reader.read(holder);
-      rowList.add(holder.value);
     }
   }
 
-  @Override
-  public FieldConverter getNewIntConverter(int fieldId, String fieldName, FieldReader reader) {
-    return new IntJDBCConverter(fieldId, fieldName, reader, fields);
-  }
-
-  public class IntJDBCConverter extends FieldConverter {
-    private final IntHolder holder = new IntHolder();
+  public class ExactNumericJdbcConverter extends FieldConverter {
 
-    public IntJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
-      super(fieldID, fieldName, reader);
-      fields.add(new JdbcWriterField(fieldName, MinorType.INT, DataMode.REQUIRED));
+    public ExactNumericJdbcConverter(int fieldId, String fieldName, FieldReader reader) {
+      super(fieldId, fieldName, reader);
     }
 
     @Override
     public void writeField() {
-      reader.read(holder);
-      rowList.add(holder.value);
+      insertStatementBuilder.addRowValue(
+        SqlLiteral.createExactNumeric(String.valueOf(reader.readObject()),
+          SqlParserPos.ZERO));
     }
   }
 
-  @Override
-  public FieldConverter getNewNullableBigIntConverter(int fieldId, String fieldName, FieldReader reader) {
-    return new NullableBigIntJDBCConverter(fieldId, fieldName, reader, fields);
-  }
-
-  public class NullableBigIntJDBCConverter extends FieldConverter {
-    private final NullableBigIntHolder holder = new NullableBigIntHolder();
+  public class ApproxNumericJdbcConverter extends FieldConverter {
 
-    public NullableBigIntJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
-      super(fieldID, fieldName, reader);
-      fields.add(new JdbcWriterField(fieldName, MinorType.BIGINT, DataMode.OPTIONAL));
+    public ApproxNumericJdbcConverter(int fieldId, String fieldName, FieldReader reader) {
+      super(fieldId, fieldName, reader);
     }
 
     @Override
     public void writeField() {
-      if (!reader.isSet()) {
-        rowList.add("null");
-        return;
-      }
-      reader.read(holder);
-      rowList.add(holder.value);
+      insertStatementBuilder.addRowValue(
+        SqlLiteral.createApproxNumeric(String.valueOf(reader.readObject()),
+          SqlParserPos.ZERO));
     }
   }
 
   @Override
-  public FieldConverter getNewBigIntConverter(int fieldId, String fieldName, FieldReader reader) {
-    return new BigIntJDBCConverter(fieldId, fieldName, reader, fields);
+  public FieldConverter getNewNullableIntConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new NullableJdbcConverter(fieldId, fieldName, reader,
+      new ExactNumericJdbcConverter(fieldId, fieldName, reader));
   }
 
-  public class BigIntJDBCConverter extends FieldConverter {
-    private final BigIntHolder holder = new BigIntHolder();
-
-    public BigIntJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
-      super(fieldID, fieldName, reader);
-      fields.add(new JdbcWriterField(fieldName, MinorType.BIGINT, DataMode.REQUIRED));
-    }
-
-    @Override
-    public void writeField() {
-      if (!reader.isSet()) {
-        rowList.add("null");
-        return;
-      }
-      reader.read(holder);
-      rowList.add(holder.value);
-    }
+  @Override
+  public FieldConverter getNewIntConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new ExactNumericJdbcConverter(fieldId, fieldName, reader);
   }
 
   @Override
-  public FieldConverter getNewNullableSmallIntConverter(int fieldId, String fieldName, FieldReader reader) {
-    return new NullableSmallIntJDBCConverter(fieldId, fieldName, reader, fields);
+  public FieldConverter getNewNullableBigIntConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new NullableJdbcConverter(fieldId, fieldName, reader,
+      new ExactNumericJdbcConverter(fieldId, fieldName, reader));
   }
 
-  public class NullableSmallIntJDBCConverter extends FieldConverter {
-    private final NullableSmallIntHolder holder = new NullableSmallIntHolder();
-
-    public NullableSmallIntJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
-      super(fieldID, fieldName, reader);
-      fields.add(new JdbcWriterField(fieldName, MinorType.SMALLINT, DataMode.OPTIONAL));
-    }
-
-    @Override
-    public void writeField() {
-      if (!reader.isSet()) {
-        rowList.add("null");
-        return;
-      }
-      reader.read(holder);
-      rowList.add(holder.value);
-    }
+  @Override
+  public FieldConverter getNewBigIntConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new ExactNumericJdbcConverter(fieldId, fieldName, reader);
   }
 
   @Override
-  public FieldConverter getNewSmallIntConverter(int fieldId, String fieldName, FieldReader reader) {
-    return new SmallIntJDBCConverter(fieldId, fieldName, reader, fields);
+  public FieldConverter getNewNullableSmallIntConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new NullableJdbcConverter(fieldId, fieldName, reader,
+      new ExactNumericJdbcConverter(fieldId, fieldName, reader));
   }
 
-  public class SmallIntJDBCConverter extends FieldConverter {
-    private final SmallIntHolder holder = new SmallIntHolder();
-
-    public SmallIntJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
-      super(fieldID, fieldName, reader);
-      fields.add(new JdbcWriterField(fieldName, MinorType.SMALLINT, DataMode.REQUIRED));
-    }
-
-    @Override
-    public void writeField() {
-      reader.read(holder);
-      rowList.add(holder.value);
-    }
+  @Override
+  public FieldConverter getNewSmallIntConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new ExactNumericJdbcConverter(fieldId, fieldName, reader);
   }
 
   @Override
   public FieldConverter getNewNullableTinyIntConverter(int fieldId, String fieldName, FieldReader reader) {
-    return new NullableTinyIntJDBCConverter(fieldId, fieldName, reader, fields);
-  }
-
-  public class NullableTinyIntJDBCConverter extends FieldConverter {
-    private final NullableTinyIntHolder holder = new NullableTinyIntHolder();
-
-    public NullableTinyIntJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
-      super(fieldID, fieldName, reader);
-      fields.add(new JdbcWriterField(fieldName, MinorType.TINYINT, DataMode.OPTIONAL));
-    }
-
-    @Override
-    public void writeField() {
-      if (!reader.isSet()) {
-        rowList.add("null");
-        return;
-      }
-      reader.read(holder);
-      rowList.add(holder.value);
-    }
+    return new NullableJdbcConverter(fieldId, fieldName, reader,
+      new ExactNumericJdbcConverter(fieldId, fieldName, reader));
   }
 
   @Override
   public FieldConverter getNewTinyIntConverter(int fieldId, String fieldName, FieldReader reader) {
-    return new TinyIntJDBCConverter(fieldId, fieldName, reader, fields);
-  }
-
-  public class TinyIntJDBCConverter extends FieldConverter {
-    private final TinyIntHolder holder = new TinyIntHolder();
-
-    public TinyIntJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
-      super(fieldID, fieldName, reader);
-      fields.add(new JdbcWriterField(fieldName, MinorType.TINYINT, DataMode.REQUIRED));
-    }
-
-    @Override
-    public void writeField() {
-      reader.read(holder);
-      rowList.add(holder.value);
-    }
+    return new ExactNumericJdbcConverter(fieldId, fieldName, reader);
   }
 
   @Override
   public FieldConverter getNewNullableFloat4Converter(int fieldId, String fieldName, FieldReader reader) {
-    return new NullableFloat4JDBCConverter(fieldId, fieldName, reader, fields);
-  }
-
-  public class NullableFloat4JDBCConverter extends FieldConverter {
-    private final NullableFloat4Holder holder = new NullableFloat4Holder();
-
-    public NullableFloat4JDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
-      super(fieldID, fieldName, reader);
-      fields.add(new JdbcWriterField(fieldName, MinorType.FLOAT4, DataMode.OPTIONAL));
-    }
-
-    @Override
-    public void writeField() {
-      if (!reader.isSet()) {
-        rowList.add("null");
-        return;
-      }
-      reader.read(holder);
-      rowList.add(holder.value);
-    }
+    return new NullableJdbcConverter(fieldId, fieldName, reader,
+      new ApproxNumericJdbcConverter(fieldId, fieldName, reader));
   }
 
   @Override
   public FieldConverter getNewFloat4Converter(int fieldId, String fieldName, FieldReader reader) {
-    return new Float4JDBCConverter(fieldId, fieldName, reader, fields);
-  }
-
-  public class Float4JDBCConverter extends FieldConverter {
-    private final Float4Holder holder = new Float4Holder();
-
-    public Float4JDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
-      super(fieldID, fieldName, reader);
-      fields.add(new JdbcWriterField(fieldName, MinorType.FLOAT4, DataMode.REQUIRED));
-    }
-
-    @Override
-    public void writeField() {
-      reader.read(holder);
-      rowList.add(holder.value);
-    }
+    return new ApproxNumericJdbcConverter(fieldId, fieldName, reader);
   }
 
   @Override
   public FieldConverter getNewNullableFloat8Converter(int fieldId, String fieldName, FieldReader reader) {
-    return new NullableFloat8JDBCConverter(fieldId, fieldName, reader, fields);
-  }
-
-  public class NullableFloat8JDBCConverter extends FieldConverter {
-    private final NullableFloat8Holder holder = new NullableFloat8Holder();
-
-    public NullableFloat8JDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
-      super(fieldID, fieldName, reader);
-      fields.add(new JdbcWriterField(fieldName, MinorType.FLOAT8, DataMode.OPTIONAL));
-    }
-
-    @Override
-    public void writeField() {
-      if (!reader.isSet()) {
-        rowList.add("null");
-        return;
-      }
-      reader.read(holder);
-      rowList.add(holder.value);
-    }
+    return new NullableJdbcConverter(fieldId, fieldName, reader,
+      new ApproxNumericJdbcConverter(fieldId, fieldName, reader));
   }
 
   @Override
   public FieldConverter getNewFloat8Converter(int fieldId, String fieldName, FieldReader reader) {
-    return new Float8JDBCConverter(fieldId, fieldName, reader, fields);
-  }
-
-  public class Float8JDBCConverter extends FieldConverter {
-    private final Float8Holder holder = new Float8Holder();
-
-    public Float8JDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
-      super(fieldID, fieldName, reader);
-      fields.add(new JdbcWriterField(fieldName, MinorType.FLOAT8, DataMode.REQUIRED));
-    }
-
-    @Override
-    public void writeField() {
-      reader.read(holder);
-      rowList.add(holder.value);
-    }
+    return new ApproxNumericJdbcConverter(fieldId, fieldName, reader);
   }
 
   @Override
   public FieldConverter getNewNullableVarDecimalConverter(int fieldId, String fieldName, FieldReader reader) {
-    return new NullableVardecimalJDBCConverter(fieldId, fieldName, reader, fields);
-  }
-
-  public class NullableVardecimalJDBCConverter extends FieldConverter {
-    private final NullableVarDecimalHolder holder = new NullableVarDecimalHolder();
-
-    public NullableVardecimalJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
-      super(fieldID, fieldName, reader);
-      fields.add(new JdbcWriterField(fieldName, MinorType.VARDECIMAL, DataMode.OPTIONAL));
-    }
-
-    @Override
-    public void writeField() {
-      if (!reader.isSet()) {
-        rowList.add("null");
-        return;
-      }
-      reader.read(holder);
-      BigDecimal value = DecimalUtility.getBigDecimalFromDrillBuf(holder.buffer,
-        holder.start, holder.end - holder.start, holder.scale);
-      rowList.add(value);
-    }
+    return new NullableJdbcConverter(fieldId, fieldName, reader,
+      new ExactNumericJdbcConverter(fieldId, fieldName, reader));
   }
 
   @Override
   public FieldConverter getNewVarDecimalConverter(int fieldId, String fieldName, FieldReader reader) {
-    return new VardecimalJDBCConverter(fieldId, fieldName, reader, fields);
-  }
-
-  public class VardecimalJDBCConverter extends FieldConverter {
-    private final VarDecimalHolder holder = new VarDecimalHolder();
-
-    public VardecimalJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
-      super(fieldID, fieldName, reader);
-      fields.add(new JdbcWriterField(fieldName, MinorType.VARDECIMAL, DataMode.REQUIRED));
-    }
-
-    @Override
-    public void writeField() {
-      reader.read(holder);
-      BigDecimal value = DecimalUtility.getBigDecimalFromDrillBuf(holder.buffer,
-        holder.start, holder.end - holder.start, holder.scale);
-      rowList.add(value);
-    }
+    return new ExactNumericJdbcConverter(fieldId, fieldName, reader);
   }
 
   @Override
   public FieldConverter getNewNullableVarCharConverter(int fieldId, String fieldName, FieldReader reader) {
-    return new NullableVarCharJDBCConverter(fieldId, fieldName, reader, fields);
-  }
-
-  public class NullableVarCharJDBCConverter extends FieldConverter {
-    private final NullableVarCharHolder holder = new NullableVarCharHolder();
-
-    public NullableVarCharJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
-      super(fieldID, fieldName, reader);
-      fields.add(new JdbcWriterField(fieldName, MinorType.VARCHAR, DataMode.OPTIONAL));
-    }
-
-    @Override
-    public void writeField() {
-      reader.read(holder);
-      if (reader.isSet()) {
-        byte[] bytes = new byte[holder.end - holder.start];
-        holder.buffer.getBytes(holder.start, bytes);
-      }
-      rowList.add(holder);
-    }
+    return new NullableJdbcConverter(fieldId, fieldName, reader,
+      new VarCharJDBCConverter(fieldId, fieldName, reader));
   }
 
   @Override
   public FieldConverter getNewVarCharConverter(int fieldId, String fieldName, FieldReader reader) {
-    return new VarCharJDBCConverter(fieldId, fieldName, reader, fields);
+    return new VarCharJDBCConverter(fieldId, fieldName, reader);
   }
 
   public class VarCharJDBCConverter extends FieldConverter {
-    private final VarCharHolder holder = new VarCharHolder();
 
-    public VarCharJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
+    public VarCharJDBCConverter(int fieldID, String fieldName, FieldReader reader) {
       super(fieldID, fieldName, reader);
-      fields.add(new JdbcWriterField(fieldName, MinorType.VARCHAR, DataMode.REQUIRED));
     }
 
     @Override
     public void writeField() {
-      reader.read(holder);
-      if (reader.isSet()) {
-        byte[] bytes = new byte[holder.end - holder.start];
-        holder.buffer.getBytes(holder.start, bytes);
-        rowList.add(holder);
-      }
+      byte[] bytes = reader.readText().copyBytes();
+      insertStatementBuilder.addRowValue(
+        SqlLiteral.createCharString(new String(bytes),
+          SqlParserPos.ZERO));
     }
   }
 
   @Override
   public FieldConverter getNewNullableDateConverter(int fieldId, String fieldName, FieldReader reader) {
-    return new NullableDateJDBCConverter(fieldId, fieldName, reader, fields);
-  }
-
-  public class NullableDateJDBCConverter extends FieldConverter {
-    private final NullableDateHolder holder = new NullableDateHolder();
-
-    public NullableDateJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
-      super(fieldID, fieldName, reader);
-      fields.add(new JdbcWriterField(fieldName, MinorType.DATE, DataMode.OPTIONAL));
-    }
-
-    @Override
-    public void writeField() {
-      if (!reader.isSet()) {
-        rowList.add("null");
-        return;
-      }
-      reader.read(holder);
-      rowList.add(holder.value);
-    }
+    return new NullableJdbcConverter(fieldId, fieldName, reader,
+      new DateJDBCConverter(fieldId, fieldName, reader));
   }
 
   @Override
   public FieldConverter getNewDateConverter(int fieldId, String fieldName, FieldReader reader) {
-    return new DateJDBCConverter(fieldId, fieldName, reader, fields);
+    return new DateJDBCConverter(fieldId, fieldName, reader);
   }
 
   public class DateJDBCConverter extends FieldConverter {
-    private final DateHolder holder = new DateHolder();
 
-    public DateJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
+    public DateJDBCConverter(int fieldID, String fieldName, FieldReader reader) {
       super(fieldID, fieldName, reader);
-      fields.add(new JdbcWriterField(fieldName, MinorType.DATE, DataMode.REQUIRED));
     }
 
     @Override
     public void writeField() {
-      reader.read(holder);
-      rowList.add(holder.value);
+      insertStatementBuilder.addRowValue(
+        SqlLiteral.createDate(DateString.fromDaysSinceEpoch((int) reader.readLocalDate().toEpochDay()),
+          SqlParserPos.ZERO));
     }
   }
 
   @Override
   public FieldConverter getNewNullableTimeConverter(int fieldId, String fieldName, FieldReader reader) {
-      return new NullableTimeJDBCConverter(fieldId, fieldName, reader, fields);
+      return new NullableJdbcConverter(fieldId, fieldName, reader,
+        new TimeJDBCConverter(fieldId, fieldName, reader));
     }
 
-    public class NullableTimeJDBCConverter extends FieldConverter {
-    private final NullableTimeHolder holder = new NullableTimeHolder();
-
-    public NullableTimeJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
-      super(fieldID, fieldName, reader);
-      fields.add(new JdbcWriterField(fieldName, MinorType.TIME, DataMode.OPTIONAL));
-    }
-
-    @Override
-    public void writeField() {
-      if (!reader.isSet()) {
-        rowList.add("null");
-        return;
-      }
-      reader.read(holder);
-      rowList.add(holder.value);
-    }
-  }
-
   @Override
   public FieldConverter getNewTimeConverter(int fieldId, String fieldName, FieldReader reader) {
-    return new TimeJDBCConverter(fieldId, fieldName, reader, fields);
+    return new TimeJDBCConverter(fieldId, fieldName, reader);
   }
 
   public class TimeJDBCConverter extends FieldConverter {
-    private final TimeHolder holder = new TimeHolder();
 
-    public TimeJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
+    public TimeJDBCConverter(int fieldID, String fieldName, FieldReader reader) {
       super(fieldID, fieldName, reader);
-      fields.add(new JdbcWriterField(fieldName, MinorType.TIME, DataMode.REQUIRED));
     }
 
     @Override
     public void writeField() {
-      reader.read(holder);
-      rowList.add(holder.value);
+      insertStatementBuilder.addRowValue(
+        SqlLiteral.createTime(TimeString.fromMillisOfDay(
+            (int) (reader.readLocalTime().toNanoOfDay() / TimeUnit.MILLISECONDS.toNanos(1))),
+          Types.DEFAULT_TIMESTAMP_PRECISION,
+          SqlParserPos.ZERO));
     }
   }
 
   @Override
   public FieldConverter getNewNullableTimeStampConverter(int fieldId, String fieldName, FieldReader reader) {
-    return new NullableTimeStampJDBCConverter(fieldId, fieldName, reader, fields);
-  }
-
-  public class NullableTimeStampJDBCConverter extends FieldConverter {
-    private final NullableTimeStampHolder holder = new NullableTimeStampHolder();
-
-    public NullableTimeStampJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
-      super(fieldID, fieldName, reader);
-      fields.add(new JdbcWriterField(fieldName, MinorType.TIMESTAMP, DataMode.OPTIONAL));
-    }
-
-    @Override
-    public void writeField() {
-      if (!reader.isSet()) {
-        rowList.add("null");
-        return;
-      }
-      reader.read(holder);
-      rowList.add(holder.value);
-    }
+    return new NullableJdbcConverter(fieldId, fieldName, reader,
+      new TimeStampJDBCConverter(fieldId, fieldName, reader));
   }
 
   @Override
   public FieldConverter getNewTimeStampConverter(int fieldId, String fieldName, FieldReader reader) {
-    return new TimeStampJDBCConverter(fieldId, fieldName, reader, fields);
+    return new TimeStampJDBCConverter(fieldId, fieldName, reader);
   }
 
   public class TimeStampJDBCConverter extends FieldConverter {
-    private final TimeStampHolder holder = new TimeStampHolder();
 
-    public TimeStampJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
+    public TimeStampJDBCConverter(int fieldID, String fieldName, FieldReader reader) {
       super(fieldID, fieldName, reader);
-      fields.add(new JdbcWriterField(fieldName, MinorType.TIMESTAMP, DataMode.REQUIRED));
     }
 
     @Override
     public void writeField() {
-      reader.read(holder);
-      rowList.add(holder.value);
+      insertStatementBuilder.addRowValue(
+        SqlLiteral.createTimestamp(
+          TimestampString.fromMillisSinceEpoch(reader.readLocalDateTime().toInstant(ZoneOffset.UTC).toEpochMilli()),
+          Types.DEFAULT_TIMESTAMP_PRECISION,
+          SqlParserPos.ZERO));
     }
   }
 
   @Override
   public FieldConverter getNewNullableBitConverter(int fieldId, String fieldName, FieldReader reader) {
-    return new NullableBitJDBCConverter(fieldId, fieldName, reader, fields);
+    return new NullableJdbcConverter(fieldId, fieldName, reader,
+      new BitJDBCConverter(fieldId, fieldName, reader));
   }
 
-  public class NullableBitJDBCConverter extends FieldConverter {
-    private final NullableBitHolder holder = new NullableBitHolder();
-
-    public NullableBitJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
-      super(fieldID, fieldName, reader);
-      fields.add(new JdbcWriterField(fieldName, MinorType.BIT, DataMode.OPTIONAL));
-    }
-
-    @Override
-    public void writeField() {
-      if (!reader.isSet()) {
-        rowList.add("null");
-        return;
-      }
-      reader.read(holder);
-      String booleanValue = "false";
-      if (holder.value == 1) {
-        booleanValue = "true";
-      }
-      rowList.add(booleanValue);
-    }
-  }
   @Override
   public FieldConverter getNewBitConverter(int fieldId, String fieldName, FieldReader reader) {
-    return new BitJDBCConverter(fieldId, fieldName, reader, fields);
+    return new BitJDBCConverter(fieldId, fieldName, reader);
   }
 
   public class BitJDBCConverter extends FieldConverter {
-    private final BitHolder holder = new BitHolder();
 
-    public BitJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
+    public BitJDBCConverter(int fieldID, String fieldName, FieldReader reader) {
       super(fieldID, fieldName, reader);
-      fields.add(new JdbcWriterField(fieldName, MinorType.BIT, DataMode.REQUIRED));
     }
 
     @Override
     public void writeField() {
-      reader.read(holder);
-      String booleanValue = "false";
-      if (holder.value == 1) {
-        booleanValue = "true";
-      }
-      rowList.add(booleanValue);
+      insertStatementBuilder.addRowValue(SqlLiteral.createBoolean(reader.readBoolean(), SqlParserPos.ZERO));
     }
   }
 }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
index 64d89ecb79..8489087465 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
@@ -35,6 +35,7 @@ import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.planner.PlannerPhase;
 import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
@@ -92,7 +93,7 @@ public class JdbcStoragePlugin extends AbstractStoragePlugin {
         getName(),
         userCredentials.getUserName()
       );
-      return Optional.<DataSource>empty();
+      return Optional.empty();
     }
 
     // Missing creds is valid under SHARED_USER (e.g. unsecured DBs, BigQuery's OAuth)
@@ -138,15 +139,30 @@ public class JdbcStoragePlugin extends AbstractStoragePlugin {
   }
 
   @Override
-  public Set<RelOptRule> getPhysicalOptimizerRules(OptimizerRulesContext context) {
-    UserCredentials userCreds = context.getContextInformation().getQueryUserCredentials();
-    Optional<DataSource> dataSource = getDataSource(userCreds);
+  public boolean supportsInsert() {
+    return jdbcStorageConfig.isWritable();
+  }
 
-    if (!dataSource.isPresent()) {
-      return ImmutableSet.of();
+  @Override
+  public Set<? extends RelOptRule> getOptimizerRules(
+    OptimizerRulesContext optimizerContext, PlannerPhase phase) {
+    switch (phase) {
+      case LOGICAL:
+      case PHYSICAL: {
+        UserCredentials userCreds = optimizerContext.getContextInformation().getQueryUserCredentials();
+
+        String userName = userCreds.getUserName();
+        return getDataSource(userCreds)
+          .map(dataSource -> getConvention(getDialect(dataSource), userName).getRules())
+          .orElse(ImmutableSet.of());
+      }
+      case LOGICAL_PRUNE_AND_JOIN:
+      case LOGICAL_PRUNE:
+      case PARTITION_PRUNING:
+      case JOIN_PLANNING:
+      default:
+        return ImmutableSet.of();
     }
-
-    return getConvention( getDialect(dataSource.get()), userCreds.getUserName() ).getRules();
   }
 
   @Override
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcTableModifyWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcTableModifyWriter.java
new file mode 100644
index 0000000000..3895a087cd
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcTableModifyWriter.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.drill.exec.record.VectorAccessible;
+
+import javax.sql.DataSource;
+import java.util.List;
+
+public class JdbcTableModifyWriter extends JdbcRecordWriter {
+
+  public JdbcTableModifyWriter(DataSource source, List<String> tableIdentifier, JdbcWriter config) {
+    super(source, tableIdentifier, config);
+  }
+
+  @Override
+  public void updateSchema(VectorAccessible batch) {
+    // no-op
+  }
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriter.java
index 05924a1d80..dadd85086a 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriter.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriter.java
@@ -17,11 +17,10 @@
  */
 package org.apache.drill.exec.store.jdbc;
 
-import java.io.IOException;
+import java.util.List;
 
 import org.apache.calcite.adapter.jdbc.JdbcSchema;
 import org.apache.drill.common.PlanStringBuilder;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.physical.base.AbstractWriter;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -37,25 +36,25 @@ public class JdbcWriter extends AbstractWriter {
   public static final String OPERATOR_TYPE = "JDBC_WRITER";
 
   private final JdbcStoragePlugin plugin;
-  private final String tableName;
+  private final List<String> tableIdentifier;
   private final JdbcSchema inner;
 
   @JsonCreator
   public JdbcWriter(
     @JsonProperty("child") PhysicalOperator child,
-    @JsonProperty("name") String name,
+    @JsonProperty("tableIdentifier") List<String> tableIdentifier,
     @JsonProperty("storage") StoragePluginConfig storageConfig,
     @JacksonInject JdbcSchema inner,
-    @JacksonInject StoragePluginRegistry engineRegistry) throws IOException, ExecutionSetupException {
+    @JacksonInject StoragePluginRegistry engineRegistry) {
     super(child);
     this.plugin = engineRegistry.resolve(storageConfig, JdbcStoragePlugin.class);
-    this.tableName = name;
+    this.tableIdentifier = tableIdentifier;
     this.inner = inner;
   }
 
-  JdbcWriter(PhysicalOperator child, String name, JdbcSchema inner, JdbcStoragePlugin plugin) {
+  JdbcWriter(PhysicalOperator child, List<String> tableIdentifier, JdbcSchema inner, JdbcStoragePlugin plugin) {
     super(child);
-    this.tableName = name;
+    this.tableIdentifier = tableIdentifier;
     this.plugin = plugin;
     this.inner = inner;
   }
@@ -67,11 +66,11 @@ public class JdbcWriter extends AbstractWriter {
 
   @Override
   protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
-    return new JdbcWriter(child, tableName, inner, plugin);
+    return new JdbcWriter(child, tableIdentifier, inner, plugin);
   }
 
-  public String getTableName() {
-    return tableName;
+  public List<String> getTableIdentifier() {
+    return tableIdentifier;
   }
 
   public StoragePluginConfig getStorage() {
@@ -89,7 +88,7 @@ public class JdbcWriter extends AbstractWriter {
   @Override
   public String toString() {
     return new PlanStringBuilder(this)
-      .field("tableName", tableName)
+      .field("tableName", tableIdentifier)
       .field("storageStrategy", getStorageStrategy())
       .toString();
   }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriterBatchCreator.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriterBatchCreator.java
index 182c104714..4373ba6303 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriterBatchCreator.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriterBatchCreator.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
 
+@SuppressWarnings("unused")
 public class JdbcWriterBatchCreator implements BatchCreator<JdbcWriter> {
 
   @Override
@@ -48,7 +49,7 @@ public class JdbcWriterBatchCreator implements BatchCreator<JdbcWriter> {
       config,
       children.iterator().next(),
       context,
-      new JdbcRecordWriter(ds, null, config.getTableName(), config)
+      new JdbcRecordWriter(ds, config.getTableIdentifier(), config)
     );
   }
 }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/utils/CreateTableStmtBuilder.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/utils/CreateTableStmtBuilder.java
index f05ded75f0..015bd7aff6 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/utils/CreateTableStmtBuilder.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/utils/CreateTableStmtBuilder.java
@@ -18,126 +18,130 @@
 
 package org.apache.drill.exec.store.jdbc.utils;
 
+import org.apache.calcite.schema.ColumnStrategy;
+import org.apache.calcite.sql.SqlBasicTypeNameSpec;
+import org.apache.calcite.sql.SqlDataTypeSpec;
 import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.ddl.SqlDdlNodes;
 import org.apache.calcite.sql.dialect.PostgresqlSqlDialect;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.store.jdbc.JdbcRecordWriter;
-import org.apache.parquet.Strings;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.record.MaterializedField;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.sql.JDBCType;
+import java.util.List;
 
 public class CreateTableStmtBuilder {
   private static final Logger logger = LoggerFactory.getLogger(CreateTableStmtBuilder.class);
   public static final int DEFAULT_VARCHAR_PRECISION = 100;
 
-  private static final String CREATE_TABLE_QUERY = "CREATE TABLE %s (";
-  private StringBuilder createTableQuery;
-  private final String tableName;
+  private final List<String> tableIdentifier;
   private final SqlDialect dialect;
-  private StringBuilder columns;
+  private final SqlNodeList sqlColumns = new SqlNodeList(SqlParserPos.ZERO);
 
-  public CreateTableStmtBuilder(String tableName, SqlDialect dialect) {
-    if (Strings.isNullOrEmpty(tableName)) {
+  public CreateTableStmtBuilder(List<String> tableIdentifier, SqlDialect dialect) {
+    if (CollectionUtils.isEmpty(tableIdentifier)) {
       throw new UnsupportedOperationException("Table name cannot be empty");
     }
-    this.tableName = tableName;
+    this.tableIdentifier = tableIdentifier;
     this.dialect = dialect;
-    columns = new StringBuilder();
   }
 
   /**
    * Adds a column to the CREATE TABLE statement
-   * @param colName The column to be added to the table
-   * @param type The Drill MinorType of the column
-   * @param nullable If the column is nullable or not.
-   * @param precision The precision, or overall length of a column
-   * @param scale The scale, or number of digits after the decimal
    */
-  public void addColumn(String colName, MinorType type, boolean nullable, int precision, int scale) {
-    StringBuilder queryText = new StringBuilder();
-    String jdbcColTypeName = "";
-    try {
-      Integer jdbcColType = JdbcRecordWriter.JDBC_TYPE_MAPPINGS.get(type);
-      jdbcColTypeName = JDBCType.valueOf(jdbcColType).getName();
-
-      if (dialect instanceof PostgresqlSqlDialect) {
-        // pg data type name special case
-        if (jdbcColType.equals(java.sql.Types.DOUBLE)) {
-          // TODO: Calcite will incorrectly output DOUBLE instead of DOUBLE PRECISION under the pg dialect
-          jdbcColTypeName = "FLOAT";
-        }
+  public void addColumn(MaterializedField field) {
+    TypeProtos.MajorType majorType = populateScaleAndPrecisionIfRequired(field.getType());
+    int jdbcType = Types.getJdbcTypeCode(Types.getSqlTypeName(majorType));
+    if (dialect instanceof PostgresqlSqlDialect) {
+      // pg data type name special case
+      if (jdbcType == java.sql.Types.DOUBLE) {
+        // TODO: Calcite will incorrectly output DOUBLE instead of DOUBLE PRECISION under the pg dialect
+        jdbcType = java.sql.Types.FLOAT;
       }
-    } catch (NullPointerException e) {
-      // JDBC Does not support writing complex fields to databases
+    }
+    SqlTypeName sqlTypeName = SqlTypeName.getNameForJdbcType(jdbcType);
+
+    if (sqlTypeName == null) {
       throw UserException.dataWriteError()
         .message("Drill does not support writing complex fields to JDBC data sources.")
-        .addContext(colName + " is a complex type.")
+        .addContext(field.getName() + " is a complex type.")
         .build(logger);
     }
 
-    queryText.append(colName).append(" ").append(jdbcColTypeName);
+    int precision = majorType.hasPrecision()
+      ? majorType.getPrecision()
+      : -1;
 
-    // Add precision or scale if applicable
-    if (jdbcColTypeName.equals("VARCHAR")) {
-      int max_precision = Math.max(precision, DEFAULT_VARCHAR_PRECISION);
-      queryText.append("(").append(max_precision).append(")");
-    }
+    int scale = majorType.hasScale()
+      ? majorType.getScale()
+      : -1;
 
-    if (!nullable) {
-      queryText.append(" NOT NULL");
-    }
+    SqlBasicTypeNameSpec typeNameSpec = new SqlBasicTypeNameSpec(
+      sqlTypeName, precision, scale, SqlParserPos.ZERO);
 
-    if (! Strings.isNullOrEmpty(columns.toString())) {
-      columns.append(",\n");
-    }
+    SqlDataTypeSpec sqlDataTypeSpec = new SqlDataTypeSpec(
+      typeNameSpec,
+      SqlParserPos.ZERO).withNullable(field.isNullable());
+
+    ColumnStrategy columnStrategy = field.isNullable()
+      ? ColumnStrategy.NULLABLE
+      : ColumnStrategy.NOT_NULLABLE;
+
+    SqlNode sqlColumnDeclaration = SqlDdlNodes.column(SqlParserPos.ZERO,
+      new SqlIdentifier(field.getName(), SqlParserPos.ZERO),
+      sqlDataTypeSpec,
+      null,
+      columnStrategy);
 
-    columns.append(queryText);
+    sqlColumns.add(sqlColumnDeclaration);
   }
 
   /**
    * Generates the CREATE TABLE query.
    * @return The create table query.
    */
-  public CreateTableStmtBuilder build() {
-    createTableQuery = new StringBuilder();
-    createTableQuery.append(String.format(CREATE_TABLE_QUERY, tableName));
-    createTableQuery.append(columns);
-    createTableQuery.append("\n)");
-    return this;
-  }
+  public String build() {
+    SqlIdentifier sqlIdentifier = new SqlIdentifier(tableIdentifier, SqlParserPos.ZERO);
+    SqlNode createTable = SqlDdlNodes.createTable(SqlParserPos.ZERO,
+      false, false, sqlIdentifier, sqlColumns, null);
 
-  public String getCreateTableQuery() {
-    return createTableQuery != null ? createTableQuery.toString() : null;
+    return createTable.toSqlString(dialect, true).getSql();
   }
 
-  @Override
-  public String toString() {
-    return getCreateTableQuery();
-  }
-
-  /**
-   * This function adds the appropriate catalog, schema and table for the FROM clauses for INSERT queries
-   * @param table The table
-   * @param catalog The database catalog
-   * @param schema The database schema
-   * @return The table with catalog and schema added, if present
-   */
-  public static String buildCompleteTableName(String table, String catalog, String schema) {
-    logger.debug("Building complete table.");
-    StringBuilder completeTable = new StringBuilder();
-    if (! Strings.isNullOrEmpty(catalog)) {
-      completeTable.append(catalog);
-      completeTable.append(".");
-    }
-
-    if (! Strings.isNullOrEmpty(schema)) {
-      completeTable.append(schema);
-      completeTable.append(".");
+  private static TypeProtos.MajorType populateScaleAndPrecisionIfRequired(TypeProtos.MajorType type) {
+    switch (type.getMinorType()) {
+      case VARDECIMAL:
+        if (!type.hasPrecision()) {
+          type = type.toBuilder().setPrecision(Types.maxPrecision(type.getMinorType())).build();
+        }
+        if (!type.hasScale()) {
+          type = type.toBuilder().setScale(0).build();
+        }
+        break;
+      case FIXEDCHAR:
+      case FIXED16CHAR:
+      case VARCHAR:
+      case VAR16CHAR:
+      case VARBINARY:
+        if (!type.hasPrecision()) {
+          type = type.toBuilder().setPrecision(DEFAULT_VARCHAR_PRECISION).build();
+        }
+      case TIMESTAMP:
+      case TIME:
+        if (!type.hasPrecision()) {
+          type = type.toBuilder().setPrecision(Types.DEFAULT_TIMESTAMP_PRECISION).build();
+        }
+      default:
     }
-    completeTable.append(table);
-    return JdbcDDLQueryUtils.addBackTicksToTable(completeTable.toString());
+    return type;
   }
 }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/utils/InsertStatementBuilder.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/utils/InsertStatementBuilder.java
new file mode 100644
index 0000000000..63be096fa7
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/utils/InsertStatementBuilder.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc.utils;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.fun.SqlInternalOperators;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class InsertStatementBuilder {
+
+  private final List<SqlNode> sqlRows = new ArrayList<>();
+  private final List<SqlNode> sqlRowValues = new ArrayList<>();
+  private final SqlDialect dialect;
+  private final List<String> tableIdentifier;
+
+  public InsertStatementBuilder(List<String> tableIdentifier, SqlDialect dialect) {
+    this.dialect = dialect;
+    this.tableIdentifier = tableIdentifier;
+  }
+
+  public void addRowValue(SqlNode value) {
+    sqlRowValues.add(value);
+  }
+
+  public void endRecord() {
+    sqlRows.add(SqlInternalOperators.ANONYMOUS_ROW.createCall(
+      SqlParserPos.ZERO, sqlRowValues.toArray(new SqlNode[0])));
+    resetRow();
+  }
+
+  public void resetRow() {
+    sqlRowValues.clear();
+  }
+
+  public String buildInsertQuery() {
+    SqlCall values = SqlStdOperatorTable.VALUES.createCall(
+      SqlParserPos.ZERO, sqlRows.toArray(new SqlNode[0]));
+    resetRow();
+    sqlRows.clear();
+    SqlInsert sqlInsert = new SqlInsert(
+      SqlParserPos.ZERO,
+      SqlNodeList.EMPTY,
+      new SqlIdentifier(tableIdentifier, SqlParserPos.ZERO),
+      values,
+      null);
+
+    return sqlInsert.toSqlString(dialect).getSql();
+  }
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/utils/JdbcDDLQueryUtils.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/utils/JdbcDDLQueryUtils.java
deleted file mode 100644
index 95b927c6b5..0000000000
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/utils/JdbcDDLQueryUtils.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.drill.exec.store.jdbc.utils;
-
-import org.apache.calcite.config.Lex;
-import org.apache.calcite.sql.SqlDialect;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.calcite.sql.parser.SqlParser;
-import org.apache.calcite.sql.util.SqlBuilder;
-import org.apache.calcite.sql.validate.SqlConformanceEnum;
-import org.apache.calcite.sql.parser.ddl.SqlDdlParserImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-public class JdbcDDLQueryUtils {
-
-  private static final Logger logger = LoggerFactory.getLogger(JdbcDDLQueryUtils.class);
-  /**
-   * Converts a given SQL query from the generic dialect to the destination system dialect.  Returns
-   * null if the original query is not valid.  This should only be used for CTAS queries.
-   *
-   * @param query An ANSI SQL statement (CTAS Only)
-   * @param dialect The destination system dialect
-   * @return A representation of the original query in the destination dialect
-   */
-  public static String cleanDDLQuery(String query, SqlDialect dialect) {
-    SqlParser.Config sqlParserConfig = SqlParser.configBuilder()
-      .setParserFactory(SqlDdlParserImpl.FACTORY)
-      .setConformance(SqlConformanceEnum.STRICT_2003)
-      .setCaseSensitive(true)
-      .setLex(Lex.MYSQL)
-      .build();
-
-    try {
-      SqlNode node = SqlParser.create(query, sqlParserConfig).parseQuery();
-
-      return node.toSqlString(dialect).getSql();
-    } catch (SqlParseException e) {
-      logger.error(e.getMessage());
-      return null;
-    }
-  }
-
-  /**
-   * This function adds backticks around table names.  If the table name already has backticks,
-   * it does nothing.
-   * @param inputTable The table name with or without backticks
-   * @return The table name with backticks added.
-   */
-  public static String addBackTicksToTable(String inputTable) {
-    String[] queryParts = inputTable.split("\\.");
-    StringBuilder cleanQuery = new StringBuilder();
-
-    int counter = 0;
-    for (String part : queryParts) {
-      if (counter > 0) {
-        cleanQuery.append(".");
-      }
-
-      if (part.startsWith("`") && part.endsWith("`")) {
-        cleanQuery.append(part);
-      } else {
-        cleanQuery.append("`").append(part).append("`");
-      }
-      counter++;
-    }
-
-    return cleanQuery.toString();
-  }
-
-  public static void addTableToInsertQuery(SqlBuilder builder, String rawEscapedTables) {
-    final String TABLE_REGEX = "(?:`(.+?)`)+";
-    Pattern pattern = Pattern.compile(TABLE_REGEX);
-    Matcher matcher = pattern.matcher(rawEscapedTables);
-
-    int matchCount = 0;
-    while (matcher.find()) {
-      if (matchCount > 0) {
-        builder.append(".");
-      }
-      builder.identifier(matcher.group(1));
-      matchCount++;
-    }
-  }
-
-  public static String addBackTicksToField(String field) {
-    if (field.startsWith("`") && field.endsWith("`")) {
-      return field;
-    } else {
-      StringBuilder cleanField = new StringBuilder();
-      return cleanField.append("`").append(field).append("`").toString();
-    }
-  }
-}
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestCreateTableStmtBuilder.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestCreateTableStmtBuilder.java
deleted file mode 100644
index 9d827a642a..0000000000
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestCreateTableStmtBuilder.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.drill.exec.store.jdbc;
-
-import org.apache.drill.exec.store.jdbc.utils.CreateTableStmtBuilder;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class TestCreateTableStmtBuilder {
-
-  @Test
-  public void testSimpleTable() {
-    String table = "table";
-    String schema = "schema";
-    String catalog = "catalog";
-
-    String completeTable = CreateTableStmtBuilder.buildCompleteTableName(table, catalog, schema);
-    assertEquals("`catalog`.`schema`.`table`", completeTable);
-    assertEquals("`catalog`.`table`", CreateTableStmtBuilder.buildCompleteTableName(table, catalog, ""));
-    assertEquals("`catalog`.`table`", CreateTableStmtBuilder.buildCompleteTableName(table, catalog, null));
-  }
-
-  @Test
-  public void testTablesWithSpaces() {
-    String table = "table with spaces";
-    String schema = "schema with spaces";
-    String catalog = "catalog with spaces";
-
-    String completeTable = CreateTableStmtBuilder.buildCompleteTableName(table, catalog, schema);
-    assertEquals("`catalog with spaces`.`schema with spaces`.`table with spaces`", completeTable);
-    assertEquals("`catalog with spaces`.`table with spaces`", CreateTableStmtBuilder.buildCompleteTableName(table, catalog, ""));
-    assertEquals("`catalog with spaces`.`table with spaces`", CreateTableStmtBuilder.buildCompleteTableName(table, catalog, null));
-  }
-}
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcInsertWithH2.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcInsertWithH2.java
new file mode 100644
index 0000000000..ab0276d2ac
--- /dev/null
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcInsertWithH2.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.drill.categories.JdbcStorageTest;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.exec.store.enumerable.plan.EnumMockPlugin;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder.QuerySummary;
+import org.h2.tools.RunScript;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.FileReader;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimeZone;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+@Category(JdbcStorageTest.class)
+public class TestJdbcInsertWithH2 extends ClusterTest {
+
+  @BeforeClass
+  public static void init() throws Exception {
+    startCluster(ClusterFixture.builder(dirTestWatcher));
+    // Force timezone to UTC for these tests.
+    TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
+    dirTestWatcher.copyResourceToRoot(Paths.get(""));
+
+    Class.forName("org.h2.Driver");
+    String connString = "jdbc:h2:" + dirTestWatcher.getTmpDir().getCanonicalPath();
+    URL scriptFile = TestJdbcPluginWithH2IT.class.getClassLoader().getResource("h2-test-data.sql");
+    assertNotNull("Script for test tables generation 'h2-test-data.sql' cannot be found in test resources", scriptFile);
+    try (Connection connection = DriverManager.getConnection(connString, "root", "root");
+         FileReader fileReader = new FileReader(scriptFile.getFile())) {
+      RunScript.execute(connection, fileReader);
+    }
+
+    Map<String, String> credentials = new HashMap<>();
+    credentials.put("username", "root");
+    credentials.put("password", "root");
+    PlainCredentialsProvider credentialsProvider = new PlainCredentialsProvider(credentials);
+
+    Map<String, Object> sourceParameters =  new HashMap<>();
+    sourceParameters.put("minimumIdle", 1);
+    JdbcStorageConfig jdbcStorageConfig = new JdbcStorageConfig("org.h2.Driver", connString,
+      "root", "root", true, true, sourceParameters, credentialsProvider, AuthMode.SHARED_USER.name(), 10000);
+    jdbcStorageConfig.setEnabled(true);
+
+    JdbcStorageConfig jdbcStorageConfigNoWrite = new JdbcStorageConfig("org.h2.Driver", connString,
+      "root", "root", true, false, sourceParameters, credentialsProvider, AuthMode.SHARED_USER.name(), 10000);
+    jdbcStorageConfig.setEnabled(true);
+    jdbcStorageConfigNoWrite.setEnabled(true);
+
+    cluster.defineStoragePlugin("h2", jdbcStorageConfig);
+    cluster.defineStoragePlugin("h2_unwritable", jdbcStorageConfigNoWrite);
+
+    EnumMockPlugin.EnumMockStoragePluginConfig config = new EnumMockPlugin.EnumMockStoragePluginConfig();
+    config.setEnabled(true);
+    cluster.defineStoragePlugin("mocked_enum", config);
+  }
+
+  @Test
+  public void testInsertValues() throws Exception {
+    String tableName = "h2.tmp.drill_h2_test.`test_table`";
+    try {
+      String query = "CREATE TABLE %s (ID, NAME) AS (VALUES(1,2))";
+      // Create the table and insert the values
+      QuerySummary insertResults = queryBuilder()
+        .sql(query, tableName)
+        .run();
+      assertTrue(insertResults.succeeded());
+
+      String insertQuery = "insert into %s(ID, NAME) VALUES (3,4)";
+      queryBuilder()
+        .sql(insertQuery, tableName)
+        .planMatcher()
+        .include("Jdbc\\(sql=\\[INSERT INTO")
+        .match();
+
+      testBuilder()
+        .sqlQuery(insertQuery, tableName)
+        .unOrdered()
+        .baselineColumns("ROWCOUNT")
+        .baselineValues(1L)
+        .go();
+
+      testBuilder()
+        .sqlQuery("select * from %s", tableName)
+        .unOrdered()
+        .baselineColumns("ID", "NAME")
+        .baselineValues(1, 2)
+        .baselineValues(3, 4)
+        .go();
+    } finally {
+      queryBuilder()
+        .sql("DROP TABLE IF EXISTS %s", tableName)
+        .run();
+    }
+  }
+
+  @Test
+  public void testInsertSelectValues() throws Exception {
+    String tableName = "h2.tmp.drill_h2_test.`test_table`";
+    try {
+      String query = "CREATE TABLE %s (ID, NAME) AS (VALUES(1,2))";
+      // Create the table and insert the values
+      QuerySummary insertResults = queryBuilder()
+        .sql(query, tableName)
+        .run();
+      assertTrue(insertResults.succeeded());
+
+      String insertQuery = "INSERT INTO %s SELECT * FROM (VALUES(1,2), (3,4))";
+      queryBuilder()
+        .sql(insertQuery, tableName)
+        .planMatcher()
+        .include("Jdbc\\(sql=\\[INSERT INTO")
+        .match();
+
+      testBuilder()
+        .sqlQuery(insertQuery, tableName)
+        .unOrdered()
+        .baselineColumns("ROWCOUNT")
+        .baselineValues(2L)
+        .go();
+
+      testBuilder()
+        .sqlQuery("select * from %s", tableName)
+        .unOrdered()
+        .baselineColumns("ID", "NAME")
+        .baselineValues(1, 2)
+        .baselineValues(1, 2)
+        .baselineValues(3, 4)
+        .go();
+    } finally {
+      queryBuilder()
+        .sql("DROP TABLE IF EXISTS %s", tableName)
+        .run();
+    }
+  }
+
+  @Test
+  public void testInsertSelectFromJdbcTable() throws Exception {
+    String tableName = "h2.tmp.drill_h2_test.`test_table`";
+    try {
+      String query = "CREATE TABLE %s (ID, NAME) AS (VALUES(1,2), (3,4))";
+      // Create the table and insert the values
+      QuerySummary insertResults = queryBuilder()
+        .sql(query, tableName)
+        .run();
+      assertTrue(insertResults.succeeded());
+
+      String insertQuery = "INSERT INTO %s SELECT * FROM %s";
+      queryBuilder()
+        .sql(insertQuery, tableName, tableName)
+        .planMatcher()
+        .include("Jdbc\\(sql=\\[INSERT INTO")
+        .match();
+
+      testBuilder()
+        .sqlQuery(insertQuery, tableName, tableName)
+        .unOrdered()
+        .baselineColumns("ROWCOUNT")
+        .baselineValues(2L)
+        .go();
+
+      testBuilder()
+        .sqlQuery("select * from %s", tableName)
+        .unOrdered()
+        .baselineColumns("ID", "NAME")
+        .baselineValues(1, 2)
+        .baselineValues(3, 4)
+        .baselineValues(1, 2)
+        .baselineValues(3, 4)
+        .go();
+    } finally {
+      queryBuilder()
+        .sql("DROP TABLE IF EXISTS %s", tableName)
+        .run();
+    }
+  }
+
+  @Test
+  public void testInsertSelectFromNonJdbcTable() throws Exception {
+    String tableName = "h2.tmp.drill_h2_test.`test_table`";
+    try {
+      String query = "CREATE TABLE %s (ID, NAME) AS (VALUES(1,2))";
+      // Create the table and insert the values
+      QuerySummary insertResults = queryBuilder()
+        .sql(query, tableName)
+        .run();
+      assertTrue(insertResults.succeeded());
+
+      String insertQuery = "INSERT INTO %s SELECT n_nationkey, n_regionkey FROM cp.`tpch/nation.parquet` limit 3";
+      queryBuilder()
+        .sql(insertQuery, tableName, tableName)
+        .planMatcher()
+        .exclude("Jdbc\\(sql=\\[INSERT INTO") // insert cannot be pushed down
+        .match();
+
+      testBuilder()
+        .sqlQuery(insertQuery, tableName, tableName)
+        .unOrdered()
+        .baselineColumns("ROWCOUNT")
+        .baselineValues(3L)
+        .go();
+
+      testBuilder()
+        .sqlQuery("select * from %s", tableName)
+        .unOrdered()
+        .baselineColumns("ID", "NAME")
+        .baselineValues(1, 2)
+        .baselineValues(0, 0)
+        .baselineValues(1, 1)
+        .baselineValues(2, 1)
+        .go();
+    } finally {
+      queryBuilder()
+        .sql("DROP TABLE IF EXISTS %s", tableName)
+        .run();
+    }
+  }
+}
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcInsertWithMySQL.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcInsertWithMySQL.java
new file mode 100644
index 0000000000..dadcf0b5bb
--- /dev/null
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcInsertWithMySQL.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.drill.categories.JdbcStorageTest;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder.QuerySummary;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.JdbcDatabaseContainer;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.ext.ScriptUtils;
+import org.testcontainers.jdbc.JdbcDatabaseDelegate;
+import org.testcontainers.utility.DockerImageName;
+
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+
+@Category(JdbcStorageTest.class)
+public class TestJdbcInsertWithMySQL extends ClusterTest {
+  private static final String DOCKER_IMAGE_MYSQL = "mysql:5.7.27";
+  private static final String DOCKER_IMAGE_MARIADB = "mariadb:10.6.0";
+  private static final Logger logger = LoggerFactory.getLogger(TestJdbcInsertWithMySQL.class);
+  private static JdbcDatabaseContainer<?> jdbcContainer;
+
+  @BeforeClass
+  public static void initMysql() throws Exception {
+    startCluster(ClusterFixture.builder(dirTestWatcher));
+    dirTestWatcher.copyResourceToRoot(Paths.get(""));
+
+    TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
+    String osName = System.getProperty("os.name").toLowerCase();
+    String mysqlDBName = "drill_mysql_test";
+
+    DockerImageName imageName;
+    if (osName.startsWith("linux") && "aarch64".equals(System.getProperty("os.arch"))) {
+      imageName = DockerImageName.parse(DOCKER_IMAGE_MARIADB).asCompatibleSubstituteFor("mysql");
+    } else {
+      imageName = DockerImageName.parse(DOCKER_IMAGE_MYSQL);
+    }
+
+    jdbcContainer = new MySQLContainer<>(imageName)
+      .withExposedPorts(3306)
+      .withConfigurationOverride("mysql_config_override")
+      .withUsername("mysqlUser")
+      .withPassword("mysqlPass")
+      .withDatabaseName(mysqlDBName)
+      .withUrlParam("serverTimezone", "UTC")
+      .withUrlParam("useJDBCCompliantTimezoneShift", "true")
+      .withInitScript("mysql-test-data.sql");
+    jdbcContainer.start();
+
+    if (osName.startsWith("linux")) {
+      JdbcDatabaseDelegate databaseDelegate = new JdbcDatabaseDelegate(jdbcContainer, "");
+      ScriptUtils.runInitScript(databaseDelegate, "mysql-test-data-linux.sql");
+    }
+
+    Map<String, Object> sourceParameters =  new HashMap<>();
+    sourceParameters.put("maximumPoolSize", "1");
+    sourceParameters.put("idleTimeout", String.valueOf(TimeUnit.SECONDS.toMillis(5)));
+    sourceParameters.put("keepaliveTime", String.valueOf(TimeUnit.SECONDS.toMillis(5)));
+    sourceParameters.put("maxLifetime", String.valueOf(TimeUnit.SECONDS.toMillis(20)));
+    sourceParameters.put("minimumIdle", "0");
+
+    String jdbcUrl = jdbcContainer.getJdbcUrl();
+    logger.debug("JDBC URL: {}", jdbcUrl);
+    JdbcStorageConfig jdbcStorageConfig = new JdbcStorageConfig("com.mysql.cj.jdbc.Driver", jdbcUrl,
+      jdbcContainer.getUsername(), jdbcContainer.getPassword(), false, true, sourceParameters, null, AuthMode.SHARED_USER.name(), 10000);
+    jdbcStorageConfig.setEnabled(true);
+
+    cluster.defineStoragePlugin("mysql", jdbcStorageConfig);
+
+    JdbcStorageConfig jdbcStorageConfigNoWrite = new JdbcStorageConfig("com.mysql.cj.jdbc.Driver", jdbcUrl,
+      jdbcContainer.getUsername(), jdbcContainer.getPassword(), false, false, sourceParameters, null, AuthMode.SHARED_USER.name(), 10000);
+    jdbcStorageConfigNoWrite.setEnabled(true);
+
+    cluster.defineStoragePlugin("mysql_no_write", jdbcStorageConfigNoWrite);
+
+    if (osName.startsWith("linux")) {
+      // adds storage plugin with case insensitive table names
+      JdbcStorageConfig jdbcCaseSensitiveStorageConfig = new JdbcStorageConfig("com.mysql.cj.jdbc.Driver", jdbcUrl,
+        jdbcContainer.getUsername(), jdbcContainer.getPassword(), true, true, sourceParameters, null, AuthMode.SHARED_USER.name(), 10000);
+      jdbcCaseSensitiveStorageConfig.setEnabled(true);
+      cluster.defineStoragePlugin("mysqlCaseInsensitive", jdbcCaseSensitiveStorageConfig);
+    }
+  }
+
+  @Test
+  public void testInsertValues() throws Exception {
+    String tableName = "mysql.`drill_mysql_test`.`test_table`";
+    try {
+      String query = "CREATE TABLE %s (ID, NAME) AS (VALUES(1,2))";
+      // Create the table and insert the values
+      QuerySummary insertResults = queryBuilder()
+        .sql(query, tableName)
+        .run();
+      assertTrue(insertResults.succeeded());
+
+      String insertQuery = "insert into %s(ID, NAME) VALUES (3,4)";
+      queryBuilder()
+        .sql(insertQuery, tableName)
+        .planMatcher()
+        .include("Jdbc\\(sql=\\[INSERT INTO")
+        .match();
+
+      testBuilder()
+        .sqlQuery(insertQuery, tableName)
+        .unOrdered()
+        .baselineColumns("ROWCOUNT")
+        .baselineValues(1L)
+        .go();
+
+      testBuilder()
+        .sqlQuery("select * from %s", tableName)
+        .unOrdered()
+        .baselineColumns("ID", "NAME")
+        .baselineValues(1, 2)
+        .baselineValues(3, 4)
+        .go();
+    } finally {
+      queryBuilder()
+        .sql("DROP TABLE IF EXISTS %s", tableName)
+        .run();
+    }
+  }
+
+  @Test
+  public void testInsertSelectValues() throws Exception {
+    String tableName = "mysql.`drill_mysql_test`.`test_table`";
+    try {
+      String query = "CREATE TABLE %s (ID, NAME) AS (VALUES(1,2))";
+      // Create the table and insert the values
+      QuerySummary insertResults = queryBuilder()
+        .sql(query, tableName)
+        .run();
+      assertTrue(insertResults.succeeded());
+
+      String insertQuery = "INSERT INTO %s SELECT * FROM (VALUES(1,2), (3,4))";
+      queryBuilder()
+        .sql(insertQuery, tableName)
+        .planMatcher()
+        .include("Jdbc\\(sql=\\[INSERT INTO")
+        .match();
+
+      testBuilder()
+        .sqlQuery(insertQuery, tableName)
+        .unOrdered()
+        .baselineColumns("ROWCOUNT")
+        .baselineValues(2L)
+        .go();
+
+      testBuilder()
+        .sqlQuery("select * from %s", tableName)
+        .unOrdered()
+        .baselineColumns("ID", "NAME")
+        .baselineValues(1, 2)
+        .baselineValues(1, 2)
+        .baselineValues(3, 4)
+        .go();
+    } finally {
+      queryBuilder()
+        .sql("DROP TABLE IF EXISTS %s", tableName)
+        .run();
+    }
+  }
+
+  @Test
+  public void testInsertSelectFromJdbcTable() throws Exception {
+    String tableName = "mysql.`drill_mysql_test`.`test_table`";
+    try {
+      String query = "CREATE TABLE %s (ID, NAME) AS (VALUES(1,2), (3,4))";
+      // Create the table and insert the values
+      QuerySummary insertResults = queryBuilder()
+        .sql(query, tableName)
+        .run();
+      assertTrue(insertResults.succeeded());
+
+      String insertQuery = "INSERT INTO %s SELECT * FROM %s";
+      queryBuilder()
+        .sql(insertQuery, tableName, tableName)
+        .planMatcher()
+        .include("Jdbc\\(sql=\\[INSERT INTO")
+        .match();
+
+      testBuilder()
+        .sqlQuery(insertQuery, tableName, tableName)
+        .unOrdered()
+        .baselineColumns("ROWCOUNT")
+        .baselineValues(2L)
+        .go();
+
+      testBuilder()
+        .sqlQuery("select * from %s", tableName)
+        .unOrdered()
+        .baselineColumns("ID", "NAME")
+        .baselineValues(1, 2)
+        .baselineValues(3, 4)
+        .baselineValues(1, 2)
+        .baselineValues(3, 4)
+        .go();
+    } finally {
+      queryBuilder()
+        .sql("DROP TABLE IF EXISTS %s", tableName)
+        .run();
+    }
+  }
+
+  @Test
+  public void testInsertSelectFromNonJdbcTable() throws Exception {
+    String tableName = "mysql.`drill_mysql_test`.`test_table`";
+    try {
+      String query = "CREATE TABLE %s (ID, NAME) AS (VALUES(1,2))";
+      // Create the table and insert the values
+      QuerySummary insertResults = queryBuilder()
+        .sql(query, tableName)
+        .run();
+      assertTrue(insertResults.succeeded());
+
+      String insertQuery = "INSERT INTO %s SELECT n_nationkey, n_regionkey FROM cp.`tpch/nation.parquet` limit 3";
+      queryBuilder()
+        .sql(insertQuery, tableName, tableName)
+        .planMatcher()
+        .exclude("Jdbc\\(sql=\\[INSERT INTO") // insert cannot be pushed down
+        .match();
+
+      testBuilder()
+        .sqlQuery(insertQuery, tableName, tableName)
+        .unOrdered()
+        .baselineColumns("ROWCOUNT")
+        .baselineValues(3L)
+        .go();
+
+      testBuilder()
+        .sqlQuery("select * from %s", tableName)
+        .unOrdered()
+        .baselineColumns("ID", "NAME")
+        .baselineValues(1, 2)
+        .baselineValues(0, 0)
+        .baselineValues(1, 1)
+        .baselineValues(2, 1)
+        .go();
+    } finally {
+      queryBuilder()
+        .sql("DROP TABLE IF EXISTS %s", tableName)
+        .run();
+    }
+  }
+
+  @AfterClass
+  public static void stopMysql() {
+    if (jdbcContainer != null) {
+      jdbcContainer.stop();
+    }
+  }
+}
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcInsertWithPostgres.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcInsertWithPostgres.java
new file mode 100644
index 0000000000..fcb6659948
--- /dev/null
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcInsertWithPostgres.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.drill.categories.JdbcStorageTest;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder.QuerySummary;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.testcontainers.containers.JdbcDatabaseContainer;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+
+@Category(JdbcStorageTest.class)
+public class TestJdbcInsertWithPostgres extends ClusterTest {
+
+  private static final String DOCKER_IMAGE_POSTGRES_X86 = "postgres:12.8-alpine3.14";
+  private static JdbcDatabaseContainer<?> jdbcContainer;
+
+  @BeforeClass
+  public static void initPostgres() throws Exception {
+    startCluster(ClusterFixture.builder(dirTestWatcher));
+    dirTestWatcher.copyResourceToRoot(Paths.get(""));
+
+    String postgresDBName = "drill_postgres_test";
+    TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
+
+    DockerImageName imageName = DockerImageName.parse(DOCKER_IMAGE_POSTGRES_X86);
+    jdbcContainer = new PostgreSQLContainer<>(imageName)
+      .withUsername("postgres")
+      .withPassword("password")
+      .withDatabaseName(postgresDBName)
+      .withInitScript("postgres-test-data.sql");
+    jdbcContainer.start();
+
+    Map<String, Object> sourceParameters =  new HashMap<>();
+    sourceParameters.put("maximumPoolSize", "16");
+    sourceParameters.put("idleTimeout", String.valueOf(TimeUnit.SECONDS.toMillis(5)));
+    sourceParameters.put("keepaliveTime", String.valueOf(TimeUnit.SECONDS.toMillis(5)));
+    sourceParameters.put("maxLifetime", String.valueOf(TimeUnit.SECONDS.toMillis(20)));
+    sourceParameters.put("minimumIdle", "0");
+
+    JdbcStorageConfig jdbcStorageConfig =
+      new JdbcStorageConfig("org.postgresql.Driver",
+        jdbcContainer.getJdbcUrl(), jdbcContainer.getUsername(), jdbcContainer.getPassword(),
+        true, true, sourceParameters, null, AuthMode.SHARED_USER.name(), 10000);
+    jdbcStorageConfig.setEnabled(true);
+    cluster.defineStoragePlugin("pg", jdbcStorageConfig);
+
+    JdbcStorageConfig unWritableJdbcStorageConfig =
+      new JdbcStorageConfig("org.postgresql.Driver",
+        jdbcContainer.getJdbcUrl(), jdbcContainer.getUsername(), jdbcContainer.getPassword(),
+        true, false, sourceParameters, null, AuthMode.SHARED_USER.name(), 10000);
+    unWritableJdbcStorageConfig.setEnabled(true);
+    cluster.defineStoragePlugin("pg_unwritable", unWritableJdbcStorageConfig);
+
+  }
+
+  @AfterClass
+  public static void stopPostgres() {
+    if (jdbcContainer != null) {
+      jdbcContainer.stop();
+    }
+  }
+
+  @Test
+  public void testInsertValues() throws Exception {
+    String tableName = "`pg`.`public`.`test_table`";
+    try {
+      String query = "CREATE TABLE %s (ID, NAME) AS (VALUES(1,2))";
+      // Create the table and insert the values
+      QuerySummary insertResults = queryBuilder()
+        .sql(query, tableName)
+        .run();
+      assertTrue(insertResults.succeeded());
+
+      String insertQuery = "insert into %s(ID, NAME) VALUES (3,4)";
+      queryBuilder()
+        .sql(insertQuery, tableName)
+        .planMatcher()
+        .include("Jdbc\\(sql=\\[INSERT INTO")
+        .match();
+
+      testBuilder()
+        .sqlQuery(insertQuery, tableName)
+        .unOrdered()
+        .baselineColumns("ROWCOUNT")
+        .baselineValues(1L)
+        .go();
+
+      testBuilder()
+        .sqlQuery("select * from %s", tableName)
+        .unOrdered()
+        .baselineColumns("ID", "NAME")
+        .baselineValues(1, 2)
+        .baselineValues(3, 4)
+        .go();
+    } finally {
+      queryBuilder()
+        .sql("DROP TABLE IF EXISTS %s", tableName)
+        .run();
+    }
+  }
+
+  @Test
+  public void testInsertSelectValues() throws Exception {
+    String tableName = "`pg`.`public`.`test_table`";
+    try {
+      String query = "CREATE TABLE %s (ID, NAME) AS (VALUES(1,2))";
+      // Create the table and insert the values
+      QuerySummary insertResults = queryBuilder()
+        .sql(query, tableName)
+        .run();
+      assertTrue(insertResults.succeeded());
+
+      String insertQuery = "INSERT INTO %s SELECT * FROM (VALUES(1,2), (3,4))";
+      queryBuilder()
+        .sql(insertQuery, tableName)
+        .planMatcher()
+        .include("Jdbc\\(sql=\\[INSERT INTO")
+        .match();
+
+      testBuilder()
+        .sqlQuery(insertQuery, tableName)
+        .unOrdered()
+        .baselineColumns("ROWCOUNT")
+        .baselineValues(2L)
+        .go();
+
+      testBuilder()
+        .sqlQuery("select * from %s", tableName)
+        .unOrdered()
+        .baselineColumns("ID", "NAME")
+        .baselineValues(1, 2)
+        .baselineValues(1, 2)
+        .baselineValues(3, 4)
+        .go();
+    } finally {
+      queryBuilder()
+        .sql("DROP TABLE IF EXISTS %s", tableName)
+        .run();
+    }
+  }
+
+  @Test
+  public void testInsertSelectFromJdbcTable() throws Exception {
+    String tableName = "`pg`.`public`.`test_table`";
+    try {
+      String query = "CREATE TABLE %s (ID, NAME) AS (VALUES(1,2), (3,4))";
+      // Create the table and insert the values
+      QuerySummary insertResults = queryBuilder()
+        .sql(query, tableName)
+        .run();
+      assertTrue(insertResults.succeeded());
+
+      String insertQuery = "INSERT INTO %s SELECT * FROM %s";
+      queryBuilder()
+        .sql(insertQuery, tableName, tableName)
+        .planMatcher()
+        .include("Jdbc\\(sql=\\[INSERT INTO")
+        .match();
+
+      testBuilder()
+        .sqlQuery(insertQuery, tableName, tableName)
+        .unOrdered()
+        .baselineColumns("ROWCOUNT")
+        .baselineValues(2L)
+        .go();
+
+      testBuilder()
+        .sqlQuery("select * from %s", tableName)
+        .unOrdered()
+        .baselineColumns("ID", "NAME")
+        .baselineValues(1, 2)
+        .baselineValues(3, 4)
+        .baselineValues(1, 2)
+        .baselineValues(3, 4)
+        .go();
+    } finally {
+      queryBuilder()
+        .sql("DROP TABLE IF EXISTS %s", tableName)
+        .run();
+    }
+  }
+
+  @Test
+  public void testInsertSelectFromNonJdbcTable() throws Exception {
+    String tableName = "`pg`.`public`.`test_table`";
+    try {
+      String query = "CREATE TABLE %s (ID, NAME) AS (VALUES(1,2))";
+      // Create the table and insert the values
+      QuerySummary insertResults = queryBuilder()
+        .sql(query, tableName)
+        .run();
+      assertTrue(insertResults.succeeded());
+
+      String insertQuery = "INSERT INTO %s SELECT n_nationkey, n_regionkey FROM cp.`tpch/nation.parquet` limit 3";
+      queryBuilder()
+        .sql(insertQuery, tableName, tableName)
+        .planMatcher()
+        .exclude("Jdbc\\(sql=\\[INSERT INTO") // insert cannot be pushed down
+        .match();
+
+      testBuilder()
+        .sqlQuery(insertQuery, tableName, tableName)
+        .unOrdered()
+        .baselineColumns("ROWCOUNT")
+        .baselineValues(3L)
+        .go();
+
+      testBuilder()
+        .sqlQuery("select * from %s", tableName)
+        .unOrdered()
+        .baselineColumns("ID", "NAME")
+        .baselineValues(1, 2)
+        .baselineValues(0, 0)
+        .baselineValues(1, 1)
+        .baselineValues(2, 1)
+        .go();
+    } finally {
+      queryBuilder()
+        .sql("DROP TABLE IF EXISTS %s", tableName)
+        .run();
+    }
+  }
+}
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithH2.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithH2.java
index 0335e8a58b..b0ce8caa08 100644
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithH2.java
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithH2.java
@@ -35,6 +35,8 @@ import org.apache.drill.test.QueryBuilder.QuerySummary;
 import org.apache.drill.test.rowSet.RowSetUtilities;
 import org.apache.hadoop.fs.Path;
 import org.h2.tools.RunScript;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -159,15 +161,15 @@ public class TestJdbcWriterWithH2 extends ClusterTest {
         .addNullable("bigint_field", MinorType.BIGINT, 38)
         .addNullable("float4_field", MinorType.FLOAT4, 38)
         .addNullable("float8_field", MinorType.FLOAT8, 38)
-        .addNullable("varchar_field", MinorType.VARCHAR, 38)
+        .addNullable("varchar_field", MinorType.VARCHAR, 3)
         .addNullable("date_field", MinorType.DATE, 10)
-        .addNullable("time_field", MinorType.TIME, 8)
-        .addNullable("timestamp_field", MinorType.TIMESTAMP, 26, 6)
+        .addNullable("time_field", MinorType.TIME, 12, 3)
+        .addNullable("timestamp_field", MinorType.TIMESTAMP, 23, 3)
         .addNullable("boolean_field", MinorType.BIT, 1)
         .buildSchema();
 
       RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-        .addRow(1, 2L, 3.0, 4.0, "5.0", LocalDate.parse("2021-01-01"), LocalTime.parse("12:00"), 1451516155000L, true)
+        .addRow(1, 2L, 3.0, 4.0, "5.0", LocalDate.parse("2021-01-01"), LocalTime.parse("12:00"), 1451516155230L, true)
         .build();
 
       RowSetUtilities.verify(expected, results);
@@ -388,7 +390,8 @@ public class TestJdbcWriterWithH2 extends ClusterTest {
       queryBuilder().sql(sql).run();
       fail();
     } catch (UserRemoteException e) {
-      assertTrue(e.getMessage().contains("DATA_WRITE ERROR: Drill does not yet support writing arrays to JDBC. `repeated_field` is an array."));
+      MatcherAssert.assertThat(e.getMessage(),
+        CoreMatchers.containsString("DATA_WRITE ERROR: Drill does not yet support writing arrays to JDBC. repeated_field is an array."));
     }
   }
 
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithMySQL.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithMySQL.java
index 4b6b38d0d0..95158989a1 100644
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithMySQL.java
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithMySQL.java
@@ -33,6 +33,8 @@ import org.apache.drill.test.ClusterTest;
 import org.apache.drill.test.QueryBuilder.QuerySummary;
 import org.apache.drill.test.rowSet.RowSetUtilities;
 import org.apache.hadoop.fs.Path;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
@@ -59,11 +61,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-/**
- * JDBC storage plugin tests against MySQL.
- * Note: it requires libaio1.so library on Linux
- */
-
 @Category(JdbcStorageTest.class)
 public class TestJdbcWriterWithMySQL extends ClusterTest {
   private static final String DOCKER_IMAGE_MYSQL = "mysql:5.7.27";
@@ -283,15 +280,15 @@ public class TestJdbcWriterWithMySQL extends ClusterTest {
       .addNullable("bigint_field", MinorType.BIGINT, 19)
       .addNullable("float4_field", MinorType.FLOAT8, 12)
       .addNullable("float8_field", MinorType.FLOAT8, 22)
-      .addNullable("varchar_field", MinorType.VARCHAR, 38)
+      .addNullable("varchar_field", MinorType.VARCHAR, 3)
       .addNullable("date_field", MinorType.DATE, 10)
-      .addNullable("time_field", MinorType.TIME, 10)
-      .addNullable("timestamp_field", MinorType.TIMESTAMP, 19)
-      .addNullable("boolean_field", MinorType.BIT)
+      .addNullable("time_field", MinorType.TIME, 14)
+      .addNullable("timestamp_field", MinorType.TIMESTAMP, 23)
+      .addNullable("boolean_field", MinorType.BIT, 1)
       .buildSchema();
 
     RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-      .addRow(1, 2L, 3.0, 4.0, "5.0", LocalDate.parse("2021-01-01"), LocalTime.parse("12:00"), 1451516155000L, true)
+      .addRow(1, 2L, 3.0, 4.0, "5.0", LocalDate.parse("2021-01-01"), LocalTime.parse("12:00"), 1451516155230L, true)
       .build();
 
     RowSetUtilities.verify(expected, results);
@@ -435,7 +432,8 @@ public class TestJdbcWriterWithMySQL extends ClusterTest {
       queryBuilder().sql(sql).run();
       fail();
     } catch (UserRemoteException e) {
-      assertTrue(e.getMessage().contains("DATA_WRITE ERROR: Drill does not support writing complex fields to JDBC data sources."));
+      MatcherAssert.assertThat(e.getMessage(),
+        CoreMatchers.containsString("DATA_WRITE ERROR: Drill does not support writing complex fields to JDBC data sources."));
     }
   }
 
@@ -447,7 +445,8 @@ public class TestJdbcWriterWithMySQL extends ClusterTest {
       queryBuilder().sql(sql).run();
       fail();
     } catch (UserRemoteException e) {
-      assertTrue(e.getMessage().contains("DATA_WRITE ERROR: Drill does not yet support writing arrays to JDBC. `repeated_field` is an array."));
+      MatcherAssert.assertThat(e.getMessage(),
+        CoreMatchers.containsString("DATA_WRITE ERROR: Drill does not yet support writing arrays to JDBC. repeated_field is an array."));
     }
   }
 
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithPostgres.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithPostgres.java
index 8c30691976..43ca9a9928 100644
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithPostgres.java
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithPostgres.java
@@ -33,6 +33,8 @@ import org.apache.drill.test.ClusterTest;
 import org.apache.drill.test.QueryBuilder.QuerySummary;
 import org.apache.drill.test.rowSet.RowSetUtilities;
 import org.apache.hadoop.fs.Path;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
@@ -185,15 +187,15 @@ public class TestJdbcWriterWithPostgres extends ClusterTest {
       .addNullable("bigint_field", MinorType.BIGINT, 19)
       .addNullable("float4_field", MinorType.FLOAT8, 17, 17)
       .addNullable("float8_field", MinorType.FLOAT8, 17, 17)
-      .addNullable("varchar_field", MinorType.VARCHAR, 38)
-      .addNullable("date_field", MinorType.DATE, 10)
-      .addNullable("time_field", MinorType.TIME, 10)
-      .addNullable("timestamp_field", MinorType.TIMESTAMP, 19)
-      .addNullable("boolean_field", MinorType.BIT)
+      .addNullable("varchar_field", MinorType.VARCHAR, 3)
+      .addNullable("date_field", MinorType.DATE, 13)
+      .addNullable("time_field", MinorType.TIME, 12, 3)
+      .addNullable("timestamp_field", MinorType.TIMESTAMP, 26, 3)
+      .addNullable("boolean_field", MinorType.BIT, 1)
       .buildSchema();
 
     RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-      .addRow(1, 2L, 3.0, 4.0, "5.0", LocalDate.parse("2021-01-01"), LocalTime.parse("12:00"), 1451516155000L, true)
+      .addRow(1, 2L, 3.0, 4.0, "5.0", LocalDate.parse("2021-01-01"), LocalTime.parse("12:00"), 1451516155230L, true)
       .build();
 
     RowSetUtilities.verify(expected, results);
@@ -459,7 +461,8 @@ public class TestJdbcWriterWithPostgres extends ClusterTest {
       queryBuilder().sql(sql).run();
       fail();
     } catch (UserRemoteException e) {
-      assertTrue(e.getMessage().contains("DATA_WRITE ERROR: Drill does not yet support writing arrays to JDBC. `repeated_field` is an array."));
+      MatcherAssert.assertThat(e.getMessage(),
+        CoreMatchers.containsString("DATA_WRITE ERROR: Drill does not yet support writing arrays to JDBC. repeated_field is an array."));
     }
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 57ba2da6ba..fadc8a348d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -68,7 +68,7 @@ import io.netty.buffer.DrillBuf;
 // in fragment contexts
 public class QueryContext implements AutoCloseable, OptimizerRulesContext, SchemaConfigInfoProvider {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class);
-  public enum SqlStatementType {OTHER, ANALYZE, CTAS, EXPLAIN, DESCRIBE_TABLE, DESCRIBE_SCHEMA, REFRESH, SELECT, SETOPTION};
+  public enum SqlStatementType {OTHER, ANALYZE, CTAS, EXPLAIN, DESCRIBE_TABLE, DESCRIBE_SCHEMA, REFRESH, SELECT, SETOPTION, INSERT};
 
   private final DrillbitContext drillbitContext;
   private final UserSession session;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
index be6eae8b2d..ea2221abaf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
@@ -38,6 +38,7 @@ import org.apache.drill.exec.physical.config.Sort;
 import org.apache.drill.exec.physical.config.StatisticsAggregate;
 import org.apache.drill.exec.physical.config.StatisticsMerge;
 import org.apache.drill.exec.physical.config.StreamingAggregate;
+import org.apache.drill.exec.physical.config.TableModify;
 import org.apache.drill.exec.physical.config.Trace;
 import org.apache.drill.exec.physical.config.UnionAll;
 import org.apache.drill.exec.physical.config.UnnestPOP;
@@ -230,6 +231,11 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme
     return visitOp(op, value);
   }
 
+  @Override
+  public T visitTableModify(TableModify op, X value) throws E {
+    return visitOp(op, value);
+  }
+
   @Override
   public T visitOp(PhysicalOperator op, X value) throws E {
     throw new UnsupportedOperationException(String.format(
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
index 1a63538edd..d1b1479bb7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
@@ -38,6 +38,7 @@ import org.apache.drill.exec.physical.config.Sort;
 import org.apache.drill.exec.physical.config.StatisticsAggregate;
 import org.apache.drill.exec.physical.config.StatisticsMerge;
 import org.apache.drill.exec.physical.config.StreamingAggregate;
+import org.apache.drill.exec.physical.config.TableModify;
 import org.apache.drill.exec.physical.config.Trace;
 import org.apache.drill.exec.physical.config.UnionAll;
 import org.apache.drill.exec.physical.config.UnnestPOP;
@@ -94,4 +95,5 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> {
   public RETURN visitLateralJoin(LateralJoinPOP lateralJoinPOP, EXTRA value) throws EXCEP;
 
   public RETURN visitIteratorValidator(IteratorValidator op, EXTRA value) throws EXCEP;
+  public RETURN visitTableModify(TableModify op, EXTRA value) throws EXCEP;
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/TableModify.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/TableModify.java
new file mode 100644
index 0000000000..d900075e5d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/TableModify.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.config;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.exec.physical.base.AbstractSingle;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.record.BatchSchema;
+
+@JsonTypeName("table_modify")
+public class TableModify extends AbstractSingle {
+  public static final String OPERATOR_TYPE = "TABLE_MODIFY";
+
+  @JsonCreator
+  public TableModify(@JsonProperty("child") PhysicalOperator child) {
+    super(child);
+  }
+
+  @Override
+  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+    return new TableModify(child);
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+    return physicalVisitor.visitTableModify(this, value);
+  }
+
+  @Override
+  public BatchSchema.SelectionVectorMode getSVMode() {
+    return child.getSVMode();
+  }
+
+  @Override
+  public String getOperatorType() {
+    return OPERATOR_TYPE;
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/InsertWriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/InsertWriterRecordBatch.java
new file mode 100644
index 0000000000..5e9a7efaaf
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/InsertWriterRecordBatch.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.Writer;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordWriter;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.BigIntVector;
+
+public class InsertWriterRecordBatch extends WriterRecordBatch {
+
+  public InsertWriterRecordBatch(Writer writer, RecordBatch incoming,
+    FragmentContext context, RecordWriter recordWriter) throws OutOfMemoryException {
+    super(writer, incoming, context, recordWriter);
+  }
+
+  @Override
+  protected void addOutputContainerData() {
+    BigIntVector rowcountVector = (BigIntVector) container.getValueAccessorById(BigIntVector.class,
+        container.getValueVectorId(SchemaPath.getSimplePath("ROWCOUNT")).getFieldIds())
+      .getValueVector();
+    AllocationHelper.allocate(rowcountVector, 1, 8);
+    rowcountVector.getMutator().setSafe(0, counter);
+    rowcountVector.getMutator().setValueCount(1);
+
+    container.setRecordCount(1);
+  }
+
+  protected void addOutputSchema() {
+    // Create vector for ROWCOUNT - number of records written.
+    final MaterializedField rowcountField =
+      MaterializedField.create("ROWCOUNT",
+        Types.required(TypeProtos.MinorType.BIGINT));
+
+    container.addOrGet(rowcountField);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
index 3c2ba083dc..307d5b269c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -47,7 +47,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
 
   private EventBasedRecordWriter eventBasedRecordWriter;
   private RecordWriter recordWriter;
-  private long counter;
+  protected long counter;
   private final RecordBatch incoming;
   private boolean processed;
   private final String fragmentUniqueId;
@@ -134,7 +134,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
     return IterOutcome.OK_NEW_SCHEMA;
   }
 
-  private void addOutputContainerData() {
+  protected void addOutputContainerData() {
     final VarCharVector fragmentIdVector = (VarCharVector) container.getValueAccessorById(
         VarCharVector.class,
         container.getValueVectorId(SchemaPath.getSimplePath("Fragment")).getFieldIds())
@@ -163,17 +163,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
           .addContext("Failure updating record writer schema")
           .build(logger);
       }
-      // Create two vectors for:
-      //   1. Fragment unique id.
-      //   2. Summary: currently contains number of records written.
-      final MaterializedField fragmentIdField =
-          MaterializedField.create("Fragment", Types.required(MinorType.VARCHAR));
-      final MaterializedField summaryField =
-          MaterializedField.create("Number of records written",
-              Types.required(MinorType.BIGINT));
-
-      container.addOrGet(fragmentIdField);
-      container.addOrGet(summaryField);
+      addOutputSchema();
       container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
     } finally {
       stats.stopSetup();
@@ -190,6 +180,20 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
     schema = container.getSchema();
   }
 
+  protected void addOutputSchema() {
+    // Create two vectors for:
+    //   1. Fragment unique id.
+    //   2. Summary: currently contains number of records written.
+    final MaterializedField fragmentIdField =
+        MaterializedField.create("Fragment", Types.required(MinorType.VARCHAR));
+    final MaterializedField summaryField =
+        MaterializedField.create("Number of records written",
+            Types.required(MinorType.BIGINT));
+
+    container.addOrGet(fragmentIdField);
+    container.addOrGet(summaryField);
+  }
+
   /** Clean up needs to be performed before closing writer. Partially written data will be removed. */
   private void closeWriter() {
     if (recordWriter == null) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
index 524400dfc3..a0f8024fea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
@@ -20,9 +20,11 @@ package org.apache.drill.exec.planner;
 import org.apache.drill.exec.planner.logical.ConvertMetadataAggregateToDirectScanRule;
 import org.apache.drill.exec.planner.logical.DrillDistinctJoinToSemiJoinRule;
 import org.apache.drill.exec.planner.logical.DrillReduceExpressionsRule;
+import org.apache.drill.exec.planner.logical.DrillTableModifyRule;
 import org.apache.drill.exec.planner.physical.MetadataAggPrule;
 import org.apache.drill.exec.planner.physical.MetadataControllerPrule;
 import org.apache.drill.exec.planner.physical.MetadataHandlerPrule;
+import org.apache.drill.exec.planner.physical.TableModifyPrule;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet.Builder;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@@ -370,7 +372,8 @@ public enum PlannerPhase {
       DrillUnionAllRule.INSTANCE,
       DrillValuesRule.INSTANCE,
       DrillUnnestRule.INSTANCE,
-      DrillCorrelateRule.INSTANCE
+      DrillCorrelateRule.INSTANCE,
+      DrillTableModifyRule.INSTANCE
       ).build();
 
   /**
@@ -552,6 +555,7 @@ public enum PlannerPhase {
 
     ruleList.add(DrillPushLimitToScanRule.LIMIT_ON_PROJECT);
     ruleList.add(DrillPushLimitToScanRule.LIMIT_ON_SCAN);
+    ruleList.add(TableModifyPrule.INSTANCE);
 
     if (ps.isHashAggEnabled()) {
       ruleList.add(HashAggPrule.INSTANCE);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
index adfddd5a51..bf1fd6f789 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
@@ -690,7 +690,7 @@ public abstract class DrillRelOptUtil {
     }
   }
 
-  public static DrillTable getDrillTable(final TableScan scan) {
+  public static DrillTable getDrillTable(RelNode scan) {
     DrillTable drillTable = scan.getTable().unwrap(DrillTable.class);
     if (drillTable == null) {
       DrillTranslatableTable transTable = scan.getTable().unwrap(DrillTranslatableTable.class);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java
index c1bcf68616..ec8899a839 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java
@@ -188,7 +188,7 @@ public class NonCoveringIndexPlanGenerator extends AbstractIndexPlanGenerator {
     if (indexDesc.getCollation() != null &&
          !settings.isIndexForceSortNonCovering()) {
       collation = IndexPlanUtils.buildCollationNonCoveringIndexScan(indexDesc, indexScanRowType, dbscanRowType, indexContext);
-      if (restrictedScanTraitSet.contains(RelCollationTraitDef.INSTANCE)) { // replace existing trait
+      if (restrictedScanTraitSet.getTrait(RelCollationTraitDef.INSTANCE) != null) { // replace existing trait
         restrictedScanTraitSet = restrictedScanTraitSet.plus(partition).replace(collation);
       } else {  // add new one
         restrictedScanTraitSet = restrictedScanTraitSet.plus(partition).plus(collation);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTableModify.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTableModify.java
new file mode 100644
index 0000000000..d86b808f5b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTableModify.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.common.logical.data.InsertWriter;
+import org.apache.drill.common.logical.data.LogicalOperator;
+
+import java.util.List;
+
+public class DrillTableModify extends TableModify implements DrillRel {
+
+  protected DrillTableModify(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table,
+    Prepare.CatalogReader catalogReader, RelNode input, Operation operation,
+    List<String> updateColumnList, List<RexNode> sourceExpressionList, boolean flattened) {
+    super(cluster, traitSet, table, catalogReader, input, operation, updateColumnList,
+      sourceExpressionList, flattened);
+  }
+
+  @Override
+  public LogicalOperator implement(DrillImplementor implementor) {
+    LogicalOperator childOp = implementor.visitChild(this, 0, getInput());
+
+    InsertWriter logicalOperators = new InsertWriter(getTable());
+    logicalOperators.setInput(childOp);
+    return logicalOperators;
+  }
+
+  @Override
+  public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+    return new DrillTableModify(getCluster(), traitSet, getTable(), getCatalogReader(),
+      inputs.get(0), getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened());
+  }
+
+  @Override
+  public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+    double rowCount = mq.getRowCount(this);
+    double inputRowCount = mq.getRowCount(getInput());
+    double dIo = inputRowCount + 1; // ensure non-zero cost
+    return planner.getCostFactory().makeCost(rowCount, 0, dIo).multiplyBy(10);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTableModifyRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTableModifyRule.java
new file mode 100644
index 0000000000..7a177c2e1c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTableModifyRule.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+
+public class DrillTableModifyRule extends RelOptRule {
+  public static final RelOptRule INSTANCE = new DrillTableModifyRule();
+
+  private final RelTrait out;
+
+  private DrillTableModifyRule() {
+    super(RelOptHelper.any(LogicalTableModify.class, Convention.NONE),
+      DrillRelFactories.LOGICAL_BUILDER, "DrillTableModifyRule");
+    this.out = DrillRel.DRILL_LOGICAL;
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    TableModify tableModify = call.rel(0);
+    RelNode input = tableModify.getInput();
+    RelTraitSet traits = tableModify.getTraitSet().plus(out);
+    RelNode convertedInput = convert(input, input.getTraitSet().plus(out).simplify());
+
+    call.transformTo(new DrillTableModify(
+      tableModify.getCluster(), traits, tableModify.getTable(),
+      tableModify.getCatalogReader(), convertedInput, tableModify.getOperation(),
+      tableModify.getUpdateColumnList(), tableModify.getSourceExpressionList(), tableModify.isFlattened()));
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ModifyTableEntry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ModifyTableEntry.java
new file mode 100644
index 0000000000..1f77c79e13
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ModifyTableEntry.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Writer;
+
+import java.io.IOException;
+
+public interface ModifyTableEntry {
+
+  Writer getWriter(PhysicalOperator child) throws IOException;
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrule.java
index 937c2e5837..6d9fbc3971 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrule.java
@@ -36,7 +36,7 @@ public class ScreenPrule extends Prule{
 
   @Override
   public void onMatch(RelOptRuleCall call) {
-    final DrillScreenRelBase screen = (DrillScreenRelBase) call.rel(0);
+    final DrillScreenRelBase screen = call.rel(0);
     final RelNode input = call.rel(1);
 
     final RelTraitSet traits = input.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON);
@@ -44,6 +44,4 @@ public class ScreenPrule extends Prule{
     DrillScreenRelBase newScreen = new ScreenPrel(screen.getCluster(), screen.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), convertedInput);
     call.transformTo(newScreen);
   }
-
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TableModifyPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TableModifyPrel.java
new file mode 100644
index 0000000000..8e7e917641
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TableModifyPrel.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.logical.ModifyTableEntry;
+import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
+import org.apache.drill.exec.planner.sql.SchemaUtilites;
+import org.apache.drill.exec.record.BatchSchema;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+public class TableModifyPrel extends TableModify implements Prel {
+
+  protected TableModifyPrel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table,
+    Prepare.CatalogReader catalogReader, RelNode input, Operation operation,
+    List<String> updateColumnList, List<RexNode> sourceExpressionList, boolean flattened) {
+    super(cluster, traitSet, table, catalogReader, input, operation, updateColumnList,
+      sourceExpressionList, flattened);
+  }
+
+  @Override
+  public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+    return new TableModifyPrel(getCluster(), traitSet, getTable(), getCatalogReader(),
+      inputs.get(0), getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened());
+  }
+
+  @Override
+  public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
+    Prel child = (Prel) this.getInput();
+
+    List<String> tablePath = getTable().getQualifiedName();
+    List<String> schemaPath = tablePath.size() > 1
+      ? tablePath.subList(0, tablePath.size() - 1)
+      : Collections.emptyList();
+    String tableName = tablePath.get(tablePath.size() - 1);
+    SchemaPlus schema = ((CalciteCatalogReader) getTable().getRelOptSchema()).getRootSchema().plus();
+
+    ModifyTableEntry modifyTableEntry = SchemaUtilites.resolveToDrillSchema(schema, schemaPath)
+      .modifyTable(tableName);
+
+    PhysicalOperator childPOP = child.getPhysicalOperator(creator);
+
+    PhysicalOperator p = modifyTableEntry.getWriter(childPOP);
+    return creator.addMetadata(this, p);
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
+    return logicalVisitor.visitTableModify(this, value);
+  }
+
+  @Override
+  public BatchSchema.SelectionVectorMode[] getSupportedEncodings() {
+    return BatchSchema.SelectionVectorMode.DEFAULT;
+  }
+
+  @Override
+  public BatchSchema.SelectionVectorMode getEncoding() {
+    return BatchSchema.SelectionVectorMode.NONE;
+  }
+
+  @Override
+  public boolean needsFinalColumnReordering() {
+    return false;
+  }
+
+  @NotNull
+  @Override
+  public Iterator<Prel> iterator() {
+    return PrelUtil.iter(getInput());
+  }
+
+  @Override
+  public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+    double rowCount = mq.getRowCount(this);
+    double inputRowCount = mq.getRowCount(getInput());
+    double dIo = inputRowCount + 1; // ensure non-zero cost
+    return planner.getCostFactory().makeCost(rowCount, 0, dIo).multiplyBy(10);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TableModifyPrule.java
similarity index 55%
copy from exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrule.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TableModifyPrule.java
index 937c2e5837..2b73d21728 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TableModifyPrule.java
@@ -17,33 +17,31 @@
  */
 package org.apache.drill.exec.planner.physical;
 
-import org.apache.drill.exec.planner.common.DrillScreenRelBase;
-import org.apache.drill.exec.planner.logical.DrillRel;
-import org.apache.drill.exec.planner.logical.DrillScreenRel;
-import org.apache.drill.exec.planner.logical.RelOptHelper;
-import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.drill.exec.planner.logical.DrillTableModify;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
 
-public class ScreenPrule extends Prule{
-  public static final RelOptRule INSTANCE = new ScreenPrule();
-
+public class TableModifyPrule extends Prule {
+  public static final RelOptRule INSTANCE = new TableModifyPrule();
 
-  public ScreenPrule() {
-    super(RelOptHelper.some(DrillScreenRel.class, DrillRel.DRILL_LOGICAL, RelOptHelper.any(RelNode.class)), "Prel.ScreenPrule");
+  private TableModifyPrule() {
+    super(RelOptHelper.some(DrillTableModify.class, RelOptHelper.any(RelNode.class)), "TableModifyPrule");
   }
 
   @Override
   public void onMatch(RelOptRuleCall call) {
-    final DrillScreenRelBase screen = (DrillScreenRelBase) call.rel(0);
-    final RelNode input = call.rel(1);
-
-    final RelTraitSet traits = input.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON);
-    final RelNode convertedInput = convert(input, traits);
-    DrillScreenRelBase newScreen = new ScreenPrel(screen.getCluster(), screen.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), convertedInput);
-    call.transformTo(newScreen);
-  }
+    DrillTableModify tableModify = call.rel(0);
+    RelNode input = tableModify.getInput();
 
+    RelTraitSet traits = input.getTraitSet().plus(Prel.DRILL_PHYSICAL);
+    RelNode convertedInput = convert(input, traits);
 
+    call.transformTo(new TableModifyPrel(
+      tableModify.getCluster(), traits, tableModify.getTable(),
+      tableModify.getCatalogReader(), convertedInput, tableModify.getOperation(),
+      tableModify.getUpdateColumnList(), tableModify.getSourceExpressionList(), tableModify.isFlattened()));
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/BasePrelVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/BasePrelVisitor.java
index dd11d03cca..f268da3ffc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/BasePrelVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/BasePrelVisitor.java
@@ -25,6 +25,7 @@ import org.apache.drill.exec.planner.physical.Prel;
 import org.apache.drill.exec.planner.physical.ProjectPrel;
 import org.apache.drill.exec.planner.physical.ScanPrel;
 import org.apache.drill.exec.planner.physical.ScreenPrel;
+import org.apache.drill.exec.planner.physical.TableModifyPrel;
 import org.apache.drill.exec.planner.physical.WriterPrel;
 import org.apache.drill.exec.planner.physical.UnnestPrel;
 import org.apache.drill.exec.planner.physical.LateralJoinPrel;
@@ -86,4 +87,9 @@ public class BasePrelVisitor<RETURN, EXTRA, EXCEP extends Throwable> implements
   public RETURN visitLeaf(LeafPrel prel, EXTRA value) throws EXCEP {
     return visitPrel(prel, value);
   }
+
+  @Override
+  public RETURN visitTableModify(TableModifyPrel prel, EXTRA value) throws EXCEP {
+    return visitPrel(prel, value);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java
index a97c10f507..b956b0b35a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java
@@ -25,6 +25,7 @@ import org.apache.drill.exec.planner.physical.Prel;
 import org.apache.drill.exec.planner.physical.ProjectPrel;
 import org.apache.drill.exec.planner.physical.ScanPrel;
 import org.apache.drill.exec.planner.physical.ScreenPrel;
+import org.apache.drill.exec.planner.physical.TableModifyPrel;
 import org.apache.drill.exec.planner.physical.WriterPrel;
 import org.apache.drill.exec.planner.physical.UnnestPrel;
 import org.apache.drill.exec.planner.physical.LateralJoinPrel;
@@ -44,4 +45,5 @@ public interface PrelVisitor<RETURN, EXTRA, EXCEP extends Throwable> {
   RETURN visitUnnest(UnnestPrel prel, EXTRA value) throws EXCEP;
   RETURN visitLateral(LateralJoinPrel prel, EXTRA value) throws EXCEP;
   RETURN visitLeaf(LeafPrel prel, EXTRA value) throws EXCEP;
+  RETURN visitTableModify(TableModifyPrel prel, EXTRA value) throws EXCEP;
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisualizerVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisualizerVisitor.java
index 3dfc030f2d..fa90c5fff8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisualizerVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisualizerVisitor.java
@@ -25,6 +25,7 @@ import org.apache.drill.exec.planner.physical.Prel;
 import org.apache.drill.exec.planner.physical.ProjectPrel;
 import org.apache.drill.exec.planner.physical.ScanPrel;
 import org.apache.drill.exec.planner.physical.ScreenPrel;
+import org.apache.drill.exec.planner.physical.TableModifyPrel;
 import org.apache.drill.exec.planner.physical.WriterPrel;
 import org.apache.drill.exec.planner.physical.UnnestPrel;
 import org.apache.drill.exec.planner.physical.LateralJoinPrel;
@@ -253,4 +254,11 @@ public class PrelVisualizerVisitor
     visitPrel(prel, value);
     return null;
   }
+
+  @Override
+  public Void visitTableModify(TableModifyPrel prel, VisualizationState value) throws Exception {
+    visitBasePrel(prel, value);
+    endNode(prel, value);
+    return null;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
index e71ff894a5..1d75e6c0aa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
@@ -41,6 +41,7 @@ import org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler;
 import org.apache.drill.exec.planner.sql.handlers.DescribeSchemaHandler;
 import org.apache.drill.exec.planner.sql.handlers.DescribeTableHandler;
 import org.apache.drill.exec.planner.sql.handlers.ExplainHandler;
+import org.apache.drill.exec.planner.sql.handlers.InsertHandler;
 import org.apache.drill.exec.planner.sql.handlers.MetastoreAnalyzeTableHandler;
 import org.apache.drill.exec.planner.sql.handlers.RefreshMetadataHandler;
 import org.apache.drill.exec.planner.sql.handlers.ResetOptionHandler;
@@ -50,7 +51,6 @@ import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig;
 import org.apache.drill.exec.planner.sql.parser.DrillSqlCall;
 import org.apache.drill.exec.planner.sql.parser.DrillSqlDescribeTable;
 import org.apache.drill.exec.planner.sql.parser.DrillSqlResetOption;
-import org.apache.drill.exec.planner.sql.parser.DrillSqlSetOption;
 import org.apache.drill.exec.planner.sql.parser.SqlSchema;
 import org.apache.drill.exec.planner.sql.conversion.SqlConverter;
 import org.apache.drill.exec.testing.ControlsInjector;
@@ -215,16 +215,11 @@ public class DrillSqlWorker {
         context.setSQLStatementType(SqlStatementType.EXPLAIN);
         break;
       case SET_OPTION:
-        if (sqlNode instanceof DrillSqlSetOption) {
-          handler = new SetOptionHandler(context);
-          context.setSQLStatementType(SqlStatementType.SETOPTION);
-          break;
-        }
-        if (sqlNode instanceof DrillSqlResetOption) {
-          handler = new ResetOptionHandler(context);
-          context.setSQLStatementType(SqlStatementType.SETOPTION);
-          break;
-        }
+        handler = sqlNode instanceof DrillSqlResetOption
+          ? new ResetOptionHandler(context)
+          : new SetOptionHandler(context);
+        context.setSQLStatementType(SqlStatementType.SETOPTION);
+        break;
       case DESCRIBE_TABLE:
         if (sqlNode instanceof DrillSqlDescribeTable) {
           handler = new DescribeTableHandler(config);
@@ -246,6 +241,10 @@ public class DrillSqlWorker {
         handler = ((DrillSqlCall) sqlNode).getSqlHandler(config, textPlan);
         context.setSQLStatementType(SqlStatementType.CTAS);
         break;
+      case INSERT:
+        handler = new InsertHandler(config, textPlan);
+        context.setSQLStatementType(SqlStatementType.INSERT);
+        break;
       case SELECT:
         handler = new DefaultSqlHandler(config, textPlan);
         context.setSQLStatementType(SqlStatementType.SELECT);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/InsertHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/InsertHandler.java
new file mode 100644
index 0000000000..c3d23e5b9f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/InsertHandler.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql.handlers;
+
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.calcite.tools.ValidationException;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.planner.sql.SchemaUtilites;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.util.Pointer;
+import org.apache.drill.exec.work.foreman.ForemanSetupException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Constructs plan to be executed for inserting data into the table.
+ */
+public class InsertHandler extends DefaultSqlHandler {
+  private static final Logger logger = LoggerFactory.getLogger(InsertHandler.class);
+
+  public InsertHandler(SqlHandlerConfig config, Pointer<String> textPlan) {
+    super(config, textPlan);
+  }
+
+  protected ConvertedRelNode validateAndConvert(SqlNode sqlNode)
+    throws ForemanSetupException, RelConversionException, ValidationException {
+    ConvertedRelNode convertedRelNode = super.validateAndConvert(sqlNode);
+
+    String storageName = SchemaUtilites.getSchemaPathAsList(
+      convertedRelNode.getConvertedNode().getTable().getQualifiedName().iterator().next()).iterator().next();
+    try {
+      if (!context.getStorage().getPlugin(storageName).supportsInsert()) {
+        throw UserException.validationError()
+          .message("Storage plugin [%s] is immutable or doesn't support inserts", storageName)
+          .build(logger);
+      }
+    } catch (StoragePluginRegistry.PluginException e) {
+      throw new DrillRuntimeException(e);
+    }
+
+    return convertedRelNode;
+  }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerConfig.java
index c2dd077fbe..b6614e9482 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerConfig.java
@@ -24,7 +24,9 @@ import java.util.Optional;
 
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.core.TableModify;
 import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableModify;
 import org.apache.calcite.tools.RuleSet;
 import org.apache.drill.common.util.function.CheckedSupplier;
 import org.apache.drill.exec.ops.QueryContext;
@@ -72,16 +74,34 @@ public class SqlHandlerConfig {
 
     @Override
     public RelNode visit(TableScan scan) {
+      collectPlugins(scan);
+      return scan;
+    }
+
+    @Override
+    public RelNode visit(LogicalTableModify modify) {
+      collectPlugins(modify);
+      return visitChildren(modify);
+    }
+
+    @Override
+    public RelNode visit(RelNode other) {
+      if (other instanceof TableModify) {
+        collectPlugins(other);
+      }
+      return super.visit(other);
+    }
+
+    private void collectPlugins(RelNode relNode) {
       String pluginName = SchemaUtilites.getSchemaPathAsList(
-        scan.getTable().getQualifiedName().iterator().next()).iterator().next();
+        relNode.getTable().getQualifiedName().iterator().next()).iterator().next();
       CheckedSupplier<StoragePlugin, StoragePluginRegistry.PluginException> pluginsProvider =
         () -> storagePlugins.getPlugin(pluginName);
 
-      StoragePlugin storagePlugin = Optional.ofNullable(DrillRelOptUtil.getDrillTable(scan))
+      StoragePlugin storagePlugin = Optional.ofNullable(DrillRelOptUtil.getDrillTable(relNode))
         .map(DrillTable::getPlugin)
         .orElseGet(pluginsProvider);
       plugins.add(storagePlugin);
-      return scan;
     }
 
     public List<StoragePlugin> getPlugins() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
index 28aee4e682..b8d68537b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
@@ -30,6 +30,7 @@ import java.util.stream.IntStream;
 
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.logical.ModifyTableEntry;
 import org.apache.drill.exec.record.metadata.schema.SchemaProvider;
 import org.apache.drill.exec.record.metadata.schema.SchemaProviderFactory;
 import org.apache.drill.exec.store.table.function.TableParamDef;
@@ -188,6 +189,18 @@ public abstract class AbstractSchema implements Schema, SchemaPartitionExplorer,
         .build(logger);
   }
 
+  /**
+   * Returns table entry using table name to insert records into the table.
+   *
+   * @param tableName : new table name.
+   * @return insert table entry
+   */
+  public ModifyTableEntry modifyTable(String tableName) {
+    throw UserException.unsupportedError()
+        .message("Modifying tables is not supported in schema [%s]", getSchemaPath())
+        .build(logger);
+  }
+
   /**
    * Creates table entry using table name and list of partition columns if any.
    * Table folder and files will be created using persistent storage strategy.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
index 485e25b158..2015002f54 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
@@ -59,6 +59,11 @@ public abstract class AbstractStoragePlugin implements StoragePlugin {
     return false;
   }
 
+  @Override
+  public boolean supportsInsert() {
+    return false;
+  }
+
   /**
    * @deprecated Marking for deprecation in next major version release. Use
    *             {@link #getOptimizerRules(org.apache.drill.exec.ops.OptimizerRulesContext, org.apache.drill.exec.planner.PlannerPhase)}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
index d8a1a0bfc4..db4ea3a659 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
@@ -55,6 +55,11 @@ public interface StoragePlugin extends SchemaFactory, AutoCloseable {
    */
   boolean supportsWrite();
 
+  /**
+   * Indicates if Drill can insert to a table to this plugin.
+   */
+  boolean supportsInsert();
+
   /**
    * Method returns a Jackson serializable object that extends a StoragePluginConfig.
    *
diff --git a/logical/src/main/java/org/apache/drill/common/logical/data/InsertWriter.java b/logical/src/main/java/org/apache/drill/common/logical/data/InsertWriter.java
new file mode 100644
index 0000000000..eb7298c7ea
--- /dev/null
+++ b/logical/src/main/java/org/apache/drill/common/logical/data/InsertWriter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.logical.data;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("insert_writer")
+public class InsertWriter extends Writer {
+
+  @JsonCreator
+  public InsertWriter(@JsonProperty("createTableEntry") Object createTableEntry) {
+    super(createTableEntry);
+  }
+}