You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2020/07/20 18:17:48 UTC

[samza] branch master updated: SAMZA-2560: Generate SamzaSQLRelRecord using avro schema of input event instead of cached-schema. (#1401)

This is an automated email from the ASF dual-hosted git repository.

shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d8de7a  SAMZA-2560: Generate SamzaSQLRelRecord using avro schema of input event instead of cached-schema. (#1401)
0d8de7a is described below

commit 0d8de7ab34de9c851d8bc32c57933d2550e26ecd
Author: shanthoosh <sv...@linkedin.com>
AuthorDate: Mon Jul 20 11:17:38 2020 -0700

    SAMZA-2560: Generate SamzaSQLRelRecord using avro schema of input event instead of cached-schema. (#1401)
    
    * Generate SamzaSQLRelRecord using schema of input avro-record rather than the cached-schema.
    
    * Fix checkstyle violations.
    
    * Address review comments.
    
    * Address review comments
    
    Add unit tests for non-nullable union conversion with nested records.
    
    * Fix rat failures.
---
 .../apache/samza/sql/avro/AvroRelConverter.java    |  13 +-
 .../apache/samza/sql/interfaces/SqlIOConfig.java   |   6 +-
 .../samza/sql/avro/TestAvroRelConversion.java      |  45 ++++++
 .../samza/sql/avro/schemas/ComplexUnion.avsc       |  32 +++++
 .../samza/sql/avro/schemas/ComplexUnion.java       | 156 +++++++++++++++++++++
 5 files changed, 247 insertions(+), 5 deletions(-)

diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
index 9afad9c..b7cee00 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
@@ -125,11 +125,18 @@ public class AvroRelConverter implements SamzaRelConverter {
         .collect(Collectors.toList()));
   }
 
-  private static SamzaSqlRelRecord convertToRelRecord(IndexedRecord avroRecord, Schema schema) {
+  private static SamzaSqlRelRecord convertToRelRecord(IndexedRecord avroRecord) {
     List<Object> fieldValues = new ArrayList<>();
     List<String> fieldNames = new ArrayList<>();
     if (avroRecord != null) {
-      fetchFieldNamesAndValuesFromIndexedRecord(avroRecord, fieldNames, fieldValues, schema);
+      fieldNames.addAll(
+          avroRecord.getSchema().getFields().stream().map(Schema.Field::name).collect(Collectors.toList()));
+      fieldValues.addAll(avroRecord.getSchema()
+          .getFields()
+          .stream()
+          .map(f -> convertToJavaObject(avroRecord.get(avroRecord.getSchema().getField(f.name()).pos()),
+              getNonNullUnionSchema(avroRecord.getSchema().getField(f.name()).schema())))
+          .collect(Collectors.toList()));
     } else {
       String msg = "Avro Record is null";
       LOG.error(msg);
@@ -224,7 +231,7 @@ public class AvroRelConverter implements SamzaRelConverter {
     }
     switch (schema.getType()) {
       case RECORD:
-        return convertToRelRecord((IndexedRecord) avroObj, schema);
+        return convertToRelRecord((IndexedRecord) avroObj);
       case ARRAY: {
         ArrayList<Object> retVal = new ArrayList<>();
         List<Object> avroArray;
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
index 9d8201c..9dc4c1e 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
@@ -81,11 +81,13 @@ public class SqlIOConfig {
     this.streamId = String.format("%s-%s", systemName, streamName);
 
     samzaRelConverterName = streamConfigs.get(CFG_SAMZA_REL_CONVERTER);
-    Validate.notEmpty(samzaRelConverterName, String.format("System %s is not supported. Please check if the system name is provided correctly.", systemName));
+    Validate.notEmpty(samzaRelConverterName, String.format("System %s is not supported."
+        + "Please check if the system name is provided correctly.", systemName));
 
     if (isRemoteTable()) {
       samzaRelTableKeyConverterName = streamConfigs.get(CFG_SAMZA_REL_TABLE_KEY_CONVERTER);
-      Validate.notEmpty(samzaRelTableKeyConverterName, String.format("System %s is not supported. Please check if the system name is provided correctly.", systemName));
+      Validate.notEmpty(samzaRelTableKeyConverterName, String.format("System %s is not supported. "
+          + "Please check if the system name is provided correctly.", systemName));
     } else {
       samzaRelTableKeyConverterName = "";
     }
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
index ec519f2..dccdd7d 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
@@ -50,6 +50,7 @@ import org.apache.samza.config.MapConfig;
 import org.apache.samza.operators.KV;
 import org.apache.samza.sql.avro.schemas.AddressRecord;
 import org.apache.samza.sql.avro.schemas.ComplexRecord;
+import org.apache.samza.sql.avro.schemas.ComplexUnion;
 import org.apache.samza.sql.avro.schemas.Kind;
 import org.apache.samza.sql.avro.schemas.MyFixed;
 import org.apache.samza.sql.avro.schemas.PhoneNumber;
@@ -76,9 +77,11 @@ public class TestAvroRelConversion {
   private final AvroRelConverter simpleRecordAvroRelConverter;
   private final AvroRelConverter complexRecordAvroRelConverter;
   private final AvroRelConverter nestedRecordAvroRelConverter;
+  private final AvroRelConverter complexUnionAvroRelConverter;
   private final AvroRelSchemaProvider simpleRecordSchemaProvider;
   private final AvroRelSchemaProvider complexRecordSchemaProvider;
   private final AvroRelSchemaProvider nestedRecordSchemaProvider;
+  private final AvroRelSchemaProvider complexUnionSchemaProvider;
 
   private int id = 1;
   private boolean boolValue = true;
@@ -102,6 +105,7 @@ public class TestAvroRelConversion {
     SystemStream ss1 = new SystemStream("test", "complexRecord");
     SystemStream ss2 = new SystemStream("test", "simpleRecord");
     SystemStream ss3 = new SystemStream("test", "nestedRecord");
+    SystemStream ss4 = new SystemStream("test", "complexUnion");
     props.put(
         String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, ss1.getSystem(), ss1.getStream()),
         ComplexRecord.SCHEMA$.toString());
@@ -111,15 +115,20 @@ public class TestAvroRelConversion {
     props.put(
         String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, ss3.getSystem(), ss3.getStream()),
         Profile.SCHEMA$.toString());
+    props.put(
+        String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, ss4.getSystem(), ss4.getStream()),
+        ComplexUnion.SCHEMA$.toString());
 
     ConfigBasedAvroRelSchemaProviderFactory factory = new ConfigBasedAvroRelSchemaProviderFactory();
 
     complexRecordSchemaProvider = (AvroRelSchemaProvider) factory.create(ss1, new MapConfig(props));
     simpleRecordSchemaProvider = (AvroRelSchemaProvider) factory.create(ss2, new MapConfig(props));
     nestedRecordSchemaProvider = (AvroRelSchemaProvider) factory.create(ss3, new MapConfig(props));
+    complexUnionSchemaProvider = (AvroRelSchemaProvider) factory.create(ss3, new MapConfig(props));
     complexRecordAvroRelConverter = new AvroRelConverter(ss1, complexRecordSchemaProvider, new MapConfig());
     simpleRecordAvroRelConverter = new AvroRelConverter(ss2, simpleRecordSchemaProvider, new MapConfig());
     nestedRecordAvroRelConverter = new AvroRelConverter(ss3, nestedRecordSchemaProvider, new MapConfig());
+    complexUnionAvroRelConverter = new AvroRelConverter(ss4, complexUnionSchemaProvider, new MapConfig());
 
     fixedBytes.bytes(DEFAULT_TRACKING_ID_BYTES);
   }
@@ -232,6 +241,42 @@ public class TestAvroRelConversion {
   }
 
   @Test
+  public void testComplexUnionConversionShouldWorkWithBothStringAndIntTypes() throws Exception {
+    // ComplexUnion is a nested avro non-nullable union-type with both String and Integer type
+    // Test the complex-union conversion for String type.
+    GenericData.Record record = new GenericData.Record(ComplexUnion.SCHEMA$);
+    record.put("non_nullable_union_value", testStrValue);
+
+    ComplexUnion complexUnion = new ComplexUnion();
+    complexUnion.non_nullable_union_value = testStrValue;
+
+    byte[] serializedData = bytesFromGenericRecord(record);
+    GenericRecord genericRecord = genericRecordFromBytes(serializedData, ComplexUnion.SCHEMA$);
+    SamzaSqlRelMessage message = complexUnionAvroRelConverter.convertToRelMessage(new KV<>("key", genericRecord));
+
+    Assert.assertEquals(testStrValue, message.getSamzaSqlRelRecord().getField("non_nullable_union_value").get().toString());
+
+    serializedData = encodeAvroSpecificRecord(ComplexUnion.class, complexUnion);
+    genericRecord = genericRecordFromBytes(serializedData, ComplexUnion.SCHEMA$);
+    Assert.assertEquals(testStrValue, genericRecord.get("non_nullable_union_value").toString());
+
+    // Testing the complex-union conversion for Integer type
+    record.put("non_nullable_union_value", Integer.valueOf(123));
+
+    complexUnion.non_nullable_union_value = Integer.valueOf(123);
+
+    serializedData = bytesFromGenericRecord(record);
+    genericRecord = genericRecordFromBytes(serializedData, ComplexUnion.SCHEMA$);
+    message = complexUnionAvroRelConverter.convertToRelMessage(new KV<>("key", genericRecord));
+    Assert.assertEquals(Integer.valueOf(123), message.getSamzaSqlRelRecord().getField("non_nullable_union_value").get());
+
+    serializedData = encodeAvroSpecificRecord(ComplexUnion.class, complexUnion);
+    genericRecord = genericRecordFromBytes(serializedData, ComplexUnion.SCHEMA$);
+    Assert.assertEquals(Integer.valueOf(123), genericRecord.get("non_nullable_union_value"));
+
+  }
+
+  @Test
   public void testNestedRecordConversion() throws IOException {
     GenericData.Record record = new GenericData.Record(Profile.SCHEMA$);
     record.put("id", 1);
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexUnion.avsc b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexUnion.avsc
new file mode 100644
index 0000000..d94417b
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexUnion.avsc
@@ -0,0 +1,32 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+{
+    "name": "ComplexUnion",
+    "version" : 1,
+    "namespace": "org.apache.samza.sql.avro.schemas",
+    "type": "record",
+    "fields": [
+        {
+           "name": "non_nullable_union_value",
+           "doc": "union Value.",
+           "type": ["int", "string"]
+        }
+    ]
+}
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexUnion.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexUnion.java
new file mode 100644
index 0000000..dd746e1
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexUnion.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.avro.schemas;
+@SuppressWarnings("all")
+@org.apache.avro.specific.AvroGenerated
+public class ComplexUnion extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
+  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"ComplexUnion\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"fields\":[{\"name\":\"non_nullable_union_value\",\"type\":[\"int\",\"string\"],\"doc\":\"union Value.\"}],\"version\":1}");
+  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+  /** union Value. */
+  @Deprecated public java.lang.Object non_nullable_union_value;
+
+  /**
+   * Default constructor.  Note that this does not initialize fields
+   * to their default values from the schema.  If that is desired then
+   * one should use <code>newBuilder()</code>.
+   */
+  public ComplexUnion() {}
+
+  /**
+   * All-args constructor.
+   */
+  public ComplexUnion(java.lang.Object non_nullable_union_value) {
+    this.non_nullable_union_value = non_nullable_union_value;
+  }
+
+  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+  // Used by DatumWriter.  Applications should not call.
+  public java.lang.Object get(int field$) {
+    switch (field$) {
+    case 0: return non_nullable_union_value;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+  // Used by DatumReader.  Applications should not call.
+  @SuppressWarnings(value="unchecked")
+  public void put(int field$, java.lang.Object value$) {
+    switch (field$) {
+    case 0: non_nullable_union_value = (java.lang.Object)value$; break;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+
+  /**
+   * Gets the value of the 'non_nullable_union_value' field.
+   * union Value.   */
+  public java.lang.Object getNonNullableUnionValue() {
+    return non_nullable_union_value;
+  }
+
+  /**
+   * Sets the value of the 'non_nullable_union_value' field.
+   * union Value.   * @param value the value to set.
+   */
+  public void setNonNullableUnionValue(java.lang.Object value) {
+    this.non_nullable_union_value = value;
+  }
+
+  /** Creates a new ComplexUnion RecordBuilder */
+  public static org.apache.samza.sql.avro.schemas.ComplexUnion.Builder newBuilder() {
+    return new org.apache.samza.sql.avro.schemas.ComplexUnion.Builder();
+  }
+
+  /** Creates a new ComplexUnion RecordBuilder by copying an existing Builder */
+  public static org.apache.samza.sql.avro.schemas.ComplexUnion.Builder newBuilder(org.apache.samza.sql.avro.schemas.ComplexUnion.Builder other) {
+    return new org.apache.samza.sql.avro.schemas.ComplexUnion.Builder(other);
+  }
+
+  /** Creates a new ComplexUnion RecordBuilder by copying an existing ComplexUnion instance */
+  public static org.apache.samza.sql.avro.schemas.ComplexUnion.Builder newBuilder(org.apache.samza.sql.avro.schemas.ComplexUnion other) {
+    return new org.apache.samza.sql.avro.schemas.ComplexUnion.Builder(other);
+  }
+
+  /**
+   * RecordBuilder for ComplexUnion instances.
+   */
+  public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<ComplexUnion>
+    implements org.apache.avro.data.RecordBuilder<ComplexUnion> {
+
+    private java.lang.Object non_nullable_union_value;
+
+    /** Creates a new Builder */
+    private Builder() {
+      super(org.apache.samza.sql.avro.schemas.ComplexUnion.SCHEMA$);
+    }
+
+    /** Creates a Builder by copying an existing Builder */
+    private Builder(org.apache.samza.sql.avro.schemas.ComplexUnion.Builder other) {
+      super(other);
+      if (isValidValue(fields()[0], other.non_nullable_union_value)) {
+        this.non_nullable_union_value = data().deepCopy(fields()[0].schema(), other.non_nullable_union_value);
+        fieldSetFlags()[0] = true;
+      }
+    }
+
+    /** Creates a Builder by copying an existing ComplexUnion instance */
+    private Builder(org.apache.samza.sql.avro.schemas.ComplexUnion other) {
+            super(org.apache.samza.sql.avro.schemas.ComplexUnion.SCHEMA$);
+      if (isValidValue(fields()[0], other.non_nullable_union_value)) {
+        this.non_nullable_union_value = data().deepCopy(fields()[0].schema(), other.non_nullable_union_value);
+        fieldSetFlags()[0] = true;
+      }
+    }
+
+    /** Gets the value of the 'non_nullable_union_value' field */
+    public java.lang.Object getNonNullableUnionValue() {
+      return non_nullable_union_value;
+    }
+
+    /** Sets the value of the 'non_nullable_union_value' field */
+    public org.apache.samza.sql.avro.schemas.ComplexUnion.Builder setNonNullableUnionValue(java.lang.Object value) {
+      validate(fields()[0], value);
+      this.non_nullable_union_value = value;
+      fieldSetFlags()[0] = true;
+      return this;
+    }
+
+    /** Checks whether the 'non_nullable_union_value' field has been set */
+    public boolean hasNonNullableUnionValue() {
+      return fieldSetFlags()[0];
+    }
+
+    /** Clears the value of the 'non_nullable_union_value' field */
+    public org.apache.samza.sql.avro.schemas.ComplexUnion.Builder clearNonNullableUnionValue() {
+      non_nullable_union_value = null;
+      fieldSetFlags()[0] = false;
+      return this;
+    }
+
+    @Override
+    public ComplexUnion build() {
+      try {
+        ComplexUnion record = new ComplexUnion();
+        record.non_nullable_union_value = fieldSetFlags()[0] ? this.non_nullable_union_value : (java.lang.Object) defaultValue(fields()[0]);
+        return record;
+      } catch (Exception e) {
+        throw new org.apache.avro.AvroRuntimeException(e);
+      }
+    }
+  }
+}