You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sr...@apache.org on 2019/02/13 19:18:52 UTC
[samza] branch master updated: End to end union test case (#916)
This is an automated email from the ASF dual-hosted git repository.
srinivasulu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 63abbd2 End to end union test case (#916)
63abbd2 is described below
commit 63abbd21ce500499ebd56bdb00a9e7ed48ded14d
Author: Srinivasulu Punuru <sr...@users.noreply.github.com>
AuthorDate: Wed Feb 13 11:18:48 2019 -0800
End to end union test case (#916)
---
.../apache/samza/sql/avro/AvroRelConverter.java | 87 ++++++++++++++++++----
.../samza/sql/avro/TestAvroRelConversion.java | 16 ++--
.../samza/sql/avro/schemas/ComplexRecord.avsc | 12 +--
.../samza/sql/avro/schemas/ComplexRecord.java | 18 ++---
.../samza/sql/system/TestAvroSystemFactory.java | 7 ++
.../samza/test/samzasql/TestSamzaSqlEndToEnd.java | 2 +-
6 files changed, 100 insertions(+), 42 deletions(-)
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
index 0d452dd..ba79e48 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
@@ -207,7 +207,11 @@ public class AvroRelConverter implements SamzaRelConverter {
.collect(Collectors.toMap(Map.Entry::getKey, e -> convertToAvroObject(e.getValue(),
getNonNullUnionSchema(schema).getValueType())));
case UNION:
- return convertToAvroObject(relObj, getNonNullUnionSchema(schema));
+ for (Schema unionSchema : schema.getTypes()) {
+ if (isSchemaCompatibleWithRelObj(relObj, unionSchema)) {
+ return convertToAvroObject(relObj, unionSchema);
+ }
+ }
case ENUM:
return new GenericData.EnumSymbol(schema, (String) relObj);
case FIXED:
@@ -219,19 +223,6 @@ public class AvroRelConverter implements SamzaRelConverter {
}
}
- // Two non-nullable types in a union is not yet supported.
- static public Schema getNonNullUnionSchema(Schema schema) {
- if (schema.getType().equals(Schema.Type.UNION)) {
- if (schema.getTypes().get(0).getType() != Schema.Type.NULL) {
- return schema.getTypes().get(0);
- }
- if (schema.getTypes().get(1).getType() != Schema.Type.NULL) {
- return schema.getTypes().get(1);
- }
- }
- return schema;
- }
-
// Not doing any validations of data types with Avro schema considering the resource cost per message.
// Casting would fail if the data types are not in sync with the schema.
public Object convertToJavaObject(Object avroObj, Schema schema) {
@@ -267,7 +258,11 @@ public class AvroRelConverter implements SamzaRelConverter {
return retVal;
}
case UNION:
- return convertToJavaObject(avroObj, getNonNullUnionSchema(schema));
+ for (Schema unionSchema : schema.getTypes()) {
+ if (isSchemaCompatible(avroObj, unionSchema)) {
+ return convertToJavaObject(avroObj, unionSchema);
+ }
+ }
case ENUM:
return avroObj.toString();
case FIXED:
@@ -283,4 +278,66 @@ public class AvroRelConverter implements SamzaRelConverter {
return avroObj;
}
}
+
+ private boolean isSchemaCompatible(Object avroObj, Schema unionSchema) {
+ if (unionSchema.getType() == Schema.Type.NULL && avroObj == null) {
+ return true;
+ }
+ switch (unionSchema.getType()) {
+ case RECORD:
+ return avroObj instanceof IndexedRecord;
+ case ARRAY:
+ return avroObj instanceof GenericData.Array || avroObj instanceof List;
+ case MAP:
+ return avroObj instanceof Map;
+ case FIXED:
+ return avroObj instanceof GenericData.Fixed;
+ case BYTES:
+ return avroObj instanceof ByteBuffer;
+ case FLOAT:
+ return avroObj instanceof Float;
+ default:
+ return true;
+ }
+ }
+
+ private static boolean isSchemaCompatibleWithRelObj(Object relObj, Schema unionSchema) {
+ if (unionSchema.getType() == Schema.Type.NULL && relObj == null) {
+ return true;
+ }
+ switch (unionSchema.getType()) {
+ case RECORD:
+ return relObj instanceof SamzaSqlRelRecord;
+ case ARRAY:
+ return relObj instanceof List;
+ case MAP:
+ return relObj instanceof Map;
+ case FIXED:
+ return relObj instanceof ByteString;
+ case BYTES:
+ return relObj instanceof ByteString;
+ case FLOAT:
+ return relObj instanceof Float || relObj instanceof Double;
+ default:
+ return true;
+ }
+ }
+
+ // Two non-nullable types in a union is not yet supported.
+ public static Schema getNonNullUnionSchema(Schema schema) {
+ if (schema.getType().equals(Schema.Type.UNION)) {
+ List<Schema> types = schema.getTypes();
+ // Typically a nullable field's schema is configured as an union of Null and a Type.
+ // This is to check whether the Union is a Nullable field
+ if (types.size() == 2) {
+ if (types.get(0).getType() == Schema.Type.NULL) {
+ return types.get(1);
+ } else if ((types.get(1).getType() == Schema.Type.NULL)) {
+ return types.get(0);
+ }
+ }
+ }
+
+ return schema;
+ }
}
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
index 47f76e8..c3f1caf 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
@@ -56,6 +56,7 @@ import org.apache.samza.sql.avro.schemas.PhoneNumber;
import org.apache.samza.sql.avro.schemas.Profile;
import org.apache.samza.sql.avro.schemas.SimpleRecord;
import org.apache.samza.sql.avro.schemas.StreetNumRecord;
+import org.apache.samza.sql.avro.schemas.SubRecord;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.planner.RelSchemaConverter;
import org.apache.samza.sql.schema.SqlSchema;
@@ -194,6 +195,7 @@ public class TestAvroRelConversion {
@Test
public void testComplexRecordConversion() throws IOException {
GenericData.Record record = new GenericData.Record(ComplexRecord.SCHEMA$);
+
record.put("id", id);
record.put("bool_value", boolValue);
record.put("double_value", doubleValue);
@@ -204,7 +206,7 @@ public class TestAvroRelConversion {
record.put("long_value", longValue);
record.put("array_values", arrayValue);
record.put("map_values", mapValue);
- record.put("union_value", id);
+ record.put("union_value", testStrValue);
ComplexRecord complexRecord = new ComplexRecord();
complexRecord.id = id;
@@ -219,18 +221,10 @@ public class TestAvroRelConversion {
complexRecord.array_values.addAll(arrayValue);
complexRecord.map_values = new HashMap<>();
complexRecord.map_values.putAll(mapValue);
- complexRecord.union_value = id;
-
- byte[] serializedData = bytesFromGenericRecord(record);
- validateAvroSerializedData(serializedData, id);
-
- serializedData = encodeAvroSpecificRecord(ComplexRecord.class, complexRecord);
- validateAvroSerializedData(serializedData, id);
-
- record.put("union_value", testStrValue);
complexRecord.union_value = testStrValue;
- serializedData = bytesFromGenericRecord(record);
+
+ byte[] serializedData = bytesFromGenericRecord(record);
validateAvroSerializedData(serializedData, testStrValue);
serializedData = encodeAvroSpecificRecord(ComplexRecord.class, complexRecord);
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc
index 5106bc2..5e78bd9 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc
@@ -112,12 +112,6 @@
]
},
{
- "name": "union_value",
- "doc": "union Value.",
- "type": ["null", "int", "string"],
- "default":null
- },
- {
"name" : "empty_record",
"type" : [ "null", {
"type" : "record",
@@ -155,6 +149,12 @@
]
}
]
+ },
+ {
+ "name": "union_value",
+ "doc": "union Value.",
+ "type": ["null", "SubRecord", "string"],
+ "default":null
}
]
}
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java
index ccc7929..7796004 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java
@@ -20,7 +20,7 @@ package org.apache.samza.sql.avro.schemas;
@SuppressWarnings("all")
public class ComplexRecord extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
- public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"ComplexRecord\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"Record id.\",\"default\":null},{\"name\":\"bool_value\",\"type\":[\"null\",\"boolean\"],\"doc\":\"Boolean Value.\",\"default\":null},{\"name\":\"double_value\",\"type\":[\"null\",\"double\"],\"doc\":\"double Value.\",\"default\":null},{\"na [...]
+ public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"ComplexRecord\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"Record id.\",\"default\":null},{\"name\":\"bool_value\",\"type\":[\"null\",\"boolean\"],\"doc\":\"Boolean Value.\",\"default\":null},{\"name\":\"double_value\",\"type\":[\"null\",\"double\"],\"doc\":\"double Value.\",\"default\":null},{\"na [...]
/** Record id. */
public java.lang.Integer id;
/** Boolean Value. */
@@ -43,11 +43,11 @@ public class ComplexRecord extends org.apache.avro.specific.SpecificRecordBase i
public java.util.Map<java.lang.CharSequence,java.lang.CharSequence> map_values;
/** enum value. */
public org.apache.samza.sql.avro.schemas.TestEnumType enum_value;
- /** union Value. */
- public java.lang.Object union_value;
public org.apache.samza.sql.avro.schemas.emptySubRecord empty_record;
/** array of records. */
public org.apache.samza.sql.avro.schemas.SubRecord array_records;
+ /** union Value. */
+ public java.lang.Object union_value;
public org.apache.avro.Schema getSchema() { return SCHEMA$; }
// Used by DatumWriter. Applications should not call.
public java.lang.Object get(int field$) {
@@ -63,9 +63,9 @@ public class ComplexRecord extends org.apache.avro.specific.SpecificRecordBase i
case 8: return array_values;
case 9: return map_values;
case 10: return enum_value;
- case 11: return union_value;
- case 12: return empty_record;
- case 13: return array_records;
+ case 11: return empty_record;
+ case 12: return array_records;
+ case 13: return union_value;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}
@@ -84,9 +84,9 @@ public class ComplexRecord extends org.apache.avro.specific.SpecificRecordBase i
case 8: array_values = (java.util.List<java.lang.CharSequence>)value$; break;
case 9: map_values = (java.util.Map<java.lang.CharSequence,java.lang.CharSequence>)value$; break;
case 10: enum_value = (org.apache.samza.sql.avro.schemas.TestEnumType)value$; break;
- case 11: union_value = (java.lang.Object)value$; break;
- case 12: empty_record = (org.apache.samza.sql.avro.schemas.emptySubRecord)value$; break;
- case 13: array_records = (org.apache.samza.sql.avro.schemas.SubRecord)value$; break;
+ case 11: empty_record = (org.apache.samza.sql.avro.schemas.emptySubRecord)value$; break;
+ case 12: array_records = (org.apache.samza.sql.avro.schemas.SubRecord)value$; break;
+ case 13: union_value = (java.lang.Object)value$; break;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
index ec25c9d..8304598 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
@@ -42,6 +42,7 @@ import org.apache.samza.sql.avro.schemas.PhoneNumber;
import org.apache.samza.sql.avro.schemas.Profile;
import org.apache.samza.sql.avro.schemas.SimpleRecord;
import org.apache.samza.sql.avro.schemas.StreetNumRecord;
+import org.apache.samza.sql.avro.schemas.SubRecord;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemAdmin;
@@ -319,6 +320,7 @@ public class TestAvroSystemFactory implements SystemFactory {
}
private Object createComplexRecord(int index) {
+
GenericRecord record = new GenericData.Record(ComplexRecord.SCHEMA$);
record.put("id", index);
record.put("string_value", "Name" + index);
@@ -330,6 +332,11 @@ public class TestAvroSystemFactory implements SystemFactory {
new GenericData.Array<>(index, ComplexRecord.SCHEMA$.getField("array_values").schema().getTypes().get(1));
arrayValues.addAll(IntStream.range(0, index).mapToObj(String::valueOf).collect(Collectors.toList()));
record.put("array_values", arrayValues);
+// record.put("union_value", "unionStrValue");
+ GenericRecord subRecord = new GenericData.Record(SubRecord.SCHEMA$);
+ subRecord.put("id", index);
+ subRecord.put("sub_values", arrayValues);
+ record.put("union_value", subRecord);
Map<String, String> mapValues = new HashMap<>();
mapValues.put("key0", "value0");
record.put("map_values", mapValues);
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index 9deb561..35c83e9 100644
--- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -292,7 +292,7 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
String sql1 =
"Insert into testavro.outputTopic"
- + " select map_values['key0'] as string_value, array_values[0] as string_value, map_values, id, bytes_value, fixed_value, float_value "
+ + " select map_values['key0'] as string_value, union_value, array_values[0] as string_value, map_values, id, bytes_value, fixed_value, float_value "
+ " from testavro.COMPLEX1";
List<String> sqlStmts = Collections.singletonList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));