You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sr...@apache.org on 2019/02/07 17:29:50 UTC

[samza] branch master updated: Support for empty records

This is an automated email from the ASF dual-hosted git repository.

srinivasulu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 5cea143  Support for empty records
     new 2619f64  Merge pull request #909 from srinipunuru/emptyrecords.1
5cea143 is described below

commit 5cea143e7e2bf742de2bf632cd07fdec1c6acb70
Author: Srinivasulu Punuru <sp...@linkedin.com>
AuthorDate: Wed Feb 6 16:21:12 2019 -0800

    Support for empty records
---
 .../org/apache/samza/sql/schema/SqlSchema.java     |  3 +-
 .../apache/samza/sql/avro/AvroTypeFactoryImpl.java |  1 -
 .../samza/sql/avro/TestAvroRelConversion.java      | 72 +++++++++++++---------
 .../samza/sql/avro/schemas/ComplexRecord.avsc      | 23 ++++++-
 .../samza/sql/avro/schemas/ComplexRecord.java      | 59 +++++++++---------
 .../samza/sql/avro/schemas/emptySubRecord.java     | 39 ++++++++++++
 6 files changed, 133 insertions(+), 64 deletions(-)

diff --git a/samza-api/src/main/java/org/apache/samza/sql/schema/SqlSchema.java b/samza-api/src/main/java/org/apache/samza/sql/schema/SqlSchema.java
index 75283f4..c542379 100644
--- a/samza-api/src/main/java/org/apache/samza/sql/schema/SqlSchema.java
+++ b/samza-api/src/main/java/org/apache/samza/sql/schema/SqlSchema.java
@@ -75,8 +75,7 @@ public class SqlSchema {
   private List<SqlField> fields;
 
   public SqlSchema(List<String> colNames, List<SqlFieldSchema> colTypes) {
-    if (colNames == null || colNames.size() == 0 || colTypes == null || colTypes.size() == 0
-        || colNames.size() != colTypes.size()) {
+    if (colNames == null || colTypes == null || colNames.size() != colTypes.size()) {
       throw new IllegalArgumentException();
     }
 
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
index 63cdd55..68116b6 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
@@ -57,7 +57,6 @@ public class AvroTypeFactoryImpl extends SqlTypeFactoryImpl {
   }
 
   private SqlSchema convertSchema(List<Schema.Field> fields) {
-    Validate.notEmpty(fields, "Fields cannot be empty");
 
     SqlSchemaBuilder schemaBuilder = SqlSchemaBuilder.builder();
     for (Schema.Field field : fields) {
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
index 295f6cd..47f76e8 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
@@ -1,21 +1,21 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 
 package org.apache.samza.sql.avro;
 
@@ -57,8 +57,8 @@ import org.apache.samza.sql.avro.schemas.Profile;
 import org.apache.samza.sql.avro.schemas.SimpleRecord;
 import org.apache.samza.sql.avro.schemas.StreetNumRecord;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
-import org.apache.samza.sql.schema.SqlSchema;
 import org.apache.samza.sql.planner.RelSchemaConverter;
+import org.apache.samza.sql.schema.SqlSchema;
 import org.apache.samza.system.SystemStream;
 import org.junit.Assert;
 import org.junit.Test;
@@ -204,6 +204,7 @@ public class TestAvroRelConversion {
     record.put("long_value", longValue);
     record.put("array_values", arrayValue);
     record.put("map_values", mapValue);
+    record.put("union_value", id);
 
     ComplexRecord complexRecord = new ComplexRecord();
     complexRecord.id = id;
@@ -218,12 +219,22 @@ public class TestAvroRelConversion {
     complexRecord.array_values.addAll(arrayValue);
     complexRecord.map_values = new HashMap<>();
     complexRecord.map_values.putAll(mapValue);
+    complexRecord.union_value = id;
 
     byte[] serializedData = bytesFromGenericRecord(record);
-    validateAvroSerializedData(serializedData);
+    validateAvroSerializedData(serializedData, id);
+
+    serializedData = encodeAvroSpecificRecord(ComplexRecord.class, complexRecord);
+    validateAvroSerializedData(serializedData, id);
+
+    record.put("union_value", testStrValue);
+    complexRecord.union_value = testStrValue;
+
+    serializedData = bytesFromGenericRecord(record);
+    validateAvroSerializedData(serializedData, testStrValue);
 
     serializedData = encodeAvroSpecificRecord(ComplexRecord.class, complexRecord);
-    validateAvroSerializedData(serializedData);
+    validateAvroSerializedData(serializedData, testStrValue);
   }
 
   @Test
@@ -240,7 +251,6 @@ public class TestAvroRelConversion {
     record.put("address", addressRecord);
     record.put("selfEmployed", "True");
 
-
     GenericData.Record phoneNumberRecordH = new GenericData.Record(PhoneNumber.SCHEMA$);
     phoneNumberRecordH.put("kind", Kind.Home);
     phoneNumberRecordH.put("number", "111-111-1111");
@@ -299,7 +309,6 @@ public class TestAvroRelConversion {
     record.put("address", addressRecord);
     record.put("selfEmployed", "True");
 
-
     List<GenericData.Record> phoneNumbers = null;
     record.put("phoneNumbers", phoneNumbers);
 
@@ -336,11 +345,12 @@ public class TestAvroRelConversion {
     return outputStream.toByteArray();
   }
 
-  private void validateAvroSerializedData(byte[] serializedData) throws IOException {
+  private void validateAvroSerializedData(byte[] serializedData, Object unionValue) throws IOException {
     GenericRecord complexRecordValue = genericRecordFromBytes(serializedData, ComplexRecord.SCHEMA$);
 
     SamzaSqlRelMessage message = complexRecordAvroRelConverter.convertToRelMessage(new KV<>("key", complexRecordValue));
-    Assert.assertEquals(message.getSamzaSqlRelRecord().getFieldNames().size(), ComplexRecord.SCHEMA$.getFields().size() + 1);
+    Assert.assertEquals(message.getSamzaSqlRelRecord().getFieldNames().size(),
+        ComplexRecord.SCHEMA$.getFields().size() + 1);
 
     Assert.assertEquals(message.getSamzaSqlRelRecord().getField("id").get(), id);
     Assert.assertEquals(message.getSamzaSqlRelRecord().getField("bool_value").get(), boolValue);
@@ -348,11 +358,15 @@ public class TestAvroRelConversion {
     Assert.assertEquals(message.getSamzaSqlRelRecord().getField("string_value").get(), new Utf8(testStrValue));
     Assert.assertEquals(message.getSamzaSqlRelRecord().getField("float_value").get(), doubleValue);
     Assert.assertEquals(message.getSamzaSqlRelRecord().getField("long_value").get(), longValue);
-    Assert.assertTrue(
-        arrayValue.stream()
-            .map(Utf8::new)
-            .collect(Collectors.toList())
-            .equals(message.getSamzaSqlRelRecord().getField("array_values").get()));
+    if (unionValue instanceof String) {
+      Assert.assertEquals(message.getSamzaSqlRelRecord().getField("union_value").get(), new Utf8((String) unionValue));
+    } else {
+      Assert.assertEquals(message.getSamzaSqlRelRecord().getField("union_value").get(), unionValue);
+    }
+    Assert.assertTrue(arrayValue.stream()
+        .map(Utf8::new)
+        .collect(Collectors.toList())
+        .equals(message.getSamzaSqlRelRecord().getField("array_values").get()));
     Assert.assertTrue(mapValue.entrySet()
         .stream()
         .collect(Collectors.toMap(x -> new Utf8(x.getKey()), y -> new Utf8(y.getValue())))
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc
index 30794fb..5106bc2 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc
@@ -20,7 +20,7 @@
 {
     "name": "ComplexRecord",
     "version" : 1,
-    "namespace": "org.apache.samza.sql.system.avro",
+    "namespace": "org.apache.samza.sql.avro.schemas",
     "type": "record",
     "fields": [
         {
@@ -104,7 +104,7 @@
             "type": [ "null",
             {
                "name": "TestEnumType",
-               "namespace": "org.apache.samza.sql.system.avro",
+               "namespace": "org.apache.samza.sql.avro.schemas",
                "type": "enum",
                "doc": "My sample enum type",
                "symbols": ["foo", "bar"]
@@ -112,6 +112,23 @@
           ]
         },
         {
+          "name": "union_value",
+          "doc": "union Value.",
+          "type": ["null", "int", "string"],
+          "default":null
+        },
+        {
+          "name" : "empty_record",
+          "type" : [ "null", {
+            "type" : "record",
+            "name" : "emptySubRecord",
+            "namespace" : "org.apache.samza.sql.avro.schemas",
+            "doc" : "",
+            "fields" : [ ]
+            }
+          ]
+        },
+        {
             "name": "array_records",
             "doc" : "array of records.",
             "default": [],
@@ -119,7 +136,7 @@
               {
                 "type": "record",
                 "name": "SubRecord",
-                "namespace": "org.apache.samza.sql.system.avro",
+                "namespace": "org.apache.samza.sql.avro.schemas",
                 "doc": "Sub record",
                 "fields": [
                   {
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java
index f46918a..ccc7929 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java
@@ -1,32 +1,26 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-/**
- * Autogenerated by Avro
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * DO NOT EDIT DIRECTLY
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 package org.apache.samza.sql.avro.schemas;
 
 @SuppressWarnings("all")
 public class ComplexRecord extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
-  public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"ComplexRecord\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"Record id.\",\"default\":null},{\"name\":\"bool_value\",\"type\":[\"null\",\"boolean\"],\"doc\":\"Boolean Value.\",\"default\":null},{\"name\":\"double_value\",\"type\":[\"null\",\"double\"],\"doc\":\"double Value.\",\"default\":null},{\"na [...]
+  public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"ComplexRecord\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"Record id.\",\"default\":null},{\"name\":\"bool_value\",\"type\":[\"null\",\"boolean\"],\"doc\":\"Boolean Value.\",\"default\":null},{\"name\":\"double_value\",\"type\":[\"null\",\"double\"],\"doc\":\"double Value.\",\"default\":null},{\"na [...]
   /** Record id. */
   public java.lang.Integer id;
   /** Boolean Value. */
@@ -42,15 +36,18 @@ public class ComplexRecord extends org.apache.avro.specific.SpecificRecordBase i
   /** long Value. */
   public java.lang.Long long_value;
   /** fixed Value. */
-  public MyFixed fixed_value;
+  public org.apache.samza.sql.avro.schemas.MyFixed fixed_value;
   /** array values in the record. */
   public java.util.List<java.lang.CharSequence> array_values;
   /** map values in the record. */
   public java.util.Map<java.lang.CharSequence,java.lang.CharSequence> map_values;
   /** enum value. */
-  public TestEnumType enum_value;
+  public org.apache.samza.sql.avro.schemas.TestEnumType enum_value;
+  /** union Value. */
+  public java.lang.Object union_value;
+  public org.apache.samza.sql.avro.schemas.emptySubRecord empty_record;
   /** array of records. */
-  public SubRecord array_records;
+  public org.apache.samza.sql.avro.schemas.SubRecord array_records;
   public org.apache.avro.Schema getSchema() { return SCHEMA$; }
   // Used by DatumWriter.  Applications should not call.
   public java.lang.Object get(int field$) {
@@ -66,7 +63,9 @@ public class ComplexRecord extends org.apache.avro.specific.SpecificRecordBase i
     case 8: return array_values;
     case 9: return map_values;
     case 10: return enum_value;
-    case 11: return array_records;
+    case 11: return union_value;
+    case 12: return empty_record;
+    case 13: return array_records;
     default: throw new org.apache.avro.AvroRuntimeException("Bad index");
     }
   }
@@ -81,11 +80,13 @@ public class ComplexRecord extends org.apache.avro.specific.SpecificRecordBase i
     case 4: string_value = (java.lang.CharSequence)value$; break;
     case 5: bytes_value = (java.nio.ByteBuffer)value$; break;
     case 6: long_value = (java.lang.Long)value$; break;
-    case 7: fixed_value = (MyFixed)value$; break;
+    case 7: fixed_value = (org.apache.samza.sql.avro.schemas.MyFixed)value$; break;
     case 8: array_values = (java.util.List<java.lang.CharSequence>)value$; break;
     case 9: map_values = (java.util.Map<java.lang.CharSequence,java.lang.CharSequence>)value$; break;
-    case 10: enum_value = (TestEnumType)value$; break;
-    case 11: array_records = (SubRecord)value$; break;
+    case 10: enum_value = (org.apache.samza.sql.avro.schemas.TestEnumType)value$; break;
+    case 11: union_value = (java.lang.Object)value$; break;
+    case 12: empty_record = (org.apache.samza.sql.avro.schemas.emptySubRecord)value$; break;
+    case 13: array_records = (org.apache.samza.sql.avro.schemas.SubRecord)value$; break;
     default: throw new org.apache.avro.AvroRuntimeException("Bad index");
     }
   }
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/emptySubRecord.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/emptySubRecord.java
new file mode 100644
index 0000000..b7161e5
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/emptySubRecord.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.avro.schemas;
+
+@SuppressWarnings("all")
+/**  */
+public class emptySubRecord extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
+  public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"emptySubRecord\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"fields\":[]}");
+  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+  // Used by DatumWriter.  Applications should not call.
+  public java.lang.Object get(int field$) {
+    switch (field$) {
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+  // Used by DatumReader.  Applications should not call.
+  @SuppressWarnings(value="unchecked")
+  public void put(int field$, java.lang.Object value$) {
+    switch (field$) {
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+}