You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by at...@apache.org on 2019/10/04 03:59:07 UTC
[samza] branch master updated: SAMZA-2320: Samza-sql: Refactor
validation to cover more cases and make it more extensible. (#1153)
This is an automated email from the ASF dual-hosted git repository.
atoomula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 6c90ce6 SAMZA-2320: Samza-sql: Refactor validation to cover more cases and make it more extensible. (#1153)
6c90ce6 is described below
commit 6c90ce6ccb03edd91f51b0ee124c357e1a7c377f
Author: Aditya Toomula <at...@linkedin.com>
AuthorDate: Thu Oct 3 20:59:02 2019 -0700
SAMZA-2320: Samza-sql: Refactor validation to cover more cases and make it more extensible. (#1153)
* SAMZA-2320: Samza-sql: Refactor validation to cover more cases and make it more extensible.
---
.../apache/samza/sql/avro/AvroTypeFactoryImpl.java | 71 ++++----
.../apache/samza/sql/interfaces/SqlIOConfig.java | 8 +-
.../samza/sql/planner/SamzaSqlValidator.java | 185 +++++++++++++--------
.../samza/sql/runner/SamzaSqlApplication.java | 1 -
.../samza/sql/translator/FilterTranslator.java | 19 ++-
.../samza/sql/translator/ProjectTranslator.java | 10 +-
.../samza/sql/avro/TestAvroRelConversion.java | 6 +-
.../samza/sql/avro/schemas/ComplexRecord.avsc | 2 +-
.../samza/sql/avro/schemas/ComplexRecord.java | 8 +-
.../samza/sql/planner/TestSamzaSqlValidator.java | 58 ++++++-
.../samza/sql/system/TestAvroSystemFactory.java | 2 +-
.../samza/test/samzasql/TestSamzaSqlEndToEnd.java | 8 +-
12 files changed, 247 insertions(+), 131 deletions(-)
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
index 6bf0c3c..153d96a 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
@@ -1,21 +1,21 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.samza.sql.avro;
@@ -31,6 +31,7 @@ import org.apache.samza.sql.schema.SqlSchemaBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* Factory that creates {@link SqlSchema} from the Avro Schema. This is used by the
* {@link AvroRelConverter} to convert Avro schema to Samza Sql schema.
@@ -44,37 +45,47 @@ public class AvroTypeFactoryImpl extends SqlTypeFactoryImpl {
}
public SqlSchema createType(Schema schema) {
+ validateTopLevelAvroType(schema);
+ return convertSchema(schema.getFields(), true);
+ }
+
+ /**
+ * Given a schema field, determine if it is an optional field. There could be cases where a field
+ * is considered as optional even if it is marked as required in the schema. The producer could be filling in this
+ * field and hence need not be specified in the query and hence is optional. Typically, such fields are
+ * the top level fields in the schema.
+ * @param field schema field
+ * @param isTopLevelField if it is top level field in the schema
+ * @return if the field is optional
+ */
+ protected boolean isOptional(Schema.Field field, boolean isTopLevelField) {
+ return field.defaultValue() != null;
+ }
+
+ private void validateTopLevelAvroType(Schema schema) {
Schema.Type type = schema.getType();
if (type != Schema.Type.RECORD) {
String msg =
- String.format("System supports only RECORD as top level avro type, But the Schema's type is %s", type);
+ String.format("Samza Sql supports only RECORD as top level avro type, But the Schema's type is %s", type);
LOG.error(msg);
throw new SamzaException(msg);
}
-
- return convertSchema(schema.getFields());
}
- private SqlSchema convertSchema(List<Schema.Field> fields) {
-
+ private SqlSchema convertSchema(List<Schema.Field> fields, boolean isTopLevelField) {
SqlSchemaBuilder schemaBuilder = SqlSchemaBuilder.builder();
for (Schema.Field field : fields) {
- boolean isOptional = (field.defaultValue() != null);
- SqlFieldSchema fieldSchema = convertField(field.schema(), false, isOptional);
+ SqlFieldSchema fieldSchema = convertField(field.schema(), false, isOptional(field, isTopLevelField));
schemaBuilder.addField(field.name(), fieldSchema);
}
return schemaBuilder.build();
}
- private SqlFieldSchema convertField(Schema fieldSchema) {
- return convertField(fieldSchema, false, false);
- }
-
private SqlFieldSchema convertField(Schema fieldSchema, boolean isNullable, boolean isOptional) {
switch (fieldSchema.getType()) {
case ARRAY:
- SqlFieldSchema elementSchema = convertField(fieldSchema.getElementType());
+ SqlFieldSchema elementSchema = convertField(fieldSchema.getElementType(), false, false);
return SqlFieldSchema.createArraySchema(elementSchema, isNullable, isOptional);
case BOOLEAN:
return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.BOOLEAN, isNullable, isOptional);
@@ -98,7 +109,7 @@ public class AvroTypeFactoryImpl extends SqlTypeFactoryImpl {
case LONG:
return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.INT64, isNullable, isOptional);
case RECORD:
- SqlSchema rowSchema = convertSchema(fieldSchema.getFields());
+ SqlSchema rowSchema = convertSchema(fieldSchema.getFields(), false);
return SqlFieldSchema.createRowFieldSchema(rowSchema, isNullable, isOptional);
case MAP:
// Can the value type be nullable and have default values ? Guess not!
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
index 0761f47..69d301d 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
@@ -81,13 +81,13 @@ public class SqlIOConfig {
this.streamId = String.format("%s-%s", systemName, streamName);
samzaRelConverterName = streamConfigs.get(CFG_SAMZA_REL_CONVERTER);
- Validate.notEmpty(samzaRelConverterName,
- String.format("%s is not set or empty for system %s", CFG_SAMZA_REL_CONVERTER, systemName));
+ Validate.notEmpty(samzaRelConverterName, String.format("System %s is unknown. %s is not set or empty for this"
+ + " system", systemName, CFG_SAMZA_REL_CONVERTER));
if (isRemoteTable()) {
samzaRelTableKeyConverterName = streamConfigs.get(CFG_SAMZA_REL_TABLE_KEY_CONVERTER);
- Validate.notEmpty(samzaRelTableKeyConverterName,
- String.format("%s is not set or empty for system %s", CFG_SAMZA_REL_CONVERTER, systemName));
+ Validate.notEmpty(samzaRelTableKeyConverterName, String.format("System %s is unknown. %s is not set or empty for"
+ + " this system", systemName, CFG_SAMZA_REL_CONVERTER));
} else {
samzaRelTableKeyConverterName = "";
}
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
index 97b5de9..c64ec7d 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
@@ -1,21 +1,21 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.samza.sql.planner;
@@ -33,11 +33,14 @@ import org.apache.calcite.rel.type.RelDataTypeFactoryImpl;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelRecordType;
import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.Validate;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.dsl.SamzaSqlDslConverter;
import org.apache.samza.sql.interfaces.RelSchemaProvider;
+import org.apache.samza.sql.interfaces.SamzaRelConverter;
import org.apache.samza.sql.interfaces.SamzaSqlJavaTypeFactoryImpl;
import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
import org.apache.samza.sql.schema.SqlFieldSchema;
@@ -78,18 +81,42 @@ public class SamzaSqlValidator {
try {
relRoot = planner.plan(qinfo.getSelectQuery());
} catch (SamzaException e) {
- throw new SamzaSqlValidatorException("Calcite planning for sql failed.", e);
+ throw new SamzaSqlValidatorException(String.format("Validation failed for sql stmt:\n%s\n", sql), e);
}
// Now that we have logical plan, validate different aspects.
- validate(relRoot, qinfo, sqlConfig);
+ String sink = qinfo.getSink();
+ validate(relRoot, sink, sqlConfig.getRelSchemaProviders().get(sink), sqlConfig.getSamzaRelConverters().get(sink));
}
}
- protected void validate(RelRoot relRoot, SamzaSqlQueryParser.QueryInfo qinfo, SamzaSqlApplicationConfig sqlConfig)
- throws SamzaSqlValidatorException {
- // Validate select fields (including Udf return types) with output schema
- validateOutput(relRoot, sqlConfig.getRelSchemaProviders().get(qinfo.getSink()));
+ /**
+ * Determine if validation needs to be done on Calcite plan based on the schema provider and schema converter.
+ * @param relRoot
+ * @param sink
+ * @param outputSchemaProvider
+ * @param ouputRelSchemaConverter
+ * @return if the validation needs to be skipped
+ */
+ protected boolean skipOutputValidation(RelRoot relRoot, String sink, RelSchemaProvider outputSchemaProvider,
+ SamzaRelConverter ouputRelSchemaConverter) {
+ return false;
+ }
+
+ // TODO: Remove this API. This API is introduced to take care of cases where RelSchemaProviders have a complex
+ // mechanism to determine if a given output field is optional. We will need system specific validators to take
+ // care of such cases and once that is introduced, we can get rid of the below API.
+ protected boolean isOptional(RelSchemaProvider outputRelSchemaProvider, String outputFieldName,
+ RelRecordType projectRecord) {
+ return false;
+ }
+
+ private void validate(RelRoot relRoot, String sink, RelSchemaProvider outputSchemaProvider,
+ SamzaRelConverter outputRelSchemaConverter) throws SamzaSqlValidatorException {
+ if (!skipOutputValidation(relRoot, sink, outputSchemaProvider, outputRelSchemaConverter)) {
+ // Validate select fields (including Udf return types) with output schema
+ validateOutput(relRoot, outputSchemaProvider);
+ }
// TODO:
// 1. SAMZA-2314: Validate Udf arguments.
@@ -97,22 +124,25 @@ public class SamzaSqlValidator {
// Eg: LogicalAggregate with sum function is not supported by Samza Sql.
}
- protected void validateOutput(RelRoot relRoot, RelSchemaProvider relSchemaProvider) throws SamzaSqlValidatorException {
- RelRecordType outputRecord = (RelRecordType) QueryPlanner.getSourceRelSchema(relSchemaProvider,
+ private void validateOutput(RelRoot relRoot, RelSchemaProvider outputRelSchemaProvider)
+ throws SamzaSqlValidatorException {
+ LogicalProject project = (LogicalProject) relRoot.rel;
+
+ RelRecordType projetRecord = (RelRecordType) project.getRowType();
+ RelRecordType outputRecord = (RelRecordType) QueryPlanner.getSourceRelSchema(outputRelSchemaProvider,
new RelSchemaConverter());
+
// Get Samza Sql schema along with Calcite schema. The reason is that the Calcite schema does not have a way
- // to represent optional fields while Samza Sql schema can represent optional fields. This is the only reason that
+ // to represent optional fields while Samza Sql schema can represent optional fields. This is the reason that
// we use SqlSchema in validating output.
- SqlSchema outputSqlSchema = QueryPlanner.getSourceSqlSchema(relSchemaProvider);
+ SqlSchema outputSqlSchema = QueryPlanner.getSourceSqlSchema(outputRelSchemaProvider);
- LogicalProject project = (LogicalProject) relRoot.rel;
- RelRecordType projetRecord = (RelRecordType) project.getRowType();
-
- validateOutputRecords(outputRecord, outputSqlSchema, projetRecord);
+ validateOutputRecords(outputSqlSchema, outputRecord, projetRecord, outputRelSchemaProvider);
+ LOG.info("Samza Sql Validation finished successfully.");
}
- protected void validateOutputRecords(RelRecordType outputRecord, SqlSchema outputSqlSchema,
- RelRecordType projectRecord)
+ private void validateOutputRecords(SqlSchema outputSqlSchema, RelRecordType outputRecord,
+ RelRecordType projectRecord, RelSchemaProvider outputRelSchemaProvider)
throws SamzaSqlValidatorException {
Map<String, RelDataType> outputRecordMap = outputRecord.getFieldList().stream().collect(
Collectors.toMap(RelDataTypeField::getName, RelDataTypeField::getType));
@@ -121,6 +151,50 @@ public class SamzaSqlValidator {
Map<String, RelDataType> projectRecordMap = projectRecord.getFieldList().stream().collect(
Collectors.toMap(RelDataTypeField::getName, RelDataTypeField::getType));
+ // Ensure that all fields from sql statement exist in the output schema and are of the same type.
+ for (Map.Entry<String, RelDataType> entry : projectRecordMap.entrySet()) {
+ String projectedFieldName = entry.getKey();
+ RelDataType outputFieldType = outputRecordMap.get(projectedFieldName);
+ SqlFieldSchema outputSqlFieldSchema = outputFieldSchemaMap.get(projectedFieldName);
+
+ if (outputFieldType == null) {
+ // If the field names are specified more than once in the select query, calcite appends 'n' as suffix to the
+ // dup fields based on the order they are specified, where 'n' starts from 0 for the first dup field.
+ // Take the following example: SELECT id as str, secondaryId as str, tertiaryId as str FROM store.myTable
+ // Calcite renames the projected fieldNames in select query as str, str0, str1 respectively.
+ // Samza Sql allows a field name to be specified up to 2 times. Do the validation accordingly.
+
+ // This type of pattern is typically followed when users want to just modify one field in the input table while
+ // keeping rest of the fields the same. Eg: SELECT myUdf(id) as id, * from store.myTable
+ if (projectedFieldName.endsWith("0")) {
+ projectedFieldName = StringUtils.chop(projectedFieldName);
+ outputFieldType = outputRecordMap.get(projectedFieldName);
+ outputSqlFieldSchema = outputFieldSchemaMap.get(projectedFieldName);
+ }
+
+ if (outputFieldType == null) {
+ // If a field in sql query is not found in the output schema, ignore if it is a Samza Sql special op.
+ // Otherwise, throw an error.
+ if (entry.getKey().equals(SamzaSqlRelMessage.OP_NAME)) {
+ continue;
+ }
+ String errMsg = String.format("Field '%s' in select query does not match any field in output schema.", entry.getKey());
+ LOG.error(errMsg);
+ throw new SamzaSqlValidatorException(errMsg);
+ }
+ }
+
+ Validate.notNull(outputFieldType);
+ Validate.notNull(outputSqlFieldSchema);
+
+ if (!compareFieldTypes(outputFieldType, outputSqlFieldSchema, entry.getValue(), outputRelSchemaProvider)) {
+ String errMsg = String.format("Field '%s' with type '%s' in select query does not match the field type '%s' in"
+ + " output schema.", entry.getKey(), entry.getValue(), outputFieldType);
+ LOG.error(errMsg);
+ throw new SamzaSqlValidatorException(errMsg);
+ }
+ }
+
// Ensure that all non-optional fields in output schema are set in the sql query and are of the
// same type.
for (Map.Entry<String, RelDataType> entry : outputRecordMap.entrySet()) {
@@ -130,47 +204,24 @@ public class SamzaSqlValidator {
if (projectFieldType == null) {
// If an output schema field is not found in the sql query, ignore it if the field is optional.
// Otherwise, throw an error.
- if (outputSqlFieldSchema.isOptional()) {
+ if (outputSqlFieldSchema.isOptional() || isOptional(outputRelSchemaProvider, entry.getKey(), projectRecord)) {
continue;
}
- String errMsg = String.format("Field '%s' in output schema does not match any projected fields.",
- entry.getKey());
+ String errMsg = String.format("Non-optional field '%s' in output schema is missing in projected fields of "
+ + "select query.", entry.getKey());
LOG.error(errMsg);
throw new SamzaSqlValidatorException(errMsg);
- } else if (!compareFieldTypes(entry.getValue(), outputSqlFieldSchema, projectFieldType)) {
+ } else if (!compareFieldTypes(entry.getValue(), outputSqlFieldSchema, projectFieldType, outputRelSchemaProvider)) {
String errMsg = String.format("Field '%s' with type '%s' in output schema does not match the field type '%s' in"
+ " projected fields.", entry.getKey(), entry.getValue(), projectFieldType);
LOG.error(errMsg);
throw new SamzaSqlValidatorException(errMsg);
}
}
-
- // Ensure that all fields from sql statement exist in the output schema and are of the same type.
- for (Map.Entry<String, RelDataType> entry : projectRecordMap.entrySet()) {
- RelDataType outputFieldType = outputRecordMap.get(entry.getKey());
- SqlFieldSchema outputSqlFieldSchema = outputFieldSchemaMap.get(entry.getKey());
-
- if (outputFieldType == null) {
- // If a field in sql query is not found in the output schema, ignore if it is a Samza Sql special op.
- // Otherwise, throw an error.
- if (entry.getKey().equals(SamzaSqlRelMessage.OP_NAME)) {
- continue;
- }
- String errMsg = String.format("Field '%s' in select query does not match any field in output schema.",
- entry.getKey());
- LOG.error(errMsg);
- throw new SamzaSqlValidatorException(errMsg);
- } else if (!compareFieldTypes(outputFieldType, outputSqlFieldSchema, entry.getValue())) {
- String errMsg = String.format("Field '%s' with type '%s' in select query does not match the field type '%s' in"
- + " output schema.", entry.getKey(), entry.getValue(), outputFieldType);
- LOG.error(errMsg);
- throw new SamzaSqlValidatorException(errMsg);
- }
- }
}
- protected boolean compareFieldTypes(RelDataType outputFieldType, SqlFieldSchema sqlFieldSchema,
- RelDataType selectQueryFieldType) {
+ private boolean compareFieldTypes(RelDataType outputFieldType, SqlFieldSchema sqlFieldSchema,
+ RelDataType selectQueryFieldType, RelSchemaProvider outputRelSchemaProvider) {
RelDataType projectFieldType;
// JavaTypes are relevant for Udf argument and return types
@@ -206,15 +257,15 @@ public class SamzaSqlValidator {
return projectSqlType == SqlTypeName.FLOAT;
case ROW:
try {
- validateOutputRecords((RelRecordType) outputFieldType, sqlFieldSchema.getRowSchema(),
- (RelRecordType) projectFieldType);
+ validateOutputRecords(sqlFieldSchema.getRowSchema(), (RelRecordType) outputFieldType,
+ (RelRecordType) projectFieldType, outputRelSchemaProvider);
} catch (SamzaSqlValidatorException e) {
LOG.error("A field in select query does not match with the output schema.", e);
return false;
}
return true;
default:
- return false;
+ return false;
}
}
@@ -300,7 +351,7 @@ public class SamzaSqlValidator {
// Ignore any formatting errors.
LOG.error("Formatting error (Not the actual error. Look for the logs for actual error)", ex);
return String.format("Failed with formatting exception (not the actual error) for the following sql"
- + " statement:\n\"%s\"\n\n%s", query, e.getMessage());
+ + " statement:\n\"%s\"\n\n%s", query, e.getMessage());
}
}
@@ -315,4 +366,4 @@ public class SamzaSqlValidator {
Arrays.fill(chars, ch);
return new String(chars);
}
-}
+}
\ No newline at end of file
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
index 4304b65..d13529f 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
@@ -28,7 +28,6 @@ import org.apache.calcite.rel.RelRoot;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.context.ApplicationContainerContext;
-import org.apache.samza.context.ApplicationTaskContext;
import org.apache.samza.context.ApplicationTaskContextFactory;
import org.apache.samza.context.ContainerContext;
import org.apache.samza.context.ExternalContext;
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
index 345ac85..6515dc2 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
@@ -22,6 +22,7 @@ package org.apache.samza.sql.translator;
import java.util.Arrays;
import java.util.Collections;
import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.samza.SamzaException;
import org.apache.samza.context.ContainerContext;
import org.apache.samza.context.Context;
import org.apache.samza.metrics.Counter;
@@ -42,7 +43,7 @@ import org.slf4j.LoggerFactory;
*/
class FilterTranslator {
- private static final Logger log = LoggerFactory.getLogger(FilterTranslator.class);
+ private static final Logger LOG = LoggerFactory.getLogger(FilterTranslator.class);
private final int queryId;
FilterTranslator(int queryId) {
@@ -95,17 +96,23 @@ class FilterTranslator {
public boolean apply(SamzaSqlRelMessage message) {
long startProcessing = System.nanoTime();
Object[] result = new Object[1];
- expr.execute(translatorContext.getExecutionContext(), context, translatorContext.getDataContext(),
- message.getSamzaSqlRelRecord().getFieldValues().toArray(), result);
- if (result.length > 0 && result[0] instanceof Boolean) {
+ try {
+ expr.execute(translatorContext.getExecutionContext(), context, translatorContext.getDataContext(),
+ message.getSamzaSqlRelRecord().getFieldValues().toArray(), result);
+ } catch (Exception e) {
+ String errMsg = String.format("Handling the following rel message ran into an error. %s", message);
+ LOG.error(errMsg, e);
+ throw new SamzaException(errMsg, e);
+ }
+ if (result[0] instanceof Boolean) {
boolean retVal = (Boolean) result[0];
- log.debug(
+ LOG.debug(
String.format("return value for input %s is %s",
Arrays.asList(message.getSamzaSqlRelRecord().getFieldValues()).toString(), retVal));
updateMetrics(startProcessing, retVal, System.nanoTime());
return retVal;
} else {
- log.error("return value is not boolean");
+ LOG.error("return value is not boolean for rel message: {}", message);
return false;
}
}
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
index 6269d55..bf44815 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
@@ -114,8 +114,14 @@ class ProjectTranslator {
long arrivalTime = System.nanoTime();
RelDataType type = project.getRowType();
Object[] output = new Object[type.getFieldCount()];
- expr.execute(translatorContext.getExecutionContext(), context, translatorContext.getDataContext(),
- message.getSamzaSqlRelRecord().getFieldValues().toArray(), output);
+ try {
+ expr.execute(translatorContext.getExecutionContext(), context, translatorContext.getDataContext(),
+ message.getSamzaSqlRelRecord().getFieldValues().toArray(), output);
+ } catch (Exception e) {
+ String errMsg = String.format("Handling the following rel message ran into an error. %s", message);
+ LOG.error(errMsg, e);
+ throw new SamzaException(errMsg, e);
+ }
List<String> names = new ArrayList<>();
for (int index = 0; index < output.length; index++) {
names.add(index, project.getNamedProjects().get(index).getValue());
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
index b7b3dc3..102ad52 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
@@ -199,7 +199,7 @@ public class TestAvroRelConversion {
record.put("id", id);
record.put("bool_value", boolValue);
record.put("double_value", doubleValue);
- record.put("float_value", floatValue);
+ record.put("float_value0", floatValue);
record.put("string_value", testStrValue);
record.put("bytes_value", testBytes);
record.put("fixed_value", fixedBytes);
@@ -212,7 +212,7 @@ public class TestAvroRelConversion {
complexRecord.id = id;
complexRecord.bool_value = boolValue;
complexRecord.double_value = doubleValue;
- complexRecord.float_value = floatValue;
+ complexRecord.float_value0 = floatValue;
complexRecord.string_value = testStrValue;
complexRecord.bytes_value = testBytes;
complexRecord.fixed_value = fixedBytes;
@@ -352,7 +352,7 @@ public class TestAvroRelConversion {
Assert.assertEquals(message.getSamzaSqlRelRecord().getField("bool_value").get(), boolValue);
Assert.assertEquals(message.getSamzaSqlRelRecord().getField("double_value").get(), doubleValue);
Assert.assertEquals(message.getSamzaSqlRelRecord().getField("string_value").get(), new Utf8(testStrValue));
- Assert.assertEquals(message.getSamzaSqlRelRecord().getField("float_value").get(), floatValue);
+ Assert.assertEquals(message.getSamzaSqlRelRecord().getField("float_value0").get(), floatValue);
Assert.assertEquals(message.getSamzaSqlRelRecord().getField("long_value").get(), longValue);
if (unionValue instanceof String) {
Assert.assertEquals(message.getSamzaSqlRelRecord().getField("union_value").get(), new Utf8((String) unionValue));
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc
index c307b10..e2e67e2 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc
@@ -40,7 +40,7 @@
"default":null
},
{
- "name": "float_value",
+ "name": "float_value0",
"doc": "float Value.",
"type": ["null", "float"],
"default":null
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java
index 91a447f..d21fc0f 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java
@@ -26,7 +26,7 @@ package org.apache.samza.sql.avro.schemas;
@SuppressWarnings("all")
public class ComplexRecord extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
- public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"ComplexRecord\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"fields\":[{\"name\":\"id\",\"type\":\"int\",\"doc\":\"Record id.\"},{\"name\":\"bool_value\",\"type\":[\"null\",\"boolean\"],\"doc\":\"Boolean Value.\"},{\"name\":\"double_value\",\"type\":[\"null\",\"double\"],\"doc\":\"double Value.\",\"default\":null},{\"name\":\"float_value\",\"type\":[\"null\",\"flo [...]
+ public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"ComplexRecord\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"fields\":[{\"name\":\"id\",\"type\":\"int\",\"doc\":\"Record id.\"},{\"name\":\"bool_value\",\"type\":[\"null\",\"boolean\"],\"doc\":\"Boolean Value.\"},{\"name\":\"double_value\",\"type\":[\"null\",\"double\"],\"doc\":\"double Value.\",\"default\":null},{\"name\":\"float_value0\",\"type\":[\"null\",\"fl [...]
/** Record id. */
public java.lang.Integer id;
/** Boolean Value. */
@@ -34,7 +34,7 @@ public class ComplexRecord extends org.apache.avro.specific.SpecificRecordBase i
/** double Value. */
public java.lang.Double double_value;
/** float Value. */
- public java.lang.Float float_value;
+ public java.lang.Float float_value0;
/** string Value. */
public java.lang.CharSequence string_value;
/** bytes Value. */
@@ -61,7 +61,7 @@ public class ComplexRecord extends org.apache.avro.specific.SpecificRecordBase i
case 0: return id;
case 1: return bool_value;
case 2: return double_value;
- case 3: return float_value;
+ case 3: return float_value0;
case 4: return string_value;
case 5: return bytes_value;
case 6: return long_value;
@@ -82,7 +82,7 @@ public class ComplexRecord extends org.apache.avro.specific.SpecificRecordBase i
case 0: id = (java.lang.Integer)value$; break;
case 1: bool_value = (java.lang.Boolean)value$; break;
case 2: double_value = (java.lang.Double)value$; break;
- case 3: float_value = (java.lang.Float)value$; break;
+ case 3: float_value0 = (java.lang.Float)value$; break;
case 4: string_value = (java.lang.CharSequence)value$; break;
case 5: bytes_value = (java.nio.ByteBuffer)value$; break;
case 6: long_value = (java.lang.Long)value$; break;
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/planner/TestSamzaSqlValidator.java b/samza-sql/src/test/java/org/apache/samza/sql/planner/TestSamzaSqlValidator.java
index 4c9522e..38a18a1 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/planner/TestSamzaSqlValidator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/planner/TestSamzaSqlValidator.java
@@ -59,11 +59,38 @@ public class TestSamzaSqlValidator {
new SamzaSqlValidator(samzaConfig).validate(sqlStmts);
}
+ // Samza Sql allows users to replace a field in the input stream. For eg: To always set bool_value to false
+ // while keeping the values of other fields the same, it could be written the below way.
+ // SELECT false AS bool_value, c.* FROM testavro.COMPLEX1 AS c
+ @Test
+ public void testRepeatedTwiceFieldsValidation() throws SamzaSqlValidatorException {
+ Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
+ "Insert into testavro.outputTopic select false as bool_value, c.* from testavro.COMPLEX1 as c");
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+
+ List<String> sqlStmts = fetchSqlFromConfig(config);
+ new SamzaSqlValidator(samzaConfig).validate(sqlStmts);
+ }
+
+ // Samza Sql allows a field to be replaced only once and validation will fail if the field is replaced more than
+ // once. We disallow it to keep things simple.
@Test (expected = SamzaSqlValidatorException.class)
- public void testNonExistingOutputField() throws SamzaSqlValidatorException {
+ public void testRepeatedThriceFieldsValidation() throws SamzaSqlValidatorException {
Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
- "Insert into testavro.outputTopic(id) select id, name as strings_value"
+ "Insert into testavro.outputTopic select id, bool_value, true as bool_value, c.* from testavro.COMPLEX1 as c");
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+
+ List<String> sqlStmts = fetchSqlFromConfig(config);
+ new SamzaSqlValidator(samzaConfig).validate(sqlStmts);
+ }
+
+ @Test (expected = SamzaSqlValidatorException.class)
+ public void testIllegitFieldEndingInZeroValidation() throws SamzaSqlValidatorException {
+ Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
+ "Insert into testavro.outputTopic select id, true as bool_value, false as non_existing_name0"
+ " from testavro.level1.level2.SIMPLE1 as s where s.id = 1");
Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
@@ -71,13 +98,28 @@ public class TestSamzaSqlValidator {
new SamzaSqlValidator(samzaConfig).validate(sqlStmts);
}
- @Test(expected = SamzaException.class)
- public void testNonExistingSelectField() throws SamzaSqlValidatorException {
+ @Test
+ public void testLegitFieldEndingInZeroValidation() throws SamzaSqlValidatorException {
Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
- "Insert into testavro.outputTopic(id) select non_existing_field, name as string_value"
+ "Insert into testavro.outputTopic"
+ + " select id, bool_value, float_value0 from testavro.COMPLEX1");
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+
+ List<String> sqlStmts = fetchSqlFromConfig(config);
+ new SamzaSqlValidator(samzaConfig).validate(sqlStmts);
+ }
+
+ @Test (expected = SamzaSqlValidatorException.class)
+ public void testNonExistingOutputField() throws SamzaSqlValidatorException {
+ Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
+ "Insert into testavro.outputTopic(id) select id, name as strings_value"
+ " from testavro.level1.level2.SIMPLE1 as s where s.id = 1");
- SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+
+ List<String> sqlStmts = fetchSqlFromConfig(config);
+ new SamzaSqlValidator(samzaConfig).validate(sqlStmts);
}
@Test(expected = SamzaSqlValidatorException.class)
@@ -160,7 +202,7 @@ public class TestSamzaSqlValidator {
try {
new SamzaSqlValidator(samzaConfig).validate(sqlStmts);
} catch (SamzaSqlValidatorException e) {
- Assert.assertTrue(e.getMessage().contains("Field 'bool_value' in output schema does not match any projected fields."));
+ Assert.assertTrue(e.getMessage().contains("Non-optional field 'bool_value' in output schema is missing"));
return;
}
@@ -181,7 +223,7 @@ public class TestSamzaSqlValidator {
try {
new SamzaSqlValidator(samzaConfig).validate(sqlStmts);
} catch (SamzaSqlValidatorException e) {
- Assert.assertTrue(e.getMessage().contains("Field 'id' in output schema does not match"));
+ Assert.assertTrue(e.getMessage().contains("Non-optional field 'id' in output schema is missing"));
return;
}
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
index 7d62b8f..65e0ad0 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
@@ -325,7 +325,7 @@ public class TestAvroSystemFactory implements SystemFactory {
record.put("id", index);
record.put("string_value", "Name" + index);
record.put("bytes_value", ByteBuffer.wrap(("sample bytes").getBytes()));
- record.put("float_value", index + 0.123456f);
+ record.put("float_value0", index + 0.123456f);
record.put("double_value", index + 0.0123456789);
MyFixed myFixedVar = new MyFixed();
myFixedVar.bytes(DEFAULT_TRACKING_ID_BYTES);
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index 20820f6..581fbda 100644
--- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -427,8 +427,8 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
LOG.info(" Class Path : " + RelOptUtil.class.getProtectionDomain().getCodeSource().getLocation().toURI().getPath());
String sql1 =
- "Insert into testavro.outputTopic(string_value, id, bool_value, bytes_value, fixed_value, float_value) "
- + " select Flatten(array_values) as string_value, id, NOT(id = 5) as bool_value, bytes_value, fixed_value, float_value "
+ "Insert into testavro.outputTopic(string_value, id, bool_value, bytes_value, fixed_value, float_value0) "
+ + " select Flatten(array_values) as string_value, id, NOT(id = 5) as bool_value, bytes_value, fixed_value, float_value0 "
+ " from testavro.COMPLEX1";
List<String> sqlStmts = Collections.singletonList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
@@ -458,7 +458,7 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
String sql1 =
"Insert into testavro.outputTopic"
+ " select bool_value, map_values['key0'] as string_value, union_value, array_values, map_values, id, bytes_value,"
- + " fixed_value, float_value from testavro.COMPLEX1";
+ + " fixed_value, float_value0 from testavro.COMPLEX1";
List<String> sqlStmts = Collections.singletonList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
@@ -479,7 +479,7 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
TestAvroSystemFactory.messages.clear();
Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql1 = "Insert into testavro.outputTopic"
- + " select 'urn:li:member:' || cast(cast(float_value as int) as varchar) as string_value, id, float_value, "
+ + " select 'urn:li:member:' || cast(cast(float_value0 as int) as varchar) as string_value, id, float_value0, "
+ " double_value, true as bool_value from testavro.COMPLEX1";
List<String> sqlStmts = Arrays.asList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));