You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by bu...@apache.org on 2017/08/18 14:57:30 UTC

[6/7] avro git commit: AVRO-1931: Reader is now compatible if able to read all branches of union

AVRO-1931: Reader is now compatible if able to read all branches of union

Closes #199

Signed-off-by: Sriharsha Chintalapani <sr...@apache.org>
Signed-off-by: Nandor Kollar <nk...@cloudera.com>
Signed-off-by: Sean Busbey <bu...@apache.org>
(cherry picked from commit 15651fc95e058d1b3cc165a70c367d1dc2bad8b7)
(cherry picked from commit 83cdd2bd70a4f5f16a52177fc46aa6dec412548c)


Project: http://git-wip-us.apache.org/repos/asf/avro/repo
Commit: http://git-wip-us.apache.org/repos/asf/avro/commit/522b59b0
Tree: http://git-wip-us.apache.org/repos/asf/avro/tree/522b59b0
Diff: http://git-wip-us.apache.org/repos/asf/avro/diff/522b59b0

Branch: refs/heads/branch-1.7
Commit: 522b59b04f3cf1300908bb91b30b89af1089621e
Parents: 492ab98
Author: Anders Sundelin <an...@ericsson.com>
Authored: Thu Mar 2 11:08:23 2017 +0100
Committer: Sean Busbey <bu...@apache.org>
Committed: Fri Aug 18 09:56:53 2017 -0500

----------------------------------------------------------------------
 .../org/apache/avro/SchemaCompatibility.java    |  17 +-
 .../TestReadingWritingDataInEvolvedSchemas.java | 284 +++++++++++++++++++
 .../apache/avro/TestSchemaCompatibility.java    |  34 +++
 3 files changed, 329 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/avro/blob/522b59b0/lang/java/avro/src/main/java/org/apache/avro/SchemaCompatibility.java
