You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by at...@apache.org on 2019/09/06 04:47:51 UTC

[samza] branch master updated: SAMZA-2316: Validate that all non-default value fields in output schema are set in the projected fields. (#1149)

This is an automated email from the ASF dual-hosted git repository.

atoomula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 713a8bf  SAMZA-2316: Validate that all non-default value fields in output schema are set in the projected fields. (#1149)
713a8bf is described below

commit 713a8bf68c2f4f81f7c575f66b1cf091ad81113c
Author: Aditya Toomula <at...@linkedin.com>
AuthorDate: Thu Sep 5 21:47:46 2019 -0700

    SAMZA-2316: Validate that all non-default value fields in output schema are set in the projected fields. (#1149)
    
    * Validate that all non-default value fields in output schema are set in the projected fields.
    * Added comments and renamed sqlFieldSchema member variables.
---
 .../apache/samza/sql/schema/SqlFieldSchema.java    | 57 ++++++++++++++++------
 .../samza/sql/client/impl/SamzaExecutor.java       |  6 +--
 .../apache/samza/sql/avro/AvroTypeFactoryImpl.java | 49 ++++++++++---------
 .../org/apache/samza/sql/planner/QueryPlanner.java | 16 +++---
 .../samza/sql/planner/RelSchemaConverter.java      | 22 ++++-----
 .../samza/sql/planner/SamzaSqlValidator.java       | 55 ++++++++++++++++++---
 .../samza/sql/avro/schemas/ComplexRecord.avsc      | 13 +++--
 .../samza/sql/avro/schemas/ComplexRecord.java      |  8 ++-
 .../samza/sql/planner/TestSamzaSqlValidator.java   | 45 +++++++++++++++--
 .../samza/test/samzasql/TestSamzaSqlEndToEnd.java  | 36 +++++++-------
 10 files changed, 213 insertions(+), 94 deletions(-)

diff --git a/samza-api/src/main/java/org/apache/samza/sql/schema/SqlFieldSchema.java b/samza-api/src/main/java/org/apache/samza/sql/schema/SqlFieldSchema.java
index b944011..d3cec05 100644
--- a/samza-api/src/main/java/org/apache/samza/sql/schema/SqlFieldSchema.java
+++ b/samza-api/src/main/java/org/apache/samza/sql/schema/SqlFieldSchema.java
@@ -24,44 +24,59 @@ package org.apache.samza.sql.schema;
  */
 public class SqlFieldSchema {
 
-  private SamzaSqlFieldType fieldType;
-  private SqlFieldSchema elementType;
-  private SqlFieldSchema valueType;
-  private SqlSchema rowSchema;
+  private final SamzaSqlFieldType fieldType;
+  private final SqlFieldSchema elementType;
+  private final SqlFieldSchema valueType;
+  private final SqlSchema rowSchema;
+  // A field is considered nullable when the field could have a null value. Please note that nullable field
+  // needs to be explicitly set while writing and is expected to be set while reading. A non-nullable field
+  // cannot have a null value.
+  private final Boolean isNullable;
+  // A field is considered optional when the field has a default value. Such a field need not be set while writing
+  // but is expected to be set while reading.
+  // Please note that nullable field is also optional field if a default value is set but the value for
+  // nullable non-optional field need to be explicitly set.
+  private final Boolean isOptional;
 
-  private SqlFieldSchema(SamzaSqlFieldType fieldType, SqlFieldSchema elementType, SqlFieldSchema valueType, SqlSchema rowSchema) {
+  private SqlFieldSchema(SamzaSqlFieldType fieldType, SqlFieldSchema elementType, SqlFieldSchema valueType,
+      SqlSchema rowSchema, boolean isNullable, boolean isOptional) {
     this.fieldType = fieldType;
     this.elementType = elementType;
     this.valueType = valueType;
     this.rowSchema = rowSchema;
+    this.isNullable = isNullable;
+    this.isOptional = isOptional;
   }
 
   /**
-   * Create a primitive fi
+   * Create a primitive field schema.
    * @param typeName
    * @return
    */
-  public static SqlFieldSchema createPrimitiveSchema(SamzaSqlFieldType typeName) {
-    return new SqlFieldSchema(typeName, null, null, null);
+  public static SqlFieldSchema createPrimitiveSchema(SamzaSqlFieldType typeName, boolean isNullable,
+      boolean isOptional) {
+    return new SqlFieldSchema(typeName, null, null, null, isNullable, isOptional);
   }
 
-  public static SqlFieldSchema createArraySchema(SqlFieldSchema elementType) {
-    return new SqlFieldSchema(SamzaSqlFieldType.ARRAY, elementType, null, null);
+  public static SqlFieldSchema createArraySchema(SqlFieldSchema elementType, boolean isNullable,
+      boolean isOptional) {
+    return new SqlFieldSchema(SamzaSqlFieldType.ARRAY, elementType, null, null, isNullable, isOptional);
   }
 
-  public static SqlFieldSchema createMapSchema(SqlFieldSchema valueType) {
-    return new SqlFieldSchema(SamzaSqlFieldType.MAP, null, valueType, null);
+  public static SqlFieldSchema createMapSchema(SqlFieldSchema valueType, boolean isNullable, boolean isOptional) {
+    return new SqlFieldSchema(SamzaSqlFieldType.MAP, null, valueType, null, isNullable, isOptional);
   }
 
-  public static SqlFieldSchema createRowFieldSchema(SqlSchema rowSchema) {
-    return new SqlFieldSchema(SamzaSqlFieldType.ROW, null, null, rowSchema);
+  public static SqlFieldSchema createRowFieldSchema(SqlSchema rowSchema, boolean isNullable, boolean isOptional) {
+    return new SqlFieldSchema(SamzaSqlFieldType.ROW, null, null, rowSchema, isNullable, isOptional);
   }
 
   /**
    * @return whether the field is a primitive field type or not.
    */
   public boolean isPrimitiveField() {
-    return fieldType != SamzaSqlFieldType.ARRAY && fieldType != SamzaSqlFieldType.MAP && fieldType != SamzaSqlFieldType.ROW;
+    return fieldType != SamzaSqlFieldType.ARRAY && fieldType != SamzaSqlFieldType.MAP &&
+        fieldType != SamzaSqlFieldType.ROW;
   }
 
   /**
@@ -93,5 +108,17 @@ public class SqlFieldSchema {
     return rowSchema;
   }
 
+  /**
+   * Get if the field type is nullable.
+   */
+  public boolean isNullable() {
+    return isNullable;
+  }
 
+  /**
+   * Get if the field type is optional.
+   */
+  public boolean isOptional() {
+    return isOptional;
+  }
 }
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
index db97516..5939adb 100755
--- a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
@@ -293,9 +293,9 @@ public class SamzaExecutor implements SqlExecutor {
      */
     List<SqlFunction> udfs = new ArrayList<>();
     udfs.add(new SamzaSqlUdfDisplayInfo("RegexMatch", "Matches the string to the regex",
-            Arrays.asList(SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.STRING),
-                SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.STRING)),
-        SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.BOOLEAN)));
+            Arrays.asList(SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.STRING, false, false),
+                SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.STRING, false, false)),
+        SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.BOOLEAN, false, false)));
 
     return udfs;
   }
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
index 68116b6..4c06938 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
@@ -23,7 +23,6 @@ import java.util.List;
 import org.apache.avro.Schema;
 import org.apache.calcite.rel.type.RelDataTypeSystem;
 import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
