You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sr...@apache.org on 2019/02/07 17:29:50 UTC
[samza] branch master updated: Support for empty records
This is an automated email from the ASF dual-hosted git repository.
srinivasulu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 5cea143 Support for empty records
new 2619f64 Merge pull request #909 from srinipunuru/emptyrecords.1
5cea143 is described below
commit 5cea143e7e2bf742de2bf632cd07fdec1c6acb70
Author: Srinivasulu Punuru <sp...@linkedin.com>
AuthorDate: Wed Feb 6 16:21:12 2019 -0800
Support for empty records
---
.../org/apache/samza/sql/schema/SqlSchema.java | 3 +-
.../apache/samza/sql/avro/AvroTypeFactoryImpl.java | 1 -
.../samza/sql/avro/TestAvroRelConversion.java | 72 +++++++++++++---------
.../samza/sql/avro/schemas/ComplexRecord.avsc | 23 ++++++-
.../samza/sql/avro/schemas/ComplexRecord.java | 59 +++++++++---------
.../samza/sql/avro/schemas/emptySubRecord.java | 39 ++++++++++++
6 files changed, 133 insertions(+), 64 deletions(-)
diff --git a/samza-api/src/main/java/org/apache/samza/sql/schema/SqlSchema.java b/samza-api/src/main/java/org/apache/samza/sql/schema/SqlSchema.java
index 75283f4..c542379 100644
--- a/samza-api/src/main/java/org/apache/samza/sql/schema/SqlSchema.java
+++ b/samza-api/src/main/java/org/apache/samza/sql/schema/SqlSchema.java
@@ -75,8 +75,7 @@ public class SqlSchema {
private List<SqlField> fields;
public SqlSchema(List<String> colNames, List<SqlFieldSchema> colTypes) {
- if (colNames == null || colNames.size() == 0 || colTypes == null || colTypes.size() == 0
- || colNames.size() != colTypes.size()) {
+ if (colNames == null || colTypes == null || colNames.size() != colTypes.size()) {
throw new IllegalArgumentException();
}
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
index 63cdd55..68116b6 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
@@ -57,7 +57,6 @@ public class AvroTypeFactoryImpl extends SqlTypeFactoryImpl {
}
private SqlSchema convertSchema(List<Schema.Field> fields) {
- Validate.notEmpty(fields, "Fields cannot be empty");
SqlSchemaBuilder schemaBuilder = SqlSchemaBuilder.builder();
for (Schema.Field field : fields) {
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
index 295f6cd..47f76e8 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
@@ -1,21 +1,21 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.samza.sql.avro;
@@ -57,8 +57,8 @@ import org.apache.samza.sql.avro.schemas.Profile;
import org.apache.samza.sql.avro.schemas.SimpleRecord;
import org.apache.samza.sql.avro.schemas.StreetNumRecord;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
-import org.apache.samza.sql.schema.SqlSchema;
import org.apache.samza.sql.planner.RelSchemaConverter;
+import org.apache.samza.sql.schema.SqlSchema;
import org.apache.samza.system.SystemStream;
import org.junit.Assert;
import org.junit.Test;
@@ -204,6 +204,7 @@ public class TestAvroRelConversion {
record.put("long_value", longValue);
record.put("array_values", arrayValue);
record.put("map_values", mapValue);
+ record.put("union_value", id);
ComplexRecord complexRecord = new ComplexRecord();
complexRecord.id = id;
@@ -218,12 +219,22 @@ public class TestAvroRelConversion {
complexRecord.array_values.addAll(arrayValue);
complexRecord.map_values = new HashMap<>();
complexRecord.map_values.putAll(mapValue);
+ complexRecord.union_value = id;
byte[] serializedData = bytesFromGenericRecord(record);
- validateAvroSerializedData(serializedData);
+ validateAvroSerializedData(serializedData, id);
+
+ serializedData = encodeAvroSpecificRecord(ComplexRecord.class, complexRecord);
+ validateAvroSerializedData(serializedData, id);
+
+ record.put("union_value", testStrValue);
+ complexRecord.union_value = testStrValue;
+
+ serializedData = bytesFromGenericRecord(record);
+ validateAvroSerializedData(serializedData, testStrValue);
serializedData = encodeAvroSpecificRecord(ComplexRecord.class, complexRecord);
- validateAvroSerializedData(serializedData);
+ validateAvroSerializedData(serializedData, testStrValue);
}
@Test
@@ -240,7 +251,6 @@ public class TestAvroRelConversion {
record.put("address", addressRecord);
record.put("selfEmployed", "True");
-
GenericData.Record phoneNumberRecordH = new GenericData.Record(PhoneNumber.SCHEMA$);
phoneNumberRecordH.put("kind", Kind.Home);
phoneNumberRecordH.put("number", "111-111-1111");
@@ -299,7 +309,6 @@ public class TestAvroRelConversion {
record.put("address", addressRecord);
record.put("selfEmployed", "True");
-
List<GenericData.Record> phoneNumbers = null;
record.put("phoneNumbers", phoneNumbers);
@@ -336,11 +345,12 @@ public class TestAvroRelConversion {
return outputStream.toByteArray();
}
- private void validateAvroSerializedData(byte[] serializedData) throws IOException {
+ private void validateAvroSerializedData(byte[] serializedData, Object unionValue) throws IOException {
GenericRecord complexRecordValue = genericRecordFromBytes(serializedData, ComplexRecord.SCHEMA$);
SamzaSqlRelMessage message = complexRecordAvroRelConverter.convertToRelMessage(new KV<>("key", complexRecordValue));
- Assert.assertEquals(message.getSamzaSqlRelRecord().getFieldNames().size(), ComplexRecord.SCHEMA$.getFields().size() + 1);
+ Assert.assertEquals(message.getSamzaSqlRelRecord().getFieldNames().size(),
+ ComplexRecord.SCHEMA$.getFields().size() + 1);
Assert.assertEquals(message.getSamzaSqlRelRecord().getField("id").get(), id);
Assert.assertEquals(message.getSamzaSqlRelRecord().getField("bool_value").get(), boolValue);
@@ -348,11 +358,15 @@ public class TestAvroRelConversion {
Assert.assertEquals(message.getSamzaSqlRelRecord().getField("string_value").get(), new Utf8(testStrValue));
Assert.assertEquals(message.getSamzaSqlRelRecord().getField("float_value").get(), doubleValue);
Assert.assertEquals(message.getSamzaSqlRelRecord().getField("long_value").get(), longValue);
- Assert.assertTrue(
- arrayValue.stream()
- .map(Utf8::new)
- .collect(Collectors.toList())
- .equals(message.getSamzaSqlRelRecord().getField("array_values").get()));
+ if (unionValue instanceof String) {
+ Assert.assertEquals(message.getSamzaSqlRelRecord().getField("union_value").get(), new Utf8((String) unionValue));
+ } else {
+ Assert.assertEquals(message.getSamzaSqlRelRecord().getField("union_value").get(), unionValue);
+ }
+ Assert.assertTrue(arrayValue.stream()
+ .map(Utf8::new)
+ .collect(Collectors.toList())
+ .equals(message.getSamzaSqlRelRecord().getField("array_values").get()));
Assert.assertTrue(mapValue.entrySet()
.stream()
.collect(Collectors.toMap(x -> new Utf8(x.getKey()), y -> new Utf8(y.getValue())))
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc
index 30794fb..5106bc2 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc
@@ -20,7 +20,7 @@
{
"name": "ComplexRecord",
"version" : 1,
- "namespace": "org.apache.samza.sql.system.avro",
+ "namespace": "org.apache.samza.sql.avro.schemas",
"type": "record",
"fields": [
{
@@ -104,7 +104,7 @@
"type": [ "null",
{
"name": "TestEnumType",
- "namespace": "org.apache.samza.sql.system.avro",
+ "namespace": "org.apache.samza.sql.avro.schemas",
"type": "enum",
"doc": "My sample enum type",
"symbols": ["foo", "bar"]
@@ -112,6 +112,23 @@
]
},
{
+ "name": "union_value",
+ "doc": "union Value.",
+ "type": ["null", "int", "string"],
+ "default":null
+ },
+ {
+ "name" : "empty_record",
+ "type" : [ "null", {
+ "type" : "record",
+ "name" : "emptySubRecord",
+ "namespace" : "org.apache.samza.sql.avro.schemas",
+ "doc" : "",
+ "fields" : [ ]
+ }
+ ]
+ },
+ {
"name": "array_records",
"doc" : "array of records.",
"default": [],
@@ -119,7 +136,7 @@
{
"type": "record",
"name": "SubRecord",
- "namespace": "org.apache.samza.sql.system.avro",
+ "namespace": "org.apache.samza.sql.avro.schemas",
"doc": "Sub record",
"fields": [
{
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java
index f46918a..ccc7929 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java
@@ -1,32 +1,26 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-/**
- * Autogenerated by Avro
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * DO NOT EDIT DIRECTLY
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.samza.sql.avro.schemas;
@SuppressWarnings("all")
public class ComplexRecord extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
- public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"ComplexRecord\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"Record id.\",\"default\":null},{\"name\":\"bool_value\",\"type\":[\"null\",\"boolean\"],\"doc\":\"Boolean Value.\",\"default\":null},{\"name\":\"double_value\",\"type\":[\"null\",\"double\"],\"doc\":\"double Value.\",\"default\":null},{\"na [...]
+ public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"ComplexRecord\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"Record id.\",\"default\":null},{\"name\":\"bool_value\",\"type\":[\"null\",\"boolean\"],\"doc\":\"Boolean Value.\",\"default\":null},{\"name\":\"double_value\",\"type\":[\"null\",\"double\"],\"doc\":\"double Value.\",\"default\":null},{\"na [...]
/** Record id. */
public java.lang.Integer id;
/** Boolean Value. */
@@ -42,15 +36,18 @@ public class ComplexRecord extends org.apache.avro.specific.SpecificRecordBase i
/** long Value. */
public java.lang.Long long_value;
/** fixed Value. */
- public MyFixed fixed_value;
+ public org.apache.samza.sql.avro.schemas.MyFixed fixed_value;
/** array values in the record. */
public java.util.List<java.lang.CharSequence> array_values;
/** map values in the record. */
public java.util.Map<java.lang.CharSequence,java.lang.CharSequence> map_values;
/** enum value. */
- public TestEnumType enum_value;
+ public org.apache.samza.sql.avro.schemas.TestEnumType enum_value;
+ /** union Value. */
+ public java.lang.Object union_value;
+ public org.apache.samza.sql.avro.schemas.emptySubRecord empty_record;
/** array of records. */
- public SubRecord array_records;
+ public org.apache.samza.sql.avro.schemas.SubRecord array_records;
public org.apache.avro.Schema getSchema() { return SCHEMA$; }
// Used by DatumWriter. Applications should not call.
public java.lang.Object get(int field$) {
@@ -66,7 +63,9 @@ public class ComplexRecord extends org.apache.avro.specific.SpecificRecordBase i
case 8: return array_values;
case 9: return map_values;
case 10: return enum_value;
- case 11: return array_records;
+ case 11: return union_value;
+ case 12: return empty_record;
+ case 13: return array_records;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}
@@ -81,11 +80,13 @@ public class ComplexRecord extends org.apache.avro.specific.SpecificRecordBase i
case 4: string_value = (java.lang.CharSequence)value$; break;
case 5: bytes_value = (java.nio.ByteBuffer)value$; break;
case 6: long_value = (java.lang.Long)value$; break;
- case 7: fixed_value = (MyFixed)value$; break;
+ case 7: fixed_value = (org.apache.samza.sql.avro.schemas.MyFixed)value$; break;
case 8: array_values = (java.util.List<java.lang.CharSequence>)value$; break;
case 9: map_values = (java.util.Map<java.lang.CharSequence,java.lang.CharSequence>)value$; break;
- case 10: enum_value = (TestEnumType)value$; break;
- case 11: array_records = (SubRecord)value$; break;
+ case 10: enum_value = (org.apache.samza.sql.avro.schemas.TestEnumType)value$; break;
+ case 11: union_value = (java.lang.Object)value$; break;
+ case 12: empty_record = (org.apache.samza.sql.avro.schemas.emptySubRecord)value$; break;
+ case 13: array_records = (org.apache.samza.sql.avro.schemas.SubRecord)value$; break;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/emptySubRecord.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/emptySubRecord.java
new file mode 100644
index 0000000..b7161e5
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/emptySubRecord.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.avro.schemas;
+
+@SuppressWarnings("all")
+/** */
+public class emptySubRecord extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
+ public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"emptySubRecord\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"fields\":[]}");
+ public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+ // Used by DatumWriter. Applications should not call.
+ public java.lang.Object get(int field$) {
+ switch (field$) {
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+ // Used by DatumReader. Applications should not call.
+ @SuppressWarnings(value="unchecked")
+ public void put(int field$, java.lang.Object value$) {
+ switch (field$) {
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+}