You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2019/03/19 18:29:46 UTC

[drill] 04/04: DRILL-7086: Output schema for row set mechanism

This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit a6474c5a732f3eaf0a8a772c80a4932a7bf2d937
Author: Paul Rogers <pr...@cloudera.com>
AuthorDate: Sat Mar 9 19:33:04 2019 -0800

    DRILL-7086: Output schema for row set mechanism
    
    Enhances the row set mechanism to take an "output schema" that describes the vectors to
    create. The "input schema" describes the type that the reader would like to write. A
    conversion mechanism inserts a conversion shim to convert from the input to output type.
    
    Provides a set of implicit type conversions, including string-to-date/time conversions
    which use the new format property stored in column metadata. Includes unit tests for
    the new functionality.
    
    closes #1690
---
 .../drill/exec/physical/impl/scan/ReaderState.java |  23 +
 .../exec/physical/rowSet/impl/ColumnBuilder.java   | 240 ++++++++--
 .../exec/physical/rowSet/impl/ContainerState.java  |   2 +-
 .../rowSet/impl/DefaultSchemaTransformer.java      |  73 +++
 .../exec/physical/rowSet/impl/LoaderInternals.java |   2 +
 .../exec/physical/rowSet/impl/OptionBuilder.java   |  21 +-
 .../physical/rowSet/impl/ResultSetLoaderImpl.java  |  17 +
 .../physical/rowSet/impl/SchemaTransformer.java    |  44 ++
 .../rowSet/impl/SchemaTransformerImpl.java         | 197 ++++++++
 .../physical/rowSet/model/MetadataProvider.java    |   6 +-
 .../rowSet/model/single/BaseWriterBuilder.java     |  12 +-
 .../rowSet/project/RequestedTupleImpl.java         |  15 +-
 .../record/metadata/AbstractColumnMetadata.java    | 150 +++---
 .../record/metadata/PrimitiveColumnMetadata.java   | 139 ++----
 .../drill/exec/record/metadata/TupleSchema.java    |  21 +-
 .../metadata/schema/parser/SchemaVisitor.java      |   4 +-
 .../java/org/apache/drill/TestSchemaCommands.java  |  14 +-
 .../physical/rowSet/impl/TestProjectedTuple.java   |   7 +-
 .../rowSet/impl/TestResultSetLoaderProjection.java | 187 ++++++++
 .../rowSet/impl/TestResultSetLoaderProtocol.java   |  53 ---
 .../impl/TestResultSetLoaderTypeConversion.java    | 149 ++++++
 .../record/metadata/TestMetadataProperties.java    | 229 +++++++++
 .../record/{ => metadata}/TestTupleSchema.java     |  17 +-
 .../record/metadata/schema/TestSchemaProvider.java |  13 +-
 .../metadata/schema/parser/TestSchemaParser.java   |  14 +-
 .../store/easy/text/compliant/BaseCsvTest.java     |   1 -
 .../org/apache/drill/test/rowSet/DirectRowSet.java |  11 +-
 .../apache/drill/test/rowSet/RowSetBuilder.java    |  17 +-
 .../drill/test/rowSet/test/DummyWriterTest.java    |  10 +-
 .../drill/test/rowSet/test/PerformanceTool.java    |   2 +-
 .../test/rowSet/test/TestColumnConverter.java      | 510 ++++++++++++++++++++-
 exec/jdbc-all/pom.xml                              |   2 +-
 .../main/codegen/templates/BasicTypeHelper.java    |  47 +-
 .../main/codegen/templates/ColumnAccessors.java    |  95 +++-
 .../exec/record/metadata/AbstractPropertied.java   |  72 +++
 .../drill/exec/record/metadata/ColumnMetadata.java | 104 ++---
 .../drill/exec/record/metadata/Propertied.java     |  46 ++
 .../exec/record/metadata/PropertyAccessor.java     |  62 +++
 .../drill/exec/record/metadata/TupleMetadata.java  |  13 +-
 .../apache/drill/exec/vector/DateUtilities.java    |  15 +-
 .../vector/accessor/InvalidConversionError.java    |  55 +++
 .../drill/exec/vector/accessor/ScalarWriter.java   |   6 +
 .../accessor/UnsupportedConversionError.java       |  16 +-
 .../AbstractWriteConverter.java                    |  23 +-
 .../{ => convert}/ColumnConversionFactory.java     |  15 +-
 .../accessor/convert/ConvertStringToDate.java      |  57 +++
 .../accessor/convert/ConvertStringToDouble.java    |  46 ++
 .../accessor/convert/ConvertStringToInt.java       |  47 ++
 .../accessor/convert/ConvertStringToInterval.java  |  49 ++
 .../accessor/convert/ConvertStringToLong.java      |  46 ++
 .../accessor/convert/ConvertStringToTime.java      |  56 +++
 .../accessor/convert/ConvertStringToTimeStamp.java |  55 +++
 .../accessor/convert/StandardConversions.java      | 262 +++++++++++
 .../exec/vector/accessor/convert/package-info.java |  23 +
 .../accessor/writer/AbstractObjectWriter.java      |  16 +
 .../accessor/writer/AbstractScalarWriter.java      |  17 +-
 .../accessor/writer/AbstractScalarWriterImpl.java  |  19 +-
 .../vector/accessor/writer/BaseScalarWriter.java   |  18 +
 .../accessor/writer/ColumnWriterFactory.java       |  47 +-
 .../accessor/writer/NullableScalarWriter.java      |  31 +-
 .../vector/accessor/writer/ScalarArrayWriter.java  |  11 +-
 .../vector/accessor/writer/UnionVectorShim.java    |   2 +-
 .../accessor/writer/dummy/DummyScalarWriter.java   |  12 +
 63 files changed, 3080 insertions(+), 505 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
index b366d34..600a9d4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
@@ -19,6 +19,8 @@ package org.apache.drill.exec.physical.impl.scan;
 
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.accessor.InvalidConversionError;
+import org.apache.drill.exec.vector.accessor.UnsupportedConversionError;
 
 /**
  * Manages a row batch reader through its lifecycle. Created when the reader
@@ -223,6 +225,17 @@ class ReaderState {
       // Throw user exceptions as-is
 
       throw e;
+    } catch (UnsupportedConversionError e) {
+
+      // Occurs if the provided schema asks to convert a reader-provided
+      // schema in a way that Drill (or the reader) cannot support.
+      // Example: implicit conversion of a float to an INTERVAL
+      // In such a case, there are no "natural" rules, a reader would have
+      // to provide ad-hoc rules or no conversion is possible.
+
+      throw UserException.validationError(e)
+        .message("Invalid runtime type conversion")
+        .build(logger);
     } catch (Throwable t) {
 
       // Wrap all others in a user exception.
@@ -387,6 +400,16 @@ class ReaderState {
       }
     } catch (UserException e) {
       throw e;
+    } catch (InvalidConversionError e) {
+
+      // Occurs when a specific data value to be converted to another type
+      // is not valid for that conversion. For example, providing the value
+      // "foo" to a string-to-int conversion.
+
+      throw UserException.unsupportedError(e)
+        .message("Invalid data value for automatic type conversion")
+        .addContext("Read failed for reader", reader.name())
+        .build(logger);
     } catch (Throwable t) {
       throw UserException.executionError(t)
         .addContext("Read failed for reader", reader.name())
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnBuilder.java
index 19d8645..aae8fad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnBuilder.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.rowSet.impl;
 
 import java.util.ArrayList;
 
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
@@ -26,6 +27,7 @@ import org.apache.drill.exec.physical.rowSet.impl.ColumnState.PrimitiveColumnSta
 import org.apache.drill.exec.physical.rowSet.impl.ListState.ListVectorState;
 import org.apache.drill.exec.physical.rowSet.impl.RepeatedListState.RepeatedListColumnState;
 import org.apache.drill.exec.physical.rowSet.impl.RepeatedListState.RepeatedListVectorState;
+import org.apache.drill.exec.physical.rowSet.impl.SchemaTransformer.ColumnTransform;
 import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.OffsetVectorState;
 import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.SimpleVectorState;
 import org.apache.drill.exec.physical.rowSet.impl.TupleState.MapArrayState;
@@ -39,10 +41,13 @@ import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.PrimitiveColumnMetadata;
 import org.apache.drill.exec.record.metadata.ProjectionType;
+import org.apache.drill.exec.record.metadata.PropertyAccessor;
 import org.apache.drill.exec.record.metadata.VariantMetadata;
 import org.apache.drill.exec.vector.NullableVector;
 import org.apache.drill.exec.vector.UInt4Vector;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.convert.AbstractWriteConverter;
 import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter;
 import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter.ArrayObjectWriter;
 import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
@@ -74,43 +79,96 @@ import org.apache.drill.exec.vector.complex.UnionVector;
  * The single exception is the case of a list with exactly one type: in this case
  * the list metadata must contain that one type so the code knows how to build
  * the nullable array writer for that column.
+ * <p>
+ * Merges the project list with the column to be built. If the column is not
+ * projected (not in the list), then creates a dummy writer. Issues an error if
+ * the column is projected, but the implied projection type is incompatible with
+ * the actual type. (Such as trying to project an INT as x[0].)
  */
 public class ColumnBuilder {
 
-  private ColumnBuilder() { }
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ColumnBuilder.class);
+
+  /**
+   * Default column transform for an unprojected column. No type conversion
+   * is needed, unprojected columns just "free-wheel": they are along for the
+   * ride, but don't do anything. They do not cause a vector to be materialized.
+   * The client, however, can still write to them, though the data is ignored.
+   */
+  public static class NoOpTransform implements ColumnTransform {
+
+    private final ColumnMetadata columnSchema;
+
+    public NoOpTransform(ColumnMetadata columnSchema) {
+      this.columnSchema = columnSchema;
+    }
+
+    @Override
+    public AbstractWriteConverter newWriter(ScalarWriter baseWriter) {
+      assert false; // Should never be materialized
+      return null;
+    }
+
+    @Override
+    public ProjectionType projectionType() { return ProjectionType.UNPROJECTED; }
+
+    @Override
+    public ColumnMetadata inputSchema() { return columnSchema; }
+
+    @Override
+    public ColumnMetadata outputSchema() { return columnSchema; }
+  }
+
+  private final SchemaTransformer schemaTransformer;
+
+  public ColumnBuilder(SchemaTransformer schemaTransformer) {
+    this.schemaTransformer = schemaTransformer;
+  }
 
   /**
    * Implementation of the work to add a new column to this tuple given a
    * schema description of the column.
    *
    * @param parent container
-   * @param columnSchema schema of the column
+   * @param columnSchema schema of the column as provided by the client
+   * using the result set loader. This is the schema of the data to be
+   * loaded
    * @return writer for the new column
    */