-import org.apache.commons.lang3.Validate;
 import org.apache.samza.SamzaException;
 import org.apache.samza.sql.schema.SamzaSqlFieldType;
 import org.apache.samza.sql.schema.SqlFieldSchema;
@@ -33,8 +32,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Factory that creates the Calcite relational types from the Avro Schema. This is used by the
- * AvroRelConverter to convert the Avro schema to calcite relational schema.
+ * Factory that creates {@link SqlSchema} from the Avro Schema. This is used by the
+ * {@link AvroRelConverter} to convert Avro schema to Samza Sql schema.
  */
 public class AvroTypeFactoryImpl extends SqlTypeFactoryImpl {
 
@@ -60,7 +59,8 @@ public class AvroTypeFactoryImpl extends SqlTypeFactoryImpl {
 
     SqlSchemaBuilder schemaBuilder = SqlSchemaBuilder.builder();
     for (Schema.Field field : fields) {
-      SqlFieldSchema fieldSchema = convertField(field.schema());
+      boolean isOptional = (field.defaultValue() != null);
+      SqlFieldSchema fieldSchema = convertField(field.schema(), false, isOptional);
       schemaBuilder.addField(field.name(), fieldSchema);
     }
 
@@ -68,36 +68,41 @@ public class AvroTypeFactoryImpl extends SqlTypeFactoryImpl {
   }
 
   private SqlFieldSchema convertField(Schema fieldSchema) {
+    return convertField(fieldSchema, false, false);
+  }
+
+  private SqlFieldSchema convertField(Schema fieldSchema, boolean isNullable, boolean isOptional) {
     switch (fieldSchema.getType()) {
       case ARRAY:
         SqlFieldSchema elementSchema = convertField(fieldSchema.getElementType());
-        return SqlFieldSchema.createArraySchema(elementSchema);
+        return SqlFieldSchema.createArraySchema(elementSchema, isNullable, isOptional);
       case BOOLEAN:
-        return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.BOOLEAN);
+        return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.BOOLEAN, isNullable, isOptional);
       case DOUBLE:
-        return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.DOUBLE);
+        return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.DOUBLE, isNullable, isOptional);
       case FLOAT:
