You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2019/03/11 11:11:23 UTC

[drill] 02/02: DRILL-7073: CREATE SCHEMA command / TupleSchema / ColumnMetadata improvements

This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 78dc86843fb9ef2683156708bc545a6b1950cb87
Author: Arina Ielchiieva <ar...@gmail.com>
AuthorDate: Tue Mar 5 19:29:18 2019 +0200

    DRILL-7073: CREATE SCHEMA command / TupleSchema / ColumnMetadata improvements
    
    1. Add format, default, column properties logic.
    2. Changed schema JSON after serialization.
    3. Added appropriate unit tests.
    
    closes #1684
---
 .../record/metadata/schema/parser/SchemaLexer.g4   |  10 ++
 .../record/metadata/schema/parser/SchemaParser.g4  |  16 ++-
 .../record/metadata/AbstractColumnMetadata.java    |  94 +++++++++++++++
 .../exec/record/metadata/MapColumnMetadata.java    |   8 +-
 .../record/metadata/PrimitiveColumnMetadata.java   | 115 ++++++++++++++++++
 .../drill/exec/record/metadata/TupleSchema.java    |  63 ++++++++--
 .../record/metadata/schema/SchemaContainer.java    |  50 +++-----
 .../metadata/schema/parser/SchemaExprParser.java   |  16 +++
 .../metadata/schema/parser/SchemaVisitor.java      |  66 +++++++++-
 .../java/org/apache/drill/TestSchemaCommands.java  |  77 ++++++++++--
 .../record/metadata/schema/TestSchemaProvider.java | 133 ++++++++++++++-------
 .../schema/parser/TestParserErrorHandling.java     |   4 -
 .../metadata/schema/parser/TestSchemaParser.java   | 119 +++++++++++-------
 .../drill/exec/record/metadata/ColumnMetadata.java |  30 +++++
 .../drill/exec/record/metadata/TupleMetadata.java  |  11 +-
 15 files changed, 657 insertions(+), 155 deletions(-)

diff --git a/exec/java-exec/src/main/antlr4/org/apache/drill/exec/record/metadata/schema/parser/SchemaLexer.g4 b/exec/java-exec/src/main/antlr4/org/apache/drill/exec/record/metadata/schema/parser/SchemaLexer.g4
index 99426d9..bc508d0 100644
--- a/exec/java-exec/src/main/antlr4/org/apache/drill/exec/record/metadata/schema/parser/SchemaLexer.g4
+++ b/exec/java-exec/src/main/antlr4/org/apache/drill/exec/record/metadata/schema/parser/SchemaLexer.g4
@@ -64,10 +64,18 @@ LEFT_PAREN: '(';
 RIGHT_PAREN: ')';
 LEFT_ANGLE_BRACKET: '<';
 RIGHT_ANGLE_BRACKET: '>';
+SINGLE_QUOTE: '\'';
+DOUBLE_QUOTE: '"';
+LEFT_BRACE: '{';
+RIGHT_BRACE: '}';
+EQUALS_SIGN: '=';
 
 NOT: 'NOT';
 NULL: 'NULL';
 AS: 'AS';
+FORMAT: 'FORMAT';
+DEFAULT: 'DEFAULT';
+PROPERTIES: 'PROPERTIES';
 
 NUMBER: [1-9] DIGIT* | '0';
 fragment DIGIT: [0-9];
@@ -83,6 +91,8 @@ ID: ([A-Z$_]) ([A-Z$_] | DIGIT)*;
 // if contains backtick, it should be escaped with backslash (`a\\`b` -> a`b)
 // if contains backslash, it should be escaped as well (`a\\\\b` -> a\b)
 QUOTED_ID: REVERSE_QUOTE (~[`\\] | '\\' [`\\])* REVERSE_QUOTE;
+SINGLE_QUOTED_STRING: SINGLE_QUOTE (~['\\] | '\\' ['\\])* SINGLE_QUOTE;
+DOUBLE_QUOTED_STRING: DOUBLE_QUOTE (~["\\] | '\\' ["\\])* DOUBLE_QUOTE;
 
 // skip
 LINE_COMMENT:  '//' ~[\r\n]* -> skip;
