You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2019/09/05 17:34:51 UTC

[GitHub] [samza] srinipunuru commented on a change in pull request #1149: SAMZA-2316: Validate that all non-default value fields in output schema are set in the projected fields.

srinipunuru commented on a change in pull request #1149: SAMZA-2316: Validate that all non-default value fields in output schema are set in the projected fields.
URL: https://github.com/apache/samza/pull/1149#discussion_r321390035
 
 

 ##########
 File path: samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
 ##########
 @@ -98,32 +101,63 @@ protected void validate(RelRoot relRoot, SamzaSqlQueryParser.QueryInfo qinfo, Sa
   protected void validateOutput(RelRoot relRoot, RelSchemaProvider relSchemaProvider) throws SamzaSqlValidatorException {
     RelRecordType outputRecord = (RelRecordType) QueryPlanner.getSourceRelSchema(relSchemaProvider,
         new RelSchemaConverter());
+    // Get Samza Sql schema along with Calcite schema. The reason is that the Calcite schema does not have a way
+    // to represent fields with default values while Samza Sql schema can represent default value fields. This is
+    // the only reason that we use SqlSchema in validating output.
+    SqlSchema outputSqlSchema = QueryPlanner.getSourceSqlSchema(relSchemaProvider);
+
     LogicalProject project = (LogicalProject) relRoot.rel;
     RelRecordType projetRecord = (RelRecordType) project.getRowType();
-    validateOutputRecords(outputRecord, projetRecord);
+
+    validateOutputRecords(outputRecord, outputSqlSchema, projetRecord);
   }
 
-  protected void validateOutputRecords(RelRecordType outputRecord, RelRecordType projectRecord)
+  protected void validateOutputRecords(RelRecordType outputRecord, SqlSchema outputSqlSchema,
+      RelRecordType projectRecord)
       throws SamzaSqlValidatorException {
     Map<String, RelDataType> outputRecordMap = outputRecord.getFieldList().stream().collect(
         Collectors.toMap(RelDataTypeField::getName, RelDataTypeField::getType));
+    Map<String, SqlFieldSchema> outputFieldSchemaMap = outputSqlSchema.getFields().stream().collect(
+        Collectors.toMap(SqlSchema.SqlField::getFieldName, SqlSchema.SqlField::getFieldSchema));
     Map<String, RelDataType> projectRecordMap = projectRecord.getFieldList().stream().collect(
         Collectors.toMap(RelDataTypeField::getName, RelDataTypeField::getType));
 
-    // There could be default values for the output schema and hence fields in project schema could be a subset of
-    // fields in output schema.
-    // TODO: SAMZA-2316: Validate that all non-default value fields in output schema are set in the projected fields.
+    // Ensure that all non-default value fields in output schema are set in the projected fields and are of the
+    // same type.
+    for (Map.Entry<String, RelDataType> entry : outputRecordMap.entrySet()) {
+      RelDataType projectFieldType = projectRecordMap.get(entry.getKey());
+      SqlFieldSchema outputSqlFieldSchema = outputFieldSchemaMap.get(entry.getKey());
+
+      if (projectFieldType == null) {
+        if (entry.getKey().equals(SamzaSqlRelMessage.KEY_NAME) || outputSqlFieldSchema.hasDefaultValue()) {
 
 Review comment:
   Can you add comments here on why we are special casing KEY_NAME field?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services