-        return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.FLOAT);
+        return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.FLOAT, isNullable, isOptional);
       case ENUM:
-        return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.STRING);
+        return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.STRING, isNullable, isOptional);
       case UNION:
-        return getSqlTypeFromUnionTypes(fieldSchema.getTypes());
+        return getSqlTypeFromUnionTypes(fieldSchema.getTypes(), isNullable, isOptional);
       case FIXED:
-        return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.BYTES);
+        return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.BYTES, isNullable, isOptional);
       case STRING:
-        return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.STRING);
+        return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.STRING, isNullable, isOptional);
       case BYTES:
-        return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.BYTES);
+        return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.BYTES, isNullable, isOptional);
       case INT:
-        return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.INT32);
+        return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.INT32, isNullable, isOptional);
       case LONG:
-        return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.INT64);
+        return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.INT64, isNullable, isOptional);
       case RECORD:
         SqlSchema rowSchema = convertSchema(fieldSchema.getFields());
-        return SqlFieldSchema.createRowFieldSchema(rowSchema);
+        return SqlFieldSchema.createRowFieldSchema(rowSchema, isNullable, isOptional);
       case MAP:
-        SqlFieldSchema valueType = convertField(fieldSchema.getValueType());
-        return SqlFieldSchema.createMapSchema(valueType);
+        // Can the value type be nullable and have default values ? Guess not!
+        SqlFieldSchema valueType = convertField(fieldSchema.getValueType(), false, false);
+        return SqlFieldSchema.createMapSchema(valueType, isNullable, isOptional);
       default:
         String msg = String.format("Field Type %s is not supported", fieldSchema.getType());
         LOG.error(msg);
@@ -105,17 +110,17 @@ public class AvroTypeFactoryImpl extends SqlTypeFactoryImpl {
     }
   }
 
-  private SqlFieldSchema getSqlTypeFromUnionTypes(List<Schema> types) {
+  private SqlFieldSchema getSqlTypeFromUnionTypes(List<Schema> types, boolean isNullable, boolean isOptional) {
     // Typically a nullable field's schema is configured as an union of Null and a Type.
     // This is to check whether the Union is a Nullable field
     if (types.size() == 2) {
       if (types.get(0).getType() == Schema.Type.NULL) {
-        return convertField(types.get(1));
+        return convertField(types.get(1), true, isOptional);
       } else if ((types.get(1).getType() == Schema.Type.NULL)) {
-        return convertField(types.get(0));
+        return convertField(types.get(0), true, isOptional);
       }
     }
 
-    return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.ANY);
+    return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.ANY, isNullable, isOptional);
   }
 }
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
index bdf03f7..83ccea0 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
@@ -152,16 +152,15 @@ public class QueryPlanner {
     }
   }
 