diff --git a/exec/java-exec/src/main/antlr4/org/apache/drill/exec/record/metadata/schema/parser/SchemaParser.g4 b/exec/java-exec/src/main/antlr4/org/apache/drill/exec/record/metadata/schema/parser/SchemaParser.g4
index 321e99f..5d578fe 100644
--- a/exec/java-exec/src/main/antlr4/org/apache/drill/exec/record/metadata/schema/parser/SchemaParser.g4
+++ b/exec/java-exec/src/main/antlr4/org/apache/drill/exec/record/metadata/schema/parser/SchemaParser.g4
@@ -27,11 +27,13 @@ options {
 
 schema: (columns | LEFT_PAREN columns RIGHT_PAREN) EOF;
 
-columns: column (COMMA column)*;
+columns: column_def  (COMMA column_def)*;
+
+column_def: column property_values?;
 
 column: (primitive_column | map_column | simple_array_column | complex_array_column);
 
-primitive_column: column_id simple_type nullability?;
+primitive_column: column_id simple_type nullability? format_value? default_value?;
 
 simple_array_column: column_id simple_array_type nullability?;
 
@@ -70,3 +72,13 @@ complex_array_type: ARRAY LEFT_ANGLE_BRACKET complex_type RIGHT_ANGLE_BRACKET;
 map_type: MAP LEFT_ANGLE_BRACKET columns RIGHT_ANGLE_BRACKET;
 
 nullability: NOT NULL;
+
+format_value: FORMAT string_value;
+
+default_value: DEFAULT string_value;
+
+property_values: PROPERTIES LEFT_BRACE property_pair (COMMA property_pair)* RIGHT_BRACE;
+
+property_pair: string_value EQUALS_SIGN string_value;
+
+string_value: (QUOTED_ID | SINGLE_QUOTED_STRING | DOUBLE_QUOTED_STRING);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java
index 1f833de..521a787 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java
@@ -17,13 +17,23 @@
  */
 package org.apache.drill.exec.record.metadata;
 
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.metadata.schema.parser.SchemaExprParser;
 import org.apache.drill.exec.vector.accessor.ColumnConversionFactory;
 import org.apache.drill.exec.vector.accessor.UnsupportedConversionError;
 
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
 /**
  * Abstract definition of column metadata. Allows applications to create
  * specialized forms of a column metadata object by extending from this
@@ -36,6 +46,13 @@ import org.apache.drill.exec.vector.accessor.UnsupportedConversionError;
  * since maps (and the row itself) will, by definition, differ between
  * the two views.
  */
+@JsonAutoDetect(
+  fieldVisibility = JsonAutoDetect.Visibility.NONE,
+  getterVisibility = JsonAutoDetect.Visibility.NONE,
+  isGetterVisibility = JsonAutoDetect.Visibility.NONE,
+  setterVisibility = JsonAutoDetect.Visibility.NONE)
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)
+@JsonPropertyOrder({"name", "type", "mode", "format", "default", "properties"})
 public abstract class AbstractColumnMetadata implements ColumnMetadata {
 
   // Capture the key schema information. We cannot use the MaterializedField
@@ -55,6 +72,21 @@ public abstract class AbstractColumnMetadata implements ColumnMetadata {
    */
 
   protected int expectedElementCount = 1;
+  protected final Map<String, String> properties = new LinkedHashMap<>();
+
+  @JsonCreator
+  public static AbstractColumnMetadata createColumnMetadata(@JsonProperty("name") String name,
+                                                            @JsonProperty("type") String type,
+                                                            @JsonProperty("mode") DataMode mode,
+                                                            @JsonProperty("format") String formatValue,
+                                                            @JsonProperty("default") String defaultValue,
+                                                            @JsonProperty("properties") Map<String, String> properties) {
+    ColumnMetadata columnMetadata = SchemaExprParser.parseColumn(name, type, mode);
+    columnMetadata.setFormatValue(formatValue);
+    columnMetadata.setDefaultFromString(defaultValue);
+    columnMetadata.setProperties(properties);
+    return (AbstractColumnMetadata) columnMetadata;
+  }
 
   public AbstractColumnMetadata(MaterializedField schema) {
     name = schema.getName();
@@ -91,6 +123,7 @@ public abstract class AbstractColumnMetadata implements ColumnMetadata {
   @Override
   public void bind(TupleMetadata parentTuple) { }
 
+  @JsonProperty("name")
   @Override
   public String name() { return name; }
 
@@ -105,6 +138,7 @@ public abstract class AbstractColumnMetadata implements ColumnMetadata {
         .build();
   }
 
+  @JsonProperty("mode")
   @Override
   public DataMode mode() { return mode; }
 
@@ -186,12 +220,28 @@ public abstract class AbstractColumnMetadata implements ColumnMetadata {
   public boolean isProjected() { return projected; }
 
   @Override
+  public void setFormatValue(String value) { }
+
+  @JsonProperty("format")
+  @Override
+  public String formatValue() { return null; }
+
+  @Override
   public void setDefaultValue(Object value) { }
 
   @Override
   public Object defaultValue() { return null; }
 
   @Override
+  public void setDefaultFromString(String value) { }
+
+  @JsonProperty("default")
+  @Override
+  public String defaultStringValue() {
+    return null;
+  }
+
+  @Override
   public void setTypeConverter(ColumnConversionFactory factory) {
     throw new UnsupportedConversionError("Type conversion not supported for non-scalar writers");
   }
@@ -200,6 +250,20 @@ public abstract class AbstractColumnMetadata implements ColumnMetadata {
   public ColumnConversionFactory typeConverter() { return null; }
 
   @Override
+  public void setProperties(Map<String, String> properties) {
+    if (properties == null) {
+      return;
+    }
+    this.properties.putAll(properties);
+  }
+
+  @JsonProperty("properties")
+  @Override
+  public Map<String, String> properties() {
+    return properties;
+  }
+
+  @Override
   public String toString() {
     final StringBuilder buf = new StringBuilder()
         .append("[")
@@ -221,11 +285,24 @@ public abstract class AbstractColumnMetadata implements ColumnMetadata {
       buf.append(", schema: ")
          .append(mapSchema().toString());
     }
+    if (formatValue() != null) {
+      buf.append(", format: ")
+        .append(formatValue());
+    }
+    if (defaultValue() != null) {
+      buf.append(", default: ")
+        .append(defaultStringValue());
+    }
+    if (!properties().isEmpty()) {
+      buf.append(", properties: ")
+        .append(properties());
+    }
     return buf
         .append("]")
         .toString();
   }
 
+  @JsonProperty("type")
   @Override
   public String typeString() {
     return majorType().toString();
@@ -243,6 +320,23 @@ public abstract class AbstractColumnMetadata implements ColumnMetadata {
       builder.append(" NOT NULL");
     }
 
+    if (formatValue() != null) {
+      builder.append(" FORMAT '").append(formatValue()).append("'");
+    }
+
+    if (defaultValue() != null) {
+      builder.append(" DEFAULT '").append(defaultStringValue()).append("'");
+    }
+
+    if (!properties().isEmpty()) {
+      builder.append(" PROPERTIES { ");
+      builder.append(properties().entrySet()
+        .stream()
+        .map(e -> String.format("'%s' = '%s'", e.getKey(), e.getValue()))
+        .collect(Collectors.joining(", ")));
+      builder.append(" }");
+    }
+
     return builder.toString();
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/MapColumnMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/MapColumnMetadata.java
index 8d295e6..3afc4d2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/MapColumnMetadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/MapColumnMetadata.java
@@ -22,6 +22,8 @@ import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.record.MaterializedField;
 
+import java.util.stream.Collectors;
+
 /**
  * Describes a map and repeated map. Both are tuples that have a tuple
  * schema as part of the column definition.
@@ -125,7 +127,11 @@ public class MapColumnMetadata extends AbstractColumnMetadata {
     if (isArray()) {
       builder.append("ARRAY<");
     }
-    builder.append("MAP<").append(mapSchema.schemaString()).append(">");
+    builder.append("MAP<");
+    builder.append(mapSchema().toMetadataList().stream()
+      .map(ColumnMetadata::columnString)
+      .collect(Collectors.joining(", ")));
+    builder.append(">");
     if (isArray()) {
       builder.append(">");
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/PrimitiveColumnMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/PrimitiveColumnMetadata.java
index 9781e1c..21ac093 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/PrimitiveColumnMetadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/PrimitiveColumnMetadata.java
@@ -24,6 +24,15 @@ import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.vector.accessor.ColumnConversionFactory;
+import org.joda.time.Period;
+
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
 
 /**
  * Primitive (non-map) column. Describes non-nullable, nullable and array types
@@ -42,12 +51,16 @@ import org.apache.drill.exec.vector.accessor.ColumnConversionFactory;
 
 public class PrimitiveColumnMetadata extends AbstractColumnMetadata {
 
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PrimitiveColumnMetadata.class);
+
   /**
    * Expected (average) width for variable-width columns.
    */
 
   private int expectedWidth;
 
+  private String formatValue;
+
   /**
    * Default value to use for filling a vector when no real data is
    * available, such as for columns added in new files but which does not
@@ -135,6 +148,16 @@ public class PrimitiveColumnMetadata extends AbstractColumnMetadata {
   }
 
   @Override
+  public void setFormatValue(String value) {
+    formatValue = value;
+  }
+
+  @Override
+  public String formatValue() {
+    return formatValue;
+  }
+
+  @Override
   public void setDefaultValue(Object value) {
     defaultValue = value;
   }
@@ -143,6 +166,16 @@ public class PrimitiveColumnMetadata extends AbstractColumnMetadata {
   public Object defaultValue() { return defaultValue; }
 
   @Override
+  public void setDefaultFromString(String value) {
+    this.defaultValue = valueFromString(value);
+  }
+
+  @Override
+  public String defaultStringValue() {
+    return valueToString(defaultValue);
+  }
+
+  @Override
   public void setTypeConverter(ColumnConversionFactory factory) {
     shimFactory = factory;
   }
@@ -226,4 +259,86 @@ public class PrimitiveColumnMetadata extends AbstractColumnMetadata {
     return builder.toString();
   }
 
+  /**
+   * Converts value in string literal form into Object instance based on {@link MinorType} value.
+   * Returns null in case of error during parsing or unsupported type.
+   *
+   * @param value value in string literal form
+   * @return Object instance
+   */
+  private Object valueFromString(String value) {
+    if (value == null) {
+      return null;
+    }
+    try {
+      switch (type) {
+        case INT:
+          return Integer.parseInt(value);
+        case BIGINT:
+          return Long.parseLong(value);
+        case FLOAT4:
+          return Float.parseFloat(value);
+        case FLOAT8:
+          return Double.parseDouble(value);
+        case VARDECIMAL:
+          return new BigDecimal(value);
+        case BIT:
+          return Boolean.parseBoolean(value);
+        case VARCHAR:
+        case VARBINARY:
+          return value;
+        case TIME:
+          DateTimeFormatter timeFormatter = formatValue == null
+            ? DateTimeFormatter.ISO_TIME.withZone(ZoneOffset.UTC) : DateTimeFormatter.ofPattern(formatValue);
+          return LocalTime.parse(value, timeFormatter);
+        case DATE:
+          DateTimeFormatter dateFormatter = formatValue == null
+            ? DateTimeFormatter.ISO_DATE.withZone(ZoneOffset.UTC) : DateTimeFormatter.ofPattern(formatValue);
+          return LocalDate.parse(value, dateFormatter);
+        case TIMESTAMP:
+          DateTimeFormatter dateTimeFormatter = formatValue == null
+            ? DateTimeFormatter.ISO_DATE_TIME.withZone(ZoneOffset.UTC) : DateTimeFormatter.ofPattern(formatValue);
+          return ZonedDateTime.parse(value, dateTimeFormatter);
+        case INTERVAL:
+        case INTERVALDAY:
+        case INTERVALYEAR:
+          return Period.parse(value);
+        default:
+          logger.warn("Unsupported type {} for default value {}, ignore and return null", type, value);
+          return null;
+      }
+    } catch (IllegalArgumentException | DateTimeParseException e) {
+      logger.warn("Error while parsing type {} default value {}, ignore and return null", type, value, e);
+      return null;
+    }
+  }
+
+  /**
+   * Converts given value instance into String literal representation based on column metadata type.
+   *
+   * @param value value instance
+   * @return value in string literal representation
+   */
+  private String valueToString(Object value) {
+    if (value == null) {
+      return null;
+    }
+    switch (type) {
+      case TIME:
+        DateTimeFormatter timeFormatter = formatValue == null
+          ? DateTimeFormatter.ISO_TIME.withZone(ZoneOffset.UTC) : DateTimeFormatter.ofPattern(formatValue);
+        return timeFormatter.format((LocalTime) value);
+      case DATE:
+        DateTimeFormatter dateFormatter = formatValue == null
+          ? DateTimeFormatter.ISO_DATE.withZone(ZoneOffset.UTC) : DateTimeFormatter.ofPattern(formatValue);
+        return dateFormatter.format((LocalDate) value);
+      case TIMESTAMP:
+        DateTimeFormatter dateTimeFormatter = formatValue == null
+          ? DateTimeFormatter.ISO_DATE_TIME.withZone(ZoneOffset.UTC) : DateTimeFormatter.ofPattern(formatValue);
+        return dateTimeFormatter.format((ZonedDateTime) value);
+      default:
+        return value.toString();
+    }
+  }
+
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/TupleSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/TupleSchema.java
index 83dc91a..283ee64 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/TupleSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/TupleSchema.java
@@ -17,15 +17,22 @@
  */
 package org.apache.drill.exec.record.metadata;
 
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.MaterializedField;
+
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-
 /**
  * Defines the schema of a tuple: either the top-level row or a nested
  * "map" (really structure). A schema is a collection of columns (backed
@@ -33,11 +40,28 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
  * index. New columns may be added at any time; the new column takes the
  * next available index.
  */
-
+@JsonAutoDetect(
+  fieldVisibility = JsonAutoDetect.Visibility.NONE,
+  getterVisibility = JsonAutoDetect.Visibility.NONE,
+  isGetterVisibility = JsonAutoDetect.Visibility.NONE,
+  setterVisibility = JsonAutoDetect.Visibility.NONE)
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)
+@JsonPropertyOrder({"columns", "properties"})
 public class TupleSchema implements TupleMetadata {
 
   private MapColumnMetadata parentMap;
   private final TupleNameSpace<ColumnMetadata> nameSpace = new TupleNameSpace<>();
+  private final Map<String, String> properties = new LinkedHashMap<>();
+
+  public TupleSchema() {
+  }
+
+  @JsonCreator
+  public TupleSchema(@JsonProperty("columns") List<AbstractColumnMetadata> columns,
+                     @JsonProperty("properties") Map<String, String> properties) {
+    columns.forEach(this::addColumn);
+    setProperties(properties);
+  }
 
   public void bind(MapColumnMetadata parentMap) {
     this.parentMap = parentMap;
@@ -145,6 +169,7 @@ public class TupleSchema implements TupleMetadata {
     return cols;
   }
 
+  @JsonProperty("columns")
   @Override
   public List<ColumnMetadata> toMetadataList() {
     return new ArrayList<>(nameSpace.entries());
@@ -183,13 +208,6 @@ public class TupleSchema implements TupleMetadata {
   public boolean isRoot() { return parentMap == null; }
 
   @Override
-  public String schemaString() {
-    return nameSpace.entries().stream()
-      .map(ColumnMetadata::columnString)
-      .collect(Collectors.joining(", "));
-  }
-
-  @Override
   public String toString() {
     StringBuilder builder = new StringBuilder()
         .append("[")
@@ -200,7 +218,28 @@ public class TupleSchema implements TupleMetadata {
       .map(ColumnMetadata::toString)
       .collect(Collectors.joining(", ")));
 
+    if (!properties.isEmpty()) {
+      if (!nameSpace.entries().isEmpty()) {
+        builder.append(", ");
+      }
+      builder.append("properties: ").append(properties);
+    }
+
     builder.append("]");
     return builder.toString();
   }
+
+  @Override
+  public void setProperties(Map<String, String> properties) {
+    if (properties == null) {
+      return;
+    }
+    this.properties.putAll(properties);
+  }
+
+  @JsonProperty("properties")
+  @Override
+  public Map<String, String> properties() {
+    return properties;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/SchemaContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/SchemaContainer.java
index e705be2..8db8f8e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/SchemaContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/SchemaContainer.java
@@ -21,34 +21,29 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.TupleSchema;
 import org.apache.drill.exec.record.metadata.schema.parser.SchemaExprParser;
 
-import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 /**
- * Holder class that contains table name, schema definition
- * and properties passed in schema file or using table function.
+ * Holder class that contains table name, schema definition and current schema container version.
  */
 @JsonInclude(JsonInclude.Include.NON_DEFAULT)
 public class SchemaContainer {
 
   private final String table;
   private final TupleMetadata schema;
-  // preserve properties order
-  private final Map<String, String> properties = new LinkedHashMap<>();
   private final Version version;
 
   @JsonCreator
   public SchemaContainer(@JsonProperty("table") String table,
-                         @JsonProperty("schema") List<String> schema,
-                         @JsonProperty("properties") LinkedHashMap<String, String> properties,
+                         @JsonProperty("schema") TupleSchema schema,
                          @JsonProperty("version") Integer version) {
-    this(table, schema == null ? null : String.join(", ", schema), properties, version);
+    this.table = table;
+    this.schema = schema;
+    this.version = new Version(version);
   }
 
   public SchemaContainer(String table, String schema, Map<String, String> properties) {
@@ -57,10 +52,7 @@ public class SchemaContainer {
 
   public SchemaContainer(String table, String schema, Map<String, String> properties, Integer version) {
     this.table = table;
-    this.schema = schema == null ? null : convert(schema);
-    if (properties != null) {
-      this.properties.putAll(properties);
-    }
+    this.schema = schema == null ? null : convert(schema, properties);
     this.version = new Version(version);
   }
 
@@ -70,15 +62,8 @@ public class SchemaContainer {
   }
 
   @JsonProperty("schema")
-  public List<String> getSchemaList() {
-    return schema == null ? null : schema.toMetadataList().stream()
-      .map(ColumnMetadata::columnString)
-      .collect(Collectors.toList());
-  }
-
-  @JsonProperty("properties")
-  public Map<String, String> getProperties() {
-    return properties;
+  public TupleMetadata getSchema() {
+    return schema;
   }
 
   @JsonProperty("version")
@@ -87,23 +72,21 @@ public class SchemaContainer {
   }
 
   @JsonIgnore
-  public TupleMetadata getSchema() {
-    return schema;
-  }
-
-  @JsonIgnore
   public Version getVersion() {
     return version;
   }
 
-  private TupleMetadata convert(String schema) {
-    return SchemaExprParser.parseSchema(schema);
+  private TupleMetadata convert(String schemaString, Map<String, String> properties) {
+    TupleMetadata schema = SchemaExprParser.parseSchema(schemaString);
+    if (properties != null) {
+      schema.setProperties(properties);
+    }
+    return schema;
   }
 
   @Override
   public String toString() {
-    return "SchemaContainer{" + "table='" + table + '\'' + ", schema=" + schema +
-      ", properties=" + properties + ", version=" + version + '}';
+    return "SchemaContainer{" + "table='" + table + '\'' + ", schema=" + schema + ", version=" + version + '}';
   }
 
   /**
@@ -114,6 +97,7 @@ public class SchemaContainer {
   public static class Version {
 
     public static final int UNDEFINED_VERSION = -1;
+
     public static final int VERSION_1 = 1;
 
     // is used for testing
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/parser/SchemaExprParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/parser/SchemaExprParser.java
index 3cf3762..ea5071e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/parser/SchemaExprParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/parser/SchemaExprParser.java
@@ -23,6 +23,7 @@ import org.antlr.v4.runtime.CodePointCharStream;
 import org.antlr.v4.runtime.CommonTokenStream;
 import org.antlr.v4.runtime.RecognitionException;
 import org.antlr.v4.runtime.Recognizer;
+import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 
@@ -41,6 +42,21 @@ public class SchemaExprParser {
   }
 
   /**
+   * Parses given column name, type and mode into {@link ColumnMetadata} instance.
+   *
+   * @param name column name
+   * @param type column type
+   * @param mode column mode
+   * @return column metadata
+   */
+  public static ColumnMetadata parseColumn(String name, String type, TypeProtos.DataMode mode) {
+    return parseColumn(String.format("`%s` %s %s",
+      name.replaceAll("(\\\\)|(`)", "\\\\$0"),
+      type,
+      TypeProtos.DataMode.REQUIRED == mode ? "not null" : ""));
+  }
+
+  /**
    * Parses string definition of the column and converts it
    * into {@link ColumnMetadata} instance.
    *
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/parser/SchemaVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/parser/SchemaVisitor.java
index 7c7663a..c49007b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/parser/SchemaVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/parser/SchemaVisitor.java
@@ -27,8 +27,12 @@ import org.apache.drill.exec.record.metadata.MetadataUtils;
 import org.apache.drill.exec.record.metadata.RepeatedListBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.record.metadata.TupleSchema;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * Visits schema and stores metadata about its columns into {@link TupleMetadata} class.
@@ -43,14 +47,42 @@ public class SchemaVisitor extends SchemaParserBaseVisitor<TupleMetadata> {
   @Override
   public TupleMetadata visitColumns(SchemaParser.ColumnsContext ctx) {
     TupleMetadata schema = new TupleSchema();
-    ColumnVisitor columnVisitor = new ColumnVisitor();
-    ctx.column().forEach(
-      c -> schema.addColumn(c.accept(columnVisitor))
+    ColumnDefVisitor columnDefVisitor = new ColumnDefVisitor();
+    ctx.column_def().forEach(
+      columnDef -> schema.addColumn(columnDef.accept(columnDefVisitor))
     );
     return schema;
   }
 
   /**
+   * Visits column definition, adds column properties to {@link ColumnMetadata} if present.
+   */
+  public static class ColumnDefVisitor extends SchemaParserBaseVisitor<ColumnMetadata> {
+
+    @Override
+    public ColumnMetadata visitColumn_def(SchemaParser.Column_defContext ctx) {
+      ColumnVisitor columnVisitor = new ColumnVisitor();
+      ColumnMetadata columnMetadata = ctx.column().accept(columnVisitor);
+      if (ctx.property_values() != null) {
+        StringValueVisitor stringValueVisitor = new StringValueVisitor();
+        Map<String, String> columnProperties = new LinkedHashMap<>();
+        ctx.property_values().property_pair().forEach(
+          pair -> {
+            List<String> pairValues = pair.string_value().stream()
+              .map(stringValueVisitor::visit)
+              .collect(Collectors.toList());
+            Preconditions.checkState(pairValues.size() == 2);
+            columnProperties.put(pairValues.get(0), pairValues.get(1));
+          }
+        );
+        columnMetadata.setProperties(columnProperties);
+      }
+      return columnMetadata;
+    }
+
+  }
+
+  /**
    * Visits various types of columns (primitive, map, array) and stores their metadata
    * into {@link ColumnMetadata} class.
    */
@@ -60,7 +92,15 @@ public class SchemaVisitor extends SchemaParserBaseVisitor<TupleMetadata> {
     public ColumnMetadata visitPrimitive_column(SchemaParser.Primitive_columnContext ctx) {
       String name = ctx.column_id().accept(new IdVisitor());
       TypeProtos.DataMode mode = ctx.nullability() == null ? TypeProtos.DataMode.OPTIONAL : TypeProtos.DataMode.REQUIRED;
-      return ctx.simple_type().accept(new TypeVisitor(name, mode));
+      ColumnMetadata columnMetadata = ctx.simple_type().accept(new TypeVisitor(name, mode));
+      StringValueVisitor stringValueVisitor = new StringValueVisitor();
+      if (ctx.format_value() != null) {
+        columnMetadata.setFormatValue(stringValueVisitor.visit(ctx.format_value().string_value()));
+      }
+      if (ctx.default_value() != null) {
+        columnMetadata.setDefaultFromString(stringValueVisitor.visit(ctx.default_value().string_value()));
+      }
+      return columnMetadata;
     }
 
     @Override
@@ -88,6 +128,20 @@ public class SchemaVisitor extends SchemaParserBaseVisitor<TupleMetadata> {
   }
 
   /**
+   * Visits quoted string, strips backticks, single quotes or double quotes and returns bare string value.
+   */
+  private static class StringValueVisitor extends SchemaParserBaseVisitor<String> {
+
+    @Override
+    public String visitString_value(SchemaParser.String_valueContext ctx) {
+      String text = ctx.getText();
+      // first substring first and last symbols (backticks, single quotes, double quotes)
+      // then find all chars that are preceding with the backslash and remove the backslash
+      return text.substring(1, text.length() -1).replaceAll("\\\\(.)", "$1");
+    }
+  }
+
+  /**
    * Visits ID and QUOTED_ID, returning their string representation.
    */
   private static class IdVisitor extends SchemaParserBaseVisitor<String> {
@@ -225,8 +279,8 @@ public class SchemaVisitor extends SchemaParserBaseVisitor<TupleMetadata> {
     @Override
     public ColumnMetadata visitMap_type(SchemaParser.Map_typeContext ctx) {
       MapBuilder builder = new MapBuilder(null, name, mode);
-      ColumnVisitor visitor = new ColumnVisitor();
-      ctx.columns().column().forEach(
+      ColumnDefVisitor visitor = new ColumnDefVisitor();
+      ctx.columns().column_def().forEach(
         c -> builder.addColumn(c.accept(visitor))
       );
       return builder.buildColumn();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestSchemaCommands.java b/exec/java-exec/src/test/java/org/apache/drill/TestSchemaCommands.java
index 4b277ae..f4b1e69 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestSchemaCommands.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestSchemaCommands.java
@@ -39,6 +39,7 @@ import org.junit.rules.ExpectedException;
 import java.io.File;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.time.LocalDate;
 import java.util.Arrays;
 import java.util.LinkedHashMap;
 import java.util.Map;
@@ -273,7 +274,7 @@ public class TestSchemaCommands extends ClusterTest {
   }
 
   @Test
-  public void testCreateWithProperties() throws Exception {
+  public void testCreateWithSchemaProperties() throws Exception {
     File tmpDir = dirTestWatcher.getTmpDir();
     File schemaFile = new File(tmpDir, "schema_for_create_with_properties.schema");
     assertFalse(schemaFile.exists());
@@ -292,16 +293,16 @@ public class TestSchemaCommands extends ClusterTest {
       SchemaContainer schemaContainer = schemaProvider.read();
 
       assertNull(schemaContainer.getTable());
-      assertNotNull(schemaContainer.getSchema());
-      assertNotNull(schemaContainer.getProperties());
+      TupleMetadata schema = schemaContainer.getSchema();
+      assertNotNull(schema);
 
       Map<String, String> properties = new LinkedHashMap<>();
       properties.put("k1", "v1");
       properties.put("k2", "v2");
       properties.put("k3", "v3");
 
-      assertEquals(properties.size(), schemaContainer.getProperties().size());
-      assertEquals(properties, schemaContainer.getProperties());
+      assertEquals(properties.size(), schema.properties().size());
+      assertEquals(properties, schema.properties());
 
     } finally {
       if (schemaFile.exists()) {
@@ -311,7 +312,7 @@ public class TestSchemaCommands extends ClusterTest {
   }
 
   @Test
-  public void testCreateWithoutProperties() throws Exception {
+  public void testCreateWithoutSchemaProperties() throws Exception {
     File tmpDir = dirTestWatcher.getTmpDir();
     File schemaFile = new File(tmpDir, "schema_for_create_without_properties.schema");
     assertFalse(schemaFile.exists());
@@ -329,9 +330,64 @@ public class TestSchemaCommands extends ClusterTest {
       SchemaContainer schemaContainer = schemaProvider.read();
 
       assertNull(schemaContainer.getTable());
-      assertNotNull(schemaContainer.getSchema());
-      assertNotNull(schemaContainer.getProperties());
-      assertEquals(0, schemaContainer.getProperties().size());
+      TupleMetadata schema = schemaContainer.getSchema();
+      assertNotNull(schema);
+      assertNotNull(schema.properties());
+      assertEquals(0, schema.properties().size());
+    } finally {
+      if (schemaFile.exists()) {
+        assertTrue(schemaFile.delete());
+      }
+    }
+  }
+
+  @Test
+  public void testCreateWithVariousColumnProperties() throws Exception {
+    File tmpDir = dirTestWatcher.getTmpDir();
+    File schemaFile = new File(tmpDir, "schema_for_create_with__various_column_properties.schema");
+    assertFalse(schemaFile.exists());
+    try {
+      testBuilder()
+        .sqlQuery("create schema ( " +
+            "a int not null default '10', " +
+            "b date format 'yyyy-MM-dd' default '2017-01-31', " +
+            "c varchar properties {'k1' = 'v1', 'k2' = 'v2'}) " +
+            "path '%s'",
+          schemaFile.getPath())
+        .unOrdered()
+        .baselineColumns("ok", "summary")
+        .baselineValues(true, String.format("Created schema for [%s]", schemaFile.getPath()))
+        .go();
+
+      SchemaProvider schemaProvider = new PathSchemaProvider(new Path(schemaFile.getPath()));
+      assertTrue(schemaProvider.exists());
+
+      SchemaContainer schemaContainer = schemaProvider.read();
+
+      assertNull(schemaContainer.getTable());
+      TupleMetadata schema = schemaContainer.getSchema();
+      assertNotNull(schema);
+
+      assertEquals(3, schema.size());
+
+      ColumnMetadata a = schema.metadata("a");
+      assertTrue(a.defaultValue() instanceof Integer);
+      assertEquals(10, a.defaultValue());
+      assertEquals("10", a.defaultStringValue());
+
+      ColumnMetadata b = schema.metadata("b");
+      assertTrue(b.defaultValue() instanceof LocalDate);
+      assertEquals("yyyy-MM-dd", b.formatValue());
+      assertEquals(LocalDate.parse("2017-01-31"), b.defaultValue());
+      assertEquals("2017-01-31", b.defaultStringValue());
+
+      ColumnMetadata c = schema.metadata("c");
+      Map<String, String> properties = new LinkedHashMap<>();
+      properties.put("k1", "v1");
+      properties.put("k2", "v2");
+      assertEquals(properties, c.properties());
+
+      assertEquals(0, schema.properties().size());
     } finally {
       if (schemaFile.exists()) {
         assertTrue(schemaFile.delete());
@@ -382,8 +438,7 @@ public class TestSchemaCommands extends ClusterTest {
       assertEquals(TypeProtos.MinorType.INT, schema.metadata("i").type());
       assertEquals(TypeProtos.MinorType.VARCHAR, schema.metadata("v").type());
 
-      assertNotNull(schemaContainer.getProperties());
-      assertEquals(2, schemaContainer.getProperties().size());
+      assertEquals(2, schema.properties().size());
     } finally {
       if (rawSchema.exists()) {
         assertTrue(rawSchema.delete());
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/TestSchemaProvider.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/TestSchemaProvider.java
index 427754f..435ec0d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/TestSchemaProvider.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/TestSchemaProvider.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.record.metadata.schema;
 
 import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.store.StorageStrategy;
 import org.junit.Rule;
@@ -86,7 +87,7 @@ public class TestSchemaProvider {
     assertEquals(1, metadata.size());
     assertEquals(TypeProtos.MinorType.INT, metadata.metadata("i").type());
 
-    assertEquals(properties, schemaContainer.getProperties());
+    assertEquals(properties, metadata.properties());
 
     SchemaContainer.Version version = schemaContainer.getVersion();
     assertFalse(version.isUndefined());
@@ -134,15 +135,24 @@ public class TestSchemaProvider {
     provider.store("i int, v varchar(10)", properties, StorageStrategy.DEFAULT);
     assertTrue(provider.exists());
 
-    String expectedContent =
-        "{\n"
-      + "  \"schema\" : [\n"
-      + "    \"`i` INT\",\n"
-      + "    \"`v` VARCHAR(10)\"\n"
-      + "  ],\n"
-      + "  \"properties\" : {\n"
-      + "    \"k1\" : \"v1\",\n"
-      + "    \"k2\" : \"v2\"\n"
+    String expectedContent = "{\n"
+      + "  \"schema\" : {\n"
+      + "    \"columns\" : [\n"
+      + "      {\n"
+      + "        \"name\" : \"i\",\n"
+      + "        \"type\" : \"INT\",\n"
+      + "        \"mode\" : \"OPTIONAL\"\n"
+      + "      },\n"
+      + "      {\n"
+      + "        \"name\" : \"v\",\n"
+      + "        \"type\" : \"VARCHAR(10)\",\n"
+      + "        \"mode\" : \"OPTIONAL\"\n"
+      + "      }\n"
+      + "    ],\n"
+      + "    \"properties\" : {\n"
+      + "      \"k1\" : \"v1\",\n"
+      + "      \"k2\" : \"v2\"\n"
+      + "    }\n"
       + "  },\n"
       + "  \"version\" : 1\n"
       + "}";
@@ -166,19 +176,39 @@ public class TestSchemaProvider {
   @Test
   public void testPathProviderRead() throws Exception {
     Path schemaPath = folder.newFile("schema").toPath();
-    Files.write(schemaPath, Collections.singletonList(
-          "{  \n"
-        + "   \"table\":\"tbl\",\n"
-        + "   \"schema\":[  \n"
-        + "      \"`i` INT\",\n"
-        + "      \"`v` VARCHAR\"\n"
-        + "   ],\n"
-        + "   \"properties\" : {\n"
-        + "      \"k1\" : \"v1\",\n"
-        + "      \"k2\" : \"v2\"\n"
-        + "   }\n"
-        + "}\n"
-    ));
+    String schema = "{\n"
+      + "  \"table\" : \"tbl\",\n"
+      + "  \"schema\" : {\n"
+      + "    \"columns\" : [\n"
+      + "      {\n"
+      + "        \"name\" : \"i\",\n"
+      + "        \"type\" : \"INT\",\n"
+      + "        \"mode\" : \"REQUIRED\",\n"
+      + "        \"default\" : \"10\"\n"
+      + "      },\n"
+      + "      {\n"
+      + "        \"name\" : \"a\",\n"
+      + "        \"type\" : \"ARRAY<VARCHAR(10)>\",\n"
+      + "        \"mode\" : \"REPEATED\",\n"
+      + "        \"properties\" : {\n"
+      + "          \"ck1\" : \"cv1\",\n"
+      + "          \"ck2\" : \"cv2\"\n"
+      + "        }\n"
+      + "      },\n"
+      + "      {\n"
+      + "        \"name\" : \"t\",\n"
+      + "        \"type\" : \"DATE\",\n"
+      + "        \"mode\" : \"OPTIONAL\",\n"
+      + "        \"format\" : \"yyyy-mm-dd\"\n"
+      + "      }\n"
+      + "    ],\n"
+      + "    \"properties\" : {\n"
+      + "      \"sk1\" : \"sv1\",\n"
+      + "      \"sk2\" : \"sv2\"\n"
+      + "    }\n"
+      + "  }\n"
+      + "}";
+    Files.write(schemaPath, Collections.singletonList(schema));
     SchemaProvider provider = new PathSchemaProvider(new org.apache.hadoop.fs.Path(schemaPath.toUri().getPath()));
     assertTrue(provider.exists());
     SchemaContainer schemaContainer = provider.read();
@@ -187,14 +217,30 @@ public class TestSchemaProvider {
 
     TupleMetadata metadata = schemaContainer.getSchema();
     assertNotNull(metadata);
-    assertEquals(2, metadata.size());
-    assertEquals(TypeProtos.MinorType.INT, metadata.metadata("i").type());
-    assertEquals(TypeProtos.MinorType.VARCHAR, metadata.metadata("v").type());
-
-    Map<String, String> properties = new LinkedHashMap<>();
-    properties.put("k1", "v1");
-    properties.put("k2", "v2");
-    assertEquals(properties, schemaContainer.getProperties());
+    Map<String, String> schemaProperties = new LinkedHashMap<>();
+    schemaProperties.put("sk1", "sv1");
+    schemaProperties.put("sk2", "sv2");
+    assertEquals(schemaProperties, metadata.properties());
+
+    assertEquals(3, metadata.size());
+
+    ColumnMetadata i = metadata.metadata("i");
+    assertEquals(TypeProtos.MinorType.INT, i.type());
+    assertEquals(TypeProtos.DataMode.REQUIRED, i.mode());
+    assertEquals(10, i.defaultValue());
+
+    ColumnMetadata a = metadata.metadata("a");
+    assertEquals(TypeProtos.MinorType.VARCHAR, a.type());
+    assertEquals(TypeProtos.DataMode.REPEATED, a.mode());
+    Map<String, String> columnProperties = new LinkedHashMap<>();
+    columnProperties.put("ck1", "cv1");
+    columnProperties.put("ck2", "cv2");
+    assertEquals(columnProperties, a.properties());
+
+    ColumnMetadata t = metadata.metadata("t");
+    assertEquals(TypeProtos.MinorType.DATE, t.type());
+    assertEquals(TypeProtos.DataMode.OPTIONAL, t.mode());
+    assertEquals("yyyy-mm-dd", t.formatValue());
 
     assertTrue(schemaContainer.getVersion().isUndefined());
   }
@@ -213,16 +259,21 @@ public class TestSchemaProvider {
   @Test
   public void testPathProviderReadSchemaWithComments() throws Exception {
     Path schemaPath = folder.newFile("schema").toPath();
-    Files.write(schemaPath, Collections.singletonList(
-          "// my schema file start\n"
-        + "{  \n"
-        + "   \"schema\":[  // start columns list\n"
-        + "      \"`i` INT\"\n"
-        + "   ]\n"
-        + "}\n"
-        + "// schema file end\n"
-        + "/* multiline comment */"
-    ));
+    String schema =  "// my schema file start\n" +
+      "{\n"
+      + "  \"schema\" : {\n"
+      + "    \"columns\" : [ // start columns list\n"
+      + "      {\n"
+      + "        \"name\" : \"i\",\n"
+      + "        \"type\" : \"INT\",\n"
+      + "        \"mode\" : \"OPTIONAL\"\n"
+      + "      }\n"
+      + "    ]\n"
+      + "  }\n"
+      + "}"
+      + "// schema file end\n"
+      + "/* multiline comment */";
+    Files.write(schemaPath, Collections.singletonList(schema));
 
     SchemaProvider provider = new PathSchemaProvider(new org.apache.hadoop.fs.Path(schemaPath.toUri().getPath()));
     assertTrue(provider.exists());
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/parser/TestParserErrorHandling.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/parser/TestParserErrorHandling.java
index 58c979b..110efeb 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/parser/TestParserErrorHandling.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/parser/TestParserErrorHandling.java
@@ -30,7 +30,6 @@ public class TestParserErrorHandling {
   public void testUnsupportedType() {
     String schema = "col unk_type";
     thrown.expect(SchemaParsingException.class);
-    thrown.expectMessage("offending symbol [@1,4:11='unk_type',<38>,1:4]: no viable alternative at input");
     SchemaExprParser.parseSchema(schema);
   }
 
@@ -54,7 +53,6 @@ public class TestParserErrorHandling {
   public void testUnquotedId() {
     String schema = "id with space varchar";
     thrown.expect(SchemaParsingException.class);
-    thrown.expectMessage("offending symbol [@1,3:6='with',<38>,1:3]: no viable alternative at input");
     SchemaExprParser.parseSchema(schema);
   }
 
@@ -62,7 +60,6 @@ public class TestParserErrorHandling {
   public void testUnescapedBackTick() {
     String schema = "`c`o`l` varchar";
     thrown.expect(SchemaParsingException.class);
-    thrown.expectMessage("offending symbol [@1,3:3='o',<38>,1:3]: no viable alternative at input");
     SchemaExprParser.parseSchema(schema);
   }
 
@@ -78,7 +75,6 @@ public class TestParserErrorHandling {
   public void testMissingType() {
     String schema = "col not null";
     thrown.expect(SchemaParsingException.class);
-    thrown.expectMessage("offending symbol [@1,4:6='not',<34>,1:4]: no viable alternative at input");
     SchemaExprParser.parseSchema(schema);
   }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/parser/TestSchemaParser.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/parser/TestSchemaParser.java
index 1b9c06f..eaae0a5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/parser/TestSchemaParser.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/parser/TestSchemaParser.java
@@ -23,8 +23,10 @@ import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.junit.Test;
 
+import java.time.LocalDate;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -35,18 +37,21 @@ import static org.junit.Assert.assertTrue;
 public class TestSchemaParser {
 
   @Test
-  public void checkQuotedId() {
+  public void checkQuotedIdWithEscapes() {
     String schemaWithEscapes = "`a\\\\b\\`c` INT";
-    assertEquals(schemaWithEscapes, SchemaExprParser.parseSchema(schemaWithEscapes).schemaString());
+    assertEquals(schemaWithEscapes, SchemaExprParser.parseSchema(schemaWithEscapes).metadata(0).columnString());
 
     String schemaWithKeywords = "`INTEGER` INT";
-    assertEquals(schemaWithKeywords, SchemaExprParser.parseSchema(schemaWithKeywords).schemaString());
+    assertEquals(schemaWithKeywords, SchemaExprParser.parseSchema(schemaWithKeywords).metadata(0).columnString());
   }
 
   @Test
   public void testSchemaWithParen() {
-    String schema = "`a` INT NOT NULL, `b` VARCHAR(10)";
-    assertEquals(schema, SchemaExprParser.parseSchema(String.format("(%s)", schema)).schemaString());
+    String schemaWithParen = "(`a` INT NOT NULL, `b` VARCHAR(10))";
+    TupleMetadata schema = SchemaExprParser.parseSchema(schemaWithParen);
+    assertEquals(2, schema.size());
+    assertEquals("`a` INT NOT NULL", schema.metadata("a").columnString());
+    assertEquals("`b` VARCHAR(10)", schema.metadata("b").columnString());
   }
 
   @Test
@@ -54,13 +59,14 @@ public class TestSchemaParser {
     String schemaString = "id\n/*comment*/int\r,//comment\r\nname\nvarchar\t\t\t";
     TupleMetadata schema = SchemaExprParser.parseSchema(schemaString);
     assertEquals(2, schema.size());
-    assertEquals("`id` INT, `name` VARCHAR", schema.schemaString());
+    assertEquals("`id` INT", schema.metadata("id").columnString());
+    assertEquals("`name` VARCHAR", schema.metadata("name").columnString());
   }
 
   @Test
   public void testCaseInsensitivity() {
     String schema = "`Id` InTeGeR NoT NuLl";
-    assertEquals("`Id` INT NOT NULL", SchemaExprParser.parseSchema(schema).schemaString());
+    assertEquals("`Id` INT NOT NULL", SchemaExprParser.parseSchema(schema).metadata(0).columnString());
   }
 
   @Test
@@ -80,10 +86,7 @@ public class TestSchemaParser {
       .buildSchema();
 
     checkSchema("int_col int, integer_col integer not null, bigint_col bigint, " +
-        "float_col float not null, double_col double",
-      schema,
-      "`int_col` INT, `integer_col` INT NOT NULL, `bigint_col` BIGINT, " +
-        "`float_col` FLOAT NOT NULL, `double_col` DOUBLE");
+        "float_col float not null, double_col double", schema);
   }
 
   @Test
@@ -100,10 +103,8 @@ public class TestSchemaParser {
       "col numeric, col_p numeric(5) not null, col_ps numeric(10, 2)"
     );
 
-    String expectedSchema = "`col` DECIMAL, `col_p` DECIMAL(5) NOT NULL, `col_ps` DECIMAL(10, 2)";
-
     schemas.forEach(
-      s -> checkSchema(s, schema, expectedSchema)
+      s -> checkSchema(s, schema)
     );
   }
 
@@ -113,13 +114,12 @@ public class TestSchemaParser {
       .addNullable("col", TypeProtos.MinorType.BIT)
       .buildSchema();
 
-    checkSchema("col boolean", schema, "`col` BOOLEAN");
+    checkSchema("col boolean", schema);
   }
 
   @Test
   public void testCharacterTypes() {
     String schemaPattern = "col %1$s, col_p %1$s(50) not null";
-    String expectedSchema = "`col` %1$s, `col_p` %1$s(50) NOT NULL";
 
     Map<String, TypeProtos.MinorType> properties = new HashMap<>();
     properties.put("char", TypeProtos.MinorType.VARCHAR);
@@ -136,7 +136,7 @@ public class TestSchemaParser {
         .add("col_p", value, 50)
         .buildSchema();
 
-      checkSchema(String.format(schemaPattern, key), schema, String.format(expectedSchema, value.name()));
+      checkSchema(String.format(schemaPattern, key), schema);
     });
   }
 
@@ -151,10 +151,7 @@ public class TestSchemaParser {
       .buildSchema();
 
     checkSchema("time_col time, time_prec_col time(3), date_col date not null, " +
-        "timestamp_col timestamp, timestamp_prec_col timestamp(3)",
-      schema,
-      "`time_col` TIME, `time_prec_col` TIME(3), `date_col` DATE NOT NULL, " +
-        "`timestamp_col` TIMESTAMP, `timestamp_prec_col` TIMESTAMP(3)");
+        "timestamp_col timestamp, timestamp_prec_col timestamp(3)", schema);
   }
 
   @Test
@@ -171,11 +168,7 @@ public class TestSchemaParser {
 
     checkSchema("interval_year_col interval year, interval_month_col interval month, " +
         "interval_day_col interval day, interval_hour_col interval hour, interval_minute_col interval minute, " +
-        "interval_second_col interval second, interval_col interval",
-      schema,
-      "`interval_year_col` INTERVAL YEAR, `interval_month_col` INTERVAL YEAR, " +
-        "`interval_day_col` INTERVAL DAY, `interval_hour_col` INTERVAL DAY, `interval_minute_col` INTERVAL DAY, " +
-        "`interval_second_col` INTERVAL DAY, `interval_col` INTERVAL");
+        "interval_second_col interval second, interval_col interval", schema);
   }
 
   @Test
@@ -201,12 +194,7 @@ public class TestSchemaParser {
         + ", nested_array array<array<int>>"
         + ", map_array array<map<m1 int, m2 varchar>>"
         + ", nested_array_map array<array<map<nm1 int, nm2 varchar>>>",
-      schema,
-      "`simple_array` ARRAY<INT>"
-        + ", `nested_array` ARRAY<ARRAY<INT>>"
-        + ", `map_array` ARRAY<MAP<`m1` INT, `m2` VARCHAR>>"
-        + ", `nested_array_map` ARRAY<ARRAY<MAP<`nm1` INT, `nm2` VARCHAR>>>"
-    );
+      schema);
 
   }
 
@@ -223,9 +211,7 @@ public class TestSchemaParser {
       .resumeSchema()
       .buildSchema();
 
-    checkSchema("map_col map<int_col int, array_col array<int>, nested_map map<m1 int, m2 varchar>>",
-      schema,
-      "`map_col` MAP<`int_col` INT, `array_col` ARRAY<INT>, `nested_map` MAP<`m1` INT, `m2` VARCHAR>>");
+    checkSchema("map_col map<int_col int, array_col array<int>, nested_map map<m1 int, m2 varchar>>", schema);
   }
 
   @Test
@@ -266,14 +252,65 @@ public class TestSchemaParser {
     assertTrue(mapSchema.metadata("m2").isNullable());
   }
 
-  private void checkSchema(String schemaString, TupleMetadata expectedSchema, String expectedSchemaString) {
+  @Test
+  public void testFormat() {
+    String value = "`a` DATE NOT NULL FORMAT 'yyyy-MM-dd'";
+    TupleMetadata schema = SchemaExprParser.parseSchema(value);
+    ColumnMetadata columnMetadata = schema.metadata("a");
+    assertEquals("yyyy-MM-dd", columnMetadata.formatValue());
+    assertEquals(value, columnMetadata.columnString());
+  }
+
+  @Test
+  public void testDefault() {
+    String value = "`a` INT NOT NULL DEFAULT '12'";
+    TupleMetadata schema = SchemaExprParser.parseSchema(value);
+    ColumnMetadata columnMetadata = schema.metadata("a");
+    assertTrue(columnMetadata.defaultValue() instanceof Integer);
+    assertEquals(12, columnMetadata.defaultValue());
+    assertEquals("12", columnMetadata.defaultStringValue());
+    assertEquals(value, columnMetadata.columnString());
+  }
+
+  @Test
+  public void testFormatAndDefault() {
+    String value = "`a` DATE NOT NULL FORMAT 'yyyy-MM-dd' DEFAULT '2018-12-31'";
+    TupleMetadata schema = SchemaExprParser.parseSchema(value);
+    ColumnMetadata columnMetadata = schema.metadata("a");
+    assertTrue(columnMetadata.defaultValue() instanceof LocalDate);
+    assertEquals(LocalDate.of(2018, 12, 31), columnMetadata.defaultValue());
+    assertEquals("2018-12-31", columnMetadata.defaultStringValue());
+    assertEquals(value, columnMetadata.columnString());
+  }
+
+  @Test
+  public void testColumnProperties() {
+    String value = "`a` INT NOT NULL PROPERTIES { 'k1' = 'v1', 'k2' = 'v2' }";
+    TupleMetadata schema = SchemaExprParser.parseSchema(value);
+
+    ColumnMetadata columnMetadata = schema.metadata("a");
+
+    Map<String, String> properties = new LinkedHashMap<>();
+    properties.put("k1", "v1");
+    properties.put("k2", "v2");
+
+    assertEquals(properties, columnMetadata.properties());
+    assertEquals(value, columnMetadata.columnString());
+  }
+
+  private void checkSchema(String schemaString, TupleMetadata expectedSchema) {
     TupleMetadata actualSchema = SchemaExprParser.parseSchema(schemaString);
-    assertEquals(expectedSchema.schemaString(), actualSchema.schemaString());
-    assertEquals(expectedSchemaString, actualSchema.schemaString());
 
-    TupleMetadata unparsedSchema = SchemaExprParser.parseSchema(actualSchema.schemaString());
-    assertEquals(unparsedSchema.schemaString(), expectedSchema.schemaString());
-    assertEquals(expectedSchemaString, unparsedSchema.schemaString());
+    assertEquals(expectedSchema.size(), actualSchema.size());
+    assertEquals(expectedSchema.properties(), actualSchema.properties());
+
+    expectedSchema.toMetadataList().forEach(
+      expectedMetadata -> {
+        ColumnMetadata actualMetadata = actualSchema.metadata(expectedMetadata.name());
+        assertEquals(expectedMetadata.columnString(), actualMetadata.columnString());
+      }
+    );
+
   }
 
 }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ColumnMetadata.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ColumnMetadata.java
index 5540cb2..1cdb927 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ColumnMetadata.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ColumnMetadata.java
@@ -23,6 +23,8 @@ import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.vector.accessor.ColumnConversionFactory;
 
+import java.util.Map;
+
 /**
  * Metadata description of a column including names, types and structure
  * information.
@@ -182,6 +184,10 @@ public interface ColumnMetadata {
 
   int expectedElementCount();
 
+  void setFormatValue(String value);
+
+  String formatValue();
+
   /**
    * Set the default value to use for filling a vector when no real data is
    * available, such as for columns added in new files but which does not
@@ -201,6 +207,21 @@ public interface ColumnMetadata {
   Object defaultValue();
 
   /**
+   * Parses default value from String based on literal value into Object instance based on {@link MinorType} value.
+   * Sets default value to use for filling a vector when no real data is available.
+   *
+   * @param value the default value in String representation
+   */
+  void setDefaultFromString(String value);
+
+  /**
+   * Returns the default value for this column in String literal representation.
+   *
+   * @return the default value in String literal representation, or null if no default value has been set
+   */
+  String defaultStringValue();
+
+  /**
    * Set the factory for an optional shim writer that translates from the type of
    * data available to the code that creates the vectors on the one hand,
    * and the actual type of the column on the other. For example, a shim
@@ -232,6 +253,15 @@ public interface ColumnMetadata {
   ColumnMetadata cloneEmpty();
 
   /**
+   * Sets column properties if not null.
+   *
+   * @param properties column properties
+   */
+  void setProperties(Map<String, String> properties);
+
+  Map<String, String> properties();
+
+  /**
    * Reports whether, in this context, the column is projected outside
    * of the context. (That is, whether the column is backed by an actual
    * value vector.)
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/TupleMetadata.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/TupleMetadata.java
index 990faad..6bfe138 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/TupleMetadata.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/TupleMetadata.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.record.metadata;
 
 import java.util.List;
+import java.util.Map;
 
 import org.apache.drill.exec.record.MaterializedField;
 
@@ -95,10 +96,12 @@ public interface TupleMetadata extends Iterable<ColumnMetadata> {
   String fullName(int index);
 
   /**
-   * Converts schema metadata into string representation
-   * accepted by the table schema parser.
+   * Sets schema properties if not null.
    *
-   * @return schema metadata string representation
+   * @param properties schema properties
    */
-  String schemaString();
+  void setProperties(Map<String, String> properties);
+
+  Map<String, String> properties();
+
 }