You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sr...@apache.org on 2019/02/13 19:18:52 UTC

[samza] branch master updated: End to end union test case (#916)

This is an automated email from the ASF dual-hosted git repository.

srinivasulu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 63abbd2  End to end union test case (#916)
63abbd2 is described below

commit 63abbd21ce500499ebd56bdb00a9e7ed48ded14d
Author: Srinivasulu Punuru <sr...@users.noreply.github.com>
AuthorDate: Wed Feb 13 11:18:48 2019 -0800

    End to end union test case (#916)
---
 .../apache/samza/sql/avro/AvroRelConverter.java    | 87 ++++++++++++++++++----
 .../samza/sql/avro/TestAvroRelConversion.java      | 16 ++--
 .../samza/sql/avro/schemas/ComplexRecord.avsc      | 12 +--
 .../samza/sql/avro/schemas/ComplexRecord.java      | 18 ++---
 .../samza/sql/system/TestAvroSystemFactory.java    |  7 ++
 .../samza/test/samzasql/TestSamzaSqlEndToEnd.java  |  2 +-
 6 files changed, 100 insertions(+), 42 deletions(-)

diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
index 0d452dd..ba79e48 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
@@ -207,7 +207,11 @@ public class AvroRelConverter implements SamzaRelConverter {
             .collect(Collectors.toMap(Map.Entry::getKey, e -> convertToAvroObject(e.getValue(),
                 getNonNullUnionSchema(schema).getValueType())));
       case UNION:
-        return convertToAvroObject(relObj, getNonNullUnionSchema(schema));
+        for (Schema unionSchema : schema.getTypes()) {
+          if (isSchemaCompatibleWithRelObj(relObj, unionSchema)) {
+            return convertToAvroObject(relObj, unionSchema);
+          }
+        }
       case ENUM:
         return new GenericData.EnumSymbol(schema, (String) relObj);
       case FIXED:
@@ -219,19 +223,6 @@ public class AvroRelConverter implements SamzaRelConverter {
     }
   }
 
-  // Two non-nullable types in a union is not yet supported.
-  static public Schema getNonNullUnionSchema(Schema schema) {
-    if (schema.getType().equals(Schema.Type.UNION)) {
-      if (schema.getTypes().get(0).getType() != Schema.Type.NULL) {
-        return schema.getTypes().get(0);
-      }
-      if (schema.getTypes().get(1).getType() != Schema.Type.NULL) {
-        return schema.getTypes().get(1);
-      }
-    }
-    return schema;
-  }
-
   // Not doing any validations of data types with Avro schema considering the resource cost per message.
   // Casting would fail if the data types are not in sync with the schema.
   public Object convertToJavaObject(Object avroObj, Schema schema) {
@@ -267,7 +258,11 @@ public class AvroRelConverter implements SamzaRelConverter {
         return retVal;
       }
       case UNION:
-        return convertToJavaObject(avroObj, getNonNullUnionSchema(schema));
+        for (Schema unionSchema : schema.getTypes()) {
+          if (isSchemaCompatible(avroObj, unionSchema)) {
+            return convertToJavaObject(avroObj, unionSchema);
+          }
+        }
       case ENUM:
         return avroObj.toString();
       case FIXED:
@@ -283,4 +278,66 @@ public class AvroRelConverter implements SamzaRelConverter {
         return avroObj;
     }
   }
+
+  private boolean isSchemaCompatible(Object avroObj, Schema unionSchema) {
+    if (unionSchema.getType() == Schema.Type.NULL && avroObj == null) {
+      return true;
+    }
+    switch (unionSchema.getType()) {
+      case RECORD:
+        return avroObj instanceof IndexedRecord;
+      case ARRAY:
+        return avroObj instanceof GenericData.Array || avroObj instanceof List;
+      case MAP:
+        return avroObj instanceof Map;
+      case FIXED:
+        return avroObj instanceof GenericData.Fixed;
+      case BYTES:
+        return avroObj instanceof ByteBuffer;
+      case FLOAT:
+        return avroObj instanceof Float;
+      default:
+        return true;
+    }
+  }
+
+  private static boolean isSchemaCompatibleWithRelObj(Object relObj, Schema unionSchema) {
+    if (unionSchema.getType() == Schema.Type.NULL && relObj == null) {
+      return true;
+    }
+    switch (unionSchema.getType()) {
+      case RECORD:
+        return relObj instanceof SamzaSqlRelRecord;
+      case ARRAY:
+        return relObj instanceof List;
+      case MAP:
+        return relObj instanceof Map;
+      case FIXED:
+        return relObj instanceof ByteString;
+      case BYTES:
+        return relObj instanceof ByteString;
+      case FLOAT:
+        return relObj instanceof Float || relObj instanceof Double;
+      default:
+        return true;
+    }
+  }
+
+  // Two non-nullable types in a union is not yet supported.
+  public static Schema getNonNullUnionSchema(Schema schema) {
+    if (schema.getType().equals(Schema.Type.UNION)) {
+      List<Schema> types = schema.getTypes();
+      // Typically a nullable field's schema is configured as an union of Null and a Type.
+      // This is to check whether the Union is a Nullable field
+      if (types.size() == 2) {
+        if (types.get(0).getType() == Schema.Type.NULL) {
+          return types.get(1);
+        } else if ((types.get(1).getType() == Schema.Type.NULL)) {
+          return types.get(0);
+        }
+      }
+    }
+
+    return schema;
+  }
 }
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
index 47f76e8..c3f1caf 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
@@ -56,6 +56,7 @@ import org.apache.samza.sql.avro.schemas.PhoneNumber;
 import org.apache.samza.sql.avro.schemas.Profile;
 import org.apache.samza.sql.avro.schemas.SimpleRecord;
 import org.apache.samza.sql.avro.schemas.StreetNumRecord;
+import org.apache.samza.sql.avro.schemas.SubRecord;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.planner.RelSchemaConverter;
 import org.apache.samza.sql.schema.SqlSchema;
@@ -194,6 +195,7 @@ public class TestAvroRelConversion {
   @Test
   public void testComplexRecordConversion() throws IOException {
     GenericData.Record record = new GenericData.Record(ComplexRecord.SCHEMA$);
+
     record.put("id", id);
     record.put("bool_value", boolValue);
     record.put("double_value", doubleValue);
@@ -204,7 +206,7 @@ public class TestAvroRelConversion {
     record.put("long_value", longValue);
     record.put("array_values", arrayValue);
     record.put("map_values", mapValue);
-    record.put("union_value", id);
+    record.put("union_value", testStrValue);
 
     ComplexRecord complexRecord = new ComplexRecord();
     complexRecord.id = id;
@@ -219,18 +221,10 @@ public class TestAvroRelConversion {
     complexRecord.array_values.addAll(arrayValue);
     complexRecord.map_values = new HashMap<>();
     complexRecord.map_values.putAll(mapValue);
-    complexRecord.union_value = id;
-
-    byte[] serializedData = bytesFromGenericRecord(record);
-    validateAvroSerializedData(serializedData, id);
-
-    serializedData = encodeAvroSpecificRecord(ComplexRecord.class, complexRecord);
-    validateAvroSerializedData(serializedData, id);
-
-    record.put("union_value", testStrValue);
     complexRecord.union_value = testStrValue;
 
-    serializedData = bytesFromGenericRecord(record);
+
+    byte[] serializedData = bytesFromGenericRecord(record);
     validateAvroSerializedData(serializedData, testStrValue);
 
     serializedData = encodeAvroSpecificRecord(ComplexRecord.class, complexRecord);
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc
index 5106bc2..5e78bd9 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc
@@ -112,12 +112,6 @@
           ]
         },
         {
-          "name": "union_value",
-          "doc": "union Value.",
-          "type": ["null", "int", "string"],
-          "default":null
-        },
-        {
           "name" : "empty_record",
           "type" : [ "null", {
             "type" : "record",
@@ -155,6 +149,12 @@
                 ]
               }
             ]
+        },
+        {
+          "name": "union_value",
+          "doc": "union Value.",
+          "type": ["null", "SubRecord", "string"],
+          "default":null
         }
     ]
 }
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java
index ccc7929..7796004 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java
@@ -20,7 +20,7 @@ package org.apache.samza.sql.avro.schemas;
 
 @SuppressWarnings("all")
 public class ComplexRecord extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
-  public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"ComplexRecord\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"Record id.\",\"default\":null},{\"name\":\"bool_value\",\"type\":[\"null\",\"boolean\"],\"doc\":\"Boolean Value.\",\"default\":null},{\"name\":\"double_value\",\"type\":[\"null\",\"double\"],\"doc\":\"double Value.\",\"default\":null},{\"na [...]
+  public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"ComplexRecord\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"Record id.\",\"default\":null},{\"name\":\"bool_value\",\"type\":[\"null\",\"boolean\"],\"doc\":\"Boolean Value.\",\"default\":null},{\"name\":\"double_value\",\"type\":[\"null\",\"double\"],\"doc\":\"double Value.\",\"default\":null},{\"na [...]
   /** Record id. */
   public java.lang.Integer id;
   /** Boolean Value. */
@@ -43,11 +43,11 @@ public class ComplexRecord extends org.apache.avro.specific.SpecificRecordBase i
   public java.util.Map<java.lang.CharSequence,java.lang.CharSequence> map_values;
   /** enum value. */
   public org.apache.samza.sql.avro.schemas.TestEnumType enum_value;
-  /** union Value. */
-  public java.lang.Object union_value;
   public org.apache.samza.sql.avro.schemas.emptySubRecord empty_record;
   /** array of records. */
   public org.apache.samza.sql.avro.schemas.SubRecord array_records;
+  /** union Value. */
+  public java.lang.Object union_value;
   public org.apache.avro.Schema getSchema() { return SCHEMA$; }
   // Used by DatumWriter.  Applications should not call.
   public java.lang.Object get(int field$) {
@@ -63,9 +63,9 @@ public class ComplexRecord extends org.apache.avro.specific.SpecificRecordBase i
     case 8: return array_values;
     case 9: return map_values;
     case 10: return enum_value;
-    case 11: return union_value;
-    case 12: return empty_record;
-    case 13: return array_records;
+    case 11: return empty_record;
+    case 12: return array_records;
+    case 13: return union_value;
     default: throw new org.apache.avro.AvroRuntimeException("Bad index");
     }
   }