-  public static RelDataType getSourceRelSchema(RelSchemaProvider relSchemaProvider,
-      RelSchemaConverter relSchemaConverter) {
-    // If the source part is the last one, then fetch the schema corresponding to the stream and register.
+  public static SqlSchema getSourceSqlSchema(RelSchemaProvider relSchemaProvider) {
     SqlSchema sqlSchema = relSchemaProvider.getSqlSchema();
 
     List<String> fieldNames = new ArrayList<>();
     List<SqlFieldSchema> fieldTypes = new ArrayList<>();
     if (!sqlSchema.containsField(SamzaSqlRelMessage.KEY_NAME)) {
       fieldNames.add(SamzaSqlRelMessage.KEY_NAME);
-      fieldTypes.add(SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.ANY));
+      // Key is a nullable and optional field. It is defaulted to null in SamzaSqlRelMessage.
+      fieldTypes.add(SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.ANY, true, true));
     }
 
     fieldNames.addAll(
@@ -169,8 +168,13 @@ public class QueryPlanner {
     fieldTypes.addAll(
         sqlSchema.getFields().stream().map(SqlSchema.SqlField::getFieldSchema).collect(Collectors.toList()));
 
-    SqlSchema newSchema = new SqlSchema(fieldNames, fieldTypes);
-    return relSchemaConverter.convertToRelSchema(newSchema);
+    return new SqlSchema(fieldNames, fieldTypes);
+  }
+
+  public static RelDataType getSourceRelSchema(RelSchemaProvider relSchemaProvider,
+      RelSchemaConverter relSchemaConverter) {
+    // If the source part is the last one, then fetch the schema corresponding to the stream and register.
+    return relSchemaConverter.convertToRelSchema(getSourceSqlSchema(relSchemaProvider));
   }
 
   private static Table createTableFromRelSchema(RelDataType relationalSchema) {
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/RelSchemaConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/RelSchemaConverter.java
index 6634f5a..dbd74b6 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/RelSchemaConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/RelSchemaConverter.java
@@ -27,6 +27,7 @@ import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
 import org.apache.calcite.rel.type.RelDataTypeSystem;
 import org.apache.calcite.rel.type.RelRecordType;
 import org.apache.calcite.sql.type.ArraySqlType;
+import org.apache.calcite.sql.type.MapSqlType;
 import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.samza.SamzaException;
@@ -73,30 +74,29 @@ public class RelSchemaConverter extends SqlTypeFactoryImpl {
     switch (fieldSchema.getFieldType()) {
       case ARRAY:
         RelDataType elementType = getRelDataType(fieldSchema.getElementSchema());
-        return new ArraySqlType(elementType, true);
+        return new ArraySqlType(elementType, fieldSchema.isNullable());
       case BOOLEAN:
-        return createTypeWithNullability(createSqlType(SqlTypeName.BOOLEAN), true);
+        return createTypeWithNullability(createSqlType(SqlTypeName.BOOLEAN), fieldSchema.isNullable());
       case DOUBLE:
-        return createTypeWithNullability(createSqlType(SqlTypeName.DOUBLE), true);
+        return createTypeWithNullability(createSqlType(SqlTypeName.DOUBLE), fieldSchema.isNullable());
       case FLOAT:
-        return createTypeWithNullability(createSqlType(SqlTypeName.FLOAT), true);
+        return createTypeWithNullability(createSqlType(SqlTypeName.FLOAT), fieldSchema.isNullable());
       case STRING:
-        return createTypeWithNullability(createSqlType(SqlTypeName.VARCHAR), true);
+        return createTypeWithNullability(createSqlType(SqlTypeName.VARCHAR), fieldSchema.isNullable());
       case BYTES:
-        return createTypeWithNullability(createSqlType(SqlTypeName.VARBINARY), true);
+        return createTypeWithNullability(createSqlType(SqlTypeName.VARBINARY), fieldSchema.isNullable());
       case INT16:
       case INT32:
-        return createTypeWithNullability(createSqlType(SqlTypeName.INTEGER), true);
+        return createTypeWithNullability(createSqlType(SqlTypeName.INTEGER), fieldSchema.isNullable());
       case INT64:
-        return createTypeWithNullability(createSqlType(SqlTypeName.BIGINT), true);
+        return createTypeWithNullability(createSqlType(SqlTypeName.BIGINT), fieldSchema.isNullable());
       case ROW:
       case ANY:
         // TODO Calcite execution engine doesn't support record type yet.
-        return createTypeWithNullability(createSqlType(SqlTypeName.ANY), true);
+        return createTypeWithNullability(createSqlType(SqlTypeName.ANY), fieldSchema.isNullable());
       case MAP:
         RelDataType valueType = getRelDataType(fieldSchema.getValueScehma());
-        return super.createMapType(createTypeWithNullability(createSqlType(SqlTypeName.VARCHAR), true),
-            createTypeWithNullability(valueType, true));
+        return new MapSqlType(createSqlType(SqlTypeName.VARCHAR), valueType, fieldSchema.isNullable());
       default:
         String msg = String.format("Field Type %s is not supported", fieldSchema.getFieldType());
         LOG.error(msg);
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
index 08d4497..9482a75 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
@@ -40,6 +40,8 @@ import org.apache.samza.sql.dsl.SamzaSqlDslConverter;
 import org.apache.samza.sql.interfaces.RelSchemaProvider;
 import org.apache.samza.sql.interfaces.SamzaSqlJavaTypeFactoryImpl;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.schema.SqlFieldSchema;
+import org.apache.samza.sql.schema.SqlSchema;
 import org.apache.samza.sql.util.SamzaSqlQueryParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -98,24 +100,59 @@ public class SamzaSqlValidator {
   protected void validateOutput(RelRoot relRoot, RelSchemaProvider relSchemaProvider) throws SamzaSqlValidatorException {
     RelRecordType outputRecord = (RelRecordType) QueryPlanner.getSourceRelSchema(relSchemaProvider,
         new RelSchemaConverter());
+    // Get Samza Sql schema along with Calcite schema. The reason is that the Calcite schema does not have a way
+    // to represent optional fields while Samza Sql schema can represent optional fields. This is the only reason that
+    // we use SqlSchema in validating output.
+    SqlSchema outputSqlSchema = QueryPlanner.getSourceSqlSchema(relSchemaProvider);
+
     LogicalProject project = (LogicalProject) relRoot.rel;
     RelRecordType projetRecord = (RelRecordType) project.getRowType();
-    validateOutputRecords(outputRecord, projetRecord);
+
+    validateOutputRecords(outputRecord, outputSqlSchema, projetRecord);
   }
 
-  protected void validateOutputRecords(RelRecordType outputRecord, RelRecordType projectRecord)
+  protected void validateOutputRecords(RelRecordType outputRecord, SqlSchema outputSqlSchema,
+      RelRecordType projectRecord)
       throws SamzaSqlValidatorException {
     Map<String, RelDataType> outputRecordMap = outputRecord.getFieldList().stream().collect(
         Collectors.toMap(RelDataTypeField::getName, RelDataTypeField::getType));
+    Map<String, SqlFieldSchema> outputFieldSchemaMap = outputSqlSchema.getFields().stream().collect(
+        Collectors.toMap(SqlSchema.SqlField::getFieldName, SqlSchema.SqlField::getFieldSchema));
     Map<String, RelDataType> projectRecordMap = projectRecord.getFieldList().stream().collect(
         Collectors.toMap(RelDataTypeField::getName, RelDataTypeField::getType));
 
-    // There could be default values for the output schema and hence fields in project schema could be a subset of
-    // fields in output schema.
-    // TODO: SAMZA-2316: Validate that all non-default value fields in output schema are set in the projected fields.
+    // Ensure that all non-optional fields in output schema are set in the sql query and are of the
+    // same type.
+    for (Map.Entry<String, RelDataType> entry : outputRecordMap.entrySet()) {
+      RelDataType projectFieldType = projectRecordMap.get(entry.getKey());
+      SqlFieldSchema outputSqlFieldSchema = outputFieldSchemaMap.get(entry.getKey());
+
+      if (projectFieldType == null) {
+        // If an output schema field is not found in the sql query, ignore it if the field is optional.
+        // Otherwise, throw an error.
+        if (outputSqlFieldSchema.isOptional()) {
+          continue;
+        }
+        String errMsg = String.format("Field '%s' in output schema does not match any projected fields.",
+            entry.getKey());
+        LOG.error(errMsg);
+        throw new SamzaSqlValidatorException(errMsg);
+      } else if (!compareFieldTypes(entry.getValue(), outputSqlFieldSchema, projectFieldType)) {
+        String errMsg = String.format("Field '%s' with type '%s' in output schema does not match the field type '%s' in"
+            + " projected fields.", entry.getKey(), entry.getValue(), projectFieldType);
+        LOG.error(errMsg);
+        throw new SamzaSqlValidatorException(errMsg);
+      }
+    }
+
+    // Ensure that all fields from sql statement exist in the output schema and are of the same type.
     for (Map.Entry<String, RelDataType> entry : projectRecordMap.entrySet()) {
       RelDataType outputFieldType = outputRecordMap.get(entry.getKey());
+      SqlFieldSchema outputSqlFieldSchema = outputFieldSchemaMap.get(entry.getKey());
+
       if (outputFieldType == null) {
+        // If a field in sql query is not found in the output schema, ignore if it is a Samza Sql special op.
+        // Otherwise, throw an error.
         if (entry.getKey().equals(SamzaSqlRelMessage.OP_NAME)) {
           continue;
         }
@@ -123,7 +160,7 @@ public class SamzaSqlValidator {
             entry.getKey());
         LOG.error(errMsg);
         throw new SamzaSqlValidatorException(errMsg);
-      } else if (!compareFieldTypes(outputFieldType, entry.getValue())) {
+      } else if (!compareFieldTypes(outputFieldType, outputSqlFieldSchema, entry.getValue())) {
         String errMsg = String.format("Field '%s' with type '%s' in select query does not match the field type '%s' in"
             + " output schema.", entry.getKey(), entry.getValue(), outputFieldType);
         LOG.error(errMsg);
@@ -132,7 +169,8 @@ public class SamzaSqlValidator {
     }
   }
 
-  protected boolean compareFieldTypes(RelDataType outputFieldType, RelDataType selectQueryFieldType) {
+  protected boolean compareFieldTypes(RelDataType outputFieldType, SqlFieldSchema sqlFieldSchema,
+      RelDataType selectQueryFieldType) {
     RelDataType projectFieldType;
 
     // JavaTypes are relevant for Udf argument and return types
@@ -168,7 +206,8 @@ public class SamzaSqlValidator {
         return projectSqlType == SqlTypeName.FLOAT;
       case ROW:
         try {
-          validateOutputRecords((RelRecordType) outputFieldType, (RelRecordType) projectFieldType);
+          validateOutputRecords((RelRecordType) outputFieldType, sqlFieldSchema.getRowSchema(),
+              (RelRecordType) projectFieldType);
         } catch (SamzaSqlValidatorException e) {
           LOG.error("A field in select query does not match with the output schema.", e);
           return false;
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc
index 5e78bd9..c307b10 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc
@@ -26,14 +26,12 @@
         {
             "name": "id",
             "doc": "Record id.",
-            "type": ["null", "int"],
-            "default":null
+            "type": "int"
         },
         {
             "name": "bool_value",
             "doc": "Boolean Value.",
-            "type": ["null", "boolean"],
-            "default":null
+            "type": ["null", "boolean"]
         },
         {
             "name": "double_value",
@@ -72,8 +70,8 @@
                "name": "MyFixed",
                "type":"fixed",
                "size":16
-            }
-          ]
+            }],
+            "default":null
         },
         {
             "name": "array_values",
@@ -120,7 +118,8 @@
             "doc" : "",
             "fields" : [ ]
             }
-          ]
+          ],
+          "default":null
         },
         {
             "name": "array_records",
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java
index 7796004..91a447f 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java
@@ -16,11 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
 package org.apache.samza.sql.avro.schemas;
 
 @SuppressWarnings("all")
 public class ComplexRecord extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
-  public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"ComplexRecord\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"Record id.\",\"default\":null},{\"name\":\"bool_value\",\"type\":[\"null\",\"boolean\"],\"doc\":\"Boolean Value.\",\"default\":null},{\"name\":\"double_value\",\"type\":[\"null\",\"double\"],\"doc\":\"double Value.\",\"default\":null},{\"na [...]
+  public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"ComplexRecord\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"fields\":[{\"name\":\"id\",\"type\":\"int\",\"doc\":\"Record id.\"},{\"name\":\"bool_value\",\"type\":[\"null\",\"boolean\"],\"doc\":\"Boolean Value.\"},{\"name\":\"double_value\",\"type\":[\"null\",\"double\"],\"doc\":\"double Value.\",\"default\":null},{\"name\":\"float_value\",\"type\":[\"null\",\"flo [...]
   /** Record id. */
   public java.lang.Integer id;
   /** Boolean Value. */
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/planner/TestSamzaSqlValidator.java b/samza-sql/src/test/java/org/apache/samza/sql/planner/TestSamzaSqlValidator.java
index b2ce6f6..4c9522e 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/planner/TestSamzaSqlValidator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/planner/TestSamzaSqlValidator.java
@@ -22,14 +22,11 @@ package org.apache.samza.sql.planner;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.apache.calcite.rel.RelRoot;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
-import org.apache.samza.sql.dsl.SamzaSqlDslConverter;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
 import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
-import org.apache.samza.sql.util.SamzaSqlQueryParser;
 import org.apache.samza.sql.util.SamzaSqlTestConfig;
 import org.junit.Assert;
 import org.junit.Before;
@@ -54,7 +51,7 @@ public class TestSamzaSqlValidator {
   public void testBasicValidation() throws SamzaSqlValidatorException {
     Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
-        "Insert into testavro.outputTopic(id) select id, name as string_value"
+        "Insert into testavro.outputTopic select id, true as bool_value, name as string_value"
             + " from testavro.level1.level2.SIMPLE1 as s where s.id = 1");
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
 
@@ -152,6 +149,46 @@ public class TestSamzaSqlValidator {
   }
 
   @Test
+  public void testNonDefaultButNullableField() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
+    // bool_value is missing
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
+        "Insert into testavro.outputTopic(id) select Flatten(a) as id from (select MyTestArray(id) a from testavro.SIMPLE1)");
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    try {
+      new SamzaSqlValidator(samzaConfig).validate(sqlStmts);
+    } catch (SamzaSqlValidatorException e) {
+      Assert.assertTrue(e.getMessage().contains("Field 'bool_value' in output schema does not match any projected fields."));
+      return;
+    }
+
+    Assert.fail("Validation test has failed.");
+  }
+
+  @Test
+  public void testNonDefaultOutputField() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+    // id is non-default field.
+    String sql = "Insert into testavro.outputTopic "
+        + " select NOT(id = 5) as bool_value, CASE WHEN id IN (5, 6, 7) THEN CAST('foo' AS VARCHAR) WHEN id < 5 THEN CAST('bars' AS VARCHAR) ELSE NULL END as string_value from testavro.SIMPLE1";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+
+    try {
+      new SamzaSqlValidator(samzaConfig).validate(sqlStmts);
+    } catch (SamzaSqlValidatorException e) {
+      Assert.assertTrue(e.getMessage().contains("Field 'id' in output schema does not match"));
+      return;
+    }
+
+    Assert.fail("Validation test has failed.");
+  }
+
+  @Test
   public void testFormatErrorString() {
     String sql =
         "select 'SampleJob' as jobName, pv.pageKey, count(*) as `count`\n"
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index d81cb3c..dec886e 100644
--- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -289,8 +289,8 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
 
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
-    String sql1 = "Insert into testavro.outputTopic(id, long_value) "
-        + " select id, TIMESTAMPDIFF(HOUR, CURRENT_TIMESTAMP(), LOCALTIMESTAMP()) + MONTH(CURRENT_DATE()) as long_value from testavro.SIMPLE1";
+    String sql1 = "Insert into testavro.outputTopic(id, bool_value, long_value) "
+        + " select id, NOT(id = 5) as bool_value, TIMESTAMPDIFF(HOUR, CURRENT_TIMESTAMP(), LOCALTIMESTAMP()) + MONTH(CURRENT_DATE()) as long_value from testavro.SIMPLE1";
     List<String> sqlStmts = Arrays.asList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
 
@@ -401,8 +401,8 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
 
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
-    String sql1 = "Insert into testavro.outputTopic(id, long_value) "
-        + " select id, name as string_value from testavro.SIMPLE1 where name like 'Name%'";
+    String sql1 = "Insert into testavro.outputTopic(id, bool_value, string_value) "
+        + " select id, NOT(id = 5) as bool_value, name as string_value from testavro.SIMPLE1 where name like 'Name%'";
     List<String> sqlStmts = Arrays.asList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
 
@@ -427,8 +427,8 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
 
     LOG.info(" Class Path : " + RelOptUtil.class.getProtectionDomain().getCodeSource().getLocation().toURI().getPath());
     String sql1 =
-        "Insert into testavro.outputTopic(string_value, id, bytes_value, fixed_value, float_value) "
-            + " select Flatten(array_values) as string_value, id, bytes_value, fixed_value, float_value "
+        "Insert into testavro.outputTopic(string_value, id, bool_value, bytes_value, fixed_value, float_value) "
+            + " select Flatten(array_values) as string_value, id, NOT(id = 5) as bool_value, bytes_value, fixed_value, float_value "
             + " from testavro.COMPLEX1";
     List<String> sqlStmts = Collections.singletonList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
@@ -457,7 +457,7 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
 
     String sql1 =
         "Insert into testavro.outputTopic"
-            + " select map_values['key0'] as string_value, union_value, array_values, map_values, id, bytes_value,"
+            + " select bool_value, map_values['key0'] as string_value, union_value, array_values, map_values, id, bytes_value,"
             + " fixed_value, float_value from testavro.COMPLEX1";
     List<String> sqlStmts = Collections.singletonList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
@@ -502,7 +502,8 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
     String sql1 =
-        "Insert into testavro.outputTopic(id) select Flatten(MyTestArray(id)) as id from testavro.SIMPLE1";
+        "Insert into testavro.outputTopic(id, bool_value) select Flatten(MyTestArray(id)) as id, NOT(id = 5) as bool_value"
+            + " from testavro.SIMPLE1";
     List<String> sqlStmts = Collections.singletonList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
 
@@ -527,7 +528,8 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
     String sql1 =
-        "Insert into testavro.outputTopic(id) select Flatten(a) as id from (select MyTestArray(id) a from testavro.SIMPLE1)";
+        "Insert into testavro.outputTopic(id, bool_value) select Flatten(a) as id, true as bool_value"
+            + " from (select MyTestArray(id) a from testavro.SIMPLE1)";
     List<String> sqlStmts = Collections.singletonList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
 
@@ -551,8 +553,8 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
     int numMessages = 20;
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
-    String sql1 = "Insert into testavro.outputTopic(id, long_value) "
-        + "select id, MyTest(MyTestObj(id)) as long_value from testavro.SIMPLE1";
+    String sql1 = "Insert into testavro.outputTopic(id, bool_value, long_value) "
+        + "select id, NOT(id = 5) as bool_value, MyTest(MyTestObj(id)) as long_value from testavro.SIMPLE1";
     List<String> sqlStmts = Collections.singletonList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
 
@@ -575,8 +577,8 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
     int numMessages = 20;
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
-    String sql1 = "Insert into testavro.outputTopic(id, long_value) "
-        + "select id, MYTest(id) as long_value from testavro.SIMPLE1";
+    String sql1 = "Insert into testavro.outputTopic(id, bool_value, long_value) "
+        + "select id, NOT(id = 5) as bool_value, MYTest(id) as long_value from testavro.SIMPLE1";
     List<String> sqlStmts = Collections.singletonList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
 
@@ -623,8 +625,8 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
     int numMessages = 20;
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
-    String sql1 = "Insert into testavro.outputTopic(id, long_value) "
-        + "select MyTestPoly(id) as long_value, MyTestPoly(name) as id from testavro.SIMPLE1";
+    String sql1 = "Insert into testavro.outputTopic(id, bool_value, long_value) "
+        + "select MyTestPoly(id) as long_value, NOT(id = 5) as bool_value, MyTestPoly(name) as id from testavro.SIMPLE1";
     List<String> sqlStmts = Collections.singletonList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
 
@@ -652,8 +654,8 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
     String sql1 =
-        "Insert into testavro.outputTopic(id) "
-            + "select id "
+        "Insert into testavro.outputTopic(id, bool_value) "
+            + "select id, NOT(id = 5) as bool_value "
             + "from testavro.SIMPLE1 "
             + "where RegexMatch('.*4', name)";
     List<String> sqlStmts = Collections.singletonList(sql1);