You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by at...@apache.org on 2019/10/04 03:59:07 UTC

[samza] branch master updated: SAMZA-2320: Samza-sql: Refactor validation to cover more cases and make it more extensible. (#1153)

This is an automated email from the ASF dual-hosted git repository.

atoomula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c90ce6  SAMZA-2320: Samza-sql: Refactor validation to cover more cases and make it more extensible. (#1153)
6c90ce6 is described below

commit 6c90ce6ccb03edd91f51b0ee124c357e1a7c377f
Author: Aditya Toomula <at...@linkedin.com>
AuthorDate: Thu Oct 3 20:59:02 2019 -0700

    SAMZA-2320: Samza-sql: Refactor validation to cover more cases and make it more extensible. (#1153)
    
    * SAMZA-2320: Samza-sql: Refactor validation to cover more cases and make it more extensible.
---
 .../apache/samza/sql/avro/AvroTypeFactoryImpl.java |  71 ++++----
 .../apache/samza/sql/interfaces/SqlIOConfig.java   |   8 +-
 .../samza/sql/planner/SamzaSqlValidator.java       | 185 +++++++++++++--------
 .../samza/sql/runner/SamzaSqlApplication.java      |   1 -
 .../samza/sql/translator/FilterTranslator.java     |  19 ++-
 .../samza/sql/translator/ProjectTranslator.java    |  10 +-
 .../samza/sql/avro/TestAvroRelConversion.java      |   6 +-
 .../samza/sql/avro/schemas/ComplexRecord.avsc      |   2 +-
 .../samza/sql/avro/schemas/ComplexRecord.java      |   8 +-
 .../samza/sql/planner/TestSamzaSqlValidator.java   |  58 ++++++-
 .../samza/sql/system/TestAvroSystemFactory.java    |   2 +-
 .../samza/test/samzasql/TestSamzaSqlEndToEnd.java  |   8 +-
 12 files changed, 247 insertions(+), 131 deletions(-)

diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
index 6bf0c3c..153d96a 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
@@ -1,21 +1,21 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 
 package org.apache.samza.sql.avro;
 
@@ -31,6 +31,7 @@ import org.apache.samza.sql.schema.SqlSchemaBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * Factory that creates {@link SqlSchema} from the Avro Schema. This is used by the
  * {@link AvroRelConverter} to convert Avro schema to Samza Sql schema.
@@ -44,37 +45,47 @@ public class AvroTypeFactoryImpl extends SqlTypeFactoryImpl {
   }
 
   public SqlSchema createType(Schema schema) {
+    validateTopLevelAvroType(schema);
+    return convertSchema(schema.getFields(), true);
+  }
+
+  /**
+   * Given a schema field, determine if it is an optional field. There could be cases where a field
+   * is considered as optional even if it is marked as required in the schema. The producer could be filling in this
+   * field and hence need not be specified in the query and hence is optional. Typically, such fields are
+   * the top level fields in the schema.
+   * @param field schema field
+   * @param isTopLevelField if it is top level field in the schema
+   * @return if the field is optional
+   */
+  protected boolean isOptional(Schema.Field field, boolean isTopLevelField) {
+    return field.defaultValue() != null;
+  }
+
+  private void validateTopLevelAvroType(Schema schema) {
     Schema.Type type = schema.getType();
     if (type != Schema.Type.RECORD) {
       String msg =
-          String.format("System supports only RECORD as top level avro type, But the Schema's type is %s", type);
+          String.format("Samza Sql supports only RECORD as top level avro type, But the Schema's type is %s", type);
       LOG.error(msg);
       throw new SamzaException(msg);
     }
-
-    return convertSchema(schema.getFields());
   }
 
-  private SqlSchema convertSchema(List<Schema.Field> fields) {
-
+  private SqlSchema convertSchema(List<Schema.Field> fields, boolean isTopLevelField) {
     SqlSchemaBuilder schemaBuilder = SqlSchemaBuilder.builder();
     for (Schema.Field field : fields) {
-      boolean isOptional = (field.defaultValue() != null);
-      SqlFieldSchema fieldSchema = convertField(field.schema(), false, isOptional);
+      SqlFieldSchema fieldSchema = convertField(field.schema(), false, isOptional(field, isTopLevelField));
       schemaBuilder.addField(field.name(), fieldSchema);
     }
 
     return schemaBuilder.build();
   }
 
-  private SqlFieldSchema convertField(Schema fieldSchema) {
-    return convertField(fieldSchema, false, false);
-  }
-
   private SqlFieldSchema convertField(Schema fieldSchema, boolean isNullable, boolean isOptional) {
     switch (fieldSchema.getType()) {
       case ARRAY:
-        SqlFieldSchema elementSchema = convertField(fieldSchema.getElementType());
+        SqlFieldSchema elementSchema = convertField(fieldSchema.getElementType(), false, false);
         return SqlFieldSchema.createArraySchema(elementSchema, isNullable, isOptional);
       case BOOLEAN:
         return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.BOOLEAN, isNullable, isOptional);
@@ -98,7 +109,7 @@ public class AvroTypeFactoryImpl extends SqlTypeFactoryImpl {
       case LONG:
         return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.INT64, isNullable, isOptional);
       case RECORD:
-        SqlSchema rowSchema = convertSchema(fieldSchema.getFields());
+        SqlSchema rowSchema = convertSchema(fieldSchema.getFields(), false);
         return SqlFieldSchema.createRowFieldSchema(rowSchema, isNullable, isOptional);
       case MAP:
         // Can the value type be nullable and have default values ? Guess not!
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
index 0761f47..69d301d 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
@@ -81,13 +81,13 @@ public class SqlIOConfig {
     this.streamId = String.format("%s-%s", systemName, streamName);
 
     samzaRelConverterName = streamConfigs.get(CFG_SAMZA_REL_CONVERTER);
-    Validate.notEmpty(samzaRelConverterName,
-        String.format("%s is not set or empty for system %s", CFG_SAMZA_REL_CONVERTER, systemName));
+    Validate.notEmpty(samzaRelConverterName, String.format("System %s is unknown. %s is not set or empty for this"
+        + " system", systemName, CFG_SAMZA_REL_CONVERTER));
 
     if (isRemoteTable()) {
       samzaRelTableKeyConverterName = streamConfigs.get(CFG_SAMZA_REL_TABLE_KEY_CONVERTER);
-      Validate.notEmpty(samzaRelTableKeyConverterName,
-          String.format("%s is not set or empty for system %s", CFG_SAMZA_REL_CONVERTER, systemName));
+      Validate.notEmpty(samzaRelTableKeyConverterName, String.format("System %s is unknown. %s is not set or empty for"
+          + " this system", systemName, CFG_SAMZA_REL_CONVERTER));
     } else {
       samzaRelTableKeyConverterName = "";
     }
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
index 97b5de9..c64ec7d 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
@@ -1,21 +1,21 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 
 package org.apache.samza.sql.planner;
 
@@ -33,11 +33,14 @@ import org.apache.calcite.rel.type.RelDataTypeFactoryImpl;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rel.type.RelRecordType;
 import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.Validate;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.dsl.SamzaSqlDslConverter;
 import org.apache.samza.sql.interfaces.RelSchemaProvider;
+import org.apache.samza.sql.interfaces.SamzaRelConverter;
 import org.apache.samza.sql.interfaces.SamzaSqlJavaTypeFactoryImpl;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
 import org.apache.samza.sql.schema.SqlFieldSchema;
@@ -78,18 +81,42 @@ public class SamzaSqlValidator {
       try {
         relRoot = planner.plan(qinfo.getSelectQuery());
       } catch (SamzaException e) {
-        throw new SamzaSqlValidatorException("Calcite planning for sql failed.", e);
+        throw new SamzaSqlValidatorException(String.format("Validation failed for sql stmt:\n%s\n", sql), e);
       }
 
       // Now that we have logical plan, validate different aspects.
-      validate(relRoot, qinfo, sqlConfig);
+      String sink = qinfo.getSink();
+      validate(relRoot, sink, sqlConfig.getRelSchemaProviders().get(sink), sqlConfig.getSamzaRelConverters().get(sink));
     }
   }
 
-  protected void validate(RelRoot relRoot, SamzaSqlQueryParser.QueryInfo qinfo, SamzaSqlApplicationConfig sqlConfig)
-      throws SamzaSqlValidatorException {
-    // Validate select fields (including Udf return types) with output schema
-    validateOutput(relRoot, sqlConfig.getRelSchemaProviders().get(qinfo.getSink()));
+  /**
+   * Determine if validation needs to be done on Calcite plan based on the schema provider and schema converter.
+   * @param relRoot
+   * @param sink
+   * @param outputSchemaProvider
+   * @param ouputRelSchemaConverter
+   * @return if the validation needs to be skipped
+   */
+  protected boolean skipOutputValidation(RelRoot relRoot, String sink, RelSchemaProvider outputSchemaProvider,
+      SamzaRelConverter ouputRelSchemaConverter) {
+    return false;
+  }
+
+  // TODO: Remove this API. This API is introduced to take care of cases where RelSchemaProviders have a complex
+  // mechanism to determine if a given output field is optional. We will need system specific validators to take
+  // care of such cases and once that is introduced, we can get rid of the below API.
+  protected boolean isOptional(RelSchemaProvider outputRelSchemaProvider, String outputFieldName,
+      RelRecordType projectRecord) {
+    return false;
+  }
+
+  private void validate(RelRoot relRoot, String sink, RelSchemaProvider outputSchemaProvider,
+      SamzaRelConverter outputRelSchemaConverter) throws SamzaSqlValidatorException {
+    if (!skipOutputValidation(relRoot, sink, outputSchemaProvider, outputRelSchemaConverter)) {
+      // Validate select fields (including Udf return types) with output schema
+      validateOutput(relRoot, outputSchemaProvider);
+    }
 
     // TODO:
     //  1. SAMZA-2314: Validate Udf arguments.
@@ -97,22 +124,25 @@ public class SamzaSqlValidator {
     //     Eg: LogicalAggregate with sum function is not supported by Samza Sql.
   }
 
-  protected void validateOutput(RelRoot relRoot, RelSchemaProvider relSchemaProvider) throws SamzaSqlValidatorException {
-    RelRecordType outputRecord = (RelRecordType) QueryPlanner.getSourceRelSchema(relSchemaProvider,
+  private void validateOutput(RelRoot relRoot, RelSchemaProvider outputRelSchemaProvider)
+      throws SamzaSqlValidatorException {
+    LogicalProject project = (LogicalProject) relRoot.rel;
+
+    RelRecordType projetRecord = (RelRecordType) project.getRowType();
+    RelRecordType outputRecord = (RelRecordType) QueryPlanner.getSourceRelSchema(outputRelSchemaProvider,
         new RelSchemaConverter());
+
     // Get Samza Sql schema along with Calcite schema. The reason is that the Calcite schema does not have a way
-    // to represent optional fields while Samza Sql schema can represent optional fields. This is the only reason that
+    // to represent optional fields while Samza Sql schema can represent optional fields. This is the reason that
     // we use SqlSchema in validating output.
-    SqlSchema outputSqlSchema = QueryPlanner.getSourceSqlSchema(relSchemaProvider);
+    SqlSchema outputSqlSchema = QueryPlanner.getSourceSqlSchema(outputRelSchemaProvider);
 
-    LogicalProject project = (LogicalProject) relRoot.rel;
-    RelRecordType projetRecord = (RelRecordType) project.getRowType();
-
-    validateOutputRecords(outputRecord, outputSqlSchema, projetRecord);
+    validateOutputRecords(outputSqlSchema, outputRecord, projetRecord, outputRelSchemaProvider);
+    LOG.info("Samza Sql Validation finished successfully.");
   }
 
-  protected void validateOutputRecords(RelRecordType outputRecord, SqlSchema outputSqlSchema,
-      RelRecordType projectRecord)
+  private void validateOutputRecords(SqlSchema outputSqlSchema, RelRecordType outputRecord,
+      RelRecordType projectRecord, RelSchemaProvider outputRelSchemaProvider)
       throws SamzaSqlValidatorException {
     Map<String, RelDataType> outputRecordMap = outputRecord.getFieldList().stream().collect(
         Collectors.toMap(RelDataTypeField::getName, RelDataTypeField::getType));
@@ -121,6 +151,50 @@ public class SamzaSqlValidator {
     Map<String, RelDataType> projectRecordMap = projectRecord.getFieldList().stream().collect(
         Collectors.toMap(RelDataTypeField::getName, RelDataTypeField::getType));
 
+    // Ensure that all fields from sql statement exist in the output schema and are of the same type.
+    for (Map.Entry<String, RelDataType> entry : projectRecordMap.entrySet()) {
+      String projectedFieldName = entry.getKey();
+      RelDataType outputFieldType = outputRecordMap.get(projectedFieldName);
+      SqlFieldSchema outputSqlFieldSchema = outputFieldSchemaMap.get(projectedFieldName);
+
+      if (outputFieldType == null) {
+        // If the field names are specified more than once in the select query, calcite appends 'n' as suffix to the
+        // dup fields based on the order they are specified, where 'n' starts from 0 for the first dup field.
+        // Take the following example: SELECT id as str, secondaryId as str, tertiaryId as str FROM store.myTable
+        //   Calcite renames the projected fieldNames in select query as str, str0, str1 respectively.
+        // Samza Sql allows a field name to be specified up to 2 times. Do the validation accordingly.
+
+        // This type of pattern is typically followed when users want to just modify one field in the input table while
+        // keeping rest of the fields the same. Eg: SELECT myUdf(id) as id, * from store.myTable
+        if (projectedFieldName.endsWith("0")) {
+          projectedFieldName = StringUtils.chop(projectedFieldName);
+          outputFieldType = outputRecordMap.get(projectedFieldName);
+          outputSqlFieldSchema = outputFieldSchemaMap.get(projectedFieldName);
+        }
+
+        if (outputFieldType == null) {
+          // If a field in sql query is not found in the output schema, ignore if it is a Samza Sql special op.
+          // Otherwise, throw an error.
+          if (entry.getKey().equals(SamzaSqlRelMessage.OP_NAME)) {
+            continue;
+          }
+          String errMsg = String.format("Field '%s' in select query does not match any field in output schema.", entry.getKey());
+          LOG.error(errMsg);
+          throw new SamzaSqlValidatorException(errMsg);
+        }
+      }
+
+      Validate.notNull(outputFieldType);
+      Validate.notNull(outputSqlFieldSchema);
+
+      if (!compareFieldTypes(outputFieldType, outputSqlFieldSchema, entry.getValue(), outputRelSchemaProvider)) {
+        String errMsg = String.format("Field '%s' with type '%s' in select query does not match the field type '%s' in"
+            + " output schema.", entry.getKey(), entry.getValue(), outputFieldType);
+        LOG.error(errMsg);
+        throw new SamzaSqlValidatorException(errMsg);
+      }
+    }
+
     // Ensure that all non-optional fields in output schema are set in the sql query and are of the
     // same type.
     for (Map.Entry<String, RelDataType> entry : outputRecordMap.entrySet()) {
@@ -130,47 +204,24 @@ public class SamzaSqlValidator {
       if (projectFieldType == null) {
         // If an output schema field is not found in the sql query, ignore it if the field is optional.
         // Otherwise, throw an error.
-        if (outputSqlFieldSchema.isOptional()) {
+        if (outputSqlFieldSchema.isOptional() || isOptional(outputRelSchemaProvider, entry.getKey(), projectRecord)) {
           continue;
         }
-        String errMsg = String.format("Field '%s' in output schema does not match any projected fields.",
-            entry.getKey());
+        String errMsg = String.format("Non-optional field '%s' in output schema is missing in projected fields of "
+            + "select query.", entry.getKey());
         LOG.error(errMsg);
         throw new SamzaSqlValidatorException(errMsg);
-      } else if (!compareFieldTypes(entry.getValue(), outputSqlFieldSchema, projectFieldType)) {
+      } else if (!compareFieldTypes(entry.getValue(), outputSqlFieldSchema, projectFieldType, outputRelSchemaProvider)) {
         String errMsg = String.format("Field '%s' with type '%s' in output schema does not match the field type '%s' in"
             + " projected fields.", entry.getKey(), entry.getValue(), projectFieldType);
         LOG.error(errMsg);
         throw new SamzaSqlValidatorException(errMsg);
       }
     }
-
-    // Ensure that all fields from sql statement exist in the output schema and are of the same type.
-    for (Map.Entry<String, RelDataType> entry : projectRecordMap.entrySet()) {
-      RelDataType outputFieldType = outputRecordMap.get(entry.getKey());
-      SqlFieldSchema outputSqlFieldSchema = outputFieldSchemaMap.get(entry.getKey());
-
-      if (outputFieldType == null) {
-        // If a field in sql query is not found in the output schema, ignore if it is a Samza Sql special op.
-        // Otherwise, throw an error.
-        if (entry.getKey().equals(SamzaSqlRelMessage.OP_NAME)) {
-          continue;
-        }
-        String errMsg = String.format("Field '%s' in select query does not match any field in output schema.",
-            entry.getKey());
-        LOG.error(errMsg);
-        throw new SamzaSqlValidatorException(errMsg);
-      } else if (!compareFieldTypes(outputFieldType, outputSqlFieldSchema, entry.getValue())) {
-        String errMsg = String.format("Field '%s' with type '%s' in select query does not match the field type '%s' in"
-            + " output schema.", entry.getKey(), entry.getValue(), outputFieldType);
-        LOG.error(errMsg);
-        throw new SamzaSqlValidatorException(errMsg);
-      }
-    }
   }
 
-  protected boolean compareFieldTypes(RelDataType outputFieldType, SqlFieldSchema sqlFieldSchema,
-      RelDataType selectQueryFieldType) {
+  private boolean compareFieldTypes(RelDataType outputFieldType, SqlFieldSchema sqlFieldSchema,
+      RelDataType selectQueryFieldType, RelSchemaProvider outputRelSchemaProvider) {
     RelDataType projectFieldType;
 
     // JavaTypes are relevant for Udf argument and return types
@@ -206,15 +257,15 @@ public class SamzaSqlValidator {
         return projectSqlType == SqlTypeName.FLOAT;
       case ROW:
         try {
-          validateOutputRecords((RelRecordType) outputFieldType, sqlFieldSchema.getRowSchema(),
-              (RelRecordType) projectFieldType);
+          validateOutputRecords(sqlFieldSchema.getRowSchema(), (RelRecordType) outputFieldType,
+              (RelRecordType) projectFieldType, outputRelSchemaProvider);
         } catch (SamzaSqlValidatorException e) {
           LOG.error("A field in select query does not match with the output schema.", e);
           return false;
         }
         return true;
       default:
-          return false;
+        return false;
     }
   }
 
@@ -300,7 +351,7 @@ public class SamzaSqlValidator {
       // Ignore any formatting errors.
       LOG.error("Formatting error (Not the actual error. Look for the logs for actual error)", ex);
       return String.format("Failed with formatting exception (not the actual error) for the following sql"
-              + " statement:\n\"%s\"\n\n%s", query, e.getMessage());
+          + " statement:\n\"%s\"\n\n%s", query, e.getMessage());
     }
   }
 
@@ -315,4 +366,4 @@ public class SamzaSqlValidator {
     Arrays.fill(chars, ch);
     return new String(chars);
   }
-}
+}
\ No newline at end of file
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
index 4304b65..d13529f 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
@@ -28,7 +28,6 @@ import org.apache.calcite.rel.RelRoot;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.context.ApplicationContainerContext;
-import org.apache.samza.context.ApplicationTaskContext;
 import org.apache.samza.context.ApplicationTaskContextFactory;
 import org.apache.samza.context.ContainerContext;
 import org.apache.samza.context.ExternalContext;
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
index 345ac85..6515dc2 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
@@ -22,6 +22,7 @@ package org.apache.samza.sql.translator;
 import java.util.Arrays;
 import java.util.Collections;
 import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.samza.SamzaException;
 import org.apache.samza.context.ContainerContext;
 import org.apache.samza.context.Context;
 import org.apache.samza.metrics.Counter;
@@ -42,7 +43,7 @@ import org.slf4j.LoggerFactory;
  */
 class FilterTranslator {
 
-  private static final Logger log = LoggerFactory.getLogger(FilterTranslator.class);
+  private static final Logger LOG = LoggerFactory.getLogger(FilterTranslator.class);
   private final int queryId;
 
   FilterTranslator(int queryId) {
@@ -95,17 +96,23 @@ class FilterTranslator {
     public boolean apply(SamzaSqlRelMessage message) {
       long startProcessing = System.nanoTime();
       Object[] result = new Object[1];
-      expr.execute(translatorContext.getExecutionContext(), context, translatorContext.getDataContext(),
-          message.getSamzaSqlRelRecord().getFieldValues().toArray(), result);
-      if (result.length > 0 && result[0] instanceof Boolean) {
+      try {
+        expr.execute(translatorContext.getExecutionContext(), context, translatorContext.getDataContext(),
+            message.getSamzaSqlRelRecord().getFieldValues().toArray(), result);
+      } catch (Exception e) {
+        String errMsg = String.format("Handling the following rel message ran into an error. %s", message);
+        LOG.error(errMsg, e);
+        throw new SamzaException(errMsg, e);
+      }
+      if (result[0] instanceof Boolean) {
         boolean retVal = (Boolean) result[0];
-        log.debug(
+        LOG.debug(
             String.format("return value for input %s is %s",
                 Arrays.asList(message.getSamzaSqlRelRecord().getFieldValues()).toString(), retVal));
         updateMetrics(startProcessing, retVal, System.nanoTime());
         return retVal;
       } else {
-        log.error("return value is not boolean");
+        LOG.error("return value is not boolean for rel message: {}", message);
         return false;
       }
     }
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
index 6269d55..bf44815 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
@@ -114,8 +114,14 @@ class ProjectTranslator {
       long arrivalTime = System.nanoTime();
       RelDataType type = project.getRowType();
       Object[] output = new Object[type.getFieldCount()];
-      expr.execute(translatorContext.getExecutionContext(), context, translatorContext.getDataContext(),
-          message.getSamzaSqlRelRecord().getFieldValues().toArray(), output);
+      try {
+        expr.execute(translatorContext.getExecutionContext(), context, translatorContext.getDataContext(),
+            message.getSamzaSqlRelRecord().getFieldValues().toArray(), output);
+      } catch (Exception e) {
+        String errMsg = String.format("Handling the following rel message ran into an error. %s", message);
+        LOG.error(errMsg, e);
+        throw new SamzaException(errMsg, e);
+      }
       List<String> names = new ArrayList<>();
       for (int index = 0; index < output.length; index++) {
         names.add(index, project.getNamedProjects().get(index).getValue());
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
index b7b3dc3..102ad52 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
@@ -199,7 +199,7 @@ public class TestAvroRelConversion {
     record.put("id", id);
     record.put("bool_value", boolValue);
     record.put("double_value", doubleValue);
-    record.put("float_value", floatValue);
+    record.put("float_value0", floatValue);
     record.put("string_value", testStrValue);
     record.put("bytes_value", testBytes);
     record.put("fixed_value", fixedBytes);
@@ -212,7 +212,7 @@ public class TestAvroRelConversion {
     complexRecord.id = id;
     complexRecord.bool_value = boolValue;
     complexRecord.double_value = doubleValue;
-    complexRecord.float_value = floatValue;
+    complexRecord.float_value0 = floatValue;
     complexRecord.string_value = testStrValue;
     complexRecord.bytes_value = testBytes;
     complexRecord.fixed_value = fixedBytes;
@@ -352,7 +352,7 @@ public class TestAvroRelConversion {
     Assert.assertEquals(message.getSamzaSqlRelRecord().getField("bool_value").get(), boolValue);
     Assert.assertEquals(message.getSamzaSqlRelRecord().getField("double_value").get(), doubleValue);
     Assert.assertEquals(message.getSamzaSqlRelRecord().getField("string_value").get(), new Utf8(testStrValue));
-    Assert.assertEquals(message.getSamzaSqlRelRecord().getField("float_value").get(), floatValue);
+    Assert.assertEquals(message.getSamzaSqlRelRecord().getField("float_value0").get(), floatValue);
     Assert.assertEquals(message.getSamzaSqlRelRecord().getField("long_value").get(), longValue);
     if (unionValue instanceof String) {
       Assert.assertEquals(message.getSamzaSqlRelRecord().getField("union_value").get(), new Utf8((String) unionValue));
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc
index c307b10..e2e67e2 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc
@@ -40,7 +40,7 @@
             "default":null
         },
         {
-            "name": "float_value",
+            "name": "float_value0",
             "doc": "float Value.",
             "type": ["null", "float"],
             "default":null
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java
index 91a447f..d21fc0f 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java
@@ -26,7 +26,7 @@ package org.apache.samza.sql.avro.schemas;
 
 @SuppressWarnings("all")
 public class ComplexRecord extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
-  public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"ComplexRecord\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"fields\":[{\"name\":\"id\",\"type\":\"int\",\"doc\":\"Record id.\"},{\"name\":\"bool_value\",\"type\":[\"null\",\"boolean\"],\"doc\":\"Boolean Value.\"},{\"name\":\"double_value\",\"type\":[\"null\",\"double\"],\"doc\":\"double Value.\",\"default\":null},{\"name\":\"float_value\",\"type\":[\"null\",\"flo [...]
+  public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"ComplexRecord\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"fields\":[{\"name\":\"id\",\"type\":\"int\",\"doc\":\"Record id.\"},{\"name\":\"bool_value\",\"type\":[\"null\",\"boolean\"],\"doc\":\"Boolean Value.\"},{\"name\":\"double_value\",\"type\":[\"null\",\"double\"],\"doc\":\"double Value.\",\"default\":null},{\"name\":\"float_value0\",\"type\":[\"null\",\"fl [...]
   /** Record id. */
   public java.lang.Integer id;
   /** Boolean Value. */
@@ -34,7 +34,7 @@ public class ComplexRecord extends org.apache.avro.specific.SpecificRecordBase i
   /** double Value. */
   public java.lang.Double double_value;
   /** float Value. */
-  public java.lang.Float float_value;
+  public java.lang.Float float_value0;
   /** string Value. */
   public java.lang.CharSequence string_value;
   /** bytes Value. */
@@ -61,7 +61,7 @@ public class ComplexRecord extends org.apache.avro.specific.SpecificRecordBase i
     case 0: return id;
     case 1: return bool_value;
     case 2: return double_value;
-    case 3: return float_value;
+    case 3: return float_value0;
     case 4: return string_value;
     case 5: return bytes_value;
     case 6: return long_value;
@@ -82,7 +82,7 @@ public class ComplexRecord extends org.apache.avro.specific.SpecificRecordBase i
     case 0: id = (java.lang.Integer)value$; break;
     case 1: bool_value = (java.lang.Boolean)value$; break;
     case 2: double_value = (java.lang.Double)value$; break;
-    case 3: float_value = (java.lang.Float)value$; break;
+    case 3: float_value0 = (java.lang.Float)value$; break;
     case 4: string_value = (java.lang.CharSequence)value$; break;
     case 5: bytes_value = (java.nio.ByteBuffer)value$; break;
     case 6: long_value = (java.lang.Long)value$; break;
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/planner/TestSamzaSqlValidator.java b/samza-sql/src/test/java/org/apache/samza/sql/planner/TestSamzaSqlValidator.java
index 4c9522e..38a18a1 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/planner/TestSamzaSqlValidator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/planner/TestSamzaSqlValidator.java
@@ -59,11 +59,38 @@ public class TestSamzaSqlValidator {
     new SamzaSqlValidator(samzaConfig).validate(sqlStmts);
   }
 
+  // Samza Sql allows users to replace a field in the input stream. For eg: To always set bool_value to false
+  // while keeping the values of other fields the same, it could be written the below way.
+  // SELECT false AS bool_value, c.* FROM testavro.COMPLEX1 AS c
+  @Test
+  public void testRepeatedTwiceFieldsValidation() throws SamzaSqlValidatorException {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
+        "Insert into testavro.outputTopic select false as bool_value, c.* from testavro.COMPLEX1 as c");
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    new SamzaSqlValidator(samzaConfig).validate(sqlStmts);
+  }
+
+  // Samza Sql allows a field to be replaced only once and validation will fail if the field is replaced more than
+  // once. We disallow it to keep things simple.
   @Test (expected = SamzaSqlValidatorException.class)
-  public void testNonExistingOutputField() throws SamzaSqlValidatorException {
+  public void testRepeatedThriceFieldsValidation() throws SamzaSqlValidatorException {
     Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
-        "Insert into testavro.outputTopic(id) select id, name as strings_value"
+        "Insert into testavro.outputTopic select id, bool_value, true as bool_value, c.* from testavro.COMPLEX1 as c");
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    new SamzaSqlValidator(samzaConfig).validate(sqlStmts);
+  }
+
+  @Test (expected = SamzaSqlValidatorException.class)
+  public void testIllegitFieldEndingInZeroValidation() throws SamzaSqlValidatorException {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
+        "Insert into testavro.outputTopic select id, true as bool_value, false as non_existing_name0"
             + " from testavro.level1.level2.SIMPLE1 as s where s.id = 1");
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
 
@@ -71,13 +98,28 @@ public class TestSamzaSqlValidator {
     new SamzaSqlValidator(samzaConfig).validate(sqlStmts);
   }
 
-  @Test(expected = SamzaException.class)
-  public void testNonExistingSelectField() throws SamzaSqlValidatorException {
+  @Test
+  public void testLegitFieldEndingInZeroValidation() throws SamzaSqlValidatorException {
     Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
-        "Insert into testavro.outputTopic(id) select non_existing_field, name as string_value"
+        "Insert into testavro.outputTopic"
+            + " select id, bool_value, float_value0 from testavro.COMPLEX1");
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    new SamzaSqlValidator(samzaConfig).validate(sqlStmts);
+  }
+
+  @Test (expected = SamzaSqlValidatorException.class)
+  public void testNonExistingOutputField() throws SamzaSqlValidatorException {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
+        "Insert into testavro.outputTopic(id) select id, name as strings_value"
             + " from testavro.level1.level2.SIMPLE1 as s where s.id = 1");
-    SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    new SamzaSqlValidator(samzaConfig).validate(sqlStmts);
   }
 
   @Test(expected = SamzaSqlValidatorException.class)
@@ -160,7 +202,7 @@ public class TestSamzaSqlValidator {
     try {
       new SamzaSqlValidator(samzaConfig).validate(sqlStmts);
     } catch (SamzaSqlValidatorException e) {
-      Assert.assertTrue(e.getMessage().contains("Field 'bool_value' in output schema does not match any projected fields."));
+      Assert.assertTrue(e.getMessage().contains("Non-optional field 'bool_value' in output schema is missing"));
       return;
     }
 
@@ -181,7 +223,7 @@ public class TestSamzaSqlValidator {
     try {
       new SamzaSqlValidator(samzaConfig).validate(sqlStmts);
     } catch (SamzaSqlValidatorException e) {
-      Assert.assertTrue(e.getMessage().contains("Field 'id' in output schema does not match"));
+      Assert.assertTrue(e.getMessage().contains("Non-optional field 'id' in output schema is missing"));
       return;
     }
 
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
index 7d62b8f..65e0ad0 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
@@ -325,7 +325,7 @@ public class TestAvroSystemFactory implements SystemFactory {
       record.put("id", index);
       record.put("string_value", "Name" + index);
       record.put("bytes_value", ByteBuffer.wrap(("sample bytes").getBytes()));
-      record.put("float_value", index + 0.123456f);
+      record.put("float_value0", index + 0.123456f);
       record.put("double_value", index + 0.0123456789);
       MyFixed myFixedVar = new MyFixed();
       myFixedVar.bytes(DEFAULT_TRACKING_ID_BYTES);
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index 20820f6..581fbda 100644
--- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -427,8 +427,8 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
 
     LOG.info(" Class Path : " + RelOptUtil.class.getProtectionDomain().getCodeSource().getLocation().toURI().getPath());
     String sql1 =
-        "Insert into testavro.outputTopic(string_value, id, bool_value, bytes_value, fixed_value, float_value) "
-            + " select Flatten(array_values) as string_value, id, NOT(id = 5) as bool_value, bytes_value, fixed_value, float_value "
+        "Insert into testavro.outputTopic(string_value, id, bool_value, bytes_value, fixed_value, float_value0) "
+            + " select Flatten(array_values) as string_value, id, NOT(id = 5) as bool_value, bytes_value, fixed_value, float_value0 "
             + " from testavro.COMPLEX1";
     List<String> sqlStmts = Collections.singletonList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
@@ -458,7 +458,7 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
     String sql1 =
         "Insert into testavro.outputTopic"
             + " select bool_value, map_values['key0'] as string_value, union_value, array_values, map_values, id, bytes_value,"
-            + " fixed_value, float_value from testavro.COMPLEX1";
+            + " fixed_value, float_value0 from testavro.COMPLEX1";
     List<String> sqlStmts = Collections.singletonList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
 
@@ -479,7 +479,7 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
     String sql1 = "Insert into testavro.outputTopic"
-        + " select 'urn:li:member:' || cast(cast(float_value as int) as varchar) as string_value, id, float_value, "
+        + " select 'urn:li:member:' || cast(cast(float_value0 as int) as varchar) as string_value, id, float_value0, "
         + " double_value, true as bool_value from testavro.COMPLEX1";
     List<String> sqlStmts = Arrays.asList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));