You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by bu...@apache.org on 2017/08/18 14:57:28 UTC
[4/7] avro git commit: AVRO-1931: Reader is now compatible if able to
read all branches of union
AVRO-1931: Reader is now compatible if able to read all branches of union
Closes #199
Signed-off-by: Sriharsha Chintalapani <sr...@apache.org>
Signed-off-by: Nandor Kollar <nk...@cloudera.com>
Signed-off-by: Sean Busbey <bu...@apache.org>
(cherry picked from commit 15651fc95e058d1b3cc165a70c367d1dc2bad8b7)
Project: http://git-wip-us.apache.org/repos/asf/avro/repo
Commit: http://git-wip-us.apache.org/repos/asf/avro/commit/83cdd2bd
Tree: http://git-wip-us.apache.org/repos/asf/avro/tree/83cdd2bd
Diff: http://git-wip-us.apache.org/repos/asf/avro/diff/83cdd2bd
Branch: refs/heads/branch-1.8
Commit: 83cdd2bd70a4f5f16a52177fc46aa6dec412548c
Parents: ea47217
Author: Anders Sundelin <an...@ericsson.com>
Authored: Thu Mar 2 11:08:23 2017 +0100
Committer: Sean Busbey <bu...@apache.org>
Committed: Fri Aug 18 09:55:58 2017 -0500
----------------------------------------------------------------------
.../org/apache/avro/SchemaCompatibility.java | 17 +-
.../TestReadingWritingDataInEvolvedSchemas.java | 284 +++++++++++++++++++
.../apache/avro/TestSchemaCompatibility.java | 34 +++
3 files changed, 329 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/avro/blob/83cdd2bd/lang/java/avro/src/main/java/org/apache/avro/SchemaCompatibility.java
----------------------------------------------------------------------
diff --git a/lang/java/avro/src/main/java/org/apache/avro/SchemaCompatibility.java b/lang/java/avro/src/main/java/org/apache/avro/SchemaCompatibility.java
index 9ac6dc8..c713c32 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/SchemaCompatibility.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/SchemaCompatibility.java
@@ -350,10 +350,15 @@ public class SchemaCompatibility {
} else {
// Reader and writer have different schema types:
- // Handle the corner case where writer is a union of a singleton branch: { X } === X
- if ((writer.getType() == Schema.Type.UNION)
- && writer.getTypes().size() == 1) {
- return getCompatibility(reader, writer.getTypes().get(0));
+ // Reader compatible with all branches of a writer union is compatible
+ if (writer.getType() == Schema.Type.UNION) {
+ for (Schema s : writer.getTypes()) {
+ SchemaCompatibilityType compatibility = getCompatibility(reader, s);
+ if (compatibility == SchemaCompatibilityType.INCOMPATIBLE) {
+ return SchemaCompatibilityType.INCOMPATIBLE;
+ }
+ }
+ return SchemaCompatibilityType.COMPATIBLE;
}
switch (reader.getType()) {
@@ -380,12 +385,12 @@ public class SchemaCompatibility {
: SchemaCompatibilityType.INCOMPATIBLE;
}
case BYTES: {
- return (writer.getType() == Type.STRING)
+ return (writer.getType() == Type.STRING)
? SchemaCompatibilityType.COMPATIBLE
: SchemaCompatibilityType.INCOMPATIBLE;
}
case STRING: {
- return (writer.getType() == Type.BYTES)
+ return (writer.getType() == Type.BYTES)
? SchemaCompatibilityType.COMPATIBLE
: SchemaCompatibilityType.INCOMPATIBLE;
}
http://git-wip-us.apache.org/repos/asf/avro/blob/83cdd2bd/lang/java/avro/src/test/java/org/apache/avro/TestReadingWritingDataInEvolvedSchemas.java
----------------------------------------------------------------------
diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestReadingWritingDataInEvolvedSchemas.java b/lang/java/avro/src/test/java/org/apache/avro/TestReadingWritingDataInEvolvedSchemas.java
new file mode 100644
index 0000000..0012876
--- /dev/null
+++ b/lang/java/avro/src/test/java/org/apache/avro/TestReadingWritingDataInEvolvedSchemas.java
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.EnumSymbol;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class TestReadingWritingDataInEvolvedSchemas {
+
+ private static final String RECORD_A = "RecordA";
+ private static final String FIELD_A = "fieldA";
+ private static final char LATIN_SMALL_LETTER_O_WITH_DIARESIS = '\u00F6';
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ private static final Schema DOUBLE_RECORD = SchemaBuilder.record(RECORD_A) //
+ .fields() //
+ .name(FIELD_A).type().doubleType().noDefault() //
+ .endRecord();
+ private static final Schema FLOAT_RECORD = SchemaBuilder.record(RECORD_A) //
+ .fields() //
+ .name(FIELD_A).type().floatType().noDefault() //
+ .endRecord();
+ private static final Schema LONG_RECORD = SchemaBuilder.record(RECORD_A) //
+ .fields() //
+ .name(FIELD_A).type().longType().noDefault() //
+ .endRecord();
+ private static final Schema INT_RECORD = SchemaBuilder.record(RECORD_A) //
+ .fields() //
+ .name(FIELD_A).type().intType().noDefault() //
+ .endRecord();
+ private static final Schema UNION_INT_LONG_FLOAT_DOUBLE_RECORD = SchemaBuilder.record(RECORD_A) //
+ .fields() //
+ .name(FIELD_A).type().unionOf().doubleType().and().floatType().and().longType().and().intType().endUnion()
+ .noDefault() //
+ .endRecord();
+ private static final Schema STRING_RECORD = SchemaBuilder.record(RECORD_A) //
+ .fields() //
+ .name(FIELD_A).type().stringType().noDefault() //
+ .endRecord();
+ private static final Schema BYTES_RECORD = SchemaBuilder.record(RECORD_A) //
+ .fields() //
+ .name(FIELD_A).type().bytesType().noDefault() //
+ .endRecord();
+ private static final Schema UNION_STRING_BYTES_RECORD = SchemaBuilder.record(RECORD_A) //
+ .fields() //
+ .name(FIELD_A).type().unionOf().stringType().and().bytesType().endUnion()
+ .noDefault() //
+ .endRecord();
+ private static final Schema ENUM_AB_RECORD = SchemaBuilder.record(RECORD_A) //
+ .fields() //
+ .name(FIELD_A).type().enumeration("Enum1").symbols("A", "B").noDefault() //
+ .endRecord();
+ private static final Schema ENUM_ABC_RECORD = SchemaBuilder.record(RECORD_A) //
+ .fields() //
+ .name(FIELD_A).type().enumeration("Enum1").symbols("A", "B", "C").noDefault() //
+ .endRecord();
+
+ @Test
+ public void doubleWrittenWithUnionSchemaIsConvertedToDoubleSchema() throws Exception {
+ Schema writer = UNION_INT_LONG_FLOAT_DOUBLE_RECORD;
+ Record record = defaultRecordWithSchema(writer, FIELD_A, 42.0);
+ byte[] encoded = encodeGenericBlob(record);
+ Record decoded = decodeGenericBlob(DOUBLE_RECORD, writer, encoded);
+ assertEquals(42.0, decoded.get(FIELD_A));
+ }
+
+ @Test
+ public void doubleWrittenWithUnionSchemaIsNotConvertedToFloatSchema() throws Exception {
+ expectedException.expect(AvroTypeException.class);
+ expectedException.expectMessage("Found double, expecting float");
+ Schema writer = UNION_INT_LONG_FLOAT_DOUBLE_RECORD;
+ Record record = defaultRecordWithSchema(writer, FIELD_A, 42.0);
+ byte[] encoded = encodeGenericBlob(record);
+ decodeGenericBlob(FLOAT_RECORD, writer, encoded);
+ }
+
+ @Test
+ public void floatWrittenWithUnionSchemaIsNotConvertedToLongSchema() throws Exception {
+ expectedException.expect(AvroTypeException.class);
+ expectedException.expectMessage("Found float, expecting long");
+ Schema writer = UNION_INT_LONG_FLOAT_DOUBLE_RECORD;
+ Record record = defaultRecordWithSchema(writer, FIELD_A, 42.0f);
+ byte[] encoded = encodeGenericBlob(record);
+ decodeGenericBlob(LONG_RECORD, writer, encoded);
+ }
+
+ @Test
+ public void longWrittenWithUnionSchemaIsNotConvertedToIntSchema() throws Exception {
+ expectedException.expect(AvroTypeException.class);
+ expectedException.expectMessage("Found long, expecting int");
+ Schema writer = UNION_INT_LONG_FLOAT_DOUBLE_RECORD;
+ Record record = defaultRecordWithSchema(writer, FIELD_A, 42L);
+ byte[] encoded = encodeGenericBlob(record);
+ decodeGenericBlob(INT_RECORD, writer, encoded);
+ }
+
+ @Test
+ public void intWrittenWithUnionSchemaIsConvertedToAllNumberSchemas() throws Exception {
+ Schema writer = UNION_INT_LONG_FLOAT_DOUBLE_RECORD;
+ Record record = defaultRecordWithSchema(writer, FIELD_A, 42);
+ byte[] encoded = encodeGenericBlob(record);
+ assertEquals(42.0, decodeGenericBlob(DOUBLE_RECORD, writer, encoded).get(FIELD_A));
+ assertEquals(42.0f, decodeGenericBlob(FLOAT_RECORD, writer, encoded).get(FIELD_A));
+ assertEquals(42L, decodeGenericBlob(LONG_RECORD, writer, encoded).get(FIELD_A));
+ assertEquals(42, decodeGenericBlob(INT_RECORD, writer, encoded).get(FIELD_A));
+ }
+
+ @Test
+ public void asciiStringWrittenWithUnionSchemaIsConvertedToBytesSchema() throws Exception {
+ Schema writer = UNION_STRING_BYTES_RECORD;
+ Record record = defaultRecordWithSchema(writer, FIELD_A, "42");
+ byte[] encoded = encodeGenericBlob(record);
+ ByteBuffer actual = (ByteBuffer) decodeGenericBlob(BYTES_RECORD, writer, encoded).get(FIELD_A);
+ assertArrayEquals("42".getBytes("UTF-8"), actual.array());
+ }
+
+ @Test
+ public void utf8StringWrittenWithUnionSchemaIsConvertedToBytesSchema() throws Exception {
+ String goeran = String.format("G%sran", LATIN_SMALL_LETTER_O_WITH_DIARESIS);
+ Schema writer = UNION_STRING_BYTES_RECORD;
+ Record record = defaultRecordWithSchema(writer, FIELD_A, goeran);
+ byte[] encoded = encodeGenericBlob(record);
+ ByteBuffer actual = (ByteBuffer) decodeGenericBlob(BYTES_RECORD, writer, encoded).get(FIELD_A);
+ assertArrayEquals(goeran.getBytes("UTF-8"), actual.array());
+ }
+
+ @Test
+ public void asciiBytesWrittenWithUnionSchemaIsConvertedToStringSchema() throws Exception {
+ Schema writer = UNION_STRING_BYTES_RECORD;
+ ByteBuffer buf = ByteBuffer.wrap("42".getBytes("UTF-8"));
+ Record record = defaultRecordWithSchema(writer, FIELD_A, buf);
+ byte[] encoded = encodeGenericBlob(record);
+ CharSequence read = (CharSequence) decodeGenericBlob(STRING_RECORD, writer, encoded).get(FIELD_A);
+ assertEquals("42", read.toString());
+ }
+
+ @Test
+ public void utf8BytesWrittenWithUnionSchemaIsConvertedToStringSchema() throws Exception {
+ String goeran = String.format("G%sran", LATIN_SMALL_LETTER_O_WITH_DIARESIS);
+ Schema writer = UNION_STRING_BYTES_RECORD;
+ Record record = defaultRecordWithSchema(writer, FIELD_A, goeran);
+ byte[] encoded = encodeGenericBlob(record);
+ CharSequence read = (CharSequence) decodeGenericBlob(STRING_RECORD, writer, encoded).get(FIELD_A);
+ assertEquals(goeran, read.toString());
+ }
+
+ @Test
+ public void enumRecordCanBeReadWithExtendedEnumSchema() throws Exception {
+ Schema writer = ENUM_AB_RECORD;
+ Record record = defaultRecordWithSchema(writer, FIELD_A, new EnumSymbol(writer, "A"));
+ byte[] encoded = encodeGenericBlob(record);
+ Record decoded = decodeGenericBlob(ENUM_ABC_RECORD, writer, encoded);
+ assertEquals("A", decoded.get(FIELD_A).toString());
+ }
+
+ @Test
+ public void enumRecordWithExtendedSchemaCanBeReadWithOriginalEnumSchemaIfOnlyOldValues() throws Exception {
+ Schema writer = ENUM_ABC_RECORD;
+ Record record = defaultRecordWithSchema(writer, FIELD_A, new EnumSymbol(writer, "A"));
+ byte[] encoded = encodeGenericBlob(record);
+ Record decoded = decodeGenericBlob(ENUM_AB_RECORD, writer, encoded);
+ assertEquals("A", decoded.get(FIELD_A).toString());
+ }
+
+ @Test
+ public void enumRecordWithExtendedSchemaCanNotBeReadIfNewValuesAreUsed() throws Exception {
+ expectedException.expect(AvroTypeException.class);
+ expectedException.expectMessage("No match for C");
+ Schema writer = ENUM_ABC_RECORD;
+ Record record = defaultRecordWithSchema(writer, FIELD_A, new EnumSymbol(writer, "C"));
+ byte[] encoded = encodeGenericBlob(record);
+ decodeGenericBlob(ENUM_AB_RECORD, writer, encoded);
+ }
+
+ @Test
+ public void recordWrittenWithExtendedSchemaCanBeReadWithOriginalSchemaButLossOfData() throws Exception {
+ Schema writer = SchemaBuilder.record(RECORD_A) //
+ .fields() //
+ .name("newTopField").type().stringType().noDefault() //
+ .name(FIELD_A).type().intType().noDefault() //
+ .endRecord();
+ Record record = defaultRecordWithSchema(writer, FIELD_A, 42);
+ record.put("newTopField", "not decoded");
+ byte[] encoded = encodeGenericBlob(record);
+ Record decoded = decodeGenericBlob(INT_RECORD, writer, encoded);
+ assertEquals(42, decoded.get(FIELD_A));
+ assertNull(decoded.get("newTopField"));
+ }
+
+ @Test
+ public void readerWithoutDefaultValueThrowsException() throws Exception {
+ expectedException.expect(AvroTypeException.class);
+ expectedException.expectMessage("missing required field newField");
+ Schema reader = SchemaBuilder.record(RECORD_A) //
+ .fields() //
+ .name("newField").type().intType().noDefault() //
+ .name(FIELD_A).type().intType().noDefault() //
+ .endRecord();
+ Record record = defaultRecordWithSchema(INT_RECORD, FIELD_A, 42);
+ byte[] encoded = encodeGenericBlob(record);
+ decodeGenericBlob(reader, INT_RECORD, encoded);
+ }
+
+ @Test
+ public void readerWithDefaultValueIsApplied() throws Exception {
+ Schema reader = SchemaBuilder.record(RECORD_A) //
+ .fields() //
+ .name("newFieldWithDefault").type().intType().intDefault(314) //
+ .name(FIELD_A).type().intType().noDefault() //
+ .endRecord();
+ Record record = defaultRecordWithSchema(INT_RECORD, FIELD_A, 42);
+ byte[] encoded = encodeGenericBlob(record);
+ Record decoded = decodeGenericBlob(reader, INT_RECORD, encoded);
+ assertEquals(42, decoded.get(FIELD_A));
+ assertEquals(314, decoded.get("newFieldWithDefault"));
+ }
+
+ private <T> Record defaultRecordWithSchema(Schema schema, String key, T value) {
+ Record data = new GenericData.Record(schema);
+ data.put(key, value);
+ return data;
+ }
+
+ private static byte[] encodeGenericBlob(GenericRecord data)
+ throws IOException {
+ DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(data.getSchema());
+ ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+ Encoder encoder = EncoderFactory.get().binaryEncoder(outStream, null);
+ writer.write(data, encoder);
+ encoder.flush();
+ outStream.close();
+ return outStream.toByteArray();
+ }
+
+ private static Record decodeGenericBlob(Schema expectedSchema, Schema schemaOfBlob, byte[] blob) throws IOException {
+ if (blob == null) {
+ return null;
+ }
+ GenericDatumReader<Record> reader = new GenericDatumReader<Record>();
+ reader.setExpected(expectedSchema);
+ reader.setSchema(schemaOfBlob);
+ Decoder decoder = DecoderFactory.get().binaryDecoder(blob, null);
+ Record data = null;
+ data = reader.read(null, decoder);
+ return data;
+ }
+}
http://git-wip-us.apache.org/repos/asf/avro/blob/83cdd2bd/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibility.java
----------------------------------------------------------------------
diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibility.java b/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibility.java
index 5ff7225..e09d211 100644
--- a/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibility.java
+++ b/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibility.java
@@ -82,12 +82,24 @@ public class TestSchemaCompatibility {
Schema.createUnion(list(INT_SCHEMA));
private static final Schema LONG_UNION_SCHEMA =
Schema.createUnion(list(LONG_SCHEMA));
+ private static final Schema FLOAT_UNION_SCHEMA =
+ Schema.createUnion(list(FLOAT_SCHEMA));
+ private static final Schema DOUBLE_UNION_SCHEMA =
+ Schema.createUnion(list(DOUBLE_SCHEMA));
private static final Schema STRING_UNION_SCHEMA =
Schema.createUnion(list(STRING_SCHEMA));
+ private static final Schema BYTES_UNION_SCHEMA =
+ Schema.createUnion(list(BYTES_SCHEMA));
private static final Schema INT_STRING_UNION_SCHEMA =
Schema.createUnion(list(INT_SCHEMA, STRING_SCHEMA));
private static final Schema STRING_INT_UNION_SCHEMA =
Schema.createUnion(list(STRING_SCHEMA, INT_SCHEMA));
+ private static final Schema INT_FLOAT_UNION_SCHEMA =
+ Schema.createUnion(list(INT_SCHEMA, FLOAT_SCHEMA));
+ private static final Schema INT_LONG_UNION_SCHEMA =
+ Schema.createUnion(list(INT_SCHEMA, LONG_SCHEMA));
+ private static final Schema INT_LONG_FLOAT_DOUBLE_UNION_SCHEMA =
+ Schema.createUnion(list(INT_SCHEMA, LONG_SCHEMA, FLOAT_SCHEMA, DOUBLE_SCHEMA));
// Non recursive records:
private static final Schema EMPTY_RECORD1 =
@@ -363,8 +375,27 @@ public class TestSchemaCompatibility {
new ReaderWriter(INT_STRING_UNION_SCHEMA, STRING_INT_UNION_SCHEMA),
new ReaderWriter(INT_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
new ReaderWriter(LONG_UNION_SCHEMA, INT_UNION_SCHEMA),
+ new ReaderWriter(FLOAT_UNION_SCHEMA, INT_UNION_SCHEMA),
+ new ReaderWriter(DOUBLE_UNION_SCHEMA, INT_UNION_SCHEMA),
+ new ReaderWriter(LONG_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
+ new ReaderWriter(FLOAT_UNION_SCHEMA, LONG_UNION_SCHEMA),
+ new ReaderWriter(DOUBLE_UNION_SCHEMA, LONG_UNION_SCHEMA),
+ new ReaderWriter(FLOAT_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
+ new ReaderWriter(DOUBLE_UNION_SCHEMA, FLOAT_UNION_SCHEMA),
+ new ReaderWriter(STRING_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
+ new ReaderWriter(STRING_UNION_SCHEMA, BYTES_UNION_SCHEMA),
+ new ReaderWriter(BYTES_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
+ new ReaderWriter(BYTES_UNION_SCHEMA, STRING_UNION_SCHEMA),
+ new ReaderWriter(DOUBLE_UNION_SCHEMA, INT_FLOAT_UNION_SCHEMA),
+
+ // Readers capable of reading all branches of a union are compatible
+ new ReaderWriter(FLOAT_SCHEMA, INT_FLOAT_UNION_SCHEMA),
+ new ReaderWriter(LONG_SCHEMA, INT_LONG_UNION_SCHEMA),
+ new ReaderWriter(DOUBLE_SCHEMA, INT_FLOAT_UNION_SCHEMA),
+ new ReaderWriter(DOUBLE_SCHEMA, INT_LONG_FLOAT_DOUBLE_UNION_SCHEMA),
// Special case of singleton unions:
+ new ReaderWriter(FLOAT_SCHEMA, FLOAT_UNION_SCHEMA),
new ReaderWriter(INT_UNION_SCHEMA, INT_SCHEMA),
new ReaderWriter(INT_SCHEMA, INT_UNION_SCHEMA),
@@ -435,6 +466,9 @@ public class TestSchemaCompatibility {
// Tests involving unions:
new ReaderWriter(INT_UNION_SCHEMA, INT_STRING_UNION_SCHEMA),
new ReaderWriter(STRING_UNION_SCHEMA, INT_STRING_UNION_SCHEMA),
+ new ReaderWriter(FLOAT_SCHEMA, INT_LONG_FLOAT_DOUBLE_UNION_SCHEMA),
+ new ReaderWriter(LONG_SCHEMA, INT_FLOAT_UNION_SCHEMA),
+ new ReaderWriter(INT_SCHEMA, INT_FLOAT_UNION_SCHEMA),
new ReaderWriter(EMPTY_RECORD2, EMPTY_RECORD1),
new ReaderWriter(A_INT_RECORD1, EMPTY_RECORD1),