----------------------------------------------------------------------
diff --git a/lang/java/avro/src/main/java/org/apache/avro/SchemaCompatibility.java b/lang/java/avro/src/main/java/org/apache/avro/SchemaCompatibility.java
index 9ac6dc8..c713c32 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/SchemaCompatibility.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/SchemaCompatibility.java
@@ -350,10 +350,15 @@ public class SchemaCompatibility {
       } else {
         // Reader and writer have different schema types:
 
-        // Handle the corner case where writer is a union of a singleton branch: { X } === X
-        if ((writer.getType() == Schema.Type.UNION)
-            && writer.getTypes().size() == 1) {
-          return getCompatibility(reader, writer.getTypes().get(0));
+        // Reader compatible with all branches of a writer union is compatible
+        if (writer.getType() == Schema.Type.UNION) {
+          for (Schema s : writer.getTypes()) {
+            SchemaCompatibilityType compatibility = getCompatibility(reader, s);
+            if (compatibility == SchemaCompatibilityType.INCOMPATIBLE) {
+              return SchemaCompatibilityType.INCOMPATIBLE;
+            }
+          }
+          return SchemaCompatibilityType.COMPATIBLE;
         }
 
         switch (reader.getType()) {
@@ -380,12 +385,12 @@ public class SchemaCompatibility {
                 : SchemaCompatibilityType.INCOMPATIBLE;
           }
           case BYTES: {
-              return (writer.getType() == Type.STRING)
+            return (writer.getType() == Type.STRING)
                       ? SchemaCompatibilityType.COMPATIBLE
                       : SchemaCompatibilityType.INCOMPATIBLE;
                 }
           case STRING: {
-              return (writer.getType() == Type.BYTES)
+            return (writer.getType() == Type.BYTES)
                   ? SchemaCompatibilityType.COMPATIBLE
                   : SchemaCompatibilityType.INCOMPATIBLE;
             }

http://git-wip-us.apache.org/repos/asf/avro/blob/522b59b0/lang/java/avro/src/test/java/org/apache/avro/TestReadingWritingDataInEvolvedSchemas.java
----------------------------------------------------------------------
diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestReadingWritingDataInEvolvedSchemas.java b/lang/java/avro/src/test/java/org/apache/avro/TestReadingWritingDataInEvolvedSchemas.java
new file mode 100644
index 0000000..0012876
--- /dev/null
+++ b/lang/java/avro/src/test/java/org/apache/avro/TestReadingWritingDataInEvolvedSchemas.java
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.EnumSymbol;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class TestReadingWritingDataInEvolvedSchemas {
+
+  private static final String RECORD_A = "RecordA";
+  private static final String FIELD_A = "fieldA";
+  private static final char LATIN_SMALL_LETTER_O_WITH_DIARESIS = '\u00F6';
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  private static final Schema DOUBLE_RECORD = SchemaBuilder.record(RECORD_A) //
+      .fields() //
+      .name(FIELD_A).type().doubleType().noDefault() //
+      .endRecord();
+  private static final Schema FLOAT_RECORD = SchemaBuilder.record(RECORD_A) //
+      .fields() //
+      .name(FIELD_A).type().floatType().noDefault() //
+      .endRecord();
+  private static final Schema LONG_RECORD = SchemaBuilder.record(RECORD_A) //
+      .fields() //
+      .name(FIELD_A).type().longType().noDefault() //
+      .endRecord();
+  private static final Schema INT_RECORD = SchemaBuilder.record(RECORD_A) //
+      .fields() //
+      .name(FIELD_A).type().intType().noDefault() //
+      .endRecord();
+  private static final Schema UNION_INT_LONG_FLOAT_DOUBLE_RECORD = SchemaBuilder.record(RECORD_A) //
+      .fields() //
+      .name(FIELD_A).type().unionOf().doubleType().and().floatType().and().longType().and().intType().endUnion()
+      .noDefault() //
+      .endRecord();
+  private static final Schema STRING_RECORD = SchemaBuilder.record(RECORD_A) //
+      .fields() //
+      .name(FIELD_A).type().stringType().noDefault() //
+      .endRecord();
+  private static final Schema BYTES_RECORD = SchemaBuilder.record(RECORD_A) //
+      .fields() //
+      .name(FIELD_A).type().bytesType().noDefault() //
+      .endRecord();
+  private static final Schema UNION_STRING_BYTES_RECORD = SchemaBuilder.record(RECORD_A) //
+      .fields() //
+      .name(FIELD_A).type().unionOf().stringType().and().bytesType().endUnion()
+      .noDefault() //
+      .endRecord();
+  private static final Schema ENUM_AB_RECORD = SchemaBuilder.record(RECORD_A) //
+      .fields() //
+      .name(FIELD_A).type().enumeration("Enum1").symbols("A", "B").noDefault() //
+      .endRecord();
+  private static final Schema ENUM_ABC_RECORD = SchemaBuilder.record(RECORD_A) //
+      .fields() //
+      .name(FIELD_A).type().enumeration("Enum1").symbols("A", "B", "C").noDefault() //
+      .endRecord();
+
+  @Test
+  public void doubleWrittenWithUnionSchemaIsConvertedToDoubleSchema() throws Exception {
+    Schema writer = UNION_INT_LONG_FLOAT_DOUBLE_RECORD;
+    Record record = defaultRecordWithSchema(writer, FIELD_A, 42.0);
+    byte[] encoded = encodeGenericBlob(record);
+    Record decoded = decodeGenericBlob(DOUBLE_RECORD, writer, encoded);
+    assertEquals(42.0, decoded.get(FIELD_A));
+  }
+
+  @Test
+  public void doubleWrittenWithUnionSchemaIsNotConvertedToFloatSchema() throws Exception {
+    expectedException.expect(AvroTypeException.class);
+    expectedException.expectMessage("Found double, expecting float");
+    Schema writer = UNION_INT_LONG_FLOAT_DOUBLE_RECORD;
+    Record record = defaultRecordWithSchema(writer, FIELD_A, 42.0);
+    byte[] encoded = encodeGenericBlob(record);
+    decodeGenericBlob(FLOAT_RECORD, writer, encoded);
+  }
+
+  @Test
+  public void floatWrittenWithUnionSchemaIsNotConvertedToLongSchema() throws Exception {
+    expectedException.expect(AvroTypeException.class);
+    expectedException.expectMessage("Found float, expecting long");
+    Schema writer = UNION_INT_LONG_FLOAT_DOUBLE_RECORD;
+    Record record = defaultRecordWithSchema(writer, FIELD_A, 42.0f);
+    byte[] encoded = encodeGenericBlob(record);
+    decodeGenericBlob(LONG_RECORD, writer, encoded);
+  }
+
+  @Test
+  public void longWrittenWithUnionSchemaIsNotConvertedToIntSchema() throws Exception {
+    expectedException.expect(AvroTypeException.class);
+    expectedException.expectMessage("Found long, expecting int");
+    Schema writer = UNION_INT_LONG_FLOAT_DOUBLE_RECORD;
+    Record record = defaultRecordWithSchema(writer, FIELD_A, 42L);
+    byte[] encoded = encodeGenericBlob(record);
+    decodeGenericBlob(INT_RECORD, writer, encoded);
+  }
+
+  @Test
+  public void intWrittenWithUnionSchemaIsConvertedToAllNumberSchemas() throws Exception {
+    Schema writer = UNION_INT_LONG_FLOAT_DOUBLE_RECORD;
+    Record record = defaultRecordWithSchema(writer, FIELD_A, 42);
+    byte[] encoded = encodeGenericBlob(record);
+    assertEquals(42.0, decodeGenericBlob(DOUBLE_RECORD, writer, encoded).get(FIELD_A));
+    assertEquals(42.0f, decodeGenericBlob(FLOAT_RECORD, writer, encoded).get(FIELD_A));
+    assertEquals(42L, decodeGenericBlob(LONG_RECORD, writer, encoded).get(FIELD_A));
+    assertEquals(42, decodeGenericBlob(INT_RECORD, writer, encoded).get(FIELD_A));
+  }
+
+  @Test
+  public void asciiStringWrittenWithUnionSchemaIsConvertedToBytesSchema() throws Exception {
+    Schema writer = UNION_STRING_BYTES_RECORD;
+    Record record = defaultRecordWithSchema(writer, FIELD_A, "42");
+    byte[] encoded = encodeGenericBlob(record);
+    ByteBuffer actual = (ByteBuffer) decodeGenericBlob(BYTES_RECORD, writer, encoded).get(FIELD_A);
+    assertArrayEquals("42".getBytes("UTF-8"), actual.array());
+  }
+
+  @Test
+  public void utf8StringWrittenWithUnionSchemaIsConvertedToBytesSchema() throws Exception {
+    String goeran = String.format("G%sran", LATIN_SMALL_LETTER_O_WITH_DIARESIS);
+    Schema writer = UNION_STRING_BYTES_RECORD;
+    Record record = defaultRecordWithSchema(writer, FIELD_A, goeran);
+    byte[] encoded = encodeGenericBlob(record);
+    ByteBuffer actual = (ByteBuffer) decodeGenericBlob(BYTES_RECORD, writer, encoded).get(FIELD_A);
+    assertArrayEquals(goeran.getBytes("UTF-8"), actual.array());
+  }
+
+  @Test
+  public void asciiBytesWrittenWithUnionSchemaIsConvertedToStringSchema() throws Exception {
+    Schema writer = UNION_STRING_BYTES_RECORD;
+    ByteBuffer buf = ByteBuffer.wrap("42".getBytes("UTF-8"));
+    Record record = defaultRecordWithSchema(writer, FIELD_A, buf);
+    byte[] encoded = encodeGenericBlob(record);
+    CharSequence read =  (CharSequence) decodeGenericBlob(STRING_RECORD, writer, encoded).get(FIELD_A);
+    assertEquals("42", read.toString());
+  }
+
+  @Test
+  public void utf8BytesWrittenWithUnionSchemaIsConvertedToStringSchema() throws Exception {
+    String goeran = String.format("G%sran", LATIN_SMALL_LETTER_O_WITH_DIARESIS);
+    Schema writer = UNION_STRING_BYTES_RECORD;
+    Record record = defaultRecordWithSchema(writer, FIELD_A, goeran);
+    byte[] encoded = encodeGenericBlob(record);
+    CharSequence read =  (CharSequence) decodeGenericBlob(STRING_RECORD, writer, encoded).get(FIELD_A);
+    assertEquals(goeran, read.toString());
+  }
+
+  @Test
+  public void enumRecordCanBeReadWithExtendedEnumSchema() throws Exception {
+    Schema writer = ENUM_AB_RECORD;
+    Record record = defaultRecordWithSchema(writer, FIELD_A, new EnumSymbol(writer, "A"));
+    byte[] encoded = encodeGenericBlob(record);
+    Record decoded = decodeGenericBlob(ENUM_ABC_RECORD, writer, encoded);
+    assertEquals("A", decoded.get(FIELD_A).toString());
+  }
+
+  @Test
+  public void enumRecordWithExtendedSchemaCanBeReadWithOriginalEnumSchemaIfOnlyOldValues() throws Exception {
+    Schema writer = ENUM_ABC_RECORD;
+    Record record = defaultRecordWithSchema(writer, FIELD_A, new EnumSymbol(writer, "A"));
+    byte[] encoded = encodeGenericBlob(record);
+    Record decoded = decodeGenericBlob(ENUM_AB_RECORD, writer, encoded);
+    assertEquals("A", decoded.get(FIELD_A).toString());
+  }
+
+  @Test
+  public void enumRecordWithExtendedSchemaCanNotBeReadIfNewValuesAreUsed() throws Exception {
+    expectedException.expect(AvroTypeException.class);
+    expectedException.expectMessage("No match for C");
+    Schema writer = ENUM_ABC_RECORD;
+    Record record = defaultRecordWithSchema(writer, FIELD_A, new EnumSymbol(writer, "C"));
+    byte[] encoded = encodeGenericBlob(record);
+    decodeGenericBlob(ENUM_AB_RECORD, writer, encoded);
+  }
+
+  @Test
+  public void recordWrittenWithExtendedSchemaCanBeReadWithOriginalSchemaButLossOfData() throws Exception {
+    Schema writer = SchemaBuilder.record(RECORD_A) //
+        .fields() //
+        .name("newTopField").type().stringType().noDefault() //
+        .name(FIELD_A).type().intType().noDefault() //
+        .endRecord();
+    Record record = defaultRecordWithSchema(writer, FIELD_A, 42);
+    record.put("newTopField", "not decoded");
+    byte[] encoded = encodeGenericBlob(record);
+    Record decoded = decodeGenericBlob(INT_RECORD, writer, encoded);
+    assertEquals(42, decoded.get(FIELD_A));
+    assertNull(decoded.get("newTopField"));
+  }
+
+  @Test
+  public void readerWithoutDefaultValueThrowsException() throws Exception {
+    expectedException.expect(AvroTypeException.class);
+    expectedException.expectMessage("missing required field newField");
+    Schema reader = SchemaBuilder.record(RECORD_A) //
+        .fields() //
+        .name("newField").type().intType().noDefault() //
+        .name(FIELD_A).type().intType().noDefault() //
+        .endRecord();
+    Record record = defaultRecordWithSchema(INT_RECORD, FIELD_A, 42);
+    byte[] encoded = encodeGenericBlob(record);
+    decodeGenericBlob(reader, INT_RECORD, encoded);
+  }
+
+  @Test
+  public void readerWithDefaultValueIsApplied() throws Exception {
+    Schema reader = SchemaBuilder.record(RECORD_A) //
+        .fields() //
+        .name("newFieldWithDefault").type().intType().intDefault(314) //
+        .name(FIELD_A).type().intType().noDefault() //
+        .endRecord();
+    Record record = defaultRecordWithSchema(INT_RECORD, FIELD_A, 42);
+    byte[] encoded = encodeGenericBlob(record);
+    Record decoded = decodeGenericBlob(reader, INT_RECORD, encoded);
+    assertEquals(42, decoded.get(FIELD_A));
+    assertEquals(314, decoded.get("newFieldWithDefault"));
+  }
+
+  private <T> Record defaultRecordWithSchema(Schema schema, String key, T value) {
+    Record data = new GenericData.Record(schema);
+    data.put(key, value);
+    return data;
+  }
+
+  private static byte[] encodeGenericBlob(GenericRecord data)
+      throws IOException {
+    DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(data.getSchema());
+    ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+    Encoder encoder = EncoderFactory.get().binaryEncoder(outStream, null);
+    writer.write(data, encoder);
+    encoder.flush();
+    outStream.close();
+    return outStream.toByteArray();
+  }
+
+  private static Record decodeGenericBlob(Schema expectedSchema, Schema schemaOfBlob, byte[] blob) throws IOException {
+    if (blob == null) {
+      return null;
+    }
+    GenericDatumReader<Record> reader = new GenericDatumReader<Record>();
+    reader.setExpected(expectedSchema);
+    reader.setSchema(schemaOfBlob);
+    Decoder decoder = DecoderFactory.get().binaryDecoder(blob, null);
+    Record data = null;
+    data = reader.read(null, decoder);
+    return data;
+  }
+}

http://git-wip-us.apache.org/repos/asf/avro/blob/522b59b0/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibility.java
----------------------------------------------------------------------
diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibility.java b/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibility.java
index 25a74e1..ebe3a61 100644
--- a/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibility.java
+++ b/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibility.java
@@ -82,12 +82,24 @@ public class TestSchemaCompatibility {
       Schema.createUnion(list(INT_SCHEMA));
   private static final Schema LONG_UNION_SCHEMA =
       Schema.createUnion(list(LONG_SCHEMA));
+  private static final Schema FLOAT_UNION_SCHEMA =
+      Schema.createUnion(list(FLOAT_SCHEMA));
+  private static final Schema DOUBLE_UNION_SCHEMA =
+      Schema.createUnion(list(DOUBLE_SCHEMA));
   private static final Schema STRING_UNION_SCHEMA =
       Schema.createUnion(list(STRING_SCHEMA));
+  private static final Schema BYTES_UNION_SCHEMA =
+      Schema.createUnion(list(BYTES_SCHEMA));
   private static final Schema INT_STRING_UNION_SCHEMA =
       Schema.createUnion(list(INT_SCHEMA, STRING_SCHEMA));
   private static final Schema STRING_INT_UNION_SCHEMA =
       Schema.createUnion(list(STRING_SCHEMA, INT_SCHEMA));
+  private static final Schema INT_FLOAT_UNION_SCHEMA =
+      Schema.createUnion(list(INT_SCHEMA, FLOAT_SCHEMA));
+  private static final Schema INT_LONG_UNION_SCHEMA =
+      Schema.createUnion(list(INT_SCHEMA, LONG_SCHEMA));
+  private static final Schema INT_LONG_FLOAT_DOUBLE_UNION_SCHEMA =
+      Schema.createUnion(list(INT_SCHEMA, LONG_SCHEMA, FLOAT_SCHEMA, DOUBLE_SCHEMA));
 
   // Non recursive records:
   private static final Schema EMPTY_RECORD1 =
@@ -363,8 +375,27 @@ public class TestSchemaCompatibility {
       new ReaderWriter(INT_STRING_UNION_SCHEMA, STRING_INT_UNION_SCHEMA),
       new ReaderWriter(INT_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
       new ReaderWriter(LONG_UNION_SCHEMA, INT_UNION_SCHEMA),
+      new ReaderWriter(FLOAT_UNION_SCHEMA, INT_UNION_SCHEMA),
+      new ReaderWriter(DOUBLE_UNION_SCHEMA, INT_UNION_SCHEMA),
+      new ReaderWriter(LONG_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
+      new ReaderWriter(FLOAT_UNION_SCHEMA, LONG_UNION_SCHEMA),
+      new ReaderWriter(DOUBLE_UNION_SCHEMA, LONG_UNION_SCHEMA),
+      new ReaderWriter(FLOAT_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
+      new ReaderWriter(DOUBLE_UNION_SCHEMA, FLOAT_UNION_SCHEMA),
+      new ReaderWriter(STRING_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
+      new ReaderWriter(STRING_UNION_SCHEMA, BYTES_UNION_SCHEMA),
+      new ReaderWriter(BYTES_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
+      new ReaderWriter(BYTES_UNION_SCHEMA, STRING_UNION_SCHEMA),
+      new ReaderWriter(DOUBLE_UNION_SCHEMA, INT_FLOAT_UNION_SCHEMA),
+
+      // Readers capable of reading all branches of a union are compatible
+      new ReaderWriter(FLOAT_SCHEMA, INT_FLOAT_UNION_SCHEMA),
+      new ReaderWriter(LONG_SCHEMA, INT_LONG_UNION_SCHEMA),
+      new ReaderWriter(DOUBLE_SCHEMA, INT_FLOAT_UNION_SCHEMA),
+      new ReaderWriter(DOUBLE_SCHEMA, INT_LONG_FLOAT_DOUBLE_UNION_SCHEMA),
 
       // Special case of singleton unions:
+      new ReaderWriter(FLOAT_SCHEMA, FLOAT_UNION_SCHEMA),
       new ReaderWriter(INT_UNION_SCHEMA, INT_SCHEMA),
       new ReaderWriter(INT_SCHEMA, INT_UNION_SCHEMA),
 
@@ -435,6 +466,9 @@ public class TestSchemaCompatibility {
       // Tests involving unions:
       new ReaderWriter(INT_UNION_SCHEMA, INT_STRING_UNION_SCHEMA),
       new ReaderWriter(STRING_UNION_SCHEMA, INT_STRING_UNION_SCHEMA),
+      new ReaderWriter(FLOAT_SCHEMA, INT_LONG_FLOAT_DOUBLE_UNION_SCHEMA),
+      new ReaderWriter(LONG_SCHEMA, INT_FLOAT_UNION_SCHEMA),
+      new ReaderWriter(INT_SCHEMA, INT_FLOAT_UNION_SCHEMA),
 
       new ReaderWriter(EMPTY_RECORD2, EMPTY_RECORD1),
       new ReaderWriter(A_INT_RECORD1, EMPTY_RECORD1),