-  public static ColumnState buildColumn(ContainerState parent, ColumnMetadata columnSchema) {
+  public ColumnState buildColumn(ContainerState parent, ColumnMetadata columnSchema) {
 
     // Indicate projection in the metadata.
 
-    columnSchema.setProjected(parent.projectionType(columnSchema.name()) != ProjectionType.UNPROJECTED);
+    ProjectionType projType = parent.projectionType(columnSchema.name());
+    ColumnTransform outputCol;
+    if (projType == ProjectionType.UNPROJECTED) {
+      PropertyAccessor.set(columnSchema, ColumnMetadata.PROJECTED_PROP, false);
+      outputCol = new NoOpTransform(columnSchema);
+    } else {
+
+      // Transform the column from input to output type.
+
+      outputCol = schemaTransformer.transform(columnSchema, projType);
+    }
 
     // Build the column
 
-    switch (columnSchema.structureType()) {
+    switch (outputCol.outputSchema().structureType()) {
     case TUPLE:
-      return buildMap(parent, columnSchema);
+      return buildMap(parent, outputCol);
     case VARIANT:
       // Variant: UNION or (non-repeated) LIST
       if (columnSchema.isArray()) {
         // (non-repeated) LIST (somewhat like a repeated UNION)
-        return buildList(parent, columnSchema);
+        return buildList(parent, outputCol);
       } else {
         // (Non-repeated) UNION
-        return buildUnion(parent, columnSchema);
+        return buildUnion(parent, outputCol);
       }
     case MULTI_ARRAY:
-      return buildRepeatedList(parent, columnSchema);
+      return buildRepeatedList(parent, outputCol);
     default:
-      return buildPrimitive(parent, columnSchema);
+      return buildPrimitive(parent, outputCol);
     }
   }
 
@@ -121,12 +179,38 @@ public class ColumnBuilder {
    * and manages the column.
    *
    * @param columnSchema schema of the new primitive column
+   * @param projType implied projection type for the column
    * @return column state for the new column
    */
 
-  private static ColumnState buildPrimitive(ContainerState parent, ColumnMetadata columnSchema) {
+  private ColumnState buildPrimitive(ContainerState parent, ColumnTransform outputCol) {
+    ProjectionType projType = outputCol.projectionType();
+    ColumnMetadata columnSchema = outputCol.outputSchema();
+
+    // Enforce correspondence between implied type from the projection list
+    // and the actual type of the column.
+
+    switch (projType) {
+    case ARRAY:
+      if (! columnSchema.isArray()) {
+        incompatibleProjection(projType, columnSchema);
+      }
+      break;
+    case TUPLE:
+    case TUPLE_ARRAY:
+      incompatibleProjection(projType, columnSchema);
+      break;
+    default:
+      break;
+    }
+
     ValueVector vector;
-    if (columnSchema.isProjected()) {
+    if (projType == ProjectionType.UNPROJECTED) {
+
+      // Column is not projected. No materialized backing for the column.
+
+      vector = null;
+    } else {
 
       // Create the vector for the column.
 
@@ -138,16 +222,12 @@ public class ColumnBuilder {
       if (parent.vectorCache().isPermissive() && ! vector.getField().isEquivalent(columnSchema.schema())) {
         columnSchema = ((PrimitiveColumnMetadata) columnSchema).mergeWith(vector.getField());
       }
-    } else {
-
-      // Column is not projected. No materialized backing for the column.
-
-      vector = null;
     }
 
     // Create the writer.
 
-    final AbstractObjectWriter colWriter = ColumnWriterFactory.buildColumnWriter(columnSchema, vector);
+    final AbstractObjectWriter colWriter = ColumnWriterFactory.buildColumnWriter(
+        columnSchema, outputCol, vector);
 
     // Build the vector state which manages the vector.
 
@@ -171,6 +251,32 @@ public class ColumnBuilder {
         vectorState);
   }
 
+  private void incompatibleProjection(ProjectionType projType,
+      ColumnMetadata columnSchema) {
+    StringBuilder buf = new StringBuilder()
+      .append("Projection of type ");
+    switch (projType) {
+    case ARRAY:
+      buf.append("array (a[n])");
+      break;
+    case TUPLE:
+      buf.append("tuple (a.x)");
+      break;
+    case TUPLE_ARRAY:
+      buf.append("tuple array (a[n].x");
+      break;
+    default:
+      throw new IllegalStateException("Unexpected projection type: " + projType);
+    }
+    buf.append(" is not compatible with column `")
+      .append(columnSchema.name())
+      .append("` of type ")
+      .append(Types.getSqlTypeName(columnSchema.majorType()));
+    throw UserException.validationError()
+      .message(buf.toString())
+      .build(logger);
+  }
+
   /**
    * Build a new map (single or repeated) column. Except for maps nested inside
    * of unions, no map vector is created
@@ -181,7 +287,8 @@ public class ColumnBuilder {
    * @return column state for the map column
    */
 
-  private static ColumnState buildMap(ContainerState parent, ColumnMetadata columnSchema) {
+  private ColumnState buildMap(ContainerState parent, ColumnTransform outputCol) {
+    ColumnMetadata columnSchema = outputCol.outputSchema();
 
     // When dynamically adding columns, must add the (empty)
     // map by itself, then add columns to the map via separate
@@ -193,16 +300,30 @@ public class ColumnBuilder {
     // Create the vector, vector state and writer.
 
     if (columnSchema.isArray()) {
-      return buildMapArray(parent, columnSchema);
+      return buildMapArray(parent, outputCol);
     } else {
-      return buildSingleMap(parent, columnSchema);
+      return buildSingleMap(parent, outputCol);
     }
   }
 
-  private static ColumnState buildSingleMap(ContainerState parent, ColumnMetadata columnSchema) {
+  private ColumnState buildSingleMap(ContainerState parent, ColumnTransform outputCol) {
+    ProjectionType projType = outputCol.projectionType();
+    ColumnMetadata columnSchema = outputCol.outputSchema();
+
+    switch (projType) {
+    case ARRAY:
+    case TUPLE_ARRAY:
+      incompatibleProjection(projType, columnSchema);
+      break;
+    default:
+      break;
+    }
     MapVector vector;
     VectorState vectorState;
-    if (columnSchema.isProjected()) {
+    if (projType == ProjectionType.UNPROJECTED) {
+      vector = null;
+      vectorState = new NullVectorState();
+    } else {
 
       // Don't get the map vector from the vector cache. Map vectors may
       // have content that varies from batch to batch. Only the leaf
@@ -211,9 +332,6 @@ public class ColumnBuilder {
       assert columnSchema.mapSchema().isEmpty();
       vector = new MapVector(columnSchema.schema(), parent.loader().allocator(), null);
       vectorState = new MapVectorState(vector, new NullVectorState());
-    } else {
-      vector = null;
-      vectorState = new NullVectorState();
     }
     final TupleObjectWriter mapWriter = MapWriter.buildMap(columnSchema, vector, new ArrayList<>());
     final SingleMapState mapState = new SingleMapState(parent.loader(),
@@ -222,13 +340,18 @@ public class ColumnBuilder {
     return new MapColumnState(mapState, mapWriter, vectorState, parent.isVersioned());
   }
 
-  private static ColumnState buildMapArray(ContainerState parent, ColumnMetadata columnSchema) {
+  private ColumnState buildMapArray(ContainerState parent, ColumnTransform outputCol) {
+    ProjectionType projType = outputCol.projectionType();
+    ColumnMetadata columnSchema = outputCol.outputSchema();
 
     // Create the map's offset vector.
 
     RepeatedMapVector mapVector;
     UInt4Vector offsetVector;
-    if (columnSchema.isProjected()) {
+    if (projType == ProjectionType.UNPROJECTED) {
+      mapVector = null;
+      offsetVector = null;
+    } else {
 
       // Creating the map vector will create its contained vectors if we
       // give it a materialized field with children. So, instead pass a clone
@@ -244,9 +367,6 @@ public class ColumnBuilder {
       mapVector = new RepeatedMapVector(mapColSchema.schema(),
           parent.loader().allocator(), null);
       offsetVector = mapVector.getOffsetVector();
-    } else {
-      mapVector = null;
-      offsetVector = null;
     }
 
     // Create the writer using the offset vector
@@ -257,13 +377,13 @@ public class ColumnBuilder {
     // Wrap the offset vector in a vector state
 
     VectorState offsetVectorState;
-    if (columnSchema.isProjected()) {
+    if (projType == ProjectionType.UNPROJECTED) {
+      offsetVectorState = new NullVectorState();
+    } else {
       offsetVectorState = new OffsetVectorState(
           (((AbstractArrayWriter) writer.array()).offsetWriter()),
           offsetVector,
           writer.array().entry().events());
-    } else {
-      offsetVectorState = new NullVectorState();
     }
     final VectorState mapVectorState = new MapVectorState(mapVector, offsetVectorState);
 
@@ -292,12 +412,22 @@ public class ColumnBuilder {
    * @param columnSchema column schema
    * @return column
    */
-  private static ColumnState buildUnion(ContainerState parent, ColumnMetadata columnSchema) {
+  private ColumnState buildUnion(ContainerState parent, ColumnTransform outputCol) {
+    ProjectionType projType = outputCol.projectionType();
+    ColumnMetadata columnSchema = outputCol.outputSchema();
     assert columnSchema.isVariant() && ! columnSchema.isArray();
 
-    if (! columnSchema.isProjected()) {
+    switch (projType) {
+    case ARRAY:
+    case TUPLE:
+    case TUPLE_ARRAY:
+      incompatibleProjection(projType, columnSchema);
+      break;
+    case UNPROJECTED:
       throw new UnsupportedOperationException("Drill does not currently support unprojected union columns: " +
           columnSchema.name());
+    default:
+      break;
     }
 
     // Create the union vector.
@@ -331,7 +461,18 @@ public class ColumnBuilder {
     return new UnionColumnState(parent.loader(), writer, vectorState, unionState);
   }
 
-  private static ColumnState buildList(ContainerState parent, ColumnMetadata columnSchema) {
+  private ColumnState buildList(ContainerState parent, ColumnTransform outputCol) {
+    ProjectionType projType = outputCol.projectionType();
+    ColumnMetadata columnSchema = outputCol.outputSchema();
+
+    switch (projType) {
+    case TUPLE:
+    case TUPLE_ARRAY:
+      incompatibleProjection(projType, columnSchema);
+      break;
+    default:
+      break;
+    }
 
     // If the list has declared a single type, and has indicated that this
     // is the only type expected, then build the list as a nullable array
@@ -341,12 +482,12 @@ public class ColumnBuilder {
     final VariantMetadata variant = columnSchema.variantSchema();
     if (variant.isSimple()) {
       if (variant.size() == 1) {
-        return buildSimpleList(parent, columnSchema);
+        return buildSimpleList(parent, outputCol);
       } else if (variant.size() == 0) {
         throw new IllegalArgumentException("Size of a non-expandable list can't be zero");
       }
     }
-    return buildUnionList(parent, columnSchema);
+    return buildUnionList(parent, outputCol);
   }
 
   /**
@@ -364,7 +505,8 @@ public class ColumnBuilder {
    * @return the column state for the list
    */
 
-  private static ColumnState buildSimpleList(ContainerState parent, ColumnMetadata columnSchema) {
+  private ColumnState buildSimpleList(ContainerState parent, ColumnTransform outputCol) {
+    ColumnMetadata columnSchema = outputCol.outputSchema();
 
     // The variant must have the one and only type.
 
@@ -423,7 +565,8 @@ public class ColumnBuilder {
    * @return the column state for the list
    */
 
-  private static ColumnState buildUnionList(ContainerState parent, ColumnMetadata columnSchema) {
+  private ColumnState buildUnionList(ContainerState parent, ColumnTransform outputCol) {
+    ColumnMetadata columnSchema = outputCol.outputSchema();
 
     // The variant must start out empty.
 
@@ -470,8 +613,10 @@ public class ColumnBuilder {
         listWriter, vectorState, listState);
   }
 
-  private static ColumnState buildRepeatedList(ContainerState parent,
-      ColumnMetadata columnSchema) {
+  private ColumnState buildRepeatedList(ContainerState parent,
+      ColumnTransform outputCol) {
+    ProjectionType projType = outputCol.projectionType();
+    ColumnMetadata columnSchema = outputCol.outputSchema();
 
     assert columnSchema.type() == MinorType.LIST;
     assert columnSchema.mode() == DataMode.REPEATED;
@@ -481,6 +626,15 @@ public class ColumnBuilder {
 
     assert columnSchema.childSchema() == null;
 
+    switch (projType) {
+    case TUPLE:
+    case TUPLE_ARRAY:
+      incompatibleProjection(projType, columnSchema);
+      break;
+    default:
+      break;
+    }
+
     // Build the repeated vector.
 
     final RepeatedListVector vector = new RepeatedListVector(
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ContainerState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ContainerState.java
index fb4bb32..c28f7ae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ContainerState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ContainerState.java
@@ -90,7 +90,7 @@ public abstract class ContainerState {
 
     // Create the vector, writer and column state
 
-    ColumnState colState = ColumnBuilder.buildColumn(this, columnSchema);
+    ColumnState colState = loader.columnBuilder().buildColumn(this, columnSchema);
 
     // Add the column to this container
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/DefaultSchemaTransformer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/DefaultSchemaTransformer.java
new file mode 100644
index 0000000..5853bf6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/DefaultSchemaTransformer.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.ProjectionType;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.convert.AbstractWriteConverter;
+import org.apache.drill.exec.vector.accessor.convert.ColumnConversionFactory;
+
+/**
+ * Default schema transformer that maps input types to output types and
+ * simply passes along the input schema and projection type. Provides
+ * support for an ad-hoc column conversion factory (to create type
+ * conversion shims), such as those used in unit tests.
+ */
+public class DefaultSchemaTransformer implements SchemaTransformer {
+
+  public class DefaultColumnTransformer implements ColumnTransform {
+
+    private final ColumnMetadata columnSchema;
+    private final ProjectionType projType;
+
+    public DefaultColumnTransformer(ColumnMetadata inputSchema, ProjectionType projType) {
+      columnSchema = inputSchema;
+      this.projType = projType;
+    }
+
+    @Override
+    public AbstractWriteConverter newWriter(ScalarWriter baseWriter) {
+      if (conversionFactory == null) {
+        return null;
+      }
+      return conversionFactory.newWriter(baseWriter);
+    }
+
+    @Override
+    public ProjectionType projectionType() { return projType; }
+
+    @Override
+    public ColumnMetadata inputSchema() { return columnSchema; }
+
+    @Override
+    public ColumnMetadata outputSchema() { return columnSchema; }
+  }
+
+  private final ColumnConversionFactory conversionFactory;
+
+  public DefaultSchemaTransformer(ColumnConversionFactory conversionFactory) {
+    this.conversionFactory = conversionFactory;
+  }
+
+  @Override
+  public ColumnTransform transform(ColumnMetadata inputSchema,
+      ProjectionType projType) {
+    return new DefaultColumnTransformer(inputSchema, projType);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/LoaderInternals.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/LoaderInternals.java
index 5baab21..f559c43 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/LoaderInternals.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/LoaderInternals.java
@@ -108,4 +108,6 @@ interface LoaderInternals {
    */
 
   boolean writeable();
+
+  ColumnBuilder columnBuilder();
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/OptionBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/OptionBuilder.java
index 12f5f9e..0a6ac35 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/OptionBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/OptionBuilder.java
@@ -25,10 +25,11 @@ import org.apache.drill.exec.physical.rowSet.impl.ResultSetLoaderImpl.ResultSetO
 import org.apache.drill.exec.physical.rowSet.project.RequestedTuple;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
 /**
  * Builder for the options for the row set loader. Reasonable defaults
- * are provided for all options; use these options for test code or
+ * are provided for all options; use the default options for test code or
  * for clients that don't need special settings.
  */
 
@@ -40,8 +41,10 @@ public class OptionBuilder {
   protected ResultVectorCache vectorCache;
   protected TupleMetadata schema;
   protected long maxBatchSize;
+  protected SchemaTransformer schemaTransformer;
 
   public OptionBuilder() {
+    // Start with the default option values.
     ResultSetOptions options = new ResultSetOptions();
     vectorSizeLimit = options.vectorSizeLimit;
     rowCountLimit = options.rowCountLimit;
@@ -52,8 +55,7 @@ public class OptionBuilder {
    * Specify the maximum number of rows per batch. Defaults to
    * {@link BaseValueVector#INITIAL_VALUE_ALLOCATION}. Batches end either
    * when this limit is reached, or when a vector overflows, whichever
-   * occurs first. The limit is capped at
-   * {@link ValueVector#MAX_ROW_COUNT}.
+   * occurs first. The limit is capped at {@link ValueVector#MAX_ROW_COUNT}.
    *
    * @param limit the row count limit
    * @return this builder
@@ -129,10 +131,23 @@ public class OptionBuilder {
     return this;
   }
 
+  /**
+   * Provide an optional higher-level schema transformer which can convert
+   * columns from one type to another.
+   *
+   * @param transform the column conversion factory
+   * @return this builder
+   */
+  public OptionBuilder setSchemaTransform(SchemaTransformer transform) {
+    schemaTransformer = transform;
+    return this;
+  }
+
   // TODO: No setter for vector length yet: is hard-coded
   // at present in the value vector.
 
   public ResultSetOptions build() {
+    Preconditions.checkArgument(projection == null || projectionSet == null);
     return new ResultSetOptions(this);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultSetLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultSetLoaderImpl.java
index 7b037ca..201bd7c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultSetLoaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultSetLoaderImpl.java
@@ -51,6 +51,7 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals {
     public final RequestedTuple projectionSet;
     public final TupleMetadata schema;
     public final long maxBatchSize;
+    public final SchemaTransformer schemaTransformer;
 
     public ResultSetOptions() {
       vectorSizeLimit = ValueVector.MAX_BUFFER_SIZE;
@@ -59,6 +60,7 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals {
       vectorCache = null;
       schema = null;
       maxBatchSize = -1;
+      schemaTransformer = null;
     }
 
     public ResultSetOptions(OptionBuilder builder) {
@@ -67,6 +69,7 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals {
       vectorCache = builder.vectorCache;
       schema = builder.schema;
       maxBatchSize = builder.maxBatchSize;
+      schemaTransformer = builder.schemaTransformer;
 
       // If projection, build the projection map.
       // The caller might have already built the map. If so,
@@ -179,6 +182,12 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals {
   final BufferAllocator allocator;
 
   /**
+   * Builds columns (vector, writer, state).
+   */
+
+  final ColumnBuilder columnBuilder;
+
+  /**
    * Internal structure used to work with the vectors (real or dummy) used
    * by this loader.
    */
@@ -275,6 +284,11 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals {
     this.options = options;
     targetRowCount = options.rowCountLimit;
     writerIndex = new WriterIndexImpl(this);
+    SchemaTransformer schemaTransformer = options.schemaTransformer;
+    if (schemaTransformer == null) {
+      schemaTransformer = new DefaultSchemaTransformer(null);
+    }
+    columnBuilder = new ColumnBuilder(schemaTransformer);
 
     // Set the projections
 
@@ -829,4 +843,7 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals {
   public int rowIndex() {
     return writerIndex().vectorIndex();
   }
+
+  @Override
+  public ColumnBuilder columnBuilder() { return columnBuilder; }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SchemaTransformer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SchemaTransformer.java
new file mode 100644
index 0000000..619994d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SchemaTransformer.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.ProjectionType;
+import org.apache.drill.exec.vector.accessor.convert.ColumnConversionFactory;
+
+/**
+ * Interface to a mechanism that transforms the schema desired by
+ * a reader (or other client of the result set loader) to the schema
+ * desired for the output batch. Automates conversions of multiple
+ * types, such as parsing a Varchar into a date or INT, etc. The actual
+ * conversion policy is provided by the implementation.
+ */
+public interface SchemaTransformer {
+
+  /**
+   * Describes how to transform a column from input type to output type,
+   * including the associated projection type
+   */
+  public interface ColumnTransform extends ColumnConversionFactory {
+    ProjectionType projectionType();
+    ColumnMetadata inputSchema();
+    ColumnMetadata outputSchema();
+  }
+
+  ColumnTransform transform(ColumnMetadata inputSchema, ProjectionType projType);
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SchemaTransformerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SchemaTransformerImpl.java
new file mode 100644
index 0000000..d17ef39
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SchemaTransformerImpl.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.ProjectionType;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.convert.AbstractWriteConverter;
+import org.apache.drill.exec.vector.accessor.convert.ColumnConversionFactory;
+import org.apache.drill.exec.vector.accessor.convert.StandardConversions;
+import org.apache.drill.exec.vector.accessor.convert.StandardConversions.ConversionDefn;
+
+/**
+ * Base class for plugin-specific type transforms. Handles basic type
+ * checking. Assumes a type conversion is needed only if the output
+ * column is defined and has a type or mode different than the input.
+ * Else, assumes no transform is needed. Subclases can change or enhance
+ * this policy. The subclass provides the actual per-column transform logic.
+ */
+
+public class SchemaTransformerImpl implements SchemaTransformer {
+
+  private static final org.slf4j.Logger logger =
+      org.slf4j.LoggerFactory.getLogger(SchemaTransformerImpl.class);
+
+  /**
+   * A no-op transform that simply keeps the input column schema and
+   * writer without any changes.
+   */
+  public static class PassThroughColumnTransform implements ColumnTransform {
+
+    private final ColumnMetadata colDefn;
+    private final ProjectionType projType;
+
+    public PassThroughColumnTransform(ColumnMetadata colDefn, ProjectionType projType) {
+      this.colDefn = colDefn;
+      this.projType = projType;
+    }
+
+    @Override
+    public AbstractWriteConverter newWriter(ScalarWriter baseWriter) {
+      return null;
+    }
+
+    @Override
+    public ProjectionType projectionType() { return projType; }
+
+    @Override
+    public ColumnMetadata inputSchema() { return colDefn; }
+
+    @Override
+    public ColumnMetadata outputSchema() { return colDefn; }
+  }
+
+  /**
+   * Full column transform that has separate input and output types
+   * and provides a type conversion writer to convert between the
+   * two. The conversion writer factory is provided via composition,
+   * not by subclassing this class.
+   */
+  public static class ColumnTransformImpl implements ColumnTransform {
+
+    private final ColumnMetadata inputSchema;
+    private final ColumnMetadata outputSchema;
+    private final ProjectionType projType;
+    private final ColumnConversionFactory conversionFactory;
+
+    public ColumnTransformImpl(ColumnMetadata inputSchema, ColumnMetadata outputSchema,
+        ProjectionType projType, ColumnConversionFactory conversionFactory) {
+      this.inputSchema = inputSchema;
+      this.outputSchema = outputSchema;
+      this.projType = projType;
+      this.conversionFactory = conversionFactory;
+    }
+
+    @Override
+    public AbstractWriteConverter newWriter(ScalarWriter baseWriter) {
+      if (conversionFactory == null) {
+        return null;
+      }
+      return conversionFactory.newWriter(baseWriter);
+    }
+
+    @Override
+    public ProjectionType projectionType() { return projType; }
+
+    @Override
+    public ColumnMetadata inputSchema() { return inputSchema; }
+
+    @Override
+    public ColumnMetadata outputSchema() { return outputSchema; }
+  }
+
+  protected final TupleMetadata outputSchema;
+
+  public SchemaTransformerImpl(TupleMetadata outputSchema) {
+    this.outputSchema = outputSchema;
+  }
+
+  /**
+   * Creates a "null" or "no-op" transform: just uses the input schema
+   * as the output schema.
+   *
+   * @param inputSchema the input schema from the reader
+   * @param projType projection type
+   * @return a no-op transform
+   */
+  protected ColumnTransform noOpTransform(ColumnMetadata inputSchema,
+      ProjectionType projType) {
+    return new PassThroughColumnTransform(inputSchema, projType);
+  }
+
+  /**
+   * Implement a basic policy to pass through input columns for which there
+   * is no matching output column, and to do a type conversion only if types
+   * and modes differ.
+   * <p>
+   * Subclasses can change this behavior if, say, they want to do conversion
+   * even if the types are the same (such as parsing a VARCHAR field to produce
+   * another VARCHAR.)
+   */
+  @Override
+  public ColumnTransform transform(ColumnMetadata inputSchema,
+      ProjectionType projType) {
+
+    // Should never get an unprojected column; should be handled
+    // by the caller.
+
+    assert projType != ProjectionType.UNPROJECTED;
+
+    // If no matching column, assume a pass-through transform
+
+    ColumnMetadata outputCol = outputSchema.metadata(inputSchema.name());
+    if (outputCol == null) {
+      return noOpTransform(inputSchema, projType);
+    }
+
+    ConversionDefn defn = StandardConversions.analyze(inputSchema, outputCol);
+    ColumnConversionFactory factory = customTransform(inputSchema, outputCol, defn);
+    if (factory == null) {
+      switch (defn.type) {
+      case NONE:
+      case IMPLICIT:
+        return noOpTransform(inputSchema, projType);
+      case EXPLICIT:
+        if (defn.conversionClass == null) {
+          throw UserException.validationError()
+            .message("Runtime type conversion not available")
+            .addContext("Input type", inputSchema.typeString())
+            .addContext("Output type", outputCol.typeString())
+            .build(logger);
+        }
+        factory = StandardConversions.factory(defn.conversionClass);
+        break;
+      default:
+        throw new IllegalStateException("Unexpected conversion type: " + defn.type);
+      }
+    }
+    return new ColumnTransformImpl(inputSchema, outputCol, projType, factory);
+  }
+
+  /**
+   * Overridden to provide a custom conversion between input an output types.
+   *
+   * @param inputDefn the column schema for the input column which the
+   * client code (e.g. reader) wants to produce
+   * @param outputDefn the column schema for the output vector to be produced
+   * by this operator
+   * @param defn a description of the required conversion. This method is
+   * required to do nothing of conversion type is
+   * {@link ProjectionType.EXPLICIT} and the conversion class is null, meaning
+   * that no standard conversion is available
+   * @return a column transformer factory to implement a custom conversion,
+   * or null to use the standard conversion
+   */
+  private ColumnConversionFactory customTransform(ColumnMetadata inputDefn,
+      ColumnMetadata outputDefn, ConversionDefn defn) {
+    return null;
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/MetadataProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/MetadataProvider.java
index fea7ecd..a48b80a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/MetadataProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/MetadataProvider.java
@@ -27,8 +27,10 @@ import org.apache.drill.exec.record.metadata.VariantMetadata;
 import org.apache.drill.exec.record.metadata.VariantSchema;
 
 /**
- * Interface for retrieving and/or creating metadata given
- * a vector.
+ * Interface for retrieving and/or creating metadata given a vector.
+ * Subclasses either generate metadata to match an existing schema
+ * (such as in a vector batch), or walk a metadata schema to drive
+ * writer creation.
  */
 
 public interface MetadataProvider {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/BaseWriterBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/BaseWriterBuilder.java
index 086632c..e1de60a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/BaseWriterBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/BaseWriterBuilder.java
@@ -27,6 +27,7 @@ import org.apache.drill.exec.physical.rowSet.model.MetadataProvider;
 import org.apache.drill.exec.physical.rowSet.model.MetadataProvider.VectorDescrip;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.convert.ColumnConversionFactory;
 import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
 import org.apache.drill.exec.vector.accessor.writer.ColumnWriterFactory;
 import org.apache.drill.exec.vector.accessor.writer.ListWriterImpl;
@@ -54,7 +55,14 @@ import org.apache.drill.exec.vector.complex.UnionVector;
 
 public abstract class BaseWriterBuilder {
 
-  protected List<AbstractObjectWriter> buildContainerChildren(VectorContainer container, MetadataProvider mdProvider) {
+  private final ColumnConversionFactory conversionFactory;
+
+  protected BaseWriterBuilder(ColumnConversionFactory conversionFactory) {
+    this.conversionFactory = conversionFactory;
+  }
+
+  protected List<AbstractObjectWriter> buildContainerChildren(VectorContainer container,
+      MetadataProvider mdProvider) {
     final List<AbstractObjectWriter> writers = new ArrayList<>();
     for (int i = 0; i < container.getNumberOfColumns(); i++) {
       final ValueVector vector = container.getValueVector(i).getValueVector();
@@ -79,7 +87,7 @@ public abstract class BaseWriterBuilder {
       return buildList(vector, descrip);
 
     default:
-      return ColumnWriterFactory.buildColumnWriter(descrip.metadata, vector);
+      return ColumnWriterFactory.buildColumnWriter(descrip.metadata, conversionFactory, vector);
     }
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java
index 4643c57..b93c0c0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java
@@ -257,9 +257,10 @@ public class RequestedTupleImpl implements RequestedTuple {
     map.parseSegment(nameSeg.getChild());
   }
 
-  private void parseArray(NameSegment arraySeg) {
-    String name = arraySeg.getPath();
-    int index = ((ArraySegment) arraySeg.getChild()).getIndex();
+  private void parseArray(NameSegment nameSeg) {
+    String name = nameSeg.getPath();
+    ArraySegment arraySeg = ((ArraySegment) nameSeg.getChild());
+    int index = arraySeg.getIndex();
     RequestedColumnImpl member = getImpl(name);
     if (member == null) {
       member = new RequestedColumnImpl(this, name);
@@ -279,6 +280,14 @@ public class RequestedTupleImpl implements RequestedTuple {
         .build(logger);
     }
     member.addIndex(index);
+
+    // Drills SQL parser does not support map arrays: a[0].c
+    // But, the SchemaPath does support them, so no harm in
+    // parsing them here.
+
+    if (! arraySeg.isLastPath()) {
+      parseInternal(nameSeg);
+    }
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java
index 521a787..00077bd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java
@@ -27,10 +27,9 @@ import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.metadata.schema.parser.SchemaExprParser;
-import org.apache.drill.exec.vector.accessor.ColumnConversionFactory;
-import org.apache.drill.exec.vector.accessor.UnsupportedConversionError;
 
-import java.util.LinkedHashMap;
+import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.stream.Collectors;
 
@@ -53,10 +52,10 @@ import java.util.stream.Collectors;
   setterVisibility = JsonAutoDetect.Visibility.NONE)
 @JsonInclude(JsonInclude.Include.NON_DEFAULT)
 @JsonPropertyOrder({"name", "type", "mode", "format", "default", "properties"})
-public abstract class AbstractColumnMetadata implements ColumnMetadata {
+public abstract class AbstractColumnMetadata extends AbstractPropertied implements ColumnMetadata {
 
   // Capture the key schema information. We cannot use the MaterializedField
-  // or MajorType because then encode child information that we encode here
+  // or MajorType because they encode child information that we encode here
   // as a child schema. Keeping the two in sync is nearly impossible.
 
   protected final String name;
@@ -64,26 +63,13 @@ public abstract class AbstractColumnMetadata implements ColumnMetadata {
   protected final DataMode mode;
   protected final int precision;
   protected final int scale;
-  protected boolean projected = true;
-
-  /**
-   * Predicted number of elements per array entry. Default is
-   * taken from the often hard-coded value of 10.
-   */
-
-  protected int expectedElementCount = 1;
-  protected final Map<String, String> properties = new LinkedHashMap<>();
 
   @JsonCreator
   public static AbstractColumnMetadata createColumnMetadata(@JsonProperty("name") String name,
                                                             @JsonProperty("type") String type,
                                                             @JsonProperty("mode") DataMode mode,
-                                                            @JsonProperty("format") String formatValue,
-                                                            @JsonProperty("default") String defaultValue,
                                                             @JsonProperty("properties") Map<String, String> properties) {
     ColumnMetadata columnMetadata = SchemaExprParser.parseColumn(name, type, mode);
-    columnMetadata.setFormatValue(formatValue);
-    columnMetadata.setDefaultFromString(defaultValue);
     columnMetadata.setProperties(properties);
     return (AbstractColumnMetadata) columnMetadata;
   }
@@ -95,9 +81,6 @@ public abstract class AbstractColumnMetadata implements ColumnMetadata {
     mode = majorType.getMode();
     precision = majorType.getPrecision();
     scale = majorType.getScale();
-    if (isArray()) {
-      expectedElementCount = DEFAULT_ARRAY_SIZE;
-    }
   }
 
   public AbstractColumnMetadata(String name, MinorType type, DataMode mode) {
@@ -106,18 +89,15 @@ public abstract class AbstractColumnMetadata implements ColumnMetadata {
     this.mode = mode;
     precision = 0;
     scale = 0;
-    if (isArray()) {
-      expectedElementCount = DEFAULT_ARRAY_SIZE;
-    }
   }
 
   public AbstractColumnMetadata(AbstractColumnMetadata from) {
+    super(from);
     name = from.name;
     type = from.type;
     mode = from.mode;
     precision = from.precision;
     scale = from.scale;
-    expectedElementCount = from.expectedElementCount;
   }
 
   @Override
@@ -204,63 +184,80 @@ public abstract class AbstractColumnMetadata implements ColumnMetadata {
     // makes an error.
 
     if (isArray()) {
-      expectedElementCount = Math.max(1, childCount);
+      PropertyAccessor.set(this, EXPECTED_CARDINALITY_PROP, Math.max(1, childCount));
     }
   }
 
   @Override
-  public int expectedElementCount() { return expectedElementCount; }
+  public int expectedElementCount() {
+    if (isArray()) {
+      // Not set means default size
+      return PropertyAccessor.getInt(this, EXPECTED_CARDINALITY_PROP, DEFAULT_ARRAY_SIZE);
+    } else {
+      // Cardinality always 1 for optional, repeated modes
+      return 1;
+    }
+  }
 
   @Override
   public void setProjected(boolean projected) {
-    this.projected = projected;
+    if (projected) {
+      // Projected is the default
+      setProperty(PROJECTED_PROP, null);
+    } else {
+      PropertyAccessor.set(this, PROJECTED_PROP, projected);
+    }
   }
 
   @Override
-  public boolean isProjected() { return projected; }
-
-  @Override
-  public void setFormatValue(String value) { }
+  public boolean isProjected() {
+    return PropertyAccessor.getBoolean(this, PROJECTED_PROP, true);
+  }
 
-  @JsonProperty("format")
   @Override
-  public String formatValue() { return null; }
+  public void setFormat(String value) {
+    setProperty(FORMAT_PROP, value);
+  }
 
   @Override
-  public void setDefaultValue(Object value) { }
+  public String format() {
+    return property(FORMAT_PROP);
+  }
 
   @Override
-  public Object defaultValue() { return null; }
+  public DateTimeFormatter dateTimeFormatter() {
+    throw new UnsupportedOperationException("Date/time not supported for non-scalar columns");
+  }
 
   @Override
-  public void setDefaultFromString(String value) { }
+  public void setDefaultValue(String value) {
+    setProperty(DEFAULT_VALUE_PROP, value);
+  }
 
-  @JsonProperty("default")
   @Override
-  public String defaultStringValue() {
-    return null;
+  public String defaultValue() {
+    return property(DEFAULT_VALUE_PROP);
   }
 
   @Override
-  public void setTypeConverter(ColumnConversionFactory factory) {
-    throw new UnsupportedConversionError("Type conversion not supported for non-scalar writers");
+  public Object decodeDefaultValue() {
+    return valueFromString(defaultValue());
   }
 
   @Override
-  public ColumnConversionFactory typeConverter() { return null; }
+  public Object valueFromString(String value) {
+    throw new UnsupportedOperationException("Value conversion not supported for non-scalar columns");
+  }
 
   @Override
-  public void setProperties(Map<String, String> properties) {
-    if (properties == null) {
-      return;
-    }
-    this.properties.putAll(properties);
+  public String valueToString(Object value) {
+    throw new UnsupportedOperationException("Value conversion not supported for non-scalar columns");
   }
 
   @JsonProperty("properties")
   @Override
   public Map<String, String> properties() {
-    return properties;
+    return super.properties();
   }
 
   @Override
@@ -269,14 +266,7 @@ public abstract class AbstractColumnMetadata implements ColumnMetadata {
         .append("[")
         .append(getClass().getSimpleName())
         .append(" ")
-        .append(schema().toString())
-        .append(", ")
-        .append(projected ? "" : "not ")
-        .append("projected");
-    if (isArray()) {
-      buf.append(", cardinality: ")
-         .append(expectedElementCount);
-    }
+        .append(schema().toString());
     if (variantSchema() != null) {
       buf.append(", variant: ")
          .append(variantSchema().toString());
@@ -285,15 +275,7 @@ public abstract class AbstractColumnMetadata implements ColumnMetadata {
       buf.append(", schema: ")
          .append(mapSchema().toString());
     }
-    if (formatValue() != null) {
-      buf.append(", format: ")
-        .append(formatValue());
-    }
-    if (defaultValue() != null) {
-      buf.append(", default: ")
-        .append(defaultStringValue());
-    }
-    if (!properties().isEmpty()) {
+    if (hasProperties()) {
       buf.append(", properties: ")
         .append(properties());
     }
@@ -320,21 +302,27 @@ public abstract class AbstractColumnMetadata implements ColumnMetadata {
       builder.append(" NOT NULL");
     }
 
-    if (formatValue() != null) {
-      builder.append(" FORMAT '").append(formatValue()).append("'");
-    }
-
-    if (defaultValue() != null) {
-      builder.append(" DEFAULT '").append(defaultStringValue()).append("'");
-    }
-
-    if (!properties().isEmpty()) {
-      builder.append(" PROPERTIES { ");
-      builder.append(properties().entrySet()
-        .stream()
-        .map(e -> String.format("'%s' = '%s'", e.getKey(), e.getValue()))
-        .collect(Collectors.joining(", ")));
-      builder.append(" }");
+    if (hasProperties()) {
+      if (format() != null) {
+        builder.append(" FORMAT '").append(format()).append("'");
+      }
+
+      if (defaultValue() != null) {
+        builder.append(" DEFAULT '").append(defaultValue()).append("'");
+      }
+
+      Map<String,String> copy = new HashMap<>();
+      copy.putAll(properties());
+      copy.remove(FORMAT_PROP);
+      copy.remove(DEFAULT_VALUE_PROP);
+      if (! copy.isEmpty()) {
+        builder.append(" PROPERTIES { ");
+        builder.append(copy.entrySet()
+          .stream()
+          .map(e -> String.format("'%s' = '%s'", e.getKey(), e.getValue()))
+          .collect(Collectors.joining(", ")));
+        builder.append(" }");
+      }
     }
 
     return builder.toString();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/PrimitiveColumnMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/PrimitiveColumnMetadata.java
index 21ac093..4be120c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/PrimitiveColumnMetadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/PrimitiveColumnMetadata.java
@@ -20,10 +20,8 @@ package org.apache.drill.exec.record.metadata;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.vector.accessor.ColumnConversionFactory;
 import org.joda.time.Period;
 
 import java.math.BigDecimal;
@@ -53,43 +51,12 @@ public class PrimitiveColumnMetadata extends AbstractColumnMetadata {
 
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PrimitiveColumnMetadata.class);
 
-  /**
-   * Expected (average) width for variable-width columns.
-   */
-
-  private int expectedWidth;
-
-  private String formatValue;
-
-  /**
-   * Default value to use for filling a vector when no real data is
-   * available, such as for columns added in new files but which does not
-   * exist in existing files. The ultimate default value is the SQL null
-   * value, which works only for nullable columns.
-   */
-
-  private Object defaultValue;
-
-  /**
-   * Factory for an optional shim writer that translates from the type of
-   * data available to the code that creates the vectors on the one hand,
-   * and the actual type of the column on the other. For example, a shim
-   * might parse a string form of a date into the form stored in vectors.
-   * <p>
-   * The default is to use the "natural" type: that is, to insert no
-   * conversion shim.
-   */
-
-  private ColumnConversionFactory shimFactory;
-
   public PrimitiveColumnMetadata(MaterializedField schema) {
     super(schema);
-    expectedWidth = estimateWidth(schema.getType());
   }
 
   public PrimitiveColumnMetadata(String name, MinorType type, DataMode mode) {
     super(name, type, mode);
-    expectedWidth = estimateWidth(Types.withMode(type, mode));
   }
 
   private int estimateWidth(MajorType majorType) {
@@ -116,7 +83,6 @@ public class PrimitiveColumnMetadata extends AbstractColumnMetadata {
 
   public PrimitiveColumnMetadata(PrimitiveColumnMetadata from) {
     super(from);
-    expectedWidth = from.expectedWidth;
   }
 
   @Override
@@ -128,7 +94,16 @@ public class PrimitiveColumnMetadata extends AbstractColumnMetadata {
   public ColumnMetadata.StructureType structureType() { return ColumnMetadata.StructureType.PRIMITIVE; }
 
   @Override
-  public int expectedWidth() { return expectedWidth; }
+  public int expectedWidth() {
+
+    // If the property is not set, estimate width from the type.
+
+    int width = PropertyAccessor.getInt(this, EXPECTED_WIDTH_PROP);
+    if (width == 0) {
+      width = estimateWidth(majorType());
+    }
+    return width;
+  }
 
   @Override
   public int precision() { return precision; }
@@ -143,56 +118,44 @@ public class PrimitiveColumnMetadata extends AbstractColumnMetadata {
     // makes an error.
 
     if (isVariableWidth()) {
-      expectedWidth = Math.max(1, width);
+      PropertyAccessor.set(this, EXPECTED_WIDTH_PROP, Math.max(1, width));
     }
   }
 
   @Override
-  public void setFormatValue(String value) {
-    formatValue = value;
-  }
-
-  @Override
-  public String formatValue() {
-    return formatValue;
-  }
-
-  @Override
-  public void setDefaultValue(Object value) {
-    defaultValue = value;
-  }
-
-  @Override
-  public Object defaultValue() { return defaultValue; }
-
-  @Override
-  public void setDefaultFromString(String value) {
-    this.defaultValue = valueFromString(value);
-  }
-
-  @Override
-  public String defaultStringValue() {
-    return valueToString(defaultValue);
-  }
-
-  @Override
-  public void setTypeConverter(ColumnConversionFactory factory) {
-    shimFactory = factory;
+  public DateTimeFormatter dateTimeFormatter() {
+    String formatValue = format();
+    try {
+      switch (type) {
+        case TIME:
+          return formatValue == null
+            ? DateTimeFormatter.ISO_TIME.withZone(ZoneOffset.UTC) : DateTimeFormatter.ofPattern(formatValue);
+        case DATE:
+          formatValue = format();
+          return formatValue == null
+            ? DateTimeFormatter.ISO_DATE.withZone(ZoneOffset.UTC) : DateTimeFormatter.ofPattern(formatValue);
+        case TIMESTAMP:
+          formatValue = format();
+          return formatValue == null
+            ? DateTimeFormatter.ISO_DATE_TIME.withZone(ZoneOffset.UTC) : DateTimeFormatter.ofPattern(formatValue);
+        default:
+          throw new IllegalArgumentException("Column is not a date/time type: " + type.toString());
+      }
+    } catch (IllegalArgumentException | DateTimeParseException e) {
+      throw new IllegalArgumentException(String.format("The format \"%s\" is not valid for type %s",
+          formatValue, type), e);
+    }
   }
 
   @Override
-  public ColumnConversionFactory typeConverter() { return shimFactory; }
-
-  @Override
   public ColumnMetadata cloneEmpty() {
     return new PrimitiveColumnMetadata(this);
   }
 
   public ColumnMetadata mergeWith(MaterializedField field) {
     PrimitiveColumnMetadata merged = new PrimitiveColumnMetadata(field);
-    merged.setExpectedElementCount(expectedElementCount);
-    merged.setExpectedWidth(Math.max(expectedWidth, field.getPrecision()));
-    merged.setProjected(projected);
+    merged.setExpectedWidth(Math.max(expectedWidth(), field.getPrecision()));
+    merged.setProperties(properties());
     return merged;
   }
 
@@ -266,7 +229,8 @@ public class PrimitiveColumnMetadata extends AbstractColumnMetadata {
    * @param value value in string literal form
    * @return Object instance
    */
-  private Object valueFromString(String value) {
+  @Override
+  public Object valueFromString(String value) {
     if (value == null) {
       return null;
     }
@@ -288,28 +252,22 @@ public class PrimitiveColumnMetadata extends AbstractColumnMetadata {
         case VARBINARY:
           return value;
         case TIME:
-          DateTimeFormatter timeFormatter = formatValue == null
-            ? DateTimeFormatter.ISO_TIME.withZone(ZoneOffset.UTC) : DateTimeFormatter.ofPattern(formatValue);
-          return LocalTime.parse(value, timeFormatter);
+          return LocalTime.parse(value, dateTimeFormatter());
         case DATE:
-          DateTimeFormatter dateFormatter = formatValue == null
-            ? DateTimeFormatter.ISO_DATE.withZone(ZoneOffset.UTC) : DateTimeFormatter.ofPattern(formatValue);
-          return LocalDate.parse(value, dateFormatter);
+          return LocalDate.parse(value, dateTimeFormatter());
         case TIMESTAMP:
-          DateTimeFormatter dateTimeFormatter = formatValue == null
-            ? DateTimeFormatter.ISO_DATE_TIME.withZone(ZoneOffset.UTC) : DateTimeFormatter.ofPattern(formatValue);
-          return ZonedDateTime.parse(value, dateTimeFormatter);
+          return ZonedDateTime.parse(value, dateTimeFormatter());
         case INTERVAL:
         case INTERVALDAY:
         case INTERVALYEAR:
           return Period.parse(value);
         default:
-          logger.warn("Unsupported type {} for default value {}, ignore and return null", type, value);
-          return null;
+          throw new IllegalArgumentException("Unsupported conversion: " + type.toString());
       }
     } catch (IllegalArgumentException | DateTimeParseException e) {
-      logger.warn("Error while parsing type {} default value {}, ignore and return null", type, value, e);
-      return null;
+      logger.warn("Error while parsing type {} default value {}", type, value, e);
+      throw new IllegalArgumentException(String.format("The string \"%s\" is not valid for type %s",
+          value, type), e);
     }
   }
 
@@ -319,26 +277,29 @@ public class PrimitiveColumnMetadata extends AbstractColumnMetadata {
    * @param value value instance
    * @return value in string literal representation
    */
-  private String valueToString(Object value) {
+  @Override
+  public String valueToString(Object value) {
     if (value == null) {
       return null;
     }
     switch (type) {
       case TIME:
+        String formatValue = format();
         DateTimeFormatter timeFormatter = formatValue == null
           ? DateTimeFormatter.ISO_TIME.withZone(ZoneOffset.UTC) : DateTimeFormatter.ofPattern(formatValue);
         return timeFormatter.format((LocalTime) value);
       case DATE:
+        formatValue = format();
         DateTimeFormatter dateFormatter = formatValue == null
           ? DateTimeFormatter.ISO_DATE.withZone(ZoneOffset.UTC) : DateTimeFormatter.ofPattern(formatValue);
         return dateFormatter.format((LocalDate) value);
       case TIMESTAMP:
+        formatValue = format();
         DateTimeFormatter dateTimeFormatter = formatValue == null
           ? DateTimeFormatter.ISO_DATE_TIME.withZone(ZoneOffset.UTC) : DateTimeFormatter.ofPattern(formatValue);
         return dateTimeFormatter.format((ZonedDateTime) value);
       default:
-        return value.toString();
+       return value.toString();
     }
   }
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/TupleSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/TupleSchema.java
index 283ee64..7e90ff8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/TupleSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/TupleSchema.java
@@ -28,7 +28,6 @@ import org.apache.drill.exec.record.MaterializedField;
 
 import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -47,14 +46,12 @@ import java.util.stream.Collectors;
   setterVisibility = JsonAutoDetect.Visibility.NONE)
 @JsonInclude(JsonInclude.Include.NON_DEFAULT)
 @JsonPropertyOrder({"columns", "properties"})
-public class TupleSchema implements TupleMetadata {
+public class TupleSchema extends AbstractPropertied implements TupleMetadata {
 
   private MapColumnMetadata parentMap;
   private final TupleNameSpace<ColumnMetadata> nameSpace = new TupleNameSpace<>();
-  private final Map<String, String> properties = new LinkedHashMap<>();
 
-  public TupleSchema() {
-  }
+  public TupleSchema() { }
 
   @JsonCreator
   public TupleSchema(@JsonProperty("columns") List<AbstractColumnMetadata> columns,
@@ -218,28 +215,20 @@ public class TupleSchema implements TupleMetadata {
       .map(ColumnMetadata::toString)
       .collect(Collectors.joining(", ")));
 
-    if (!properties.isEmpty()) {
+    if (hasProperties()) {
       if (!nameSpace.entries().isEmpty()) {
         builder.append(", ");
       }
-      builder.append("properties: ").append(properties);
+      builder.append("properties: ").append(properties());
     }
 
     builder.append("]");
     return builder.toString();
   }
 
-  @Override
-  public void setProperties(Map<String, String> properties) {
-    if (properties == null) {
-      return;
-    }
-    this.properties.putAll(properties);
-  }
-
   @JsonProperty("properties")
   @Override
   public Map<String, String> properties() {
-    return properties;
+    return super.properties();
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/parser/SchemaVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/parser/SchemaVisitor.java
index 6ea682b..62bcbf5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/parser/SchemaVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/parser/SchemaVisitor.java
@@ -95,10 +95,10 @@ public class SchemaVisitor extends SchemaParserBaseVisitor<TupleMetadata> {
       ColumnMetadata columnMetadata = ctx.simple_type().accept(new TypeVisitor(name, mode));
       StringValueVisitor stringValueVisitor = new StringValueVisitor();
       if (ctx.format_value() != null) {
-        columnMetadata.setFormatValue(stringValueVisitor.visit(ctx.format_value().string_value()));
+        columnMetadata.setFormat(stringValueVisitor.visit(ctx.format_value().string_value()));
       }
       if (ctx.default_value() != null) {
-        columnMetadata.setDefaultFromString(stringValueVisitor.visit(ctx.default_value().string_value()));
+        columnMetadata.setDefaultValue(stringValueVisitor.visit(ctx.default_value().string_value()));
       }
       return columnMetadata;
     }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestSchemaCommands.java b/exec/java-exec/src/test/java/org/apache/drill/TestSchemaCommands.java
index f4b1e69..92a7b27 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestSchemaCommands.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestSchemaCommands.java
@@ -371,15 +371,15 @@ public class TestSchemaCommands extends ClusterTest {
       assertEquals(3, schema.size());
 
       ColumnMetadata a = schema.metadata("a");
-      assertTrue(a.defaultValue() instanceof Integer);
-      assertEquals(10, a.defaultValue());
-      assertEquals("10", a.defaultStringValue());
+      assertTrue(a.decodeDefaultValue() instanceof Integer);
+      assertEquals(10, a.decodeDefaultValue());
+      assertEquals("10", a.defaultValue());
 
       ColumnMetadata b = schema.metadata("b");
-      assertTrue(b.defaultValue() instanceof LocalDate);
-      assertEquals("yyyy-MM-dd", b.formatValue());
-      assertEquals(LocalDate.parse("2017-01-31"), b.defaultValue());
-      assertEquals("2017-01-31", b.defaultStringValue());
+      assertTrue(b.decodeDefaultValue() instanceof LocalDate);
+      assertEquals("yyyy-MM-dd", b.format());
+      assertEquals(LocalDate.parse("2017-01-31"), b.decodeDefaultValue());
+      assertEquals("2017-01-31", b.defaultValue());
 
       ColumnMetadata c = schema.metadata("c");
       Map<String, String> properties = new LinkedHashMap<>();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestProjectedTuple.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestProjectedTuple.java
index 5fb046e..424ee6f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestProjectedTuple.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestProjectedTuple.java
@@ -352,10 +352,9 @@ public class TestProjectedTuple {
     assertEquals(ProjectionType.UNPROJECTED, projSet.projectionType("foo"));
   }
 
-  //@Test
-  //@Ignore("Drill syntax does not support map arrays")
-  @SuppressWarnings("unused")
-  private void testMapArray() {
+  @Test
+  // Drill syntax does not support map arrays
+  public void testMapArray() {
     RequestedTuple projSet = RequestedTupleImpl.parse(
         RowSetTestUtils.projectList("a[1].x"));
     List<RequestedColumn> cols = projSet.projections();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProjection.java
index 6629512..035ae75 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProjection.java
@@ -19,20 +19,24 @@ package org.apache.drill.exec.physical.rowSet.impl;
 
 import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue;
 import static org.apache.drill.test.rowSet.RowSetUtilities.objArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.intArray;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.util.Arrays;
 import java.util.List;
 
 import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
 import org.apache.drill.exec.physical.rowSet.RowSetLoader;
 import org.apache.drill.exec.physical.rowSet.impl.ResultSetLoaderImpl.ResultSetOptions;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
@@ -144,6 +148,58 @@ public class TestResultSetLoaderProjection extends SubOperatorTest {
   }
 
   @Test
+  public void testArrayProjection() {
+    List<SchemaPath> selection = RowSetTestUtils.projectList("a1", "a2[0]");
+    TupleMetadata schema = new SchemaBuilder()
+        .addArray("a1", MinorType.INT)
+        .addArray("a2", MinorType.INT)
+        .addArray("a3", MinorType.INT)
+        .buildSchema();
+    ResultSetOptions options = new OptionBuilder()
+        .setProjection(selection)
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    // Verify the projected columns
+
+    TupleMetadata actualSchema = rootWriter.tupleSchema();
+    ColumnMetadata a1Md = actualSchema.metadata("a1");
+    assertTrue(a1Md.isArray());
+    assertTrue(a1Md.isProjected());
+
+    ColumnMetadata a2Md = actualSchema.metadata("a2");
+    assertTrue(a2Md.isArray());
+    assertTrue(a2Md.isProjected());
+
+    ColumnMetadata a3Md = actualSchema.metadata("a3");
+    assertTrue(a3Md.isArray());
+    assertFalse(a3Md.isProjected());
+
+    // Write a couple of rows.
+
+    rsLoader.startBatch();
+    rootWriter.start();
+    rootWriter
+      .addRow(intArray(10, 100), intArray(20, 200), intArray(30, 300))
+      .addRow(intArray(11, 101), intArray(21, 201), intArray(31, 301));
+
+    // Verify. Only the projected columns appear in the result set.
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addArray("a1", MinorType.INT)
+        .addArray("a2", MinorType.INT)
+      .buildSchema();
+    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+      .addRow(intArray(10, 100), intArray(20, 200))
+      .addRow(intArray(11, 101), intArray(21, 201))
+      .build();
+    RowSetUtilities.verify(expected, fixture.wrap(rsLoader.harvest()));
+    rsLoader.close();
+  }
+
+  @Test
   public void testMapProjection() {
     List<SchemaPath> selection = RowSetTestUtils.projectList("m1", "m2.d");
     TupleMetadata schema = new SchemaBuilder()
@@ -399,4 +455,135 @@ public class TestResultSetLoaderProjection extends SubOperatorTest {
 
     rsLoader.close();
   }
+
+  @Test
+  public void testScalarArrayConflict() {
+    List<SchemaPath> selection = RowSetTestUtils.projectList("col[0]");
+    TupleMetadata schema = new SchemaBuilder()
+        .add("col", MinorType.VARCHAR)
+        .buildSchema();
+    ResultSetOptions options = new OptionBuilder()
+        .setProjection(selection)
+        .setSchema(schema)
+        .build();
+    try {
+      new ResultSetLoaderImpl(fixture.allocator(), options);
+      fail();
+    } catch (UserException e) {
+      assertTrue(e.getErrorType() == ErrorType.VALIDATION);
+    }
+  }
+
+  @Test
+  public void testScalarMapConflict() {
+    List<SchemaPath> selection = RowSetTestUtils.projectList("col.child");
+    TupleMetadata schema = new SchemaBuilder()
+        .add("col", MinorType.VARCHAR)
+        .buildSchema();
+    ResultSetOptions options = new OptionBuilder()
+        .setProjection(selection)
+        .setSchema(schema)
+        .build();
+    try {
+      new ResultSetLoaderImpl(fixture.allocator(), options);
+      fail();
+    } catch (UserException e) {
+      assertTrue(e.getErrorType() == ErrorType.VALIDATION);
+    }
+  }
+
+  @Test
+  public void testScalarMapArrayConflict() {
+    List<SchemaPath> selection = RowSetTestUtils.projectList("col[0].child");
+    TupleMetadata schema = new SchemaBuilder()
+        .add("col", MinorType.VARCHAR)
+        .buildSchema();
+    ResultSetOptions options = new OptionBuilder()
+        .setProjection(selection)
+        .setSchema(schema)
+        .build();
+    try {
+      new ResultSetLoaderImpl(fixture.allocator(), options);
+      fail();
+    } catch (UserException e) {
+      assertTrue(e.getErrorType() == ErrorType.VALIDATION);
+    }
+  }
+
+  @Test
+  public void testArrayMapConflict() {
+    List<SchemaPath> selection = RowSetTestUtils.projectList("col.child");
+    TupleMetadata schema = new SchemaBuilder()
+        .addArray("col", MinorType.VARCHAR)
+        .buildSchema();
+    ResultSetOptions options = new OptionBuilder()
+        .setProjection(selection)
+        .setSchema(schema)
+        .build();
+    try {
+      new ResultSetLoaderImpl(fixture.allocator(), options);
+      fail();
+    } catch (UserException e) {
+      assertTrue(e.getErrorType() == ErrorType.VALIDATION);
+    }
+  }
+
+  @Test
+  public void testArrayMapArrayConflict() {
+    List<SchemaPath> selection = RowSetTestUtils.projectList("col[0].child");
+    TupleMetadata schema = new SchemaBuilder()
+        .addArray("col", MinorType.VARCHAR)
+        .buildSchema();
+    ResultSetOptions options = new OptionBuilder()
+        .setProjection(selection)
+        .setSchema(schema)
+        .build();
+    try {
+      new ResultSetLoaderImpl(fixture.allocator(), options);
+      fail();
+    } catch (UserException e) {
+      assertTrue(e.getErrorType() == ErrorType.VALIDATION);
+    }
+  }
+
+  @Test
+  public void testMapArrayConflict() {
+    List<SchemaPath> selection = RowSetTestUtils.projectList("col[0]");
+    TupleMetadata schema = new SchemaBuilder()
+        .addMap("col")
+          .add("child", MinorType.VARCHAR)
+          .resumeSchema()
+        .buildSchema();
+    ResultSetOptions options = new OptionBuilder()
+        .setProjection(selection)
+        .setSchema(schema)
+        .build();
+    try {
+      new ResultSetLoaderImpl(fixture.allocator(), options);
+      fail();
+    } catch (UserException e) {
+      assertTrue(e.getErrorType() == ErrorType.VALIDATION);
+    }
+  }
+
+
+  @Test
+  public void testMapMapArrayConflict() {
+    List<SchemaPath> selection = RowSetTestUtils.projectList("col[0].child");
+    TupleMetadata schema = new SchemaBuilder()
+        .addMap("col")
+          .add("child", MinorType.VARCHAR)
+          .resumeSchema()
+        .buildSchema();
+    ResultSetOptions options = new OptionBuilder()
+        .setProjection(selection)
+        .setSchema(schema)
+        .build();
+    try {
+      new ResultSetLoaderImpl(fixture.allocator(), options);
+      fail();
+    } catch (UserException e) {
+      assertTrue(e.getErrorType() == ErrorType.VALIDATION);
+    }
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProtocol.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProtocol.java
index dc0236d..eb342da 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProtocol.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProtocol.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.physical.rowSet.impl;
 
-import static org.apache.drill.test.rowSet.RowSetUtilities.intArray;
 import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -45,7 +44,6 @@ import org.apache.drill.exec.vector.accessor.TupleWriter.UndefinedColumnExceptio
 import org.apache.drill.test.SubOperatorTest;
 import org.apache.drill.test.rowSet.RowSet;
 import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
-import org.apache.drill.test.rowSet.test.TestColumnConverter.TestConverter;
 import org.apache.drill.test.rowSet.RowSetReader;
 import org.apache.drill.test.rowSet.RowSetUtilities;
 import org.junit.Test;
@@ -607,55 +605,4 @@ public class TestResultSetLoaderProtocol extends SubOperatorTest {
 
     rsLoader.close();
   }
-
-  /**
-   * Test the use of a column type converter in the result set loader for
-   * required, nullable and repeated columns.
-   */
-
-  @Test
-  public void testTypeConversion() {
-    TupleMetadata schema = new SchemaBuilder()
-        .add("n1", MinorType.INT)
-        .addNullable("n2", MinorType.INT)
-        .addArray("n3", MinorType.INT)
-        .buildSchema();
-
-    // Add a type converter. Passed in as a factory
-    // since we must create a new one for each row set writer.
-
-    schema.metadata("n1").setTypeConverter(TestConverter.factory());
-    schema.metadata("n2").setTypeConverter(TestConverter.factory());
-    schema.metadata("n3").setTypeConverter(TestConverter.factory());
-
-    ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
-        .setSchema(schema)
-        .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
-        .build();
-    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
-    rsLoader.startBatch();
-
-    // Write data as both a string as an integer
-
-    RowSetLoader rootWriter = rsLoader.writer();
-    rootWriter.addRow("123", "12", strArray("123", "124"));
-    rootWriter.addRow(234, 23, intArray(234, 235));
-    RowSet actual = fixture.wrap(rsLoader.harvest());
-
-    // Build the expected vector without a type converter.
-
-    TupleMetadata expectedSchema = new SchemaBuilder()
-        .add("n1", MinorType.INT)
-        .addNullable("n2", MinorType.INT)
-        .addArray("n3", MinorType.INT)
-        .buildSchema();
-    final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
-        .addRow(123, 12, intArray(123, 124))
-        .addRow(234, 23, intArray(234, 235))
-        .build();
-
-    // Compare
-
-    RowSetUtilities.verify(expected, actual);
-  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderTypeConversion.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderTypeConversion.java
new file mode 100644
index 0000000..a46ef9e
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderTypeConversion.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import static org.apache.drill.test.rowSet.RowSetUtilities.intArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
+import org.apache.drill.exec.physical.rowSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.test.TestColumnConverter;
+import org.apache.drill.test.rowSet.test.TestColumnConverter.ConverterFactory;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(RowSetTests.class)
+public class TestResultSetLoaderTypeConversion extends SubOperatorTest {
+
+  /**
+   * Test the use of a column type converter in the result set loader for
+   * required, nullable and repeated columns.
+   * <p>
+   * This tests the simple case: keeping the same column type, just
+   * inserting a conversion "shim" on top.
+   */
+
+  @Test
+  public void testConversionShim() {
+    TupleMetadata schema = new SchemaBuilder()
+        .add("n1", MinorType.INT)
+        .addNullable("n2", MinorType.INT)
+        .addArray("n3", MinorType.INT)
+        .buildSchema();
+
+    // Add a type converter. Passed in as a factory
+    // since we must create a new one for each row set writer.
+
+    TestColumnConverter.setConverterProp(schema.metadata("n1"),
+        TestColumnConverter.CONVERT_TO_INT);
+    TestColumnConverter.setConverterProp(schema.metadata("n2"),
+        TestColumnConverter.CONVERT_TO_INT);
+    TestColumnConverter.setConverterProp(schema.metadata("n3"),
+        TestColumnConverter.CONVERT_TO_INT);
+
+    SchemaTransformer schemaTransform = new DefaultSchemaTransformer(new ConverterFactory());
+    ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setSchema(schema)
+        .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+        .setSchemaTransform(schemaTransform)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    rsLoader.startBatch();
+
+    // Write data as both a string as an integer
+
+    RowSetLoader rootWriter = rsLoader.writer();
+    rootWriter.addRow("123", "12", strArray("123", "124"));
+    rootWriter.addRow(234, 23, intArray(234, 235));
+    RowSet actual = fixture.wrap(rsLoader.harvest());
+
+    // Build the expected vector without a type converter.
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .add("n1", MinorType.INT)
+        .addNullable("n2", MinorType.INT)
+        .addArray("n3", MinorType.INT)
+        .buildSchema();
+    final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow(123, 12, intArray(123, 124))
+        .addRow(234, 23, intArray(234, 235))
+        .build();
+
+    // Compare
+
+    RowSetUtilities.verify(expected, actual);
+  }
+
+  /**
+   * Test full-blown type conversion using the standard Drill properties.
+   */
+
+  @Test
+  public void testTypeConversion() {
+    TupleMetadata outputSchema = new SchemaBuilder()
+        .add("n1", MinorType.INT)
+        .addNullable("n2", MinorType.INT)
+        .addArray("n3", MinorType.INT)
+        .buildSchema();
+
+    TupleMetadata inputSchema = new SchemaBuilder()
+        .add("n1", MinorType.VARCHAR)
+        .addNullable("n2", MinorType.VARCHAR)
+        .addArray("n3", MinorType.VARCHAR)
+        .buildSchema();
+
+    ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setSchema(inputSchema)
+        .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+        .setSchemaTransform(new SchemaTransformerImpl(outputSchema))
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    rsLoader.startBatch();
+
+    // Write data as both a string as an integer
+
+    RowSetLoader rootWriter = rsLoader.writer();
+    rootWriter.addRow("123", "12", strArray("123", "124"));
+    rootWriter.addRow(234, 23, intArray(234, 235));
+    RowSet actual = fixture.wrap(rsLoader.harvest());
+
+    // Build the expected vector without a type converter.
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .add("n1", MinorType.INT)
+        .addNullable("n2", MinorType.INT)
+        .addArray("n3", MinorType.INT)
+        .buildSchema();
+    final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow(123, 12, intArray(123, 124))
+        .addRow(234, 23, intArray(234, 235))
+        .build();
+
+    // Compare
+
+    RowSetUtilities.verify(expected, actual);
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/TestMetadataProperties.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/TestMetadataProperties.java
new file mode 100644
index 0000000..0ac3638
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/TestMetadataProperties.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.record.metadata;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.BasicTypeHelper;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(RowSetTests.class)
+public class TestMetadataProperties {
+
+  @Test
+  public void testBasics() {
+
+    AbstractPropertied props = new AbstractPropertied();
+    assertFalse(props.hasProperties());
+
+    // Copy constructor works
+
+    AbstractPropertied second = new AbstractPropertied(props);
+    assertFalse(second.hasProperties());
+
+    // Getting a property does not change state
+
+    assertNull(props.property("foo"));
+    assertFalse(props.hasProperties());
+
+    // Clearing a property does not change state
+
+    props.setProperty("foo", null);
+    assertNull(props.property("foo"));
+    assertFalse(props.hasProperties());
+
+    // Getting all properties does not change state (though it
+    // does materialize properties)
+
+    assertNotNull(props.properties());
+    assertTrue(props.properties().isEmpty());
+    assertFalse(props.hasProperties());
+
+    // Setting a property works as expected
+
+    props.setProperty("foo", "bar");
+    assertEquals("bar", props.property("foo"));
+    assertTrue(props.hasProperties());
+    assertEquals("bar", props.properties().get("foo"));
+
+    // As does clearing a property
+
+    props.setProperty("foo", null);
+    assertNull(props.property("foo"));
+    assertFalse(props.hasProperties());
+
+    // Setting multiple properties overwrites duplicates,
+    // leaves others.
+
+    props.setProperty("foo", "bar");
+    props.setProperty("fred", "wilma");
+
+    Map<String, String> other = new HashMap<>();
+    other.put("fred", "pebbles");
+    other.put("barney", "bambam");
+
+    props.setProperties(other);
+    assertTrue(props.hasProperties());
+    assertEquals(3, props.properties().size());
+    assertEquals("bar", props.property("foo"));
+    assertEquals("pebbles", props.property("fred"));
+    assertEquals("bambam", props.property("barney"));
+
+    // Copy constructor works
+
+    second = new AbstractPropertied(props);
+    assertTrue(second.hasProperties());
+    assertEquals(3, second.properties().size());
+  }
+
+  @Test
+  public void testAccessor() {
+    AbstractPropertied props = new AbstractPropertied();
+
+    // Accessors with default
+
+    assertEquals("bar", PropertyAccessor.getString(props, "foo", "bar"));
+    assertEquals(10, PropertyAccessor.getInt(props, "foo", 10));
+    assertEquals(true, PropertyAccessor.getBoolean(props, "foo", true));
+
+    // Accessors without default
+
+    assertNull(props.property("foo"));
+    assertEquals(0, PropertyAccessor.getInt(props, "foo"));
+    assertEquals(false, PropertyAccessor.getBoolean(props, "foo"));
+
+    // Set, then get, property
+
+    props.setProperty("str", "value");
+    assertEquals("value", props.property("str"));
+
+    PropertyAccessor.set(props, "int", 20);
+    assertEquals("20", props.property("int"));
+    assertEquals(20, PropertyAccessor.getInt(props, "int"));
+
+    PropertyAccessor.set(props, "bool", true);
+    assertEquals("true", props.property("bool"));
+    assertEquals(true, PropertyAccessor.getBoolean(props, "bool"));
+  }
+
+  @Test
+  public void testWidth() {
+    PrimitiveColumnMetadata col = new PrimitiveColumnMetadata("c", MinorType.VARCHAR, DataMode.OPTIONAL);
+    AbstractPropertied props = col;
+
+    // Width is not set by default
+    assertFalse(props.hasProperties());
+
+    // But is estimated on demand
+
+    assertEquals(BasicTypeHelper.WIDTH_ESTIMATE, col.expectedWidth());
+
+    // Set an explicit width
+
+    col.setExpectedWidth(20);
+    assertTrue(props.hasProperties());
+    assertEquals(20, col.expectedWidth());
+    assertEquals("20", col.property(ColumnMetadata.EXPECTED_WIDTH_PROP));
+
+    // Clear the width
+
+    col.setProperty(ColumnMetadata.EXPECTED_WIDTH_PROP, null);
+    assertFalse(props.hasProperties());
+    assertEquals(BasicTypeHelper.WIDTH_ESTIMATE, col.expectedWidth());
+  }
+
+  @Test
+  public void testProjected() {
+    PrimitiveColumnMetadata col = new PrimitiveColumnMetadata("c", MinorType.INT, DataMode.OPTIONAL);
+    AbstractPropertied props = col;
+    assertTrue(col.isProjected());
+    col.setProjected(true);
+    assertTrue(col.isProjected());
+
+    // Projected is the default, so no properties set.
+    assertFalse(props.hasProperties());
+
+    col.setProjected(false);
+    assertFalse(col.isProjected());
+    assertTrue(props.hasProperties());
+
+    // Sanity check that the expected prop was set
+
+    assertEquals("false", col.property(ColumnMetadata.PROJECTED_PROP));
+  }
+
+  @Test
+  public void testFormat() {
+    PrimitiveColumnMetadata col = new PrimitiveColumnMetadata("c", MinorType.INT, DataMode.OPTIONAL);
+    AbstractPropertied props = col;
+    assertNull(col.format());
+    col.setFormat("###");
+    assertEquals("###", col.format());
+    assertTrue(props.hasProperties());
+    assertEquals("###", col.property(ColumnMetadata.FORMAT_PROP));
+    col.setFormat(null);
+    assertFalse(props.hasProperties());
+    assertNull(col.format());
+  }
+
+  @Test
+  public void testDefault() {
+    PrimitiveColumnMetadata col = new PrimitiveColumnMetadata("c", MinorType.VARCHAR, DataMode.OPTIONAL);
+    AbstractPropertied props = col;
+    assertNull(col.defaultValue());
+    col.setDefaultValue("empty");
+    assertEquals("empty", col.defaultValue());
+    assertTrue(props.hasProperties());
+    assertEquals("empty", col.property(ColumnMetadata.DEFAULT_VALUE_PROP));
+    col.setDefaultValue(null);
+    assertFalse(props.hasProperties());
+    assertNull(col.defaultValue());
+  }
+
+  @Test
+  public void testStringEncode() {
+    PrimitiveColumnMetadata col = new PrimitiveColumnMetadata("c", MinorType.VARCHAR, DataMode.OPTIONAL);
+    String encoded = col.valueToString("foo");
+    assertEquals("foo", encoded);
+    col.setDefaultValue(encoded);
+    assertEquals("foo", col.decodeDefaultValue());
+  }
+
+  @Test
+  public void testIntEncode() {
+    PrimitiveColumnMetadata col = new PrimitiveColumnMetadata("c", MinorType.INT, DataMode.OPTIONAL);
+    String encoded = col.valueToString(20);
+    assertEquals("20", encoded);
+    assertEquals(20, col.valueFromString(encoded));
+    col.setDefaultValue(encoded);
+    assertEquals(20, col.decodeDefaultValue());
+  }
+
+  // TODO: Test encode/decode for other types
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestTupleSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/TestTupleSchema.java
similarity index 97%
rename from exec/java-exec/src/test/java/org/apache/drill/exec/record/TestTupleSchema.java
rename to exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/TestTupleSchema.java
index a59a6c1..a33d8d4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestTupleSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/TestTupleSchema.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.record;
+package org.apache.drill.exec.record.metadata;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -32,25 +32,20 @@ import java.util.List;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.metadata.ColumnBuilder;
-import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.ColumnMetadata.StructureType;
-import org.apache.drill.exec.record.metadata.MapColumnMetadata;
-import org.apache.drill.exec.record.metadata.MetadataUtils;
-import org.apache.drill.exec.record.metadata.PrimitiveColumnMetadata;
-import org.apache.drill.exec.record.metadata.SchemaBuilder;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
-import org.apache.drill.exec.record.metadata.TupleSchema;
-import org.apache.drill.exec.record.metadata.VariantColumnMetadata;
-import org.apache.drill.exec.record.metadata.VariantMetadata;
 import org.apache.drill.test.SubOperatorTest;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 /**
  * Test the tuple and column metadata, including extended attributes.
  */
+@Category(RowSetTests.class)
 public class TestTupleSchema extends SubOperatorTest {
 
   /**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/TestSchemaProvider.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/TestSchemaProvider.java
index 435ec0d..0654493 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/TestSchemaProvider.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/TestSchemaProvider.java
@@ -184,7 +184,9 @@ public class TestSchemaProvider {
       + "        \"name\" : \"i\",\n"
       + "        \"type\" : \"INT\",\n"
       + "        \"mode\" : \"REQUIRED\",\n"
-      + "        \"default\" : \"10\"\n"
+      + "        \"properties\" : {\n"
+      + "          \"drill.default\" : \"10\"\n"
+      + "        }\n"
       + "      },\n"
       + "      {\n"
       + "        \"name\" : \"a\",\n"
@@ -199,7 +201,9 @@ public class TestSchemaProvider {
       + "        \"name\" : \"t\",\n"
       + "        \"type\" : \"DATE\",\n"
       + "        \"mode\" : \"OPTIONAL\",\n"
-      + "        \"format\" : \"yyyy-mm-dd\"\n"
+      + "        \"properties\" : {\n"
+      + "          \"drill.format\" : \"yyyy-mm-dd\"\n"
+      + "        }\n"
       + "      }\n"
       + "    ],\n"
       + "    \"properties\" : {\n"
@@ -227,7 +231,7 @@ public class TestSchemaProvider {
     ColumnMetadata i = metadata.metadata("i");
     assertEquals(TypeProtos.MinorType.INT, i.type());
     assertEquals(TypeProtos.DataMode.REQUIRED, i.mode());
-    assertEquals(10, i.defaultValue());
+    assertEquals(10, i.decodeDefaultValue());
 
     ColumnMetadata a = metadata.metadata("a");
     assertEquals(TypeProtos.MinorType.VARCHAR, a.type());
@@ -240,7 +244,7 @@ public class TestSchemaProvider {
     ColumnMetadata t = metadata.metadata("t");
     assertEquals(TypeProtos.MinorType.DATE, t.type());
     assertEquals(TypeProtos.DataMode.OPTIONAL, t.mode());
-    assertEquals("yyyy-mm-dd", t.formatValue());
+    assertEquals("yyyy-mm-dd", t.format());
 
     assertTrue(schemaContainer.getVersion().isUndefined());
   }
@@ -279,5 +283,4 @@ public class TestSchemaProvider {
     assertTrue(provider.exists());
     assertNotNull(provider.read());
   }
-
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/parser/TestSchemaParser.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/parser/TestSchemaParser.java
index 09b4c8a..42542b0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/parser/TestSchemaParser.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/parser/TestSchemaParser.java
@@ -257,7 +257,7 @@ public class TestSchemaParser {
     String value = "`a` DATE NOT NULL FORMAT 'yyyy-MM-dd'";
     TupleMetadata schema = SchemaExprParser.parseSchema(value);
     ColumnMetadata columnMetadata = schema.metadata("a");
-    assertEquals("yyyy-MM-dd", columnMetadata.formatValue());
+    assertEquals("yyyy-MM-dd", columnMetadata.format());
     assertEquals(value, columnMetadata.columnString());
   }
 
@@ -266,9 +266,9 @@ public class TestSchemaParser {
     String value = "`a` INT NOT NULL DEFAULT '12'";
     TupleMetadata schema = SchemaExprParser.parseSchema(value);
     ColumnMetadata columnMetadata = schema.metadata("a");
-    assertTrue(columnMetadata.defaultValue() instanceof Integer);
-    assertEquals(12, columnMetadata.defaultValue());
-    assertEquals("12", columnMetadata.defaultStringValue());
+    assertTrue(columnMetadata.decodeDefaultValue() instanceof Integer);
+    assertEquals(12, columnMetadata.decodeDefaultValue());
+    assertEquals("12", columnMetadata.defaultValue());
     assertEquals(value, columnMetadata.columnString());
   }
 
@@ -277,9 +277,9 @@ public class TestSchemaParser {
     String value = "`a` DATE NOT NULL FORMAT 'yyyy-MM-dd' DEFAULT '2018-12-31'";
     TupleMetadata schema = SchemaExprParser.parseSchema(value);
     ColumnMetadata columnMetadata = schema.metadata("a");
-    assertTrue(columnMetadata.defaultValue() instanceof LocalDate);
-    assertEquals(LocalDate.of(2018, 12, 31), columnMetadata.defaultValue());
-    assertEquals("2018-12-31", columnMetadata.defaultStringValue());
+    assertTrue(columnMetadata.decodeDefaultValue() instanceof LocalDate);
+    assertEquals(LocalDate.of(2018, 12, 31), columnMetadata.decodeDefaultValue());
+    assertEquals("2018-12-31", columnMetadata.defaultValue());
     assertEquals(value, columnMetadata.columnString());
   }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java
index 056b8e4..2e176c3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java
@@ -97,5 +97,4 @@ public class BaseCsvTest extends ClusterTest {
       }
     }
   }
-
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java
index acde6f9..236e39a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.vector.accessor.convert.ColumnConversionFactory;
 import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet;
 import org.apache.drill.test.rowSet.RowSetWriterImpl.WriterIndexImpl;
 
@@ -46,6 +47,10 @@ public class DirectRowSet extends AbstractSingleRowSet implements ExtendableRowS
 
   public static class RowSetWriterBuilder extends BaseWriterBuilder {
 
+    public RowSetWriterBuilder(ColumnConversionFactory conversionFactory) {
+      super(conversionFactory);
+    }
+
     public RowSetWriter buildWriter(DirectRowSet rowSet) {
       WriterIndexImpl index = new WriterIndexImpl();
       TupleMetadata schema = rowSet.schema();
@@ -100,11 +105,15 @@ public class DirectRowSet extends AbstractSingleRowSet implements ExtendableRowS
 
   @Override
   public RowSetWriter writer(int initialRowCount) {
+    return writer(initialRowCount, null);
+  }
+
+  public RowSetWriter writer(int initialRowCount, ColumnConversionFactory conversionFactory) {
     if (container().hasRecordCount()) {
       throw new IllegalStateException("Row set already contains data");
     }
     allocate(initialRowCount);
-    return new RowSetWriterBuilder().buildWriter(this);
+    return new RowSetWriterBuilder(conversionFactory).buildWriter(this);
   }
 
   @Override
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java
index ff9b4f4..ae120d8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java
@@ -22,6 +22,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.metadata.MetadataUtils;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.convert.ColumnConversionFactory;
 import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
 
 import java.util.Set;
@@ -52,16 +53,26 @@ public final class RowSetBuilder {
    */
   @Deprecated
   public RowSetBuilder(BufferAllocator allocator, BatchSchema schema) {
-    this(allocator, MetadataUtils.fromFields(schema), 10);
+    this(allocator, MetadataUtils.fromFields(schema), 10, null);
   }
 
   public RowSetBuilder(BufferAllocator allocator, TupleMetadata schema) {
-    this(allocator, schema, 10);
+    this(allocator, schema, 10, null);
+  }
+
+  public RowSetBuilder(BufferAllocator allocator, TupleMetadata schema,
+      ColumnConversionFactory conversionFactory) {
+    this(allocator, schema, 10, conversionFactory);
   }
 
   public RowSetBuilder(BufferAllocator allocator, TupleMetadata schema, int capacity) {
+    this(allocator, schema, capacity, null);
+  }
+
+  public RowSetBuilder(BufferAllocator allocator, TupleMetadata schema,
+      int capacity, ColumnConversionFactory conversionFactory) {
     rowSet = DirectRowSet.fromSchema(allocator, schema);
-    writer = rowSet.writer(capacity);
+    writer = rowSet.writer(capacity, conversionFactory);
   }
 
   public RowSetWriter writer() { return writer; }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/DummyWriterTest.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/DummyWriterTest.java
index e353e6d..1329a86 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/DummyWriterTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/DummyWriterTest.java
@@ -68,8 +68,8 @@ public class DummyWriterTest extends SubOperatorTest {
 
     // We provide no vector. Factory should build us "dummy" writers.
 
-    writers.add(ColumnWriterFactory.buildColumnWriter(schema.metadata("a"), null));
-    writers.add(ColumnWriterFactory.buildColumnWriter(schema.metadata("b"), null));
+    writers.add(ColumnWriterFactory.buildColumnWriter(schema.metadata("a"), null, null));
+    writers.add(ColumnWriterFactory.buildColumnWriter(schema.metadata("b"), null, null));
     AbstractTupleWriter rootWriter = new RootWriterFixture(schema, writers);
 
     // Events are ignored.
@@ -141,8 +141,8 @@ public class DummyWriterTest extends SubOperatorTest {
       schema.metadata("m1").setProjected(false);
       TupleMetadata mapSchema = schema.metadata("m1").mapSchema();
       List<AbstractObjectWriter> members = new ArrayList<>();
-      members.add(ColumnWriterFactory.buildColumnWriter(mapSchema.metadata("a"), null));
-      members.add(ColumnWriterFactory.buildColumnWriter(mapSchema.metadata("b"), null));
+      members.add(ColumnWriterFactory.buildColumnWriter(mapSchema.metadata("a"), null, null));
+      members.add(ColumnWriterFactory.buildColumnWriter(mapSchema.metadata("b"), null, null));
       writers.add(MapWriter.buildMapWriter(schema.metadata("m1"), null, members));
     }
 
@@ -150,7 +150,7 @@ public class DummyWriterTest extends SubOperatorTest {
       schema.metadata("m2").setProjected(false);
       TupleMetadata mapSchema = schema.metadata("m2").mapSchema();
       List<AbstractObjectWriter> members = new ArrayList<>();
-      members.add(ColumnWriterFactory.buildColumnWriter(mapSchema.metadata("c"), null));
+      members.add(ColumnWriterFactory.buildColumnWriter(mapSchema.metadata("c"), null, null));
       writers.add(MapWriter.buildMapWriter(schema.metadata("m2"), null, members));
     }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java
index 906bc05..fa92c09 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java
@@ -251,7 +251,7 @@ public class PerformanceTool {
         vector.allocateNew(ROW_COUNT, 5 * ROW_COUNT);
         IntColumnWriter colWriter = new IntColumnWriter(vector.getDataVector());
         ColumnMetadata colSchema = MetadataUtils.fromField(vector.getField());
-        ArrayObjectWriter arrayWriter = ScalarArrayWriter.build(colSchema, vector, colWriter);
+        ArrayObjectWriter arrayWriter = ScalarArrayWriter.build(colSchema, vector, colWriter, null);
         TestWriterIndex index = new TestWriterIndex();
         arrayWriter.events().bindIndex(index);
         arrayWriter.events().startWrite();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestColumnConverter.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestColumnConverter.java
index f989b0c..c9fdb18 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestColumnConverter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestColumnConverter.java
@@ -17,36 +17,57 @@
  */
 package org.apache.drill.test.rowSet.test;
 
+import static org.apache.drill.test.rowSet.RowSetUtilities.intArray;
 import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
 
 import org.apache.drill.categories.RowSetTests;
-
-import static org.apache.drill.test.rowSet.RowSetUtilities.intArray;
-
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
-import org.apache.drill.exec.vector.accessor.ColumnConversionFactory;
+import org.apache.drill.exec.vector.accessor.InvalidConversionError;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
-import org.apache.drill.exec.vector.accessor.writer.AbstractWriteConverter;
-import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriter;
+import org.apache.drill.exec.vector.accessor.convert.AbstractWriteConverter;
+import org.apache.drill.exec.vector.accessor.convert.ColumnConversionFactory;
+import org.apache.drill.exec.vector.accessor.convert.StandardConversions;
+import org.apache.drill.exec.vector.accessor.convert.StandardConversions.ConversionDefn;
+import org.apache.drill.exec.vector.accessor.convert.StandardConversions.ConversionType;
 import org.apache.drill.test.SubOperatorTest;
 import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Instant;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
+import org.joda.time.Period;
 import org.apache.drill.test.rowSet.RowSetBuilder;
 import org.apache.drill.test.rowSet.RowSetUtilities;
-import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 /**
  * Tests the column type converter feature of the column metadata
  * and of the RowSetWriter.
+ * <p>
+ * Also tests the set of standard conversions.
+ * <p>
+ * TODO: At present, the set is incomplete. It handles the most
+ * common conversions, but others (such as Decimal) are incomplete.
  */
 
 @Category(RowSetTests.class)
 public class TestColumnConverter extends SubOperatorTest {
 
+  public static final String CONVERTER_PROP = "test.conversion";
+  public static final String CONVERT_TO_INT = "int";
+
+  public static void setConverterProp(ColumnMetadata colSchema, String value) {
+    colSchema.setProperty(CONVERTER_PROP, value);
+  }
+
   /**
    * Simple type converter that allows string-to-int conversions.
    * Inherits usual int value support from the base writer.
@@ -61,18 +82,35 @@ public class TestColumnConverter extends SubOperatorTest {
     public void setString(String value) {
       setInt(Integer.parseInt(value));
     }
+  }
 
-    public static ColumnConversionFactory factory() {
-      return new ColumnConversionFactory() {
-        @Override
-        public AbstractScalarWriter newWriter(ColumnMetadata colDefn,
-            ScalarWriter baseWriter) {
-           return new TestConverter(baseWriter);
-        }
-      };
+  /**
+   * Mock conversion factory that uses a property on the column metadata
+   * to indicate that a converter should be inserted. This is primarily a
+   * proof-of-concept: that the conversion factory works, and that it can
+   * use properties. Also verifies that the plumbing works for the
+   * three data modes.
+   */
+  public static class ConverterFactory implements ColumnConversionFactory {
+
+    @Override
+    public AbstractWriteConverter newWriter(ScalarWriter baseWriter) {
+      String value = baseWriter.schema().property(CONVERTER_PROP);
+      if (value == null) {
+        return null;
+      }
+      if (value.equals(CONVERT_TO_INT)) {
+        return new TestConverter(baseWriter);
+      }
+      return null;
     }
   }
 
+  /**
+   * Test doing type conversion using (ad-hoc) properties on the
+   * column metadata to drive conversion. Verifies that the properties
+   * are available to the converter.
+   */
   @Test
   public void testScalarConverter() {
 
@@ -86,12 +124,13 @@ public class TestColumnConverter extends SubOperatorTest {
     // Add a type converter. Passed in as a factory
     // since we must create a new one for each row set writer.
 
-    schema.metadata("n1").setTypeConverter(TestConverter.factory());
-    schema.metadata("n2").setTypeConverter(TestConverter.factory());
+    setConverterProp(schema.metadata("n1"), CONVERT_TO_INT);
+    setConverterProp(schema.metadata("n2"), CONVERT_TO_INT);
 
     // Write data as both a string as an integer
 
-    RowSet actual = new RowSetBuilder(fixture.allocator(), schema)
+    ConverterFactory conversionFactory = new ConverterFactory();
+    RowSet actual = new RowSetBuilder(fixture.allocator(), schema, conversionFactory)
         .addRow("123", "12")
         .addRow(234, 23)
         .build();
@@ -124,11 +163,12 @@ public class TestColumnConverter extends SubOperatorTest {
     // Add a type converter. Passed in as a factory
     // since we must create a new one for each row set writer.
 
-    schema.metadata("n").setTypeConverter(TestConverter.factory());
+    setConverterProp(schema.metadata("n"), CONVERT_TO_INT);
 
     // Write data as both a string as an integer
 
-    RowSet actual = new RowSetBuilder(fixture.allocator(), schema)
+    ConverterFactory conversionFactory = new ConverterFactory();
+    RowSet actual = new RowSetBuilder(fixture.allocator(), schema, conversionFactory)
         .addSingleCol(strArray("123", "124"))
         .addSingleCol(intArray(234, 235))
         .build();
@@ -147,4 +187,434 @@ public class TestColumnConverter extends SubOperatorTest {
 
     RowSetUtilities.verify(expected, actual);
   }
+
+  /**
+   * Mock column conversion factory that takes an input schema, matches it against
+   * the given writer, and inserts a standard type conversion shim.
+   */
+  private static class ConversionTestFixture implements ColumnConversionFactory {
+
+    private TupleMetadata inputSchema;
+
+    public ConversionTestFixture(TupleMetadata inputSchema) {
+      this.inputSchema = inputSchema;
+    }
+
+    @Override
+    public AbstractWriteConverter newWriter(ScalarWriter baseWriter) {
+      ColumnMetadata inputCol = inputSchema.metadata(baseWriter.schema().name());
+      assertNotNull(inputCol);
+      ConversionDefn defn = StandardConversions.analyze(inputCol, baseWriter.schema());
+      assertNotNull(defn.conversionClass);
+      return StandardConversions.newInstance(defn.conversionClass, baseWriter);
+    }
+  }
+
+  /**
+   * Test the standard string-to-type conversion using an ad-hoc conversion
+   * from the input type (the type used by the row set builder) to the output
+   * (vector) type.
+   */
+  @Test
+  public void testStringToNumberConversion() {
+
+    // Create the schema
+
+    TupleMetadata outputSchema = new SchemaBuilder()
+        .add("ti", MinorType.TINYINT)
+        .add("si", MinorType.SMALLINT)
+        .add("int", MinorType.INT)
+        .add("bi", MinorType.BIGINT)
+        .add("fl", MinorType.FLOAT4)
+        .add("db", MinorType.FLOAT8)
+       .buildSchema();
+    TupleMetadata inputSchema = new SchemaBuilder()
+        .add("ti", MinorType.VARCHAR)
+        .add("si", MinorType.VARCHAR)
+        .add("int", MinorType.VARCHAR)
+        .add("bi", MinorType.VARCHAR)
+        .add("fl", MinorType.VARCHAR)
+        .add("db", MinorType.VARCHAR)
+       .buildSchema();
+
+    RowSet actual = new RowSetBuilder(fixture.allocator(), outputSchema,
+        new ConversionTestFixture(inputSchema))
+        .addRow("11", "12", "13", "14", "15.5", "16.25")
+        .addRow("127", "32757", Integer.toString(Integer.MAX_VALUE),
+            Long.toString(Long.MAX_VALUE), "10E6", "10E200")
+        .build();
+
+    // Build the expected vector without a type converter.
+
+    final SingleRowSet expected = fixture.rowSetBuilder(outputSchema)
+        .addRow(11, 12, 13, 14L, 15.5F, 16.25D)
+        .addRow(127, 32757, Integer.MAX_VALUE, Long.MAX_VALUE, 10E6F, 10E200D)
+        .build();
+
+    // Compare
+
+    RowSetUtilities.verify(expected, actual);
+  }
+
+  @Test
+  public void testStringToNumberConversionError() {
+
+    TupleMetadata outputSchema = new SchemaBuilder()
+       .add("int", MinorType.INT)
+       .buildSchema();
+    TupleMetadata inputSchema = new SchemaBuilder()
+       .add("int", MinorType.VARCHAR)
+       .buildSchema();
+
+    RowSetBuilder builder = new RowSetBuilder(fixture.allocator(), outputSchema,
+        new ConversionTestFixture(inputSchema));
+    try {
+      builder.addRow("foo");
+      fail();
+    } catch (InvalidConversionError e) {
+      // Expected
+    } finally {
+      builder.build().clear();
+    }
+  }
+
+  /**
+   * Tests the implicit conversions provided by the column writer itself.
+   * No conversion mechanism is needed in this case.
+   */
+  @Test
+  public void testImplicitConversion() {
+
+    TupleMetadata schema = new SchemaBuilder()
+        .add("ti", MinorType.TINYINT)
+        .add("si", MinorType.SMALLINT)
+        .add("int", MinorType.INT)
+        .add("bi", MinorType.BIGINT)
+        .add("fl", MinorType.FLOAT4)
+        .add("db", MinorType.FLOAT8)
+        .buildSchema();
+
+    // Test allowed implicit conversions.
+
+    RowSet actual = new RowSetBuilder(fixture.allocator(), schema)
+        .addRow(11,  12,  13,  14,  15,  16)  // int
+        .addRow(21L, 22L, 23L, 24L, 25L, 26L) // long
+        .addRow(31F, 32F, 33F, 34F, 35F, 36F) // float
+        .addRow(41D, 42D, 43D, 44D, 45D, 46D) // double
+        .build();
+
+    // Build the expected vector without a type converter.
+
+    final SingleRowSet expected = fixture.rowSetBuilder(schema)
+        .addRow(11, 12, 13, 14L, 15F, 16D)
+        .addRow(21, 22, 23, 24L, 25F, 26D)
+        .addRow(31, 32, 33, 34L, 35F, 36D)
+        .addRow(41, 42, 43, 44L, 45F, 46D)
+        .build();
+
+    // Compare
+
+    RowSetUtilities.verify(expected, actual);
+  }
+
+  /**
+   * The column accessors provide only int setters. For performance, the int value is
+   * assumed to be of the correct range for the target column. If not, truncation of
+   * the highest bytes occurs.
+   * <p>
+   * The assumption is, if the reader or other code expects that overflow might
+   * occur, that code should be implemented in the client (or in a type conversion
+   * shim), leaving the normal code path to optimize for the 99% of the cases where
+   * the value is in the proper range.
+   */
+  @Test
+  public void testImplicitConversionIntTruncation() {
+
+    TupleMetadata schema = new SchemaBuilder()
+        .add("ti", MinorType.TINYINT)
+        .add("si", MinorType.SMALLINT)
+        .buildSchema();
+
+    // Test allowed implicit conversions.
+    RowSet actual = new RowSetBuilder(fixture.allocator(), schema)
+        .addRow(Byte.MAX_VALUE + 1, Short.MAX_VALUE + 1)
+        .addRow(Byte.MAX_VALUE + 2, Short.MAX_VALUE + 2)
+        .build();
+
+    // Build the expected vector without a type converter.
+
+    final SingleRowSet expected = fixture.rowSetBuilder(schema)
+        .addRow(Byte.MIN_VALUE, Short.MIN_VALUE)
+        .addRow(Byte.MIN_VALUE + 1, Short.MIN_VALUE + 1)
+        .build();
+
+    // Compare
+
+    RowSetUtilities.verify(expected, actual);
+  }
+
+  /**
+   * Overflow from double-to-int is detected.
+   */
+  @Test
+  public void testImplicitConversionIntOverflow() {
+
+    TupleMetadata schema = new SchemaBuilder()
+        .add("int", MinorType.INT)
+        .buildSchema();
+
+    {
+      RowSetBuilder builder = new RowSetBuilder(fixture.allocator(), schema);
+      try {
+        builder.addRow((long) Integer.MAX_VALUE + 1);
+        fail();
+      } catch (InvalidConversionError e) {
+        // Expected
+      } finally {
+        builder.build().clear();
+      }
+    }
+    {
+      RowSetBuilder builder = new RowSetBuilder(fixture.allocator(), schema);
+      try {
+        builder.addRow((double) Integer.MAX_VALUE + 1);
+        fail();
+      } catch (InvalidConversionError e) {
+        // Expected
+      } finally {
+        builder.build().clear();
+      }
+    }
+  }
+
+  /**
+   * Implicit conversion from double (or float) follows the Java Math.round
+   * rules: round to the closest long value. Readers that want other behavior
+   * should insert a type-conversion shim to implement the preferred rules.
+   */
+  @Test
+  public void testImplicitConversionDoubleClamp() {
+
+    TupleMetadata schema = new SchemaBuilder()
+        .add("bi", MinorType.BIGINT)
+        .buildSchema();
+
+    RowSet actual = new RowSetBuilder(fixture.allocator(), schema)
+        .addRow(Long.MAX_VALUE * 10D)
+        .addRow(Double.NaN)
+        .addRow(Double.MAX_VALUE)
+        .addRow(Double.MIN_VALUE)
+        .addRow(Double.POSITIVE_INFINITY)
+        .addRow(Double.NEGATIVE_INFINITY)
+        .build();
+
+    final SingleRowSet expected = fixture.rowSetBuilder(schema)
+        .addRow(Long.MAX_VALUE)
+        .addRow(0)
+        .addRow(Long.MAX_VALUE)
+        .addRow(0)
+        .addRow(Long.MAX_VALUE)
+        .addRow(Long.MIN_VALUE)
+        .build();
+
+    RowSetUtilities.verify(expected, actual);
+  }
+
+  /**
+   * Implicit conversion from String to period using default ISO
+   * format.
+   */
+  @Test
+  public void testStringToInterval() {
+
+    TupleMetadata outputSchema = new SchemaBuilder()
+        .add("id", MinorType.INTERVALDAY)
+        .add("iy", MinorType.INTERVALYEAR)
+        .add("int", MinorType.INTERVAL)
+        .buildSchema();
+
+    TupleMetadata inputSchema = new SchemaBuilder()
+        .add("id", MinorType.VARCHAR)
+        .add("iy", MinorType.VARCHAR)
+        .add("int", MinorType.VARCHAR)
+        .buildSchema();
+
+    RowSet actual = new RowSetBuilder(fixture.allocator(), outputSchema,
+        new ConversionTestFixture(inputSchema))
+        .addRow("P2DT3H4M5S", "P9Y8M", "P9Y8M2DT3H4M5S")
+        .build();
+
+    Period p1 = Period.days(2).plusHours(3).plusMinutes(4).plusSeconds(5);
+    Period p2 = Period.years(9).plusMonths(8);
+    Period p3 = p1.plus(p2);
+    final SingleRowSet expected = fixture.rowSetBuilder(outputSchema)
+        .addRow(p1, p2, p3)
+        .build();
+
+    RowSetUtilities.verify(expected, actual);
+  }
+
+  /**
+   * Test VARCHAR to DATE, TIME and TIMESTAMP conversion
+   * using default ISO formats.
+   */
+  @Test
+  public void testStringToDateTimeDefault() {
+
+    TupleMetadata outputSchema = new SchemaBuilder()
+        .add("date", MinorType.DATE)
+        .add("time", MinorType.TIME)
+        .add("ts", MinorType.TIMESTAMP)
+        .buildSchema();
+
+    TupleMetadata inputSchema = new SchemaBuilder()
+        .add("date", MinorType.VARCHAR)
+        .add("time", MinorType.VARCHAR)
+        .add("ts", MinorType.VARCHAR)
+        .buildSchema();
+
+    RowSet actual = new RowSetBuilder(fixture.allocator(), outputSchema,
+        new ConversionTestFixture(inputSchema))
+        .addRow("2019-03-28", "12:34:56", "2019-03-28T12:34:56")
+        .addRow("2019-03-28", "12:34:56", "2019-03-28T12:34:56")
+        .build();
+
+    LocalTime lt = new LocalTime(12, 34, 56);
+    LocalDate ld = new LocalDate(2019, 3, 28);
+    Instant ts = ld.toDateTime(lt, DateTimeZone.UTC).toInstant();
+    final SingleRowSet expected = fixture.rowSetBuilder(outputSchema)
+        .addRow(ld, lt, ts)
+        .addRow(ld.toDateTimeAtStartOfDay(DateTimeZone.UTC).toInstant().getMillis(),
+                lt.getMillisOfDay(), ts.getMillis())
+        .build();
+
+    RowSetUtilities.verify(expected, actual);
+  }
+
+  @Test
+  public void testStringToDateTimeCustom() {
+
+    TupleMetadata outputSchema = new SchemaBuilder()
+        .add("date", MinorType.DATE)
+        .add("time", MinorType.TIME)
+        .add("ts", MinorType.TIMESTAMP)
+        .buildSchema();
+
+    outputSchema.metadata("date").setFormat("M/d/yyyy");
+    outputSchema.metadata("time").setFormat("hh:mm:ss a");
+    outputSchema.metadata("ts").setFormat("M/d/yyyy hh:mm:ss a X");
+
+    TupleMetadata inputSchema = new SchemaBuilder()
+        .add("date", MinorType.VARCHAR)
+        .add("time", MinorType.VARCHAR)
+        .add("ts", MinorType.VARCHAR)
+        .buildSchema();
+
+    RowSet actual = new RowSetBuilder(fixture.allocator(), outputSchema,
+        new ConversionTestFixture(inputSchema))
+        .addRow("3/28/2019", "12:34:56 PM", "3/28/2019 12:34:56 PM Z")
+        .addRow("3/28/2019", "12:34:56 PM", "3/28/2019 12:34:56 PM Z")
+        .build();
+
+    LocalTime lt = new LocalTime(12, 34, 56);
+    LocalDate ld = new LocalDate(2019, 3, 28);
+    Instant ts = ld.toDateTime(lt, DateTimeZone.UTC).toInstant();
+    final SingleRowSet expected = fixture.rowSetBuilder(outputSchema)
+        .addRow(ld, lt, ts)
+        .addRow(ld.toDateTimeAtStartOfDay(DateTimeZone.UTC).toInstant().getMillis(),
+                lt.getMillisOfDay(), ts.getMillis())
+        .build();
+
+    RowSetUtilities.verify(expected, actual);
+  }
+
+  private static void expect(ConversionType type, ConversionDefn defn) {
+    assertEquals(type, defn.type);
+  }
+
+  /**
+   * Test the conversion type for a subset of type pairs.
+   */
+  @Test
+  public void testImplicitConversionType() {
+    TupleMetadata schema = new SchemaBuilder()
+        .add("ti", MinorType.TINYINT)
+        .add("si", MinorType.SMALLINT)
+        .add("int", MinorType.INT)
+        .add("bi", MinorType.BIGINT)
+        .add("fl", MinorType.FLOAT4)
+        .add("db", MinorType.FLOAT8)
+        .add("dec", MinorType.VARDECIMAL)
+        .buildSchema();
+    ColumnMetadata tinyIntCol = schema.metadata("ti");
+    ColumnMetadata smallIntCol = schema.metadata("si");
+    ColumnMetadata intCol = schema.metadata("int");
+    ColumnMetadata bigIntCol = schema.metadata("bi");
+    ColumnMetadata float4Col = schema.metadata("fl");
+    ColumnMetadata float8Col = schema.metadata("db");
+    ColumnMetadata decimalCol = schema.metadata("dec");
+
+    // TinyInt --> x
+    expect(ConversionType.NONE, StandardConversions.analyze(tinyIntCol, tinyIntCol));
+    expect(ConversionType.IMPLICIT, StandardConversions.analyze(tinyIntCol, smallIntCol));
+    expect(ConversionType.IMPLICIT, StandardConversions.analyze(tinyIntCol, intCol));
+    expect(ConversionType.IMPLICIT, StandardConversions.analyze(tinyIntCol, bigIntCol));
+    expect(ConversionType.IMPLICIT, StandardConversions.analyze(tinyIntCol, float4Col));
+    expect(ConversionType.IMPLICIT, StandardConversions.analyze(tinyIntCol, float8Col));
+    expect(ConversionType.IMPLICIT, StandardConversions.analyze(tinyIntCol, decimalCol));
+
+    // SmallInt --> x
+    expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(smallIntCol, tinyIntCol));
+    expect(ConversionType.NONE, StandardConversions.analyze(smallIntCol, smallIntCol));
+    expect(ConversionType.IMPLICIT, StandardConversions.analyze(smallIntCol, intCol));
+    expect(ConversionType.IMPLICIT, StandardConversions.analyze(smallIntCol, bigIntCol));
+    expect(ConversionType.IMPLICIT, StandardConversions.analyze(smallIntCol, float4Col));
+    expect(ConversionType.IMPLICIT, StandardConversions.analyze(smallIntCol, float8Col));
+    expect(ConversionType.IMPLICIT, StandardConversions.analyze(smallIntCol, decimalCol));
+
+    // Int --> x
+    expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(intCol, tinyIntCol));
+    expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(intCol, smallIntCol));
+    expect(ConversionType.NONE, StandardConversions.analyze(intCol, intCol));
+    expect(ConversionType.IMPLICIT, StandardConversions.analyze(intCol, bigIntCol));
+    expect(ConversionType.IMPLICIT, StandardConversions.analyze(intCol, float4Col));
+    expect(ConversionType.IMPLICIT, StandardConversions.analyze(intCol, float8Col));
+    expect(ConversionType.IMPLICIT, StandardConversions.analyze(intCol, decimalCol));
+
+    // BigInt --> x
+    expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(bigIntCol, tinyIntCol));
+    expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(bigIntCol, smallIntCol));
+    expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(bigIntCol, intCol));
+    expect(ConversionType.NONE, StandardConversions.analyze(bigIntCol, bigIntCol));
+    expect(ConversionType.IMPLICIT, StandardConversions.analyze(bigIntCol, float4Col));
+    expect(ConversionType.IMPLICIT, StandardConversions.analyze(bigIntCol, float8Col));
+    expect(ConversionType.IMPLICIT, StandardConversions.analyze(bigIntCol, decimalCol));
+
+    // Float4 --> x
+    expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(float4Col, tinyIntCol));
+    expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(float4Col, smallIntCol));
+    expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(float4Col, intCol));
+    expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(float4Col, bigIntCol));
+    expect(ConversionType.NONE, StandardConversions.analyze(float4Col, float4Col));
+    expect(ConversionType.IMPLICIT, StandardConversions.analyze(float4Col, float8Col));
+    expect(ConversionType.IMPLICIT, StandardConversions.analyze(float4Col, decimalCol));
+
+    // Float8 --> x
+    expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(float8Col, tinyIntCol));
+    expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(float8Col, smallIntCol));
+    expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(float8Col, intCol));
+    expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(float8Col, bigIntCol));
+    expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(float8Col, float4Col));
+    expect(ConversionType.NONE, StandardConversions.analyze(float8Col, float8Col));
+    expect(ConversionType.IMPLICIT, StandardConversions.analyze(float8Col, decimalCol));
+
+    // Decimal --> x
+    expect(ConversionType.EXPLICIT, StandardConversions.analyze(decimalCol, tinyIntCol));
+    expect(ConversionType.EXPLICIT, StandardConversions.analyze(decimalCol, smallIntCol));
+    expect(ConversionType.EXPLICIT, StandardConversions.analyze(decimalCol, intCol));
+    expect(ConversionType.EXPLICIT, StandardConversions.analyze(decimalCol, bigIntCol));
+    expect(ConversionType.EXPLICIT, StandardConversions.analyze(decimalCol, float4Col));
+    expect(ConversionType.EXPLICIT, StandardConversions.analyze(decimalCol, float8Col));
+    expect(ConversionType.NONE, StandardConversions.analyze(decimalCol, decimalCol));
+  }
 }
diff --git a/exec/jdbc-all/pom.xml b/exec/jdbc-all/pom.xml
index 69fadc9..64a3833 100644
--- a/exec/jdbc-all/pom.xml
+++ b/exec/jdbc-all/pom.xml
@@ -518,7 +518,7 @@
                   This is likely due to you adding new dependencies to a java-exec and not updating the excludes in this module. This is important as it minimizes the size of the dependency of Drill application users.
 
                   </message>
-                  <maxsize>40000000</maxsize>
+                  <maxsize>41000000</maxsize>
                   <minsize>15000000</minsize>
                   <files>
                    <file>${project.build.directory}/drill-jdbc-all-${project.version}.jar</file>
diff --git a/exec/vector/src/main/codegen/templates/BasicTypeHelper.java b/exec/vector/src/main/codegen/templates/BasicTypeHelper.java
index f1de685..b35e189 100644
--- a/exec/vector/src/main/codegen/templates/BasicTypeHelper.java
+++ b/exec/vector/src/main/codegen/templates/BasicTypeHelper.java
@@ -35,13 +35,16 @@ import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.vector.complex.RepeatedMapVector;
 import org.apache.drill.exec.util.CallBack;
 import org.apache.drill.common.types.Types;
+import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
+
 /*
  * This class is generated using freemarker and the ${.template_name} template.
  */
 public class BasicTypeHelper {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicTypeHelper.class);
 
-  private static final int WIDTH_ESTIMATE = 50;
+  @VisibleForTesting
+  public static final int WIDTH_ESTIMATE = 50;
 
   protected static String buildErrorMessage(final String operation, final MinorType type, final DataMode mode) {
     return String.format("Unable to %s for minor type [%s] and mode [%s]", operation, type, mode);
@@ -61,11 +64,11 @@ public class BasicTypeHelper {
                                minor.class?substring(0, 3) == "MSG"> + WIDTH_ESTIMATE</#if>;
   </#list>
 </#list>
-      case FIXEDCHAR: return major.getPrecision();
-      case FIXED16CHAR: return major.getPrecision();
-      case FIXEDBINARY: return major.getPrecision();
-      case NULL:
-        return 0;
+    case FIXEDCHAR: return major.getPrecision();
+    case FIXED16CHAR: return major.getPrecision();
+    case FIXEDBINARY: return major.getPrecision();
+    case NULL:
+      return 0;
     }
     throw new UnsupportedOperationException(buildErrorMessage("get size", major));
   }
@@ -82,7 +85,7 @@ public class BasicTypeHelper {
       case REPEATED:
         return RepeatedMapVector.class;
       }
-      
+
     case LIST:
       switch (mode) {
       case REPEATED:
@@ -91,7 +94,7 @@ public class BasicTypeHelper {
       case OPTIONAL:
         return ListVector.class;
       }
-    
+
 <#list vv.types as type>
   <#list type.minor as minor>
       case ${minor.class?upper_case}:
@@ -123,7 +126,7 @@ public class BasicTypeHelper {
           return SingleMapReaderImpl.class;
         else
           return SingleLikeRepeatedMapReaderImpl.class;
-      case REPEATED: 
+      case REPEATED:
           return RepeatedMapReaderImpl.class;
       }
     case LIST:
@@ -133,7 +136,7 @@ public class BasicTypeHelper {
       case REPEATED:
         return RepeatedListReaderImpl.class;
       }
-      
+
 <#list vv.types as type>
   <#list type.minor as minor>
       case ${minor.class?upper_case}:
@@ -152,7 +155,7 @@ public class BasicTypeHelper {
       }
       throw new UnsupportedOperationException(buildErrorMessage("get reader class name", type, mode));
   }
-  
+
   public static Class<?> getWriterInterface( MinorType type, DataMode mode){
     switch (type) {
     case UNION: return UnionWriter.class;
@@ -168,7 +171,7 @@ public class BasicTypeHelper {
       }
       throw new UnsupportedOperationException(buildErrorMessage("get writer interface", type, mode));
   }
-  
+
   public static Class<?> getWriterImpl( MinorType type, DataMode mode){
     switch (type) {
     case UNION:
@@ -189,7 +192,7 @@ public class BasicTypeHelper {
       case REPEATED:
         return RepeatedListWriter.class;
       }
-      
+
 <#list vv.types as type>
   <#list type.minor as minor>
       case ${minor.class?upper_case}:
@@ -238,7 +241,7 @@ public class BasicTypeHelper {
   }
 
   public static Class<?> getHolderReaderImpl(MinorType type, DataMode mode) {
-    switch (type) {      
+    switch (type) {
 <#list vv.types as type>
   <#list type.minor as minor>
       case ${minor.class?upper_case}:
@@ -259,27 +262,27 @@ public class BasicTypeHelper {
       }
       throw new UnsupportedOperationException(buildErrorMessage("get holder reader implementation", type, mode));
   }
-  
+
   public static ValueVector getNewVector(String name, BufferAllocator allocator, MajorType type, CallBack callback) {
     MaterializedField field = MaterializedField.create(name, type);
     return getNewVector(field, allocator, callback);
   }
-  
+
   public static ValueVector getNewVector(MaterializedField field, BufferAllocator allocator){
     return getNewVector(field, allocator, null);
   }
-  
+
   public static ValueVector getNewVector(MaterializedField field, BufferAllocator allocator, CallBack callBack) {
     return getNewVector(field, field.getType(), allocator, callBack);
   }
-  
+
   // Creates an internal or external vector. Internal vectors may have
   // types that disagree with their materialized field.
-  
+
   public static ValueVector getNewVector(MaterializedField field, MajorType type, BufferAllocator allocator, CallBack callBack) {
 
     switch (type.getMinorType()) {
-    
+
     case UNION:
       return new UnionVector(field, allocator, callBack);
 
@@ -444,8 +447,8 @@ public class BasicTypeHelper {
 <#list vv.types as type>
   <#list type.minor as minor>
     case ${minor.class?upper_case} :
-      if ( ((${minor.class}Vector) v1).getAccessor().get(v1index) == 
-           ((${minor.class}Vector) v2).getAccessor().get(v2index) ) 
+      if ( ((${minor.class}Vector) v1).getAccessor().get(v1index) ==
+           ((${minor.class}Vector) v2).getAccessor().get(v2index) )
         return true;
       break;
   </#list>
diff --git a/exec/vector/src/main/codegen/templates/ColumnAccessors.java b/exec/vector/src/main/codegen/templates/ColumnAccessors.java
index 216a616..8f81960 100644
--- a/exec/vector/src/main/codegen/templates/ColumnAccessors.java
+++ b/exec/vector/src/main/codegen/templates/ColumnAccessors.java
@@ -84,13 +84,16 @@ import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
 
 import io.netty.buffer.DrillBuf;
 
+import org.joda.time.DateTimeZone;
 import org.joda.time.Period;
+import org.joda.time.Instant;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 
 /**
- * Basic accessors for most Drill vector types and modes. These are bare-bones
- * accessors: they do only the most rudimentary type conversions. For all,
- * there is only one way to get/set values; they don't convert from, say,
- * a double to an int or visa-versa.
+ * Basic accessors for most Drill vector types and modes. Each class has a bare-bones
+ * accessors that converts from the "native" Drill type to the vectors. Many classes
+ * also have "convenience" methods that convert from other Java types.
  * <p>
  * Writers work only with single vectors. Readers work with either single
  * vectors or a "hyper vector": a collection of vectors indexed together.
@@ -117,7 +120,7 @@ public class ColumnAccessors {
     </#if>
     <#assign varWidth = drillType == "VarChar" || drillType == "Var16Char" || drillType == "VarBinary"  || drillType == "VarDecimal"/>
     <#assign decimal = drillType == "Decimal9" || drillType == "Decimal18" ||
-                       drillType == "Decimal28Sparse" || drillType == "Decimal38Sparse"  || drillType == "VarDecimal"/>
+                       drillType == "Decimal28Sparse" || drillType == "Decimal38Sparse" || drillType == "VarDecimal"/>
     <#if varWidth>
       <#assign accessorType = "byte[]">
       <#assign label = "Bytes">
@@ -345,7 +348,7 @@ public class ColumnAccessors {
       setBytes(bytes, bytes.length);
     }
 
-    <#elseif drillType = "VarDecimal">
+    <#elseif drillType == "VarDecimal">
 
     @Override
     public final void setDecimal(final BigDecimal bd) {
@@ -353,6 +356,86 @@ public class ColumnAccessors {
       int len = barr.length;
       setBytes(barr, len);
     }
+    <#elseif drillType == "TinyInt" || drillType == "SmallInt" || drillType == "Int">
+
+    @Override
+    public final void setLong(long value) {
+      try {
+        // Catches int overflow. Does not catch overflow for smaller types.
+        setInt(Math.toIntExact(value));
+      } catch (ArithmeticException e) {
+        throw InvalidConversionError.writeError(schema(), value, e);
+      }
+    }
+
+    @Override
+    public final void setDouble(double value) {
+      try {
+        // Catches int overflow. Does not catch overflow from
+        // double. See Math.round for details.
+        setInt(Math.toIntExact(Math.round(value)));
+      } catch (ArithmeticException e) {
+        throw InvalidConversionError.writeError(schema(), value, e);
+      }
+    }
+    <#elseif drillType == "BigInt">
+
+    @Override
+    public final void setInt(int value) {
+      setLong(value);
+    }
+
+    @Override
+    public final void setDouble(double value) {
+      // Does not catch overflow from
+      // double. See Math.round for details.
+      setLong(Math.round(value));
+    }
+    <#elseif drillType == "Float4" || drillType == "Float8">
+
+    @Override
+    public final void setInt(int value) {
+      setDouble(value);
+    }
+
+    @Override
+    public final void setLong(long value) {
+      setDouble(value);
+    }
+    <#elseif decimal>
+
+    @Override
+    public final void setInt(int value) {
+      setDecimal(BigDecimal.valueOf(value));
+    }
+
+    @Override
+    public final void setLong(long value) {
+      setDecimal(BigDecimal.valueOf(value));
+    }
+
+    @Override
+    public final void setDouble(double value) {
+      setDecimal(BigDecimal.valueOf(value));
+    }
+    <#elseif drillType == "Date">
+
+    @Override
+    public final void setDate(LocalDate value) {
+      setLong(value.toDateTimeAtStartOfDay(DateTimeZone.UTC).toInstant().getMillis());
+    }
+    <#elseif drillType == "Time">
+
+    @Override
+    public final void setTime(LocalTime value) {
+      setInt(value.getMillisOfDay());
+    }
+    <#elseif drillType == "TimeStamp">
+
+    @Override
+    public final void setTimestamp(Instant value) {
+      setLong(value.getMillis());
+    }
     </#if>
   }
 
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractPropertied.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractPropertied.java
new file mode 100644
index 0000000..c508814
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractPropertied.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.record.metadata;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * Base class for an object with properties. Defers property map creation
+ * until needed, since most instances may not need properties.
+ */
+public class AbstractPropertied implements Propertied {
+
+  private Map<String, String> properties;
+
+  protected AbstractPropertied() { }
+
+  protected AbstractPropertied(AbstractPropertied from) {
+    setProperties(from.properties);
+  }
+
+  protected boolean hasProperties() {
+    return properties != null && ! properties.isEmpty();
+  }
+
+  @Override
+  public void setProperties(Map<String, String> properties) {
+    if (properties != null && ! properties.isEmpty()) {
+      properties().putAll(properties);
+    }
+  }
+
+  @Override
+  public Map<String, String> properties() {
+    if (properties == null) {
+      properties = new LinkedHashMap<>();
+    }
+    return properties;
+  }
+
+  @Override
+  public String property(String key) {
+    if (properties == null) {
+      return null;
+    }
+    return properties.get(key);
+  }
+
+  @Override
+  public void setProperty(String key, String value) {
+    if (value != null) {
+      properties().put(key, value);
+    } else if (properties != null) {
+      properties.remove(key);
+    }
+  }
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ColumnMetadata.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ColumnMetadata.java
index 1cdb927..6a2c874 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ColumnMetadata.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ColumnMetadata.java
@@ -17,19 +17,46 @@
  */
 package org.apache.drill.exec.record.metadata;
 
+import java.time.format.DateTimeFormatter;
+
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.vector.accessor.ColumnConversionFactory;
-
-import java.util.Map;
+import org.apache.drill.exec.vector.accessor.ColumnWriter;
 
 /**
  * Metadata description of a column including names, types and structure
  * information.
  */
-public interface ColumnMetadata {
+public interface ColumnMetadata extends Propertied {
+
+  /**
+   * Predicted number of elements per array entry. Default is
+   * taken from the often hard-coded value of 10.
+   */
+  public static final String EXPECTED_CARDINALITY_PROP = DRILL_PROP_PREFIX + "cardinality";
+
+  /**
+   * Default value represented as a string.
+   */
+  public static final String DEFAULT_VALUE_PROP = DRILL_PROP_PREFIX + "default";
+
+  /**
+   * Expected (average) width for variable-width columns.
+   */
+  public static final String EXPECTED_WIDTH_PROP = DRILL_PROP_PREFIX + "width";
+
+  /**
+   * Optional format to use when converting to/from string values.
+   */
+  public static final String FORMAT_PROP = DRILL_PROP_PREFIX + "format";
+
+  /**
+   * Indicates if the column is projected. Used only for internal
+   * reader-provided schemas.
+   */
+  public static final String PROJECTED_PROP = DRILL_PROP_PREFIX + "projected";
 
   /**
    * Rough characterization of Drill types into metadata categories.
@@ -184,63 +211,46 @@ public interface ColumnMetadata {
 
   int expectedElementCount();
 
-  void setFormatValue(String value);
+  void setFormat(String value);
 
-  String formatValue();
+  String format();
 
   /**
-   * Set the default value to use for filling a vector when no real data is
-   * available, such as for columns added in new files but which does not
-   * exist in existing files. The "default default" is null, which works
-   * only for nullable columns.
+   * Returns the formatter to use for date/time values. Only valid for
+   * date/time columns.
    *
-   * @param value column value, represented as a Java object, acceptable
-   * to the {@link ColumnWriter#setObject()} method for this column's writer.
+   * @return
    */
-  void setDefaultValue(Object value);
+  DateTimeFormatter dateTimeFormatter();
 
   /**
-   * Returns the default value for this column.
-   *
-   * @return the default value, or null if no default value has been set
-   */
-  Object defaultValue();
-
-  /**
-   * Parses default value from String based on literal value into Object instance based on {@link MinorType} value.
-   * Sets default value to use for filling a vector when no real data is available.
+   * Sets the default value property using the string-encoded form of the value.
+   * The default value is used for filling a vector when no real data is available.
    *
    * @param value the default value in String representation
    */
-  void setDefaultFromString(String value);
+  void setDefaultValue(String value);
 
   /**
    * Returns the default value for this column in String literal representation.
    *
-   * @return the default value in String literal representation, or null if no default value has been set
-   */
-  String defaultStringValue();
-
-  /**
-   * Set the factory for an optional shim writer that translates from the type of
-   * data available to the code that creates the vectors on the one hand,
-   * and the actual type of the column on the other. For example, a shim
-   * might parse a string form of a date into the form stored in vectors.
-   * <p>
-   * The shim must write to the base vector for this column using one of
-   * the supported base writer "set" methods.
-   * <p>
-   * The default is to use the "natural" type: that is, to insert no
-   * conversion shim.
+   * @return the default value in String literal representation, or null if no
+   * default value has been set
    */
-  void setTypeConverter(ColumnConversionFactory factory);
+  String defaultValue();
 
   /**
-   * Returns the type conversion shim for this column.
+   * Returns the default value decoded into object form. This is the same as:
+   * <pre><code>decodeValue(defaultValue());
+   * </code></pre>
    *
-   * @return the type conversion factory, or null if none is set
+   * @return the default value decode as an object that can be passed to
+   * the {@link ColumnWriter#setObject()} method.
    */
-  ColumnConversionFactory typeConverter();
+  Object decodeDefaultValue();
+
+  String valueToString(Object value);
+  Object valueFromString(String value);
 
   /**
    * Create an empty version of this column. If the column is a scalar,
@@ -253,15 +263,6 @@ public interface ColumnMetadata {
   ColumnMetadata cloneEmpty();
 
   /**
-   * Sets column properties if not null.
-   *
-   * @param properties column properties
-   */
-  void setProperties(Map<String, String> properties);
-
-  Map<String, String> properties();
-
-  /**
    * Reports whether, in this context, the column is projected outside
    * of the context. (That is, whether the column is backed by an actual
    * value vector.)
@@ -292,5 +293,4 @@ public interface ColumnMetadata {
    * @return column metadata string representation
    */
   String columnString();
-
 }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/Propertied.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/Propertied.java
new file mode 100644
index 0000000..8ff1d0a
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/Propertied.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.record.metadata;
+
+import java.util.Map;
+
+/**
+ * Interface for an object that defines properties. Used in conjunction with
+ * {@link PropertyAccessor}.
+ */
+public interface Propertied {
+
+  /**
+   * Base name for properties which Drill itself defines. Provides a
+   * separate "name space" from user-defined properties which should
+   * have some other perfix.
+   */
+  public static final String DRILL_PROP_PREFIX = "drill.";
+
+  /**
+   * Sets schema properties if not null.
+   *
+   * @param properties schema properties
+   */
+  void setProperties(Map<String, String> properties);
+
+  Map<String, String> properties();
+
+  String property(String key);
+  void setProperty(String key, String value);
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/PropertyAccessor.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/PropertyAccessor.java
new file mode 100644
index 0000000..0cf0a32
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/PropertyAccessor.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.record.metadata;
+
+/**
+ * Helper utilities to get/set typed values within a propertied object
+ */
+public class PropertyAccessor {
+
+  private PropertyAccessor() { }
+
+  public static String getString(Propertied props, String key, String defaultValue) {
+    final String value = props.property(key);
+    return value == null ? defaultValue : value;
+  }
+
+  public static int getInt(Propertied props, String key, int defaultValue) {
+    final String value = props.property(key);
+    try {
+      return value == null ? defaultValue : Integer.parseInt(value);
+    } catch (final NumberFormatException e) {
+      throw new IllegalStateException(String.format(
+          "Invalid int property %s: %s", key, value), e);
+    }
+  }
+
+  public static int getInt(Propertied props, String key) {
+    return getInt(props, key, 0);
+  }
+
+  public static boolean getBoolean(Propertied props, String key, boolean defaultValue) {
+    final String value = props.property(key);
+    return value == null ? defaultValue : Boolean.parseBoolean(value);
+  }
+
+  public static boolean getBoolean(Propertied props, String key) {
+    return getBoolean(props, key, false);
+  }
+
+  public static void set(Propertied props, String key, int value) {
+    props.setProperty(key, Integer.toString(value));
+  }
+
+  public static void set(Propertied props, String key, boolean value) {
+    props.setProperty(key, Boolean.toString(value));
+  }
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/TupleMetadata.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/TupleMetadata.java
index 6bfe138..1c417b3 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/TupleMetadata.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/TupleMetadata.java
@@ -18,7 +18,6 @@
 package org.apache.drill.exec.record.metadata;
 
 import java.util.List;
-import java.util.Map;
 
 import org.apache.drill.exec.record.MaterializedField;
 
@@ -44,7 +43,7 @@ import org.apache.drill.exec.record.MaterializedField;
  * In the future, this structure will also gather metadata useful
  * for vector processing such as expected widths and so on.
  */
-public interface TupleMetadata extends Iterable<ColumnMetadata> {
+public interface TupleMetadata extends Propertied, Iterable<ColumnMetadata> {
 
   /**
    * Add a new column to the schema.
@@ -94,14 +93,4 @@ public interface TupleMetadata extends Iterable<ColumnMetadata> {
 
   String fullName(ColumnMetadata column);
   String fullName(int index);
-
-  /**
-   * Sets schema properties if not null.
-   *
-   * @param properties schema properties
-   */
-  void setProperties(Map<String, String> properties);
-
-  Map<String, String> properties();
-
 }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/DateUtilities.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/DateUtilities.java
index 9925f7c..d3fb6f7 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/DateUtilities.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/DateUtilities.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.vector;
 
+import java.time.LocalTime;
+
 import org.joda.time.Period;
 
 /**
@@ -105,7 +107,7 @@ public class DateUtilities {
     final int seconds = millis / (secondsToMillis);
     millis %= (secondsToMillis);
 
-    StringBuilder buf = new StringBuilder()
+    final StringBuilder buf = new StringBuilder()
             .append(days)
             .append(pluralify("day", days))
             .append(" ")
@@ -152,7 +154,7 @@ public class DateUtilities {
     final int seconds = millis / secondsToMillis;
     millis %= secondsToMillis;
 
-    StringBuilder buf = new StringBuilder()
+    final StringBuilder buf = new StringBuilder()
            .append(years)
            .append(pluralify("year", years))
            .append(" ")
@@ -187,4 +189,13 @@ public class DateUtilities {
             seconds) * 1000 +
            millis;
   }
+
+  /**
+   * Convert from Java LocalTime to the ms-since-midnight format which Drill uses
+   * @param localTime Java local time
+   * @return Drill form of the time
+   */
+  public static int toTime(LocalTime localTime) {
+    return (int) ((localTime.toNanoOfDay() + 500_000L) / 1_000_000L); // round to milliseconds
+  }
 }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/InvalidConversionError.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/InvalidConversionError.java
new file mode 100644
index 0000000..aec60ae
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/InvalidConversionError.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor;
+
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+
+/**
+ * Raised when a conversion from one type to another is supported at
+ * setup type, but a value provided at runtime is not valid for that
+ * conversion. Example: trying to convert the string "foo" to an INT
+ * column.
+ *
+ * @see {UnsupportedConversionError} for a setup tie exception where the conversion
+ * is not supported for any values.
+ */
+
+public class InvalidConversionError extends UnsupportedOperationException {
+
+  private static final long serialVersionUID = 1L;
+
+  public InvalidConversionError(String message) {
+    super(message);
+  }
+
+  public InvalidConversionError(String message, Exception e) {
+    super(message, e);
+  }
+
+  public static InvalidConversionError writeError(ColumnMetadata schema, Object value) {
+    return writeError(schema, value, null);
+  }
+
+  public static InvalidConversionError writeError(ColumnMetadata schema, Object value, Exception e) {
+    return new InvalidConversionError(
+        String.format("Illegal conversion: Column `%s` of type %s, Illegal value `%s`",
+            schema.name(), schema.type().name(),
+            value == null ? "null" : value.toString()),
+        e);
+  }
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ScalarWriter.java
index db86570..c89f754 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ScalarWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ScalarWriter.java
@@ -19,6 +19,9 @@ package org.apache.drill.exec.vector.accessor;
 
 import java.math.BigDecimal;
 
+import org.joda.time.Instant;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 import org.joda.time.Period;
 
 /**
@@ -60,4 +63,7 @@ public interface ScalarWriter extends ColumnWriter {
   void setBytes(byte[] value, int len);
   void setDecimal(BigDecimal value);
   void setPeriod(Period value);
+  void setDate(LocalDate value);
+  void setTime(LocalTime value);
+  void setTimestamp(Instant value);
 }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/UnsupportedConversionError.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/UnsupportedConversionError.java
index 68ed5e0..fbf3703 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/UnsupportedConversionError.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/UnsupportedConversionError.java
@@ -22,6 +22,16 @@ import org.apache.drill.exec.record.metadata.ColumnMetadata;
 /**
  * Raised when a column accessor reads or writes the value using the wrong
  * Java type (which may indicate an data inconsistency in the input data.)
+ * <p>
+ * Also raised during setup if no conversion is available between input
+ * and output types.
+ * <p>
+ * This exception means that there is no conversion <i>in principal</i>:
+ * it is a static error due to the schema provided or the implementation
+ * of the code.
+ *
+ * @see {InvalidConversionError} for a runtime exception where the conversion
+ * is supported, but a specific value is invalid for that conversion.
  */
 
 public class UnsupportedConversionError extends UnsupportedOperationException {
@@ -32,6 +42,10 @@ public class UnsupportedConversionError extends UnsupportedOperationException {
     super(message);
   }
 
+  public UnsupportedConversionError(String message, Exception e) {
+    super(message, e);
+  }
+
   public static UnsupportedConversionError readError(ColumnMetadata schema, String javaType) {
     return new UnsupportedConversionError(
         String.format("Column `%s`: Unsupported conversion from Drill type %s to Java type %s",
@@ -41,7 +55,7 @@ public class UnsupportedConversionError extends UnsupportedOperationException {
   public static UnsupportedConversionError writeError(ColumnMetadata schema, String javaType) {
     return new UnsupportedConversionError(
         String.format("Column `%s`: Unsupported conversion from Java type %s to Drill type %s",
-            schema.name(), schema.type().name(), javaType));
+            schema.name(), javaType, schema.type().name()));
   }
 
   public static UnsupportedConversionError nullError(ColumnMetadata schema) {
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractWriteConverter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/AbstractWriteConverter.java
similarity index 83%
rename from exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractWriteConverter.java
rename to exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/AbstractWriteConverter.java
index 55cd991..503c0c4 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractWriteConverter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/AbstractWriteConverter.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.vector.accessor.writer;
+package org.apache.drill.exec.vector.accessor.convert;
 
 import java.math.BigDecimal;
 
@@ -23,6 +23,10 @@ import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.vector.accessor.ObjectType;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.drill.exec.vector.accessor.ValueType;
+import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriter;
+import org.joda.time.Instant;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 import org.joda.time.Period;
 
 /**
@@ -38,7 +42,7 @@ import org.joda.time.Period;
 
 public class AbstractWriteConverter extends AbstractScalarWriter {
 
-  private final ScalarWriter baseWriter;
+  protected final ScalarWriter baseWriter;
 
   public AbstractWriteConverter(ScalarWriter baseWriter) {
     this.baseWriter = baseWriter;
@@ -103,4 +107,19 @@ public class AbstractWriteConverter extends AbstractScalarWriter {
   public void setPeriod(Period value) {
     baseWriter.setPeriod(value);
   }
+
+  @Override
+  public void setDate(LocalDate value) {
+    baseWriter.setDate(value);
+  }
+
+  @Override
+  public void setTime(LocalTime value) {
+    baseWriter.setTime(value);
+  }
+
+  @Override
+  public void setTimestamp(Instant value) {
+    baseWriter.setTimestamp(value);
+  }
 }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnConversionFactory.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/ColumnConversionFactory.java
similarity index 80%
rename from exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnConversionFactory.java
rename to exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/ColumnConversionFactory.java
index 691dcf1..18c80d5 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnConversionFactory.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/ColumnConversionFactory.java
@@ -15,9 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.vector.accessor;
+package org.apache.drill.exec.vector.accessor.convert;
 
-import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
 
 /**
  * Create a column type converter for the given column and base writer.
@@ -26,14 +26,19 @@ import org.apache.drill.exec.record.metadata.ColumnMetadata;
  * client requires to the type required by the underlying vector as
  * represented by the base writer.
  */
+
 public interface ColumnConversionFactory {
+
   /**
    * Create a type conversion writer for the given column, converting data
-   * to the type needed by the base writer.
+   * to the type needed by the base writer. The caller will bind the
+   * converter to the base column.
+   *
    * @param colDefn column metadata definition
    * @param baseWriter base column writer for the column's vector
    * @return a new scalar writer to insert between the client and
-   * the base vector
+   * the base vector, or null if no conversion is needed
    */
-  ScalarWriter newWriter(ColumnMetadata colDefn, ScalarWriter baseWriter);
+
+  AbstractWriteConverter newWriter(ScalarWriter baseWriter);
 }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/ConvertStringToDate.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/ConvertStringToDate.java
new file mode 100644
index 0000000..9eb9341
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/ConvertStringToDate.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor.convert;
+
+import java.time.LocalDate;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
+
+import org.apache.drill.exec.vector.accessor.InvalidConversionError;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+
+/**
+ * Convert a VARCHAR column to an DATE column following the Java rules
+ * for parsing a date time, optionally using the formatter provided in
+ * the column schema.
+ */
+public class ConvertStringToDate extends AbstractWriteConverter {
+
+  private static ZoneId UTC = ZoneId.of("Z");
+  private final DateTimeFormatter dateTimeFormatter;
+
+  public ConvertStringToDate(ScalarWriter baseWriter) {
+    super(baseWriter);
+    dateTimeFormatter = baseWriter.schema().dateTimeFormatter();
+  }
+
+  @Override
+  public void setString(String value) {
+    if (value == null) {
+      baseWriter.setNull();
+    } else {
+      try {
+        final LocalDate dt = LocalDate.parse(value, dateTimeFormatter);
+        baseWriter.setLong(dt.atStartOfDay(UTC).toInstant().toEpochMilli());
+      }
+      catch (final DateTimeParseException e) {
+        throw InvalidConversionError.writeError(schema(), value, e);
+      }
+    }
+  }
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/ConvertStringToDouble.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/ConvertStringToDouble.java
new file mode 100644
index 0000000..ac89240
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/ConvertStringToDouble.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor.convert;
+
+import org.apache.drill.exec.vector.accessor.InvalidConversionError;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+
+/**
+ * Convert a VARCHAR column to a DOUBLE column following the Java rules
+ * for parsing doubles (i.e. no formatting.)
+ */
+public class ConvertStringToDouble extends AbstractWriteConverter {
+
+  public ConvertStringToDouble(ScalarWriter baseWriter) {
+    super(baseWriter);
+  }
+
+  @Override
+  public void setString(String value) {
+    if (value == null) {
+      baseWriter.setNull();
+    } else {
+      try {
+        baseWriter.setDouble(Double.parseDouble(value));
+      }
+      catch (final NumberFormatException e) {
+        throw InvalidConversionError.writeError(schema(), value, e);
+      }
+    }
+  }
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/ConvertStringToInt.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/ConvertStringToInt.java
new file mode 100644
index 0000000..dd9b962
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/ConvertStringToInt.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor.convert;
+
+import org.apache.drill.exec.vector.accessor.InvalidConversionError;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+
+/**
+ * Convert a VARCHAR column to an INT column following the Java rules
+ * for parsing integers (i.e. no formatting.) This conversion works
+ * for any int-based column type including SMALLINT and TINYINT.
+ */
+public class ConvertStringToInt extends AbstractWriteConverter {
+
+  public ConvertStringToInt(ScalarWriter baseWriter) {
+    super(baseWriter);
+  }
+
+  @Override
+  public void setString(String value) {
+    if (value == null) {
+      baseWriter.setNull();
+    } else {
+      try {
+        baseWriter.setInt(Integer.parseInt(value));
+      }
+      catch (final NumberFormatException e) {
+        throw InvalidConversionError.writeError(schema(), value, e);
+      }
+    }
+  }
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/ConvertStringToInterval.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/ConvertStringToInterval.java
new file mode 100644
index 0000000..9e23140
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/ConvertStringToInterval.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor.convert;
+
+import java.time.format.DateTimeParseException;
+
+import org.apache.drill.exec.vector.accessor.InvalidConversionError;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.joda.time.Period;
+
+/**
+ * Convert a VARCHAR column to an INTERVAL column following the Java rules
+ * for parsing a period.
+ */
+public class ConvertStringToInterval extends AbstractWriteConverter {
+
+  public ConvertStringToInterval(ScalarWriter baseWriter) {
+    super(baseWriter);
+  }
+
+  @Override
+  public void setString(String value) {
+    if (value == null) {
+      baseWriter.setNull();
+    } else {
+      try {
+        baseWriter.setPeriod(Period.parse(value));
+      }
+      catch (final DateTimeParseException e) {
+        throw InvalidConversionError.writeError(schema(), value, e);
+      }
+    }
+  }
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/ConvertStringToLong.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/ConvertStringToLong.java
new file mode 100644
index 0000000..a6c26f8
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/ConvertStringToLong.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor.convert;
+
+import org.apache.drill.exec.vector.accessor.InvalidConversionError;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+
+/**
+ * Convert a VARCHAR column to an BIGINT column following the Java rules
+ * for parsing longs (i.e. no formatting.)
+ */
+public class ConvertStringToLong extends AbstractWriteConverter {
+
+  public ConvertStringToLong(ScalarWriter baseWriter) {
+    super(baseWriter);
+  }
+
+  @Override
+  public void setString(String value) {
+    if (value == null) {
+      baseWriter.setNull();
+    } else {
+      try {
+        baseWriter.setLong(Long.parseLong(value));
+      }
+      catch (final NumberFormatException e) {
+        throw InvalidConversionError.writeError(schema(), value, e);
+      }
+    }
+  }
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/ConvertStringToTime.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/ConvertStringToTime.java
new file mode 100644
index 0000000..155d6ae
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/ConvertStringToTime.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor.convert;
+
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
+
+import org.apache.drill.exec.vector.DateUtilities;
+import org.apache.drill.exec.vector.accessor.InvalidConversionError;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+
+/**
+ * Convert a VARCHAR column to an TIME column following the Java rules
+ * for parsing a date time, optionally using the formatter provided in
+ * the column schema.
+ */
+public class ConvertStringToTime extends AbstractWriteConverter {
+
+  private final DateTimeFormatter dateTimeFormatter;
+
+  public ConvertStringToTime(ScalarWriter baseWriter) {
+    super(baseWriter);
+    dateTimeFormatter = baseWriter.schema().dateTimeFormatter();
+  }
+
+  @Override
+  public void setString(String value) {
+    if (value == null) {
+      baseWriter.setNull();
+    } else {
+      try {
+        final LocalTime dt = LocalTime.parse(value, dateTimeFormatter);
+        baseWriter.setInt(DateUtilities.toTime(dt));
+      }
+      catch (final DateTimeParseException e) {
+        throw InvalidConversionError.writeError(schema(), value, e);
+      }
+    }
+  }
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/ConvertStringToTimeStamp.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/ConvertStringToTimeStamp.java
new file mode 100644
index 0000000..0329190
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/ConvertStringToTimeStamp.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor.convert;
+
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
+
+import org.apache.drill.exec.vector.accessor.InvalidConversionError;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+
+/**
+ * Convert a VARCHAR column to an TIMESTAMP column following the Java rules
+ * for parsing a date time, optionally using the formatter provided in
+ * the column schema.
+ */
+public class ConvertStringToTimeStamp extends AbstractWriteConverter {
+
+  private final DateTimeFormatter dateTimeFormatter;
+
+  public ConvertStringToTimeStamp(ScalarWriter baseWriter) {
+    super(baseWriter);
+    dateTimeFormatter = baseWriter.schema().dateTimeFormatter();
+  }
+
+  @Override
+  public void setString(String value) {
+    if (value == null) {
+      baseWriter.setNull();
+    } else {
+      try {
+        final ZonedDateTime dt = ZonedDateTime.parse(value, dateTimeFormatter);
+        baseWriter.setLong(dt.toInstant().toEpochMilli());
+      }
+      catch (final DateTimeParseException e) {
+        throw InvalidConversionError.writeError(schema(), value, e);
+      }
+    }
+  }
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/StandardConversions.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/StandardConversions.java
new file mode 100644
index 0000000..ef07e2c
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/StandardConversions.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor.convert;
+
+import java.lang.reflect.Constructor;
+
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+
+/**
+ * Standard type conversion tools. Provides a mapping from input to output
+ * (vector) type. Handles implicit conversions (those done by the column
+ * writer itself) and explicit conversions for the Java string notation
+ * for various types. Readers must provide custom conversions or specialized
+ * formats.
+ * <p>
+ * Type-narrowing operations are supported by the column writers using
+ * "Java semantics". Long-to-int overflow is caught, double-to-long conversion
+ * sets the maximum or minimum long values, Double-to-int will overflow (on
+ * the int conversion.) Messy, but the goal is not to handle invalid data,
+ * rather it is to provide convenience for valid data.
+ * <p>
+ * The semantics of invalid conversions can be refined (set to null?
+ * different exceptions) without affecting the behavior of queries with
+ * valid data.
+ */
+public class StandardConversions {
+
+  /**
+   * Indicates the type of conversion needed.
+   */
+  public enum ConversionType {
+
+    /**
+     * No conversion needed. Readers generally don't provide converters
+     * in this case unless the meaning of a column must change, keeping
+     * the type, such as converting between units.
+     */
+    NONE,
+    /**
+     * Conversion is done by the column writers. Again, no converter
+     * is needed except for semantic reasons.
+     */
+    IMPLICIT,
+    /**
+     * Conversion is done by the column writers. No converter is needed.
+     * However, the value is subject to overflow as this is a narrowing
+     * operation. Depending on the implementation, the operation may
+     * either raise an error, or produce a value limited by the range
+     * of the target type.
+     */
+    IMPLICIT_UNSAFE,
+    /**
+     * Conversion is needed because there is no "natural",
+     * precision-preserving conversion. Conversions must be done on
+     * an ad-hoc basis.
+     */
+    EXPLICIT
+  }
+
+  /**
+   * Definition of a conversion including conversion type and the standard
+   * conversion class (if available.)
+   */
+  public static class ConversionDefn {
+    public final ConversionType type;
+    public final Class<? extends AbstractWriteConverter> conversionClass;
+
+    public ConversionDefn(ConversionType type) {
+      this.type = type;
+      conversionClass = null;
+    }
+
+    public ConversionDefn(Class<? extends AbstractWriteConverter> conversionClass) {
+      this.type = ConversionType.EXPLICIT;
+      this.conversionClass = conversionClass;
+    }
+  }
+
+  /**
+   * Column conversion factory for the case where a conversion class is provided.
+   */
+  public static class SimpleWriterConverterFactory implements ColumnConversionFactory {
+    private final Class<? extends AbstractWriteConverter> conversionClass;
+
+    SimpleWriterConverterFactory(Class<? extends AbstractWriteConverter> conversionClass) {
+      this.conversionClass = conversionClass;
+    }
+
+    @Override
+    public AbstractWriteConverter newWriter(ScalarWriter baseWriter) {
+      return newInstance(conversionClass, baseWriter);
+    }
+  }
+
+  public static ColumnConversionFactory factory(Class<? extends AbstractWriteConverter> converterClass) {
+    return new SimpleWriterConverterFactory(converterClass);
+  }
+
+  public static AbstractWriteConverter newInstance(
+      Class<? extends AbstractWriteConverter> conversionClass, ScalarWriter baseWriter) {
+    try {
+      final Constructor<? extends AbstractWriteConverter> ctor = conversionClass.getDeclaredConstructor(ScalarWriter.class);
+      return ctor.newInstance(baseWriter);
+    } catch (final ReflectiveOperationException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  /**
+   * Create converters for standard cases.
+   * <p>
+   * Does not support any of the "legacy" decimal types.
+   *
+   * @param inputDefn the column schema for the input column which the
+   * client code (e.g. reader) wants to produce
+   * @param outputDefn the column schema for the output vector to be produced
+   * by this operator
+   * @return a description of the conversion needed (if any), along with the
+   * standard conversion class, if available
+   */
+  public static ConversionDefn analyze(ColumnMetadata inputSchema, ColumnMetadata outputSchema) {
+    if (inputSchema.type().equals(outputSchema.type())) {
+      return new ConversionDefn(ConversionType.NONE);
+    }
+
+    switch (inputSchema.type()) {
+    case VARCHAR:
+      return new ConversionDefn(convertFromVarchar(outputSchema));
+    case TINYINT:
+      switch (outputSchema.type()) {
+      case SMALLINT:
+      case INT:
+      case BIGINT:
+      case FLOAT4:
+      case FLOAT8:
+      case VARDECIMAL:
+        return new ConversionDefn(ConversionType.IMPLICIT);
+      default:
+        break;
+      }
+      break;
+    case SMALLINT:
+      switch (outputSchema.type()) {
+      case TINYINT:
+        return new ConversionDefn(ConversionType.IMPLICIT_UNSAFE);
+      case INT:
+      case BIGINT:
+      case FLOAT4:
+      case FLOAT8:
+      case VARDECIMAL:
+        return new ConversionDefn(ConversionType.IMPLICIT);
+      default:
+        break;
+      }
+      break;
+    case INT:
+      switch (outputSchema.type()) {
+      case TINYINT:
+      case SMALLINT:
+        return new ConversionDefn(ConversionType.IMPLICIT_UNSAFE);
+      case BIGINT:
+      case FLOAT4:
+      case FLOAT8:
+      case VARDECIMAL:
+        return new ConversionDefn(ConversionType.IMPLICIT);
+      default:
+        break;
+      }
+      break;
+    case BIGINT:
+      switch (outputSchema.type()) {
+      case TINYINT:
+      case SMALLINT:
+      case INT:
+        return new ConversionDefn(ConversionType.IMPLICIT_UNSAFE);
+      case FLOAT4:
+      case FLOAT8:
+      case VARDECIMAL:
+        return new ConversionDefn(ConversionType.IMPLICIT);
+      default:
+        break;
+      }
+      break;
+    case FLOAT4:
+      switch (outputSchema.type()) {
+      case TINYINT:
+      case SMALLINT:
+      case INT:
+      case BIGINT:
+        return new ConversionDefn(ConversionType.IMPLICIT_UNSAFE);
+      case FLOAT8:
+      case VARDECIMAL:
+        return new ConversionDefn(ConversionType.IMPLICIT);
+      default:
+        break;
+      }
+      break;
+    case FLOAT8:
+      switch (outputSchema.type()) {
+      case TINYINT:
+      case SMALLINT:
+      case INT:
+      case BIGINT:
+      case FLOAT4:
+        return new ConversionDefn(ConversionType.IMPLICIT_UNSAFE);
+      case VARDECIMAL:
+        return new ConversionDefn(ConversionType.IMPLICIT);
+      default:
+        break;
+      }
+      break;
+    default:
+      break;
+    }
+    return new ConversionDefn(ConversionType.EXPLICIT);
+  }
+
+  public static Class<? extends AbstractWriteConverter> convertFromVarchar(
+      ColumnMetadata outputDefn) {
+    switch (outputDefn.type()) {
+    case TINYINT:
+    case SMALLINT:
+    case INT:
+    case UINT1:
+    case UINT2:
+      return ConvertStringToInt.class;
+    case BIGINT:
+      return ConvertStringToLong.class;
+    case FLOAT4:
+    case FLOAT8:
+      return ConvertStringToDouble.class;
+    case DATE:
+      return ConvertStringToDate.class;
+    case TIME:
+      return ConvertStringToTime.class;
+    case TIMESTAMP:
+      return ConvertStringToTimeStamp.class;
+    case INTERVALYEAR:
+    case INTERVALDAY:
+    case INTERVAL:
+      return ConvertStringToInterval.class;
+    default:
+      return null;
+    }
+  }
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/package-info.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/package-info.java
new file mode 100644
index 0000000..6f07bbc
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Defines the type conversion mechanism along with several basic
+ * conversions.
+ */
+package org.apache.drill.exec.vector.accessor.convert;
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java
index 527a69c..96710d7 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java
@@ -25,6 +25,8 @@ import org.apache.drill.exec.vector.accessor.ObjectWriter;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.drill.exec.vector.accessor.TupleWriter;
 import org.apache.drill.exec.vector.accessor.VariantWriter;
+import org.apache.drill.exec.vector.accessor.convert.AbstractWriteConverter;
+import org.apache.drill.exec.vector.accessor.convert.ColumnConversionFactory;
 import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
 
 /**
@@ -58,6 +60,7 @@ public abstract class AbstractObjectWriter implements ObjectWriter {
   }
 
   public abstract ColumnWriter writer();
+
   @Override
   public abstract WriterEvents events();
 
@@ -77,4 +80,17 @@ public abstract class AbstractObjectWriter implements ObjectWriter {
   public void setObject(Object value) { writer().setObject(value); }
 
   public abstract void dump(HierarchicalFormatter format);
+
+  protected static ScalarWriter convertWriter(
+      ColumnConversionFactory conversionFactory,
+      ScalarWriter baseWriter) {
+    if (conversionFactory == null) {
+      return baseWriter;
+    }
+    final AbstractWriteConverter shim = conversionFactory.newWriter(baseWriter);
+    if (shim == null) {
+      return baseWriter;
+    }
+    return shim;
+  }
 }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriter.java
index a596e14..5d1eea6 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriter.java
@@ -22,6 +22,9 @@ import java.math.BigDecimal;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.drill.exec.vector.accessor.UnsupportedConversionError;
 import org.apache.drill.exec.vector.accessor.writer.WriterEvents.ColumnWriterListener;
+import org.joda.time.Instant;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 import org.joda.time.Period;
 
 /**
@@ -41,10 +44,20 @@ public abstract class AbstractScalarWriter implements ScalarWriter {
       setLong((Long) value);
     } else if (value instanceof String) {
       setString((String) value);
+    } else if (value instanceof Double) {
+      setDouble((Double) value);
+    } else if (value instanceof Float) {
+      setDouble((Float) value);
     } else if (value instanceof BigDecimal) {
       setDecimal((BigDecimal) value);
     } else if (value instanceof Period) {
       setPeriod((Period) value);
+    } else if (value instanceof LocalTime) {
+      setTime((LocalTime) value);
+    } else if (value instanceof LocalDate) {
+      setDate((LocalDate) value);
+    } else if (value instanceof Instant) {
+      setTimestamp((Instant) value);
     } else if (value instanceof byte[]) {
       final byte[] bytes = (byte[]) value;
       setBytes(bytes, bytes.length);
@@ -52,10 +65,6 @@ public abstract class AbstractScalarWriter implements ScalarWriter {
       setInt((Byte) value);
     } else if (value instanceof Short) {
       setInt((Short) value);
-    } else if (value instanceof Double) {
-      setDouble((Double) value);
-    } else if (value instanceof Float) {
-      setDouble((Float) value);
     } else {
       throw conversionError(value.getClass().getSimpleName());
     }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriterImpl.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriterImpl.java
index c1306b6..135e46a 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriterImpl.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriterImpl.java
@@ -19,11 +19,11 @@ package org.apache.drill.exec.vector.accessor.writer;
 
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.vector.BaseDataValueVector;
-import org.apache.drill.exec.vector.accessor.ColumnConversionFactory;
 import org.apache.drill.exec.vector.accessor.ColumnWriter;
 import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
 import org.apache.drill.exec.vector.accessor.ObjectType;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.convert.ColumnConversionFactory;
 import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
 
 /**
@@ -49,17 +49,12 @@ public abstract class AbstractScalarWriterImpl extends AbstractScalarWriter impl
   public static class ScalarObjectWriter extends AbstractObjectWriter {
 
     private final WriterEvents writerEvents;
-    private ScalarWriter scalarWriter;
-
-    public ScalarObjectWriter(AbstractScalarWriterImpl scalarWriter) {
-      final ColumnMetadata metadata = scalarWriter.schema();
-      final ColumnConversionFactory factory = metadata.typeConverter();
-      writerEvents = scalarWriter;
-      if (factory == null) {
-        this.scalarWriter = scalarWriter;
-      } else {
-        this.scalarWriter = factory.newWriter(metadata, scalarWriter);
-      }
+    private final ScalarWriter scalarWriter;
+
+    public ScalarObjectWriter(AbstractScalarWriterImpl baseWriter,
+        ColumnConversionFactory conversionFactory) {
+      writerEvents = baseWriter;
+      scalarWriter = convertWriter(conversionFactory, baseWriter);
     }
 
     @Override
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseScalarWriter.java
index e2e63d2..5bc38bb 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseScalarWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseScalarWriter.java
@@ -21,6 +21,9 @@ import java.math.BigDecimal;
 
 import org.apache.drill.exec.vector.accessor.UnsupportedConversionError;
 import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
+import org.joda.time.Instant;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 import org.joda.time.Period;
 
 import io.netty.buffer.DrillBuf;
@@ -246,6 +249,21 @@ public abstract class BaseScalarWriter extends AbstractScalarWriterImpl {
   }
 
   @Override
+  public void setDate(LocalDate value) {
+    throw conversionError("LocalDate");
+  }
+
+  @Override
+  public void setTime(LocalTime value) {
+    throw conversionError("LocalTime");
+  }
+
+  @Override
+  public void setTimestamp(Instant value) {
+    throw conversionError("Instant");
+  }
+
+  @Override
   public void dump(HierarchicalFormatter format) {
     format.extend();
     super.dump(format);
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ColumnWriterFactory.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ColumnWriterFactory.java
index 206a0a6..557336f 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ColumnWriterFactory.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ColumnWriterFactory.java
@@ -19,12 +19,14 @@ package org.apache.drill.exec.vector.accessor.writer;
 
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
+
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.vector.NullableVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.accessor.ColumnAccessorUtils;
+import org.apache.drill.exec.vector.accessor.convert.ColumnConversionFactory;
 import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter.ArrayObjectWriter;
 import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriterImpl.ScalarObjectWriter;
 import org.apache.drill.exec.vector.accessor.writer.dummy.DummyArrayWriter;
@@ -53,7 +55,8 @@ public class ColumnWriterFactory {
     ColumnAccessorUtils.defineRequiredWriters(requiredWriters);
   }
 
-  public static AbstractObjectWriter buildColumnWriter(ColumnMetadata schema, ValueVector vector) {
+  public static AbstractObjectWriter buildColumnWriter(ColumnMetadata schema,
+      ColumnConversionFactory conversionFactory, ValueVector vector) {
     if (vector == null) {
       return buildDummyColumnWriter(schema);
     }
@@ -74,11 +77,11 @@ public class ColumnWriterFactory {
     default:
       switch (schema.mode()) {
       case OPTIONAL:
-        return nullableScalarWriter(schema, (NullableVector) vector);
+        return nullableScalarWriter(schema, conversionFactory, (NullableVector) vector);
       case REQUIRED:
-        return requiredScalarWriter(schema, vector);
+        return requiredScalarWriter(schema, conversionFactory, vector);
       case REPEATED:
-        return repeatedScalarWriter(schema, (RepeatedValueVector) vector);
+        return repeatedScalarWriter(schema, conversionFactory, (RepeatedValueVector) vector);
       default:
         throw new UnsupportedOperationException(schema.mode().toString());
       }
@@ -86,24 +89,30 @@ public class ColumnWriterFactory {
   }
 
   private static ScalarObjectWriter requiredScalarWriter(
-      ColumnMetadata schema, ValueVector vector) {
-    BaseScalarWriter baseWriter = newWriter(vector);
+      ColumnMetadata schema,
+      ColumnConversionFactory conversionFactory,
+      ValueVector vector) {
+    final BaseScalarWriter baseWriter = newWriter(vector);
     baseWriter.bindSchema(schema);
-    return new ScalarObjectWriter(baseWriter);
+    return new ScalarObjectWriter(baseWriter, conversionFactory);
   }
 
   private static ScalarObjectWriter nullableScalarWriter(
-      ColumnMetadata schema, NullableVector vector) {
-    BaseScalarWriter baseWriter = newWriter(vector.getValuesVector());
+      ColumnMetadata schema,
+      ColumnConversionFactory conversionFactory,
+      NullableVector vector) {
+    final BaseScalarWriter baseWriter = newWriter(vector.getValuesVector());
     baseWriter.bindSchema(schema);
-    return NullableScalarWriter.build(schema, vector, baseWriter);
+    return NullableScalarWriter.build(schema, vector, baseWriter, conversionFactory);
   }
 
   private static AbstractObjectWriter repeatedScalarWriter(
-      ColumnMetadata schema, RepeatedValueVector vector) {
-    BaseScalarWriter baseWriter = newWriter(vector.getDataVector());
+      ColumnMetadata schema,
+      ColumnConversionFactory conversionFactory,
+      RepeatedValueVector vector) {
+    final BaseScalarWriter baseWriter = newWriter(vector.getDataVector());
     baseWriter.bindSchema(schema);
-    return ScalarArrayWriter.build(schema, vector, baseWriter);
+    return ScalarArrayWriter.build(schema, vector, baseWriter, conversionFactory);
   }
 
   /**
@@ -121,8 +130,8 @@ public class ColumnWriterFactory {
     case UNION:
       throw new UnsupportedOperationException(schema.type().toString());
     default:
-      ScalarObjectWriter scalarWriter = new ScalarObjectWriter(
-          new DummyScalarWriter(schema));
+      final ScalarObjectWriter scalarWriter = new ScalarObjectWriter(
+          new DummyScalarWriter(schema), null);
       switch (schema.mode()) {
       case OPTIONAL:
       case REQUIRED:
@@ -138,14 +147,14 @@ public class ColumnWriterFactory {
   }
 
   public static BaseScalarWriter newWriter(ValueVector vector) {
-    MajorType major = vector.getField().getType();
-    MinorType type = major.getMinorType();
+    final MajorType major = vector.getField().getType();
+    final MinorType type = major.getMinorType();
     try {
-      Class<? extends BaseScalarWriter> accessorClass = requiredWriters[type.ordinal()];
+      final Class<? extends BaseScalarWriter> accessorClass = requiredWriters[type.ordinal()];
       if (accessorClass == null) {
         throw new UnsupportedOperationException(type.toString());
       }
-      Constructor<? extends BaseScalarWriter> ctor = accessorClass.getConstructor(ValueVector.class);
+      final Constructor<? extends BaseScalarWriter> ctor = accessorClass.getConstructor(ValueVector.class);
       return ctor.newInstance(vector);
     } catch (InstantiationException | IllegalAccessException | NoSuchMethodException |
              SecurityException | IllegalArgumentException | InvocationTargetException e) {
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java
index e9fe11a..f555277 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java
@@ -25,7 +25,11 @@ import org.apache.drill.exec.vector.NullableVector;
 import org.apache.drill.exec.vector.accessor.ColumnAccessors.UInt1ColumnWriter;
 import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
 import org.apache.drill.exec.vector.accessor.ValueType;
+import org.apache.drill.exec.vector.accessor.convert.ColumnConversionFactory;
 import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
+import org.joda.time.Instant;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 import org.joda.time.Period;
 
 public class NullableScalarWriter extends AbstractScalarWriterImpl {
@@ -77,9 +81,11 @@ public class NullableScalarWriter extends AbstractScalarWriterImpl {
   }
 
   public static ScalarObjectWriter build(ColumnMetadata schema,
-      NullableVector nullableVector, BaseScalarWriter baseWriter) {
+      NullableVector nullableVector, BaseScalarWriter baseWriter,
+      ColumnConversionFactory conversionFactory) {
     return new ScalarObjectWriter(
-        new NullableScalarWriter(schema, nullableVector, baseWriter));
+        new NullableScalarWriter(schema, nullableVector, baseWriter),
+        conversionFactory);
   }
 
   public BaseScalarWriter bitsWriter() { return isSetWriter; }
@@ -179,6 +185,27 @@ public class NullableScalarWriter extends AbstractScalarWriterImpl {
   }
 
   @Override
+  public void setDate(LocalDate value) {
+    baseWriter.setDate(value);
+    isSetWriter.setInt(1);
+    writerIndex.nextElement();
+  }
+
+  @Override
+  public void setTime(LocalTime value) {
+    baseWriter.setTime(value);
+    isSetWriter.setInt(1);
+    writerIndex.nextElement();
+  }
+
+  @Override
+  public void setTimestamp(Instant value) {
+    baseWriter.setTimestamp(value);
+    isSetWriter.setInt(1);
+    writerIndex.nextElement();
+  }
+
+  @Override
   public void preRollover() {
     isSetWriter.preRollover();
     baseWriter.preRollover();
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java
index c761a95..16d827f 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java
@@ -23,6 +23,7 @@ import java.math.BigDecimal;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.convert.ColumnConversionFactory;
 import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter.BaseArrayWriter;
 import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriterImpl.ScalarObjectWriter;
 import org.apache.drill.exec.vector.complex.RepeatedValueVector;
@@ -64,9 +65,10 @@ public class ScalarArrayWriter extends BaseArrayWriter {
   private final ScalarWriter elementWriter;
 
   public ScalarArrayWriter(ColumnMetadata schema,
-      RepeatedValueVector vector, BaseScalarWriter elementWriter) {
+      RepeatedValueVector vector, BaseScalarWriter baseElementWriter,
+      ColumnConversionFactory conversionFactory) {
     super(schema, vector.getOffsetVector(),
-        new ScalarObjectWriter(elementWriter));
+        new ScalarObjectWriter(baseElementWriter, conversionFactory));
 
     // Save the writer from the scalar object writer created above
     // which may have wrapped the element writer in a type convertor.
@@ -75,9 +77,10 @@ public class ScalarArrayWriter extends BaseArrayWriter {
   }
 
   public static ArrayObjectWriter build(ColumnMetadata schema,
-      RepeatedValueVector repeatedVector, BaseScalarWriter elementWriter) {
+      RepeatedValueVector repeatedVector, BaseScalarWriter baseElementWriter,
+      ColumnConversionFactory conversionFactory) {
     return new ArrayObjectWriter(
-        new ScalarArrayWriter(schema, repeatedVector, elementWriter));
+        new ScalarArrayWriter(schema, repeatedVector, baseElementWriter, conversionFactory));
   }
 
   @Override
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionVectorShim.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionVectorShim.java
index d18b382..bec5fc7 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionVectorShim.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionVectorShim.java
@@ -59,7 +59,7 @@ public class UnionVectorShim implements UnionShim {
 
       final ValueVector memberVector = shim.vector.getMember(type);
       final ColumnMetadata memberSchema = shim.writer.variantSchema().addType(type);
-      return ColumnWriterFactory.buildColumnWriter(memberSchema, memberVector);
+      return ColumnWriterFactory.buildColumnWriter(memberSchema, null, memberVector);
     }
 
     @Override
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java
index 919f689..a1385d4 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java
@@ -24,6 +24,9 @@ import org.apache.drill.exec.vector.BaseDataValueVector;
 import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
 import org.apache.drill.exec.vector.accessor.ValueType;
 import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriterImpl;
+import org.joda.time.Instant;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 import org.joda.time.Period;
 
 /**
@@ -94,4 +97,13 @@ public class DummyScalarWriter extends AbstractScalarWriterImpl {
 
   @Override
   public int rowStartIndex() { return 0; }
+
+  @Override
+  public void setDate(LocalDate value) { }
+
+  @Override
+  public void setTime(LocalTime value) { }
+
+  @Override
+  public void setTimestamp(Instant value) { }
 }