@@ -84,9 +84,9 @@ public class ComplexRecord extends org.apache.avro.specific.SpecificRecordBase i
     case 8: array_values = (java.util.List<java.lang.CharSequence>)value$; break;
     case 9: map_values = (java.util.Map<java.lang.CharSequence,java.lang.CharSequence>)value$; break;
     case 10: enum_value = (org.apache.samza.sql.avro.schemas.TestEnumType)value$; break;
-    case 11: union_value = (java.lang.Object)value$; break;
-    case 12: empty_record = (org.apache.samza.sql.avro.schemas.emptySubRecord)value$; break;
-    case 13: array_records = (org.apache.samza.sql.avro.schemas.SubRecord)value$; break;
+    case 11: empty_record = (org.apache.samza.sql.avro.schemas.emptySubRecord)value$; break;
+    case 12: array_records = (org.apache.samza.sql.avro.schemas.SubRecord)value$; break;
+    case 13: union_value = (java.lang.Object)value$; break;
     default: throw new org.apache.avro.AvroRuntimeException("Bad index");
     }
   }
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
index ec25c9d..8304598 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
@@ -42,6 +42,7 @@ import org.apache.samza.sql.avro.schemas.PhoneNumber;
 import org.apache.samza.sql.avro.schemas.Profile;
 import org.apache.samza.sql.avro.schemas.SimpleRecord;
 import org.apache.samza.sql.avro.schemas.StreetNumRecord;
+import org.apache.samza.sql.avro.schemas.SubRecord;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemAdmin;
@@ -319,6 +320,7 @@ public class TestAvroSystemFactory implements SystemFactory {
     }
 
     private Object createComplexRecord(int index) {
+
       GenericRecord record = new GenericData.Record(ComplexRecord.SCHEMA$);
       record.put("id", index);
       record.put("string_value", "Name" + index);
@@ -330,6 +332,11 @@ public class TestAvroSystemFactory implements SystemFactory {
           new GenericData.Array<>(index, ComplexRecord.SCHEMA$.getField("array_values").schema().getTypes().get(1));
       arrayValues.addAll(IntStream.range(0, index).mapToObj(String::valueOf).collect(Collectors.toList()));
       record.put("array_values", arrayValues);
+//      record.put("union_value", "unionStrValue");
+      GenericRecord subRecord = new GenericData.Record(SubRecord.SCHEMA$);
+      subRecord.put("id", index);
+      subRecord.put("sub_values", arrayValues);
+      record.put("union_value", subRecord);
       Map<String, String> mapValues = new HashMap<>();
       mapValues.put("key0", "value0");
       record.put("map_values", mapValues);
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index 9deb561..35c83e9 100644
--- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -292,7 +292,7 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
 
     String sql1 =
         "Insert into testavro.outputTopic"
-            + " select map_values['key0'] as string_value, array_values[0] as string_value, map_values, id, bytes_value, fixed_value, float_value "
+            + " select map_values['key0'] as string_value, union_value, array_values[0] as string_value, map_values, id, bytes_value, fixed_value, float_value "
             + " from testavro.COMPLEX1";
     List<String> sqlStmts = Collections.singletonList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));