You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by at...@apache.org on 2019/09/06 04:47:51 UTC
[samza] branch master updated: SAMZA-2316: Validate that all
non-default value fields in output schema are set in the projected fields.
(#1149)
This is an automated email from the ASF dual-hosted git repository.
atoomula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 713a8bf SAMZA-2316: Validate that all non-default value fields in output schema are set in the projected fields. (#1149)
713a8bf is described below
commit 713a8bf68c2f4f81f7c575f66b1cf091ad81113c
Author: Aditya Toomula <at...@linkedin.com>
AuthorDate: Thu Sep 5 21:47:46 2019 -0700
SAMZA-2316: Validate that all non-default value fields in output schema are set in the projected fields. (#1149)
* Validate that all non-default value fields in output schema are set in the projected fields.
* Added comments and renamed sqlFieldSchema member variables.
---
.../apache/samza/sql/schema/SqlFieldSchema.java | 57 ++++++++++++++++------
.../samza/sql/client/impl/SamzaExecutor.java | 6 +--
.../apache/samza/sql/avro/AvroTypeFactoryImpl.java | 49 ++++++++++---------
.../org/apache/samza/sql/planner/QueryPlanner.java | 16 +++---
.../samza/sql/planner/RelSchemaConverter.java | 22 ++++-----
.../samza/sql/planner/SamzaSqlValidator.java | 55 ++++++++++++++++++---
.../samza/sql/avro/schemas/ComplexRecord.avsc | 13 +++--
.../samza/sql/avro/schemas/ComplexRecord.java | 8 ++-
.../samza/sql/planner/TestSamzaSqlValidator.java | 45 +++++++++++++++--
.../samza/test/samzasql/TestSamzaSqlEndToEnd.java | 36 +++++++-------
10 files changed, 213 insertions(+), 94 deletions(-)
diff --git a/samza-api/src/main/java/org/apache/samza/sql/schema/SqlFieldSchema.java b/samza-api/src/main/java/org/apache/samza/sql/schema/SqlFieldSchema.java
index b944011..d3cec05 100644
--- a/samza-api/src/main/java/org/apache/samza/sql/schema/SqlFieldSchema.java
+++ b/samza-api/src/main/java/org/apache/samza/sql/schema/SqlFieldSchema.java
@@ -24,44 +24,59 @@ package org.apache.samza.sql.schema;
*/
public class SqlFieldSchema {
- private SamzaSqlFieldType fieldType;
- private SqlFieldSchema elementType;
- private SqlFieldSchema valueType;
- private SqlSchema rowSchema;
+ private final SamzaSqlFieldType fieldType;
+ private final SqlFieldSchema elementType;
+ private final SqlFieldSchema valueType;
+ private final SqlSchema rowSchema;
+ // A field is considered nullable when the field could have a null value. Please note that nullable field
+ // needs to be explicitly set while writing and is expected to be set while reading. A non-nullable field
+ // cannot have a null value.
+ private final Boolean isNullable;
+ // A field is considered optional when the field has a default value. Such a field need not be set while writing
+ // but is expected to be set while reading.
+ // Please note that nullable field is also optional field if a default value is set but the value for
+ // nullable non-optional field need to be explicitly set.
+ private final Boolean isOptional;
- private SqlFieldSchema(SamzaSqlFieldType fieldType, SqlFieldSchema elementType, SqlFieldSchema valueType, SqlSchema rowSchema) {
+ private SqlFieldSchema(SamzaSqlFieldType fieldType, SqlFieldSchema elementType, SqlFieldSchema valueType,
+ SqlSchema rowSchema, boolean isNullable, boolean isOptional) {
this.fieldType = fieldType;
this.elementType = elementType;
this.valueType = valueType;
this.rowSchema = rowSchema;
+ this.isNullable = isNullable;
+ this.isOptional = isOptional;
}
/**
- * Create a primitive fi
+ * Create a primitive field schema.
* @param typeName
* @return
*/
- public static SqlFieldSchema createPrimitiveSchema(SamzaSqlFieldType typeName) {
- return new SqlFieldSchema(typeName, null, null, null);
+ public static SqlFieldSchema createPrimitiveSchema(SamzaSqlFieldType typeName, boolean isNullable,
+ boolean isOptional) {
+ return new SqlFieldSchema(typeName, null, null, null, isNullable, isOptional);
}
- public static SqlFieldSchema createArraySchema(SqlFieldSchema elementType) {
- return new SqlFieldSchema(SamzaSqlFieldType.ARRAY, elementType, null, null);
+ public static SqlFieldSchema createArraySchema(SqlFieldSchema elementType, boolean isNullable,
+ boolean isOptional) {
+ return new SqlFieldSchema(SamzaSqlFieldType.ARRAY, elementType, null, null, isNullable, isOptional);
}
- public static SqlFieldSchema createMapSchema(SqlFieldSchema valueType) {
- return new SqlFieldSchema(SamzaSqlFieldType.MAP, null, valueType, null);
+ public static SqlFieldSchema createMapSchema(SqlFieldSchema valueType, boolean isNullable, boolean isOptional) {
+ return new SqlFieldSchema(SamzaSqlFieldType.MAP, null, valueType, null, isNullable, isOptional);
}
- public static SqlFieldSchema createRowFieldSchema(SqlSchema rowSchema) {
- return new SqlFieldSchema(SamzaSqlFieldType.ROW, null, null, rowSchema);
+ public static SqlFieldSchema createRowFieldSchema(SqlSchema rowSchema, boolean isNullable, boolean isOptional) {
+ return new SqlFieldSchema(SamzaSqlFieldType.ROW, null, null, rowSchema, isNullable, isOptional);
}
/**
* @return whether the field is a primitive field type or not.
*/
public boolean isPrimitiveField() {
- return fieldType != SamzaSqlFieldType.ARRAY && fieldType != SamzaSqlFieldType.MAP && fieldType != SamzaSqlFieldType.ROW;
+ return fieldType != SamzaSqlFieldType.ARRAY && fieldType != SamzaSqlFieldType.MAP &&
+ fieldType != SamzaSqlFieldType.ROW;
}
/**
@@ -93,5 +108,17 @@ public class SqlFieldSchema {
return rowSchema;
}
+ /**
+ * Get if the field type is nullable.
+ */
+ public boolean isNullable() {
+ return isNullable;
+ }
+ /**
+ * Get if the field type is optional.
+ */
+ public boolean isOptional() {
+ return isOptional;
+ }
}
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
index db97516..5939adb 100755
--- a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
@@ -293,9 +293,9 @@ public class SamzaExecutor implements SqlExecutor {
*/
List<SqlFunction> udfs = new ArrayList<>();
udfs.add(new SamzaSqlUdfDisplayInfo("RegexMatch", "Matches the string to the regex",
- Arrays.asList(SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.STRING),
- SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.STRING)),
- SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.BOOLEAN)));
+ Arrays.asList(SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.STRING, false, false),
+ SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.STRING, false, false)),
+ SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.BOOLEAN, false, false)));
return udfs;
}
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
index 68116b6..4c06938 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
@@ -23,7 +23,6 @@ import java.util.List;
import org.apache.avro.Schema;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
-import org.apache.commons.lang3.Validate;
import org.apache.samza.SamzaException;
import org.apache.samza.sql.schema.SamzaSqlFieldType;
import org.apache.samza.sql.schema.SqlFieldSchema;
@@ -33,8 +32,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Factory that creates the Calcite relational types from the Avro Schema. This is used by the
- * AvroRelConverter to convert the Avro schema to calcite relational schema.
+ * Factory that creates {@link SqlSchema} from the Avro Schema. This is used by the
+ * {@link AvroRelConverter} to convert Avro schema to Samza Sql schema.
*/
public class AvroTypeFactoryImpl extends SqlTypeFactoryImpl {
@@ -60,7 +59,8 @@ public class AvroTypeFactoryImpl extends SqlTypeFactoryImpl {
SqlSchemaBuilder schemaBuilder = SqlSchemaBuilder.builder();
for (Schema.Field field : fields) {
- SqlFieldSchema fieldSchema = convertField(field.schema());
+ boolean isOptional = (field.defaultValue() != null);
+ SqlFieldSchema fieldSchema = convertField(field.schema(), false, isOptional);
schemaBuilder.addField(field.name(), fieldSchema);
}
@@ -68,36 +68,41 @@ public class AvroTypeFactoryImpl extends SqlTypeFactoryImpl {
}
private SqlFieldSchema convertField(Schema fieldSchema) {
+ return convertField(fieldSchema, false, false);
+ }
+
+ private SqlFieldSchema convertField(Schema fieldSchema, boolean isNullable, boolean isOptional) {
switch (fieldSchema.getType()) {
case ARRAY:
SqlFieldSchema elementSchema = convertField(fieldSchema.getElementType());
- return SqlFieldSchema.createArraySchema(elementSchema);
+ return SqlFieldSchema.createArraySchema(elementSchema, isNullable, isOptional);
case BOOLEAN:
- return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.BOOLEAN);
+ return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.BOOLEAN, isNullable, isOptional);
case DOUBLE:
- return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.DOUBLE);
+ return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.DOUBLE, isNullable, isOptional);
case FLOAT:
- return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.FLOAT);
+ return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.FLOAT, isNullable, isOptional);
case ENUM:
- return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.STRING);
+ return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.STRING, isNullable, isOptional);
case UNION:
- return getSqlTypeFromUnionTypes(fieldSchema.getTypes());
+ return getSqlTypeFromUnionTypes(fieldSchema.getTypes(), isNullable, isOptional);
case FIXED:
- return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.BYTES);
+ return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.BYTES, isNullable, isOptional);
case STRING:
- return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.STRING);
+ return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.STRING, isNullable, isOptional);
case BYTES:
- return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.BYTES);
+ return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.BYTES, isNullable, isOptional);
case INT:
- return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.INT32);
+ return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.INT32, isNullable, isOptional);
case LONG:
- return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.INT64);
+ return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.INT64, isNullable, isOptional);
case RECORD:
SqlSchema rowSchema = convertSchema(fieldSchema.getFields());
- return SqlFieldSchema.createRowFieldSchema(rowSchema);
+ return SqlFieldSchema.createRowFieldSchema(rowSchema, isNullable, isOptional);
case MAP:
- SqlFieldSchema valueType = convertField(fieldSchema.getValueType());
- return SqlFieldSchema.createMapSchema(valueType);
+ // Can the value type be nullable and have default values ? Guess not!
+ SqlFieldSchema valueType = convertField(fieldSchema.getValueType(), false, false);
+ return SqlFieldSchema.createMapSchema(valueType, isNullable, isOptional);
default:
String msg = String.format("Field Type %s is not supported", fieldSchema.getType());
LOG.error(msg);
@@ -105,17 +110,17 @@ public class AvroTypeFactoryImpl extends SqlTypeFactoryImpl {
}
}
- private SqlFieldSchema getSqlTypeFromUnionTypes(List<Schema> types) {
+ private SqlFieldSchema getSqlTypeFromUnionTypes(List<Schema> types, boolean isNullable, boolean isOptional) {
// Typically a nullable field's schema is configured as an union of Null and a Type.
// This is to check whether the Union is a Nullable field
if (types.size() == 2) {
if (types.get(0).getType() == Schema.Type.NULL) {
- return convertField(types.get(1));
+ return convertField(types.get(1), true, isOptional);
} else if ((types.get(1).getType() == Schema.Type.NULL)) {
- return convertField(types.get(0));
+ return convertField(types.get(0), true, isOptional);
}
}
- return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.ANY);
+ return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.ANY, isNullable, isOptional);
}
}
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
index bdf03f7..83ccea0 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
@@ -152,16 +152,15 @@ public class QueryPlanner {
}
}
- public static RelDataType getSourceRelSchema(RelSchemaProvider relSchemaProvider,
- RelSchemaConverter relSchemaConverter) {
- // If the source part is the last one, then fetch the schema corresponding to the stream and register.
+ public static SqlSchema getSourceSqlSchema(RelSchemaProvider relSchemaProvider) {
SqlSchema sqlSchema = relSchemaProvider.getSqlSchema();
List<String> fieldNames = new ArrayList<>();
List<SqlFieldSchema> fieldTypes = new ArrayList<>();
if (!sqlSchema.containsField(SamzaSqlRelMessage.KEY_NAME)) {
fieldNames.add(SamzaSqlRelMessage.KEY_NAME);
- fieldTypes.add(SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.ANY));
+ // Key is a nullable and optional field. It is defaulted to null in SamzaSqlRelMessage.
+ fieldTypes.add(SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.ANY, true, true));
}
fieldNames.addAll(
@@ -169,8 +168,13 @@ public class QueryPlanner {
fieldTypes.addAll(
sqlSchema.getFields().stream().map(SqlSchema.SqlField::getFieldSchema).collect(Collectors.toList()));
- SqlSchema newSchema = new SqlSchema(fieldNames, fieldTypes);
- return relSchemaConverter.convertToRelSchema(newSchema);
+ return new SqlSchema(fieldNames, fieldTypes);
+ }
+
+ public static RelDataType getSourceRelSchema(RelSchemaProvider relSchemaProvider,
+ RelSchemaConverter relSchemaConverter) {
+ // If the source part is the last one, then fetch the schema corresponding to the stream and register.
+ return relSchemaConverter.convertToRelSchema(getSourceSqlSchema(relSchemaProvider));
}
private static Table createTableFromRelSchema(RelDataType relationalSchema) {
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/RelSchemaConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/RelSchemaConverter.java
index 6634f5a..dbd74b6 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/RelSchemaConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/RelSchemaConverter.java
@@ -27,6 +27,7 @@ import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rel.type.RelRecordType;
import org.apache.calcite.sql.type.ArraySqlType;
+import org.apache.calcite.sql.type.MapSqlType;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.samza.SamzaException;
@@ -73,30 +74,29 @@ public class RelSchemaConverter extends SqlTypeFactoryImpl {
switch (fieldSchema.getFieldType()) {
case ARRAY:
RelDataType elementType = getRelDataType(fieldSchema.getElementSchema());
- return new ArraySqlType(elementType, true);
+ return new ArraySqlType(elementType, fieldSchema.isNullable());
case BOOLEAN:
- return createTypeWithNullability(createSqlType(SqlTypeName.BOOLEAN), true);
+ return createTypeWithNullability(createSqlType(SqlTypeName.BOOLEAN), fieldSchema.isNullable());
case DOUBLE:
- return createTypeWithNullability(createSqlType(SqlTypeName.DOUBLE), true);
+ return createTypeWithNullability(createSqlType(SqlTypeName.DOUBLE), fieldSchema.isNullable());
case FLOAT:
- return createTypeWithNullability(createSqlType(SqlTypeName.FLOAT), true);
+ return createTypeWithNullability(createSqlType(SqlTypeName.FLOAT), fieldSchema.isNullable());
case STRING:
- return createTypeWithNullability(createSqlType(SqlTypeName.VARCHAR), true);
+ return createTypeWithNullability(createSqlType(SqlTypeName.VARCHAR), fieldSchema.isNullable());
case BYTES:
- return createTypeWithNullability(createSqlType(SqlTypeName.VARBINARY), true);
+ return createTypeWithNullability(createSqlType(SqlTypeName.VARBINARY), fieldSchema.isNullable());
case INT16:
case INT32:
- return createTypeWithNullability(createSqlType(SqlTypeName.INTEGER), true);
+ return createTypeWithNullability(createSqlType(SqlTypeName.INTEGER), fieldSchema.isNullable());
case INT64:
- return createTypeWithNullability(createSqlType(SqlTypeName.BIGINT), true);
+ return createTypeWithNullability(createSqlType(SqlTypeName.BIGINT), fieldSchema.isNullable());
case ROW:
case ANY:
// TODO Calcite execution engine doesn't support record type yet.
- return createTypeWithNullability(createSqlType(SqlTypeName.ANY), true);
+ return createTypeWithNullability(createSqlType(SqlTypeName.ANY), fieldSchema.isNullable());
case MAP:
RelDataType valueType = getRelDataType(fieldSchema.getValueScehma());
- return super.createMapType(createTypeWithNullability(createSqlType(SqlTypeName.VARCHAR), true),
- createTypeWithNullability(valueType, true));
+ return new MapSqlType(createSqlType(SqlTypeName.VARCHAR), valueType, fieldSchema.isNullable());
default:
String msg = String.format("Field Type %s is not supported", fieldSchema.getFieldType());
LOG.error(msg);
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
index 08d4497..9482a75 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
@@ -40,6 +40,8 @@ import org.apache.samza.sql.dsl.SamzaSqlDslConverter;
import org.apache.samza.sql.interfaces.RelSchemaProvider;
import org.apache.samza.sql.interfaces.SamzaSqlJavaTypeFactoryImpl;
import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.schema.SqlFieldSchema;
+import org.apache.samza.sql.schema.SqlSchema;
import org.apache.samza.sql.util.SamzaSqlQueryParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -98,24 +100,59 @@ public class SamzaSqlValidator {
protected void validateOutput(RelRoot relRoot, RelSchemaProvider relSchemaProvider) throws SamzaSqlValidatorException {
RelRecordType outputRecord = (RelRecordType) QueryPlanner.getSourceRelSchema(relSchemaProvider,
new RelSchemaConverter());
+ // Get Samza Sql schema along with Calcite schema. The reason is that the Calcite schema does not have a way
+ // to represent optional fields while Samza Sql schema can represent optional fields. This is the only reason that
+ // we use SqlSchema in validating output.
+ SqlSchema outputSqlSchema = QueryPlanner.getSourceSqlSchema(relSchemaProvider);
+
LogicalProject project = (LogicalProject) relRoot.rel;
RelRecordType projetRecord = (RelRecordType) project.getRowType();
- validateOutputRecords(outputRecord, projetRecord);
+
+ validateOutputRecords(outputRecord, outputSqlSchema, projetRecord);
}
- protected void validateOutputRecords(RelRecordType outputRecord, RelRecordType projectRecord)
+ protected void validateOutputRecords(RelRecordType outputRecord, SqlSchema outputSqlSchema,
+ RelRecordType projectRecord)
throws SamzaSqlValidatorException {
Map<String, RelDataType> outputRecordMap = outputRecord.getFieldList().stream().collect(
Collectors.toMap(RelDataTypeField::getName, RelDataTypeField::getType));
+ Map<String, SqlFieldSchema> outputFieldSchemaMap = outputSqlSchema.getFields().stream().collect(
+ Collectors.toMap(SqlSchema.SqlField::getFieldName, SqlSchema.SqlField::getFieldSchema));
Map<String, RelDataType> projectRecordMap = projectRecord.getFieldList().stream().collect(
Collectors.toMap(RelDataTypeField::getName, RelDataTypeField::getType));
- // There could be default values for the output schema and hence fields in project schema could be a subset of
- // fields in output schema.
- // TODO: SAMZA-2316: Validate that all non-default value fields in output schema are set in the projected fields.
+ // Ensure that all non-optional fields in output schema are set in the sql query and are of the
+ // same type.
+ for (Map.Entry<String, RelDataType> entry : outputRecordMap.entrySet()) {
+ RelDataType projectFieldType = projectRecordMap.get(entry.getKey());
+ SqlFieldSchema outputSqlFieldSchema = outputFieldSchemaMap.get(entry.getKey());
+
+ if (projectFieldType == null) {
+ // If an output schema field is not found in the sql query, ignore it if the field is optional.
+ // Otherwise, throw an error.
+ if (outputSqlFieldSchema.isOptional()) {
+ continue;
+ }
+ String errMsg = String.format("Field '%s' in output schema does not match any projected fields.",
+ entry.getKey());
+ LOG.error(errMsg);
+ throw new SamzaSqlValidatorException(errMsg);
+ } else if (!compareFieldTypes(entry.getValue(), outputSqlFieldSchema, projectFieldType)) {
+ String errMsg = String.format("Field '%s' with type '%s' in output schema does not match the field type '%s' in"
+ + " projected fields.", entry.getKey(), entry.getValue(), projectFieldType);
+ LOG.error(errMsg);
+ throw new SamzaSqlValidatorException(errMsg);
+ }
+ }
+
+ // Ensure that all fields from sql statement exist in the output schema and are of the same type.
for (Map.Entry<String, RelDataType> entry : projectRecordMap.entrySet()) {
RelDataType outputFieldType = outputRecordMap.get(entry.getKey());
+ SqlFieldSchema outputSqlFieldSchema = outputFieldSchemaMap.get(entry.getKey());
+
if (outputFieldType == null) {
+ // If a field in sql query is not found in the output schema, ignore if it is a Samza Sql special op.
+ // Otherwise, throw an error.
if (entry.getKey().equals(SamzaSqlRelMessage.OP_NAME)) {
continue;
}
@@ -123,7 +160,7 @@ public class SamzaSqlValidator {
entry.getKey());
LOG.error(errMsg);
throw new SamzaSqlValidatorException(errMsg);
- } else if (!compareFieldTypes(outputFieldType, entry.getValue())) {
+ } else if (!compareFieldTypes(outputFieldType, outputSqlFieldSchema, entry.getValue())) {
String errMsg = String.format("Field '%s' with type '%s' in select query does not match the field type '%s' in"
+ " output schema.", entry.getKey(), entry.getValue(), outputFieldType);
LOG.error(errMsg);
@@ -132,7 +169,8 @@ public class SamzaSqlValidator {
}
}
- protected boolean compareFieldTypes(RelDataType outputFieldType, RelDataType selectQueryFieldType) {
+ protected boolean compareFieldTypes(RelDataType outputFieldType, SqlFieldSchema sqlFieldSchema,
+ RelDataType selectQueryFieldType) {
RelDataType projectFieldType;
// JavaTypes are relevant for Udf argument and return types
@@ -168,7 +206,8 @@ public class SamzaSqlValidator {
return projectSqlType == SqlTypeName.FLOAT;
case ROW:
try {
- validateOutputRecords((RelRecordType) outputFieldType, (RelRecordType) projectFieldType);
+ validateOutputRecords((RelRecordType) outputFieldType, sqlFieldSchema.getRowSchema(),
+ (RelRecordType) projectFieldType);
} catch (SamzaSqlValidatorException e) {
LOG.error("A field in select query does not match with the output schema.", e);
return false;
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc
index 5e78bd9..c307b10 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc
@@ -26,14 +26,12 @@
{
"name": "id",
"doc": "Record id.",
- "type": ["null", "int"],
- "default":null
+ "type": "int"
},
{
"name": "bool_value",
"doc": "Boolean Value.",
- "type": ["null", "boolean"],
- "default":null
+ "type": ["null", "boolean"]
},
{
"name": "double_value",
@@ -72,8 +70,8 @@
"name": "MyFixed",
"type":"fixed",
"size":16
- }
- ]
+ }],
+ "default":null
},
{
"name": "array_values",
@@ -120,7 +118,8 @@
"doc" : "",
"fields" : [ ]
}
- ]
+ ],
+ "default":null
},
{
"name": "array_records",
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java
index 7796004..91a447f 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java
@@ -16,11 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
+
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
package org.apache.samza.sql.avro.schemas;
@SuppressWarnings("all")
public class ComplexRecord extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
- public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"ComplexRecord\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"Record id.\",\"default\":null},{\"name\":\"bool_value\",\"type\":[\"null\",\"boolean\"],\"doc\":\"Boolean Value.\",\"default\":null},{\"name\":\"double_value\",\"type\":[\"null\",\"double\"],\"doc\":\"double Value.\",\"default\":null},{\"na [...]
+ public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"ComplexRecord\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"fields\":[{\"name\":\"id\",\"type\":\"int\",\"doc\":\"Record id.\"},{\"name\":\"bool_value\",\"type\":[\"null\",\"boolean\"],\"doc\":\"Boolean Value.\"},{\"name\":\"double_value\",\"type\":[\"null\",\"double\"],\"doc\":\"double Value.\",\"default\":null},{\"name\":\"float_value\",\"type\":[\"null\",\"flo [...]
/** Record id. */
public java.lang.Integer id;
/** Boolean Value. */
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/planner/TestSamzaSqlValidator.java b/samza-sql/src/test/java/org/apache/samza/sql/planner/TestSamzaSqlValidator.java
index b2ce6f6..4c9522e 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/planner/TestSamzaSqlValidator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/planner/TestSamzaSqlValidator.java
@@ -22,14 +22,11 @@ package org.apache.samza.sql.planner;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.calcite.rel.RelRoot;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
-import org.apache.samza.sql.dsl.SamzaSqlDslConverter;
import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
-import org.apache.samza.sql.util.SamzaSqlQueryParser;
import org.apache.samza.sql.util.SamzaSqlTestConfig;
import org.junit.Assert;
import org.junit.Before;
@@ -54,7 +51,7 @@ public class TestSamzaSqlValidator {
public void testBasicValidation() throws SamzaSqlValidatorException {
Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
- "Insert into testavro.outputTopic(id) select id, name as string_value"
+ "Insert into testavro.outputTopic select id, true as bool_value, name as string_value"
+ " from testavro.level1.level2.SIMPLE1 as s where s.id = 1");
Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
@@ -152,6 +149,46 @@ public class TestSamzaSqlValidator {
}
@Test
+ public void testNonDefaultButNullableField() {
+ Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
+ // bool_value is missing
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
+ "Insert into testavro.outputTopic(id) select Flatten(a) as id from (select MyTestArray(id) a from testavro.SIMPLE1)");
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+
+ List<String> sqlStmts = fetchSqlFromConfig(config);
+ try {
+ new SamzaSqlValidator(samzaConfig).validate(sqlStmts);
+ } catch (SamzaSqlValidatorException e) {
+ Assert.assertTrue(e.getMessage().contains("Field 'bool_value' in output schema does not match any projected fields."));
+ return;
+ }
+
+ Assert.fail("Validation test has failed.");
+ }
+
+ @Test
+ public void testNonDefaultOutputField() {
+ Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+ // id is non-default field.
+ String sql = "Insert into testavro.outputTopic "
+ + " select NOT(id = 5) as bool_value, CASE WHEN id IN (5, 6, 7) THEN CAST('foo' AS VARCHAR) WHEN id < 5 THEN CAST('bars' AS VARCHAR) ELSE NULL END as string_value from testavro.SIMPLE1";
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+
+ List<String> sqlStmts = fetchSqlFromConfig(config);
+
+ try {
+ new SamzaSqlValidator(samzaConfig).validate(sqlStmts);
+ } catch (SamzaSqlValidatorException e) {
+ Assert.assertTrue(e.getMessage().contains("Field 'id' in output schema does not match"));
+ return;
+ }
+
+ Assert.fail("Validation test has failed.");
+ }
+
+ @Test
public void testFormatErrorString() {
String sql =
"select 'SampleJob' as jobName, pv.pageKey, count(*) as `count`\n"
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index d81cb3c..dec886e 100644
--- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -289,8 +289,8 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
TestAvroSystemFactory.messages.clear();
Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
- String sql1 = "Insert into testavro.outputTopic(id, long_value) "
- + " select id, TIMESTAMPDIFF(HOUR, CURRENT_TIMESTAMP(), LOCALTIMESTAMP()) + MONTH(CURRENT_DATE()) as long_value from testavro.SIMPLE1";
+ String sql1 = "Insert into testavro.outputTopic(id, bool_value, long_value) "
+ + " select id, NOT(id = 5) as bool_value, TIMESTAMPDIFF(HOUR, CURRENT_TIMESTAMP(), LOCALTIMESTAMP()) + MONTH(CURRENT_DATE()) as long_value from testavro.SIMPLE1";
List<String> sqlStmts = Arrays.asList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
@@ -401,8 +401,8 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
TestAvroSystemFactory.messages.clear();
Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
- String sql1 = "Insert into testavro.outputTopic(id, long_value) "
- + " select id, name as string_value from testavro.SIMPLE1 where name like 'Name%'";
+ String sql1 = "Insert into testavro.outputTopic(id, bool_value, string_value) "
+ + " select id, NOT(id = 5) as bool_value, name as string_value from testavro.SIMPLE1 where name like 'Name%'";
List<String> sqlStmts = Arrays.asList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
@@ -427,8 +427,8 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
LOG.info(" Class Path : " + RelOptUtil.class.getProtectionDomain().getCodeSource().getLocation().toURI().getPath());
String sql1 =
- "Insert into testavro.outputTopic(string_value, id, bytes_value, fixed_value, float_value) "
- + " select Flatten(array_values) as string_value, id, bytes_value, fixed_value, float_value "
+ "Insert into testavro.outputTopic(string_value, id, bool_value, bytes_value, fixed_value, float_value) "
+ + " select Flatten(array_values) as string_value, id, NOT(id = 5) as bool_value, bytes_value, fixed_value, float_value "
+ " from testavro.COMPLEX1";
List<String> sqlStmts = Collections.singletonList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
@@ -457,7 +457,7 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
String sql1 =
"Insert into testavro.outputTopic"
- + " select map_values['key0'] as string_value, union_value, array_values, map_values, id, bytes_value,"
+ + " select bool_value, map_values['key0'] as string_value, union_value, array_values, map_values, id, bytes_value,"
+ " fixed_value, float_value from testavro.COMPLEX1";
List<String> sqlStmts = Collections.singletonList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
@@ -502,7 +502,8 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
TestAvroSystemFactory.messages.clear();
Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql1 =
- "Insert into testavro.outputTopic(id) select Flatten(MyTestArray(id)) as id from testavro.SIMPLE1";
+ "Insert into testavro.outputTopic(id, bool_value) select Flatten(MyTestArray(id)) as id, NOT(id = 5) as bool_value"
+ + " from testavro.SIMPLE1";
List<String> sqlStmts = Collections.singletonList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
@@ -527,7 +528,8 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
TestAvroSystemFactory.messages.clear();
Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql1 =
- "Insert into testavro.outputTopic(id) select Flatten(a) as id from (select MyTestArray(id) a from testavro.SIMPLE1)";
+ "Insert into testavro.outputTopic(id, bool_value) select Flatten(a) as id, true as bool_value"
+ + " from (select MyTestArray(id) a from testavro.SIMPLE1)";
List<String> sqlStmts = Collections.singletonList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
@@ -551,8 +553,8 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
- String sql1 = "Insert into testavro.outputTopic(id, long_value) "
- + "select id, MyTest(MyTestObj(id)) as long_value from testavro.SIMPLE1";
+ String sql1 = "Insert into testavro.outputTopic(id, bool_value, long_value) "
+ + "select id, NOT(id = 5) as bool_value, MyTest(MyTestObj(id)) as long_value from testavro.SIMPLE1";
List<String> sqlStmts = Collections.singletonList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
@@ -575,8 +577,8 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
- String sql1 = "Insert into testavro.outputTopic(id, long_value) "
- + "select id, MYTest(id) as long_value from testavro.SIMPLE1";
+ String sql1 = "Insert into testavro.outputTopic(id, bool_value, long_value) "
+ + "select id, NOT(id = 5) as bool_value, MYTest(id) as long_value from testavro.SIMPLE1";
List<String> sqlStmts = Collections.singletonList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
@@ -623,8 +625,8 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
- String sql1 = "Insert into testavro.outputTopic(id, long_value) "
- + "select MyTestPoly(id) as long_value, MyTestPoly(name) as id from testavro.SIMPLE1";
+ String sql1 = "Insert into testavro.outputTopic(id, bool_value, long_value) "
+ + "select MyTestPoly(id) as long_value, NOT(id = 5) as bool_value, MyTestPoly(name) as id from testavro.SIMPLE1";
List<String> sqlStmts = Collections.singletonList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
@@ -652,8 +654,8 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
TestAvroSystemFactory.messages.clear();
Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql1 =
- "Insert into testavro.outputTopic(id) "
- + "select id "
+ "Insert into testavro.outputTopic(id, bool_value) "
+ + "select id, NOT(id = 5) as bool_value "
+ "from testavro.SIMPLE1 "
+ "where RegexMatch('.*4', name)";
List<String> sqlStmts = Collections.singletonList(sql1);