You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by bl...@apache.org on 2016/07/24 22:22:27 UTC
[1/2] avro git commit: AVRO-1704: Java: Add toByteArray and
fromByteArray to specific.
Repository: avro
Updated Branches:
refs/heads/master 5259f26ec -> f6c044e56
AVRO-1704: Java: Add toByteArray and fromByteArray to specific.
Project: http://git-wip-us.apache.org/repos/asf/avro/repo
Commit: http://git-wip-us.apache.org/repos/asf/avro/commit/f6c044e5
Tree: http://git-wip-us.apache.org/repos/asf/avro/tree/f6c044e5
Diff: http://git-wip-us.apache.org/repos/asf/avro/diff/f6c044e5
Branch: refs/heads/master
Commit: f6c044e56f9de7d3e34eeb1702c11fa4add04d84
Parents: e3c5da6
Author: Ryan Blue <bl...@apache.org>
Authored: Mon Jun 27 20:41:40 2016 -0700
Committer: Ryan Blue <bl...@apache.org>
Committed: Sun Jul 24 15:21:00 2016 -0700
----------------------------------------------------------------------
.../specific/TestRecordWithLogicalTypes.java | 27 ++++-
.../specific/TestRecordWithoutLogicalTypes.java | 22 ++++
.../specific/TestSpecificToFromByteArray.java | 102 +++++++++++++++++++
.../specific/templates/java/classic/record.vm | 28 ++++-
.../compiler/specific/TestSpecificCompiler.java | 4 +
.../avro/examples/baseball/Player.java | 27 ++++-
.../tools/src/test/compiler/output/Player.java | 27 ++++-
7 files changed, 228 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/avro/blob/f6c044e5/lang/java/avro/src/test/java/org/apache/avro/specific/TestRecordWithLogicalTypes.java
----------------------------------------------------------------------
diff --git a/lang/java/avro/src/test/java/org/apache/avro/specific/TestRecordWithLogicalTypes.java b/lang/java/avro/src/test/java/org/apache/avro/specific/TestRecordWithLogicalTypes.java
index c1b98c7..da3f878 100644
--- a/lang/java/avro/src/test/java/org/apache/avro/specific/TestRecordWithLogicalTypes.java
+++ b/lang/java/avro/src/test/java/org/apache/avro/specific/TestRecordWithLogicalTypes.java
@@ -5,7 +5,8 @@
*/
package org.apache.avro.specific;
-import org.apache.avro.Conversions;
+import org.apache.avro.message.BinaryMessageDecoder;
+import org.apache.avro.message.BinaryMessageEncoder;
import java.math.BigDecimal;
@@ -15,6 +16,26 @@ public class TestRecordWithLogicalTypes extends org.apache.avro.specific.Specifi
private static final long serialVersionUID = -4211233492739285532L;
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"TestRecordWithLogicalTypes\",\"namespace\":\"org.apache.avro.specific\",\"fields\":[{\"name\":\"b\",\"type\":\"boolean\"},{\"name\":\"i32\",\"type\":\"int\"},{\"name\":\"i64\",\"type\":\"long\"},{\"name\":\"f32\",\"type\":\"float\"},{\"name\":\"f64\",\"type\":\"double\"},{\"name\":\"s\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"d\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}},{\"name\":\"t\",\"type\":{\"type\":\"int\",\"logicalType\":\"time-millis\"}},{\"name\":\"ts\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}},{\"name\":\"dec\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":9,\"scale\":2}}]}");
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+
+ private static SpecificData MODEL$ = new SpecificData();
+
+ private static final BinaryMessageEncoder<TestRecordWithLogicalTypes> ENCODER =
+ new BinaryMessageEncoder<TestRecordWithLogicalTypes>(MODEL$, SCHEMA$);
+
+ private static final BinaryMessageDecoder<TestRecordWithLogicalTypes> DECODER =
+ new BinaryMessageDecoder<TestRecordWithLogicalTypes>(MODEL$, SCHEMA$);
+
+ /** Serializes this ${schema.getName()} to a ByteBuffer. */
+ public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException {
+ return ENCODER.encode(this);
+ }
+
+ /** Deserializes a ${schema.getName()} from a ByteBuffer. */
+ public static TestRecordWithLogicalTypes fromByteBuffer(
+ java.nio.ByteBuffer b) throws java.io.IOException {
+ return DECODER.decode(b);
+ }
+
@Deprecated public boolean b;
@Deprecated public int i32;
@Deprecated public long i64;
@@ -757,7 +778,7 @@ public class TestRecordWithLogicalTypes extends org.apache.avro.specific.Specifi
}
private static final org.apache.avro.io.DatumWriter
- WRITER$ = new org.apache.avro.specific.SpecificDatumWriter(SCHEMA$);
+ WRITER$ = new org.apache.avro.specific.SpecificDatumWriter(SCHEMA$);
@Override public void writeExternal(java.io.ObjectOutput out)
throws java.io.IOException {
@@ -765,7 +786,7 @@ public class TestRecordWithLogicalTypes extends org.apache.avro.specific.Specifi
}
private static final org.apache.avro.io.DatumReader
- READER$ = new org.apache.avro.specific.SpecificDatumReader(SCHEMA$);
+ READER$ = new org.apache.avro.specific.SpecificDatumReader(SCHEMA$);
@Override public void readExternal(java.io.ObjectInput in)
throws java.io.IOException {
http://git-wip-us.apache.org/repos/asf/avro/blob/f6c044e5/lang/java/avro/src/test/java/org/apache/avro/specific/TestRecordWithoutLogicalTypes.java
----------------------------------------------------------------------
diff --git a/lang/java/avro/src/test/java/org/apache/avro/specific/TestRecordWithoutLogicalTypes.java b/lang/java/avro/src/test/java/org/apache/avro/specific/TestRecordWithoutLogicalTypes.java
index 86fb8f0..92a5e60 100644
--- a/lang/java/avro/src/test/java/org/apache/avro/specific/TestRecordWithoutLogicalTypes.java
+++ b/lang/java/avro/src/test/java/org/apache/avro/specific/TestRecordWithoutLogicalTypes.java
@@ -5,6 +5,8 @@
*/
package org.apache.avro.specific;
+import org.apache.avro.message.BinaryMessageDecoder;
+import org.apache.avro.message.BinaryMessageEncoder;
import java.nio.ByteBuffer;
@SuppressWarnings("all")
@@ -12,6 +14,26 @@ import java.nio.ByteBuffer;
public class TestRecordWithoutLogicalTypes extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"TestRecordWithoutLogicalTypes\",\"namespace\":\"org.apache.avro.specific\",\"fields\":[{\"name\":\"b\",\"type\":\"boolean\"},{\"name\":\"i32\",\"type\":\"int\"},{\"name\":\"i64\",\"type\":\"long\"},{\"name\":\"f32\",\"type\":\"float\"},{\"name\":\"f64\",\"type\":\"double\"},{\"name\":\"s\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"d\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}},{\"name\":\"t\",\"type\":{\"type\":\"int\",\"logicalType\":\"time-millis\"}},{\"name\":\"ts\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}},{\"name\":\"dec\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":9,\"scale\":2}}]}");
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+
+ private static SpecificData MODEL$ = new SpecificData();
+
+ private static final BinaryMessageEncoder<TestRecordWithoutLogicalTypes> ENCODER =
+ new BinaryMessageEncoder<TestRecordWithoutLogicalTypes>(MODEL$, SCHEMA$);
+
+ private static final BinaryMessageDecoder<TestRecordWithoutLogicalTypes> DECODER =
+ new BinaryMessageDecoder<TestRecordWithoutLogicalTypes>(MODEL$, SCHEMA$);
+
+ /** Serializes this ${schema.getName()} to a ByteBuffer. */
+ public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException {
+ return ENCODER.encode(this);
+ }
+
+ /** Deserializes a ${schema.getName()} from a ByteBuffer. */
+ public static TestRecordWithoutLogicalTypes fromByteBuffer(
+ java.nio.ByteBuffer b) throws java.io.IOException {
+ return DECODER.decode(b);
+ }
+
private boolean b;
private int i32;
private long i64;
http://git-wip-us.apache.org/repos/asf/avro/blob/f6c044e5/lang/java/avro/src/test/java/org/apache/avro/specific/TestSpecificToFromByteArray.java
----------------------------------------------------------------------
diff --git a/lang/java/avro/src/test/java/org/apache/avro/specific/TestSpecificToFromByteArray.java b/lang/java/avro/src/test/java/org/apache/avro/specific/TestSpecificToFromByteArray.java
new file mode 100644
index 0000000..b17cad2
--- /dev/null
+++ b/lang/java/avro/src/test/java/org/apache/avro/specific/TestSpecificToFromByteArray.java
@@ -0,0 +1,102 @@
+package org.apache.avro.specific;
+
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.data.TimeConversions;
+import org.apache.avro.message.MissingSchemaException;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
+import org.junit.Test;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestSpecificToFromByteArray {
+ @Test
+ public void testSpecificToFromByteBufferWithLogicalTypes() throws IOException {
+ TestRecordWithLogicalTypes record = new TestRecordWithLogicalTypes(
+ true,
+ 34,
+ 35L,
+ 3.14F,
+ 3019.34,
+ null,
+ LocalDate.now(),
+ LocalTime.now(),
+ DateTime.now().withZone(DateTimeZone.UTC),
+ new BigDecimal("123.45")
+ );
+
+ ByteBuffer b = record.toByteBuffer();
+ TestRecordWithLogicalTypes copy = TestRecordWithLogicalTypes.fromByteBuffer(b);
+
+ assertEquals(record, copy);
+ }
+
+ @Test
+ public void testSpecificToFromByteBufferWithoutLogicalTypes() throws IOException {
+ TestRecordWithoutLogicalTypes record = new TestRecordWithoutLogicalTypes(
+ true,
+ 34,
+ 35L,
+ 3.14F,
+ 3019.34,
+ null,
+ new TimeConversions.DateConversion().toInt(LocalDate.now(), null, null),
+ new TimeConversions.TimeConversion().toInt(LocalTime.now(), null, null),
+ new TimeConversions.TimestampConversion().toLong(
+ DateTime.now().withZone(DateTimeZone.UTC), null, null),
+ new Conversions.DecimalConversion().toBytes(
+ new BigDecimal("123.45"), null, LogicalTypes.decimal(9, 2))
+ );
+
+ ByteBuffer b = record.toByteBuffer();
+ TestRecordWithoutLogicalTypes copy = TestRecordWithoutLogicalTypes.fromByteBuffer(b);
+
+ assertEquals(record, copy);
+ }
+
+ @Test(expected = MissingSchemaException.class)
+ public void testSpecificByteArrayIncompatibleWithLogicalTypes() throws IOException {
+ TestRecordWithoutLogicalTypes withoutLogicalTypes = new TestRecordWithoutLogicalTypes(
+ true,
+ 34,
+ 35L,
+ 3.14F,
+ 3019.34,
+ null,
+ new TimeConversions.DateConversion().toInt(LocalDate.now(), null, null),
+ new TimeConversions.TimeConversion().toInt(LocalTime.now(), null, null),
+ new TimeConversions.TimestampConversion().toLong(
+ DateTime.now().withZone(DateTimeZone.UTC), null, null),
+ new Conversions.DecimalConversion().toBytes(
+ new BigDecimal("123.45"), null, LogicalTypes.decimal(9, 2))
+ );
+
+ ByteBuffer b = withoutLogicalTypes.toByteBuffer();
+ TestRecordWithLogicalTypes.fromByteBuffer(b);
+ }
+
+ @Test(expected = MissingSchemaException.class)
+ public void testSpecificByteArrayIncompatibleWithoutLogicalTypes() throws IOException {
+ TestRecordWithLogicalTypes withLogicalTypes = new TestRecordWithLogicalTypes(
+ true,
+ 34,
+ 35L,
+ 3.14F,
+ 3019.34,
+ null,
+ LocalDate.now(),
+ LocalTime.now(),
+ DateTime.now().withZone(DateTimeZone.UTC),
+ new BigDecimal("123.45")
+ );
+
+ ByteBuffer b = withLogicalTypes.toByteBuffer();
+ TestRecordWithoutLogicalTypes.fromByteBuffer(b);
+ }
+}
http://git-wip-us.apache.org/repos/asf/avro/blob/f6c044e5/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/record.vm
----------------------------------------------------------------------
diff --git a/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/record.vm b/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/record.vm
index dc81aba..1f99f38 100644
--- a/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/record.vm
+++ b/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/record.vm
@@ -20,6 +20,8 @@ package $schema.getNamespace();
#end
import org.apache.avro.specific.SpecificData;
+import org.apache.avro.message.BinaryMessageEncoder;
+import org.apache.avro.message.BinaryMessageDecoder;
@SuppressWarnings("all")
#if ($schema.getDoc())
@@ -33,6 +35,28 @@ public class ${this.mangle($schema.getName())}#if ($schema.isError()) extends or
private static final long serialVersionUID = ${this.fingerprint64($schema)}L;
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse(${this.javaSplit($schema.toString())});
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+
+ private static SpecificData MODEL$ = new SpecificData();
+
+#if (!$schema.isError())
+ private static final BinaryMessageEncoder<${this.mangle($schema.getName())}> ENCODER =
+ new BinaryMessageEncoder<${this.mangle($schema.getName())}>(MODEL$, SCHEMA$);
+
+ private static final BinaryMessageDecoder<${this.mangle($schema.getName())}> DECODER =
+ new BinaryMessageDecoder<${this.mangle($schema.getName())}>(MODEL$, SCHEMA$);
+
+ /** Serializes this ${schema.getName()} to a ByteBuffer. */
+ public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException {
+ return ENCODER.encode(this);
+ }
+
+ /** Deserializes a ${schema.getName()} from a ByteBuffer. */
+ public static ${this.mangle($schema.getName())} fromByteBuffer(
+ java.nio.ByteBuffer b) throws java.io.IOException {
+ return DECODER.decode(b);
+ }
+#end
+
#foreach ($field in $schema.getFields())
#if ($field.doc())
/** $field.doc() */
@@ -405,7 +429,7 @@ public class ${this.mangle($schema.getName())}#if ($schema.isError()) extends or
}
private static final org.apache.avro.io.DatumWriter
- WRITER$ = new org.apache.avro.specific.SpecificDatumWriter(SCHEMA$);
+ WRITER$ = MODEL$.createDatumWriter(SCHEMA$);
@Override public void writeExternal(java.io.ObjectOutput out)
throws java.io.IOException {
@@ -413,7 +437,7 @@ public class ${this.mangle($schema.getName())}#if ($schema.isError()) extends or
}
private static final org.apache.avro.io.DatumReader
- READER$ = new org.apache.avro.specific.SpecificDatumReader(SCHEMA$);
+ READER$ = MODEL$.createDatumReader(SCHEMA$);
@Override public void readExternal(java.io.ObjectInput in)
throws java.io.IOException {
http://git-wip-us.apache.org/repos/asf/avro/blob/f6c044e5/lang/java/compiler/src/test/java/org/apache/avro/compiler/specific/TestSpecificCompiler.java
----------------------------------------------------------------------
diff --git a/lang/java/compiler/src/test/java/org/apache/avro/compiler/specific/TestSpecificCompiler.java b/lang/java/compiler/src/test/java/org/apache/avro/compiler/specific/TestSpecificCompiler.java
index 25ad97d..49174a8 100644
--- a/lang/java/compiler/src/test/java/org/apache/avro/compiler/specific/TestSpecificCompiler.java
+++ b/lang/java/compiler/src/test/java/org/apache/avro/compiler/specific/TestSpecificCompiler.java
@@ -516,4 +516,8 @@ public class TestSpecificCompiler {
Assert.assertEquals("Should use null for decimal if the flag is off",
"null", compiler.conversionInstance(uuidSchema));
}
+
+ public void testToFromByteBuffer() {
+
+ }
}
http://git-wip-us.apache.org/repos/asf/avro/blob/f6c044e5/lang/java/tools/src/test/compiler/output-string/avro/examples/baseball/Player.java
----------------------------------------------------------------------
diff --git a/lang/java/tools/src/test/compiler/output-string/avro/examples/baseball/Player.java b/lang/java/tools/src/test/compiler/output-string/avro/examples/baseball/Player.java
index 07a3483..89b4af8 100644
--- a/lang/java/tools/src/test/compiler/output-string/avro/examples/baseball/Player.java
+++ b/lang/java/tools/src/test/compiler/output-string/avro/examples/baseball/Player.java
@@ -6,6 +6,8 @@
package avro.examples.baseball;
import org.apache.avro.specific.SpecificData;
+import org.apache.avro.message.BinaryMessageEncoder;
+import org.apache.avro.message.BinaryMessageDecoder;
@SuppressWarnings("all")
/** \u9078\u624b is Japanese for player. */
@@ -14,6 +16,26 @@ public class Player extends org.apache.avro.specific.SpecificRecordBase implemen
private static final long serialVersionUID = 3865593031278745715L;
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Player\",\"namespace\":\"avro.examples.baseball\",\"doc\":\"\u9078\u624b is Japanese for player.\",\"fields\":[{\"name\":\"number\",\"type\":\"int\",\"doc\":\"The number of the player\"},{\"name\":\"first_name\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"last_name\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"position\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"enum\",\"name\":\"Position\",\"symbols\":[\"P\",\"C\",\"B1\",\"B2\",\"B3\",\"SS\",\"LF\",\"CF\",\"RF\",\"DH\"]}}}]}");
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+
+ private static SpecificData MODEL$ = new SpecificData();
+
+ private static final BinaryMessageEncoder<Player> ENCODER =
+ new BinaryMessageEncoder<Player>(MODEL$, SCHEMA$);
+
+ private static final BinaryMessageDecoder<Player> DECODER =
+ new BinaryMessageDecoder<Player>(MODEL$, SCHEMA$);
+
+ /** Serializes this Player to a ByteBuffer. */
+ public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException {
+ return ENCODER.encode(this);
+ }
+
+ /** Deserializes a Player from a ByteBuffer. */
+ public static Player fromByteBuffer(
+ java.nio.ByteBuffer b) throws java.io.IOException {
+ return DECODER.decode(b);
+ }
+
/** The number of the player */
@Deprecated public int number;
@Deprecated public java.lang.String first_name;
@@ -52,6 +74,7 @@ public class Player extends org.apache.avro.specific.SpecificRecordBase implemen
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}
+
// Used by DatumReader. Applications should not call.
@SuppressWarnings(value="unchecked")
public void put(int field$, java.lang.Object value$) {
@@ -395,7 +418,7 @@ public class Player extends org.apache.avro.specific.SpecificRecordBase implemen
}
private static final org.apache.avro.io.DatumWriter
- WRITER$ = new org.apache.avro.specific.SpecificDatumWriter(SCHEMA$);
+ WRITER$ = MODEL$.createDatumWriter(SCHEMA$);
@Override public void writeExternal(java.io.ObjectOutput out)
throws java.io.IOException {
@@ -403,7 +426,7 @@ public class Player extends org.apache.avro.specific.SpecificRecordBase implemen
}
private static final org.apache.avro.io.DatumReader
- READER$ = new org.apache.avro.specific.SpecificDatumReader(SCHEMA$);
+ READER$ = MODEL$.createDatumReader(SCHEMA$);
@Override public void readExternal(java.io.ObjectInput in)
throws java.io.IOException {
http://git-wip-us.apache.org/repos/asf/avro/blob/f6c044e5/lang/java/tools/src/test/compiler/output/Player.java
----------------------------------------------------------------------
diff --git a/lang/java/tools/src/test/compiler/output/Player.java b/lang/java/tools/src/test/compiler/output/Player.java
index 252eaab..bfd3d07 100644
--- a/lang/java/tools/src/test/compiler/output/Player.java
+++ b/lang/java/tools/src/test/compiler/output/Player.java
@@ -6,6 +6,8 @@
package avro.examples.baseball;
import org.apache.avro.specific.SpecificData;
+import org.apache.avro.message.BinaryMessageEncoder;
+import org.apache.avro.message.BinaryMessageDecoder;
@SuppressWarnings("all")
/** \u9078\u624b is Japanese for player. */
@@ -14,6 +16,26 @@ public class Player extends org.apache.avro.specific.SpecificRecordBase implemen
private static final long serialVersionUID = 3865593031278745715L;
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Player\",\"namespace\":\"avro.examples.baseball\",\"doc\":\"\u9078\u624b is Japanese for player.\",\"fields\":[{\"name\":\"number\",\"type\":\"int\",\"doc\":\"The number of the player\"},{\"name\":\"first_name\",\"type\":\"string\"},{\"name\":\"last_name\",\"type\":\"string\"},{\"name\":\"position\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"enum\",\"name\":\"Position\",\"symbols\":[\"P\",\"C\",\"B1\",\"B2\",\"B3\",\"SS\",\"LF\",\"CF\",\"RF\",\"DH\"]}}}]}");
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+
+ private static SpecificData MODEL$ = new SpecificData();
+
+ private static final BinaryMessageEncoder<Player> ENCODER =
+ new BinaryMessageEncoder<Player>(MODEL$, SCHEMA$);
+
+ private static final BinaryMessageDecoder<Player> DECODER =
+ new BinaryMessageDecoder<Player>(MODEL$, SCHEMA$);
+
+ /** Serializes this Player to a ByteBuffer. */
+ public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException {
+ return ENCODER.encode(this);
+ }
+
+ /** Deserializes a Player from a ByteBuffer. */
+ public static Player fromByteBuffer(
+ java.nio.ByteBuffer b) throws java.io.IOException {
+ return DECODER.decode(b);
+ }
+
/** The number of the player */
@Deprecated public int number;
@Deprecated public java.lang.CharSequence first_name;
@@ -52,6 +74,7 @@ public class Player extends org.apache.avro.specific.SpecificRecordBase implemen
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}
+
// Used by DatumReader. Applications should not call.
@SuppressWarnings(value="unchecked")
public void put(int field$, java.lang.Object value$) {
@@ -395,7 +418,7 @@ public class Player extends org.apache.avro.specific.SpecificRecordBase implemen
}
private static final org.apache.avro.io.DatumWriter
- WRITER$ = new org.apache.avro.specific.SpecificDatumWriter(SCHEMA$);
+ WRITER$ = MODEL$.createDatumWriter(SCHEMA$);
@Override public void writeExternal(java.io.ObjectOutput out)
throws java.io.IOException {
@@ -403,7 +426,7 @@ public class Player extends org.apache.avro.specific.SpecificRecordBase implemen
}
private static final org.apache.avro.io.DatumReader
- READER$ = new org.apache.avro.specific.SpecificDatumReader(SCHEMA$);
+ READER$ = MODEL$.createDatumReader(SCHEMA$);
@Override public void readExternal(java.io.ObjectInput in)
throws java.io.IOException {
[2/2] avro git commit: AVRO-1704: Java: Add support for
single-message encoding.
Posted by bl...@apache.org.
AVRO-1704: Java: Add support for single-message encoding.
Project: http://git-wip-us.apache.org/repos/asf/avro/repo
Commit: http://git-wip-us.apache.org/repos/asf/avro/commit/e3c5da69
Tree: http://git-wip-us.apache.org/repos/asf/avro/tree/e3c5da69
Diff: http://git-wip-us.apache.org/repos/asf/avro/diff/e3c5da69
Branch: refs/heads/master
Commit: e3c5da696d4762b287bfec0d133efec20ddb8b14
Parents: 5259f26
Author: Ryan Blue <bl...@apache.org>
Authored: Sun Jul 24 15:20:37 2016 -0700
Committer: Ryan Blue <bl...@apache.org>
Committed: Sun Jul 24 15:21:00 2016 -0700
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/avro/message/BadHeaderException.java | 34 +++
.../avro/message/BinaryMessageDecoder.java | 190 +++++++++++++++
.../avro/message/BinaryMessageEncoder.java | 122 ++++++++++
.../org/apache/avro/message/MessageDecoder.java | 159 ++++++++++++
.../org/apache/avro/message/MessageEncoder.java | 50 ++++
.../avro/message/MissingSchemaException.java | 35 +++
.../apache/avro/message/RawMessageDecoder.java | 101 ++++++++
.../apache/avro/message/RawMessageEncoder.java | 129 ++++++++++
.../org/apache/avro/message/SchemaStore.java | 63 +++++
.../avro/util/ReusableByteArrayInputStream.java | 35 +++
.../util/ReusableByteBufferInputStream.java | 89 +++++++
.../avro/message/TestBinaryMessageEncoding.java | 241 +++++++++++++++++++
.../main/java/org/apache/avro/GuavaClasses.java | 2 +
14 files changed, 1252 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/avro/blob/e3c5da69/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0dfbdfc..a94ed17 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,8 @@ Trunk (not yet released)
NEW FEATURES
+ AVRO-1704: Java: Add support for single-message encoding. (blue)
+
OPTIMIZATIONS
IMPROVEMENTS
http://git-wip-us.apache.org/repos/asf/avro/blob/e3c5da69/lang/java/avro/src/main/java/org/apache/avro/message/BadHeaderException.java
----------------------------------------------------------------------
diff --git a/lang/java/avro/src/main/java/org/apache/avro/message/BadHeaderException.java b/lang/java/avro/src/main/java/org/apache/avro/message/BadHeaderException.java
new file mode 100644
index 0000000..38c0001
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/message/BadHeaderException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.message;
+
+import org.apache.avro.AvroRuntimeException;
+
+/**
+ * Exception thrown by a {@link MessageDecoder} when a message header is not
+ * recognized.
+ * <p>
+ * This usually indicates that the encoded bytes were not an Avro message.
+ */
+public class BadHeaderException extends AvroRuntimeException {
+ public BadHeaderException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/avro/blob/e3c5da69/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageDecoder.java
----------------------------------------------------------------------
diff --git a/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageDecoder.java b/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageDecoder.java
new file mode 100644
index 0000000..11a7336
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageDecoder.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.message;
+
+import com.google.common.collect.MapMaker;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaNormalization;
+import org.apache.avro.generic.GenericData;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Map;
+
+/**
+ * A {@link MessageDecoder} that reads a binary-encoded datum. This checks for
+ * the datum header and decodes the payload with the schema that corresponds to
+ * the 8-byte schema fingerprint.
+ * <p>
+ * Instances can decode message payloads for known {@link Schema schemas}, which
+ * are schemas added using {@link #addSchema(Schema)}, schemas resolved by the
+ * {@link SchemaStore} passed to the constructor, or the expected schema passed
+ * to the constructor. Messages encoded using an unknown schema will cause
+ * instances to throw a {@link MissingSchemaException}.
+ * <p>
+ * It is safe to continue using instances of this class after {@link #decode}
+ * throws {@link BadHeaderException} or {@link MissingSchemaException}.
+ * <p>
+ * This class is thread-safe.
+ */
+public class BinaryMessageDecoder<D> extends MessageDecoder.BaseDecoder<D> {
+
+ private static final ThreadLocal<byte[]> HEADER_BUFFER =
+ new ThreadLocal<byte[]>() {
+ @Override
+ protected byte[] initialValue() {
+ return new byte[10];
+ }
+ };
+
+ private static final ThreadLocal<ByteBuffer> FP_BUFFER =
+ new ThreadLocal<ByteBuffer>() {
+ @Override
+ protected ByteBuffer initialValue() {
+ byte[] header = HEADER_BUFFER.get();
+ return ByteBuffer.wrap(header).order(ByteOrder.LITTLE_ENDIAN);
+ }
+ };
+
+ private final GenericData model;
+ private final Schema readSchema;
+ private final SchemaStore resolver;
+
+ private final Map<Long, RawMessageDecoder<D>> codecByFingerprint =
+ new MapMaker().makeMap();
+
+ /**
+ * Creates a new {@link BinaryMessageEncoder} that uses the given
+ * {@link GenericData data model} to construct datum instances described by
+ * the {@link Schema schema}.
+ * <p>
+ * The {@code readSchema} is as used the expected schema (read schema). Datum
+ * instances created by this class will are described by the expected schema.
+ * <p>
+ * The schema used to decode incoming buffers is determined by the schema
+ * fingerprint encoded in the message header. This class can decode messages
+ * that were encoded using the {@code readSchema} and other schemas that are
+ * added using {@link #addSchema(Schema)}.
+ *
+ * @param model the {@link GenericData data model} for datum instances
+ * @param readSchema the {@link Schema} used to construct datum instances
+ */
+ public BinaryMessageDecoder(GenericData model, Schema readSchema) {
+ this(model, readSchema, null);
+ }
+
+ /**
+ * Creates a new {@link BinaryMessageEncoder} that uses the given
+ * {@link GenericData data model} to construct datum instances described by
+ * the {@link Schema schema}.
+ * <p>
+ * The {@code readSchema} is used as the expected schema (read schema). Datum
+ * instances created by this class will are described by the expected schema.
+ * <p>
+ * The schema used to decode incoming buffers is determined by the schema
+ * fingerprint encoded in the message header. This class can decode messages
+ * that were encoded using the {@code readSchema}, other schemas that are
+ * added using {@link #addSchema(Schema)}, or schemas returned by the
+ * {@code resolver}.
+ *
+ * @param model the {@link GenericData data model} for datum instances
+ * @param readSchema the {@link Schema} used to construct datum instances
+ * @param resolver a {@link SchemaStore} used to find schemas by fingerprint
+ */
+ public BinaryMessageDecoder(GenericData model, Schema readSchema,
+ SchemaStore resolver) {
+ this.model = model;
+ this.readSchema = readSchema;
+ this.resolver = resolver;
+ addSchema(readSchema);
+ }
+
+ /**
+ * Adds a {@link Schema} that can be used to decode buffers.
+ *
+ * @param writeSchema a {@link Schema} to use when decoding buffers
+ */
+ public void addSchema(Schema writeSchema) {
+ long fp = SchemaNormalization.parsingFingerprint64(writeSchema);
+ codecByFingerprint.put(fp,
+ new RawMessageDecoder<D>(model, writeSchema, readSchema));
+ }
+
+ private RawMessageDecoder<D> getDecoder(long fp) {
+ RawMessageDecoder<D> decoder = codecByFingerprint.get(fp);
+ if (decoder != null) {
+ return decoder;
+ }
+
+ if (resolver != null) {
+ Schema writeSchema = resolver.findByFingerprint(fp);
+ if (writeSchema != null) {
+ addSchema(writeSchema);
+ return codecByFingerprint.get(fp);
+ }
+ }
+
+ throw new MissingSchemaException(
+ "Cannot resolve schema for fingerprint: " + fp);
+ }
+
+ @Override
+ public D decode(InputStream stream, D reuse) throws IOException {
+ byte[] header = HEADER_BUFFER.get();
+ try {
+ if (!readFully(stream, header)) {
+ throw new BadHeaderException("Not enough header bytes");
+ }
+ } catch (IOException e) {
+ throw new IOException("Failed to read header and fingerprint bytes", e);
+ }
+
+ if (! (BinaryMessageEncoder.V1_HEADER[0] == header[0])
+ && BinaryMessageEncoder.V1_HEADER[1] == header[1]) {
+ throw new BadHeaderException(String.format(
+ "Unrecognized header bytes: 0x%h%h",
+ header[0], header[1]));
+ }
+
+ RawMessageDecoder<D> decoder = getDecoder(FP_BUFFER.get().getLong(2));
+
+ return decoder.decode(stream, reuse);
+ }
+
+ /**
+ * Reads a buffer from a stream, making multiple read calls if necessary.
+ *
+ * @param stream an InputStream to read from
+ * @param bytes a buffer
+ * @return true if the buffer is complete, false otherwise (stream ended)
+ * @throws IOException
+ */
+ private boolean readFully(InputStream stream, byte[] bytes)
+ throws IOException {
+ int pos = 0;
+ int bytesRead;
+ while ((bytes.length - pos) > 0 &&
+ (bytesRead = stream.read(bytes, pos, bytes.length - pos)) > 0) {
+ pos += bytesRead;
+ }
+ return (pos == bytes.length);
+ }
+}
http://git-wip-us.apache.org/repos/asf/avro/blob/e3c5da69/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageEncoder.java
----------------------------------------------------------------------
diff --git a/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageEncoder.java b/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageEncoder.java
new file mode 100644
index 0000000..3cf3d0c
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageEncoder.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.message;
+
+import com.google.common.primitives.Bytes;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaNormalization;
+import org.apache.avro.generic.GenericData;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.security.NoSuchAlgorithmException;
+
+/**
+ * A {@link MessageEncoder} that adds a header and 8-byte schema fingerprint to
+ * each datum encoded as binary.
+ * <p>
+ * This class is thread-safe.
+ */
+public class BinaryMessageEncoder<D> implements MessageEncoder<D> {
+
+ static final byte[] V1_HEADER = new byte[] {(byte) 0xC3, (byte) 0x01};
+
+ private final RawMessageEncoder<D> writeCodec;
+
+ /**
+ * Creates a new {@link BinaryMessageEncoder} that uses the given
+ * {@link GenericData data model} to deconstruct datum instances described by
+ * the {@link Schema schema}.
+ * <p>
+ * Buffers returned by {@link #encode(D)} are copied and will not be modified
+ * by future calls to {@code encode}.
+ *
+ * @param model the {@link GenericData data model} for datum instances
+ * @param schema the {@link Schema} for datum instances
+ */
+ public BinaryMessageEncoder(GenericData model, Schema schema) {
+ this(model, schema, true);
+ }
+
+ /**
+ * Creates a new {@link BinaryMessageEncoder} that uses the given
+ * {@link GenericData data model} to deconstruct datum instances described by
+ * the {@link Schema schema}.
+ * <p>
+ * If {@code shouldCopy} is true, then buffers returned by {@link #encode(D)}
+ * are copied and will not be modified by future calls to {@code encode}.
+ * <p>
+ * If {@code shouldCopy} is false, then buffers returned by {@code encode}
+ * wrap a thread-local buffer that can be reused by future calls to
+ * {@code encode}, but may not be. Callers should only set {@code shouldCopy}
+ * to false if the buffer will be copied before the current thread's next call
+ * to {@code encode}.
+ *
+ * @param model the {@link GenericData data model} for datum instances
+ * @param schema the {@link Schema} for datum instances
+ * @param shouldCopy whether to copy buffers before returning encoded results
+ */
+ public BinaryMessageEncoder(GenericData model, Schema schema,
+ boolean shouldCopy) {
+ this.writeCodec = new V1MessageEncoder<D>(model, schema, shouldCopy);
+ }
+
+ @Override
+ public ByteBuffer encode(D datum) throws IOException {
+ return writeCodec.encode(datum);
+ }
+
+ @Override
+ public void encode(D datum, OutputStream stream) throws IOException {
+ writeCodec.encode(datum, stream);
+ }
+
+ /**
+ * This is a RawDatumEncoder that adds the V1 header to the outgoing buffer.
+ * BinaryDatumEncoder wraps this class to avoid confusion over what it does.
+ * It should not have an "is a" relationship with RawDatumEncoder because it
+ * adds the extra bytes.
+ */
+ private static class V1MessageEncoder<D> extends RawMessageEncoder<D> {
+ private final byte[] headerBytes;
+
+ V1MessageEncoder(GenericData model, Schema schema, boolean shouldCopy) {
+ super(model, schema, shouldCopy);
+ this.headerBytes = getWriteHeader(schema);
+ }
+
+ @Override
+ public void encode(D datum, OutputStream stream) throws IOException {
+ stream.write(headerBytes);
+ super.encode(datum, stream);
+ }
+
+ private static byte[] getWriteHeader(Schema schema) {
+ try {
+ byte[] fp = SchemaNormalization
+ .parsingFingerprint("CRC-64-AVRO", schema);
+ return Bytes.concat(V1_HEADER, fp);
+ } catch (NoSuchAlgorithmException e) {
+ throw new AvroRuntimeException(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/avro/blob/e3c5da69/lang/java/avro/src/main/java/org/apache/avro/message/MessageDecoder.java
----------------------------------------------------------------------
diff --git a/lang/java/avro/src/main/java/org/apache/avro/message/MessageDecoder.java b/lang/java/avro/src/main/java/org/apache/avro/message/MessageDecoder.java
new file mode 100644
index 0000000..bc86d12
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/message/MessageDecoder.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.message;
+
+import org.apache.avro.util.ReusableByteArrayInputStream;
+import org.apache.avro.util.ReusableByteBufferInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Deserializes a single datum from a ByteBuffer, byte array, or InputStream.
+ * @param <D> a datum class
+ */
+public interface MessageDecoder<D> {
+
+ /**
+ * Deserialize a single datum from an InputStream.
+ *
+ * @param stream stream to read from
+ * @return a datum read from the stream
+ * @throws BadHeaderException If the payload's header is not recognized.
+ * @throws MissingSchemaException If the payload's schema cannot be found.
+ * @throws IOException
+ */
+ D decode(InputStream stream) throws IOException;
+
+ /**
+ * Deserialize a single datum from an InputStream.
+ *
+ * @param stream stream to read from
+ * @param reuse a datum instance to reuse, avoiding instantiation if possible
+ * @return a datum read from the stream
+ * @throws BadHeaderException If the payload's header is not recognized.
+ * @throws MissingSchemaException If the payload's schema cannot be found.
+ * @throws IOException
+ */
+ D decode(InputStream stream, D reuse) throws IOException;
+
+ /**
+ * Deserialize a single datum from a ByteBuffer.
+ *
+ * @param encoded a ByteBuffer containing an encoded datum
+ * @return a datum read from the stream
+ * @throws BadHeaderException If the payload's header is not recognized.
+ * @throws MissingSchemaException If the payload's schema cannot be found.
+ * @throws IOException
+ */
+ D decode(ByteBuffer encoded) throws IOException;
+
+ /**
+ * Deserialize a single datum from a ByteBuffer.
+ *
+ * @param encoded a ByteBuffer containing an encoded datum
+ * @param reuse a datum instance to reuse, avoiding instantiation if possible
+ * @return a datum read from the stream
+ * @throws BadHeaderException If the payload's header is not recognized.
+ * @throws MissingSchemaException If the payload's schema cannot be found.
+ * @throws IOException
+ */
+ D decode(ByteBuffer encoded, D reuse) throws IOException;
+
+ /**
+ * Deserialize a single datum from a byte array.
+ *
+ * @param encoded a byte array containing an encoded datum
+ * @return a datum read from the stream
+ * @throws BadHeaderException If the payload's header is not recognized.
+ * @throws MissingSchemaException If the payload's schema cannot be found.
+ * @throws IOException
+ */
+ D decode(byte[] encoded) throws IOException;
+
+ /**
+ * Deserialize a single datum from a byte array.
+ *
+ * @param encoded a byte array containing an encoded datum
+ * @param reuse a datum instance to reuse, avoiding instantiation if possible
+ * @return a datum read from the stream
+ * @throws BadHeaderException If the payload's header is not recognized.
+ * @throws MissingSchemaException If the payload's schema cannot be found.
+ * @throws IOException
+ */
+ D decode(byte[] encoded, D reuse) throws IOException;
+
+ /**
+ * Base class for {@link MessageEncoder} implementations that provides default
+ * implementations for most of the {@code DatumEncoder} API.
+ * <p>
+ * Implementations provided by this base class are thread-safe.
+ *
+ * @param <D> a datum class
+ */
+ abstract class BaseDecoder<D> implements MessageDecoder<D> {
+
+ private static final ThreadLocal<ReusableByteArrayInputStream>
+ BYTE_ARRAY_IN = new ThreadLocal<ReusableByteArrayInputStream>() {
+ @Override
+ protected ReusableByteArrayInputStream initialValue() {
+ return new ReusableByteArrayInputStream();
+ }
+ };
+
+ private static final ThreadLocal<ReusableByteBufferInputStream>
+ BYTE_BUFFER_IN = new ThreadLocal<ReusableByteBufferInputStream>() {
+ @Override
+ protected ReusableByteBufferInputStream initialValue() {
+ return new ReusableByteBufferInputStream();
+ }
+ };
+
+ @Override
+ public D decode(InputStream stream) throws IOException {
+ return decode(stream, null);
+ }
+
+ @Override
+ public D decode(ByteBuffer encoded) throws IOException {
+ return decode(encoded, null);
+ }
+
+ @Override
+ public D decode(byte[] encoded) throws IOException {
+ return decode(encoded, null);
+ }
+
+ @Override
+ public D decode(ByteBuffer encoded, D reuse) throws IOException {
+ ReusableByteBufferInputStream in = BYTE_BUFFER_IN.get();
+ in.setByteBuffer(encoded);
+ return decode(in, reuse);
+ }
+
+ @Override
+ public D decode(byte[] encoded, D reuse) throws IOException {
+ ReusableByteArrayInputStream in = BYTE_ARRAY_IN.get();
+ in.setByteArray(encoded, 0, encoded.length);
+ return decode(in, reuse);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/avro/blob/e3c5da69/lang/java/avro/src/main/java/org/apache/avro/message/MessageEncoder.java
----------------------------------------------------------------------
diff --git a/lang/java/avro/src/main/java/org/apache/avro/message/MessageEncoder.java b/lang/java/avro/src/main/java/org/apache/avro/message/MessageEncoder.java
new file mode 100644
index 0000000..60bfb79
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/message/MessageEncoder.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.message;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Serializes an individual datum as a ByteBuffer or to an OutputStream.
+ * @param <D> a datum class
+ */
+public interface MessageEncoder<D> {
+
+ /**
+ * Serialize a single datum to a ByteBuffer.
+ *
+ * @param datum a datum
+ * @return a ByteBuffer containing the serialized datum
+ * @throws IOException
+ */
+ ByteBuffer encode(D datum) throws IOException;
+
+ /**
+ * Serialize a single datum to an OutputStream.
+ *
+ * @param datum a datum
+ * @param stream an OutputStream to serialize the datum to
+ * @throws IOException
+ */
+ void encode(D datum, OutputStream stream) throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/avro/blob/e3c5da69/lang/java/avro/src/main/java/org/apache/avro/message/MissingSchemaException.java
----------------------------------------------------------------------
diff --git a/lang/java/avro/src/main/java/org/apache/avro/message/MissingSchemaException.java b/lang/java/avro/src/main/java/org/apache/avro/message/MissingSchemaException.java
new file mode 100644
index 0000000..a3b89fd
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/message/MissingSchemaException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.message;
+
+import org.apache.avro.AvroRuntimeException;
+
+/**
+ * Exception thrown by a {@link MessageDecoder} when the message is encoded
+ * using an unknown {@link org.apache.avro.Schema}.
+ * <p>
+ * Using a {@link SchemaStore} to provide schemas to the decoder can avoid this
+ * problem.
+ */
+public class MissingSchemaException extends AvroRuntimeException {
+ public MissingSchemaException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/avro/blob/e3c5da69/lang/java/avro/src/main/java/org/apache/avro/message/RawMessageDecoder.java
----------------------------------------------------------------------
diff --git a/lang/java/avro/src/main/java/org/apache/avro/message/RawMessageDecoder.java b/lang/java/avro/src/main/java/org/apache/avro/message/RawMessageDecoder.java
new file mode 100644
index 0000000..52a7c2e
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/message/RawMessageDecoder.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.message;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * A {@link MessageDecoder} that deserializes from raw datum bytes.
+ * <p>
+ * This class uses the schema passed to its constructor when decoding buffers.
+ * To decode buffers that have different schemas, use
+ * {@link BinaryMessageEncoder} and {@link BinaryMessageDecoder}.
+ * <p>
+ * This will not throw {@link BadHeaderException} because it expects no header,
+ * and will not throw {@link MissingSchemaException} because it always uses the
+ * read schema from its constructor.
+ * <p>
+ * This class is thread-safe.
+ */
+public class RawMessageDecoder<D> extends MessageDecoder.BaseDecoder<D> {
+
+ private static final ThreadLocal<BinaryDecoder> DECODER =
+ new ThreadLocal<BinaryDecoder>();
+
+ private final Schema writeSchema;
+ private final Schema readSchema;
+ private final DatumReader<D> reader;
+
+ /**
+ * Creates a new {@link RawMessageDecoder} that uses the given
+ * {@link GenericData data model} to construct datum instances described by
+ * the {@link Schema schema}.
+ * <p>
+ * The {@code schema} is used as both the expected schema (read schema) and
+ * for the schema of payloads that are decoded (written schema).
+ *
+ * @param model the {@link GenericData data model} for datum instances
+ * @param schema the {@link Schema} used to construct datum instances and to
+ * decode buffers.
+ */
+ public RawMessageDecoder(GenericData model, Schema schema) {
+ this(model, schema, schema);
+ }
+
+ /**
+ * Creates a new {@link RawMessageDecoder} that uses the given
+ * {@link GenericData data model} to construct datum instances described by
+ * the {@link Schema readSchema}.
+ * <p>
+ * The {@code readSchema} is used for the expected schema and the
+ * {@code writeSchema} is the schema used to decode buffers. The
+ * {@code writeSchema} must be the schema that was used to encode all buffers
+ * decoded by this class.
+ *
+ * @param model the {@link GenericData data model} for datum instances
+ * @param readSchema the {@link Schema} used to construct datum instances
+ * @param writeSchema the {@link Schema} used to decode buffers
+ */
+ public RawMessageDecoder(GenericData model, Schema writeSchema,
+ Schema readSchema) {
+ this.writeSchema = writeSchema;
+ this.readSchema = readSchema;
+ this.reader = model.createDatumReader(this.writeSchema, this.readSchema);
+ }
+
+ @Override
+ public D decode(InputStream stream, D reuse) {
+ BinaryDecoder decoder = DecoderFactory.get()
+ .directBinaryDecoder(stream, DECODER.get());
+ DECODER.set(decoder);
+ try {
+ return reader.read(reuse, decoder);
+ } catch (IOException e) {
+ throw new AvroRuntimeException("Decoding datum failed", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/avro/blob/e3c5da69/lang/java/avro/src/main/java/org/apache/avro/message/RawMessageEncoder.java
----------------------------------------------------------------------
diff --git a/lang/java/avro/src/main/java/org/apache/avro/message/RawMessageEncoder.java b/lang/java/avro/src/main/java/org/apache/avro/message/RawMessageEncoder.java
new file mode 100644
index 0000000..07ed861
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/message/RawMessageEncoder.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.message;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * A {@link MessageEncoder} that encodes only a datum's bytes, without additional
+ * information (such as a schema fingerprint).
+ * <p>
+ * This class is thread-safe.
+ */
+public class RawMessageEncoder<D> implements MessageEncoder<D> {
+
+ private static final ThreadLocal<BufferOutputStream> TEMP =
+ new ThreadLocal<BufferOutputStream>() {
+ @Override
+ protected BufferOutputStream initialValue() {
+ return new BufferOutputStream();
+ }
+ };
+
+ private static final ThreadLocal<BinaryEncoder> ENCODER =
+ new ThreadLocal<BinaryEncoder>();
+
+ private final Schema writeSchema;
+ private final boolean copyOutputBytes;
+ private final DatumWriter<D> writer;
+
+ /**
+ * Creates a new {@link RawMessageEncoder} that uses the given
+ * {@link GenericData data model} to deconstruct datum instances described by
+ * the {@link Schema schema}.
+ * <p>
+ * Buffers returned by {@link #encode(D)} are copied and will not be modified
+ * by future calls to {@code encode}.
+ *
+ * @param model the {@link GenericData data model} for datum instances
+ * @param schema the {@link Schema} for datum instances
+ */
+ public RawMessageEncoder(GenericData model, Schema schema) {
+ this(model, schema, true);
+ }
+
+ /**
+ * Creates a new {@link RawMessageEncoder} that uses the given
+ * {@link GenericData data model} to deconstruct datum instances described by
+ * the {@link Schema schema}.
+ * <p>
+ * If {@code shouldCopy} is true, then buffers returned by {@link #encode(D)}
+ * are copied and will not be modified by future calls to {@code encode}.
+ * <p>
+ * If {@code shouldCopy} is false, then buffers returned by {@code encode}
+ * wrap a thread-local buffer that can be reused by future calls to
+ * {@code encode}, but may not be. Callers should only set {@code shouldCopy}
+ * to false if the buffer will be copied before the current thread's next call
+ * to {@code encode}.
+ *
+ * @param model the {@link GenericData data model} for datum instances
+ * @param schema the {@link Schema} for datum instances
+ * @param shouldCopy whether to copy buffers before returning encoded results
+ */
+ public RawMessageEncoder(GenericData model, Schema schema, boolean shouldCopy) {
+ this.writeSchema = schema;
+ this.copyOutputBytes = shouldCopy;
+ this.writer = model.createDatumWriter(this.writeSchema);
+ }
+
+ @Override
+ public ByteBuffer encode(D datum) throws IOException {
+ BufferOutputStream temp = TEMP.get();
+ temp.reset();
+
+ encode(datum, temp);
+
+ if (copyOutputBytes) {
+ return temp.toBufferWithCopy();
+ } else {
+ return temp.toBufferWithoutCopy();
+ }
+ }
+
+ @Override
+ public void encode(D datum, OutputStream stream) throws IOException {
+ BinaryEncoder encoder = EncoderFactory.get()
+ .directBinaryEncoder(stream, ENCODER.get());
+ ENCODER.set(encoder);
+ writer.write(datum, encoder);
+ encoder.flush();
+ }
+
+ private static class BufferOutputStream extends ByteArrayOutputStream {
+ BufferOutputStream() {
+ }
+
+ ByteBuffer toBufferWithoutCopy() {
+ return ByteBuffer.wrap(buf, 0, count);
+ }
+
+ ByteBuffer toBufferWithCopy() {
+ return ByteBuffer.wrap(toByteArray());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/avro/blob/e3c5da69/lang/java/avro/src/main/java/org/apache/avro/message/SchemaStore.java
----------------------------------------------------------------------
diff --git a/lang/java/avro/src/main/java/org/apache/avro/message/SchemaStore.java b/lang/java/avro/src/main/java/org/apache/avro/message/SchemaStore.java
new file mode 100644
index 0000000..6e89b52
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/message/SchemaStore.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.message;
+
+import com.google.common.collect.MapMaker;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaNormalization;
+import java.util.Map;
+
+/**
+ * Interface for classes that can provide avro schemas by fingerprint.
+ */
+public interface SchemaStore {
+
+ /**
+ * Retrieves a fingerprint by its AVRO-CRC-64 fingerprint.
+ * @param fingerprint an AVRO-CRC-64 fingerprint long
+ * @return a Schema with the given fingerprint, or null
+ */
+ Schema findByFingerprint(long fingerprint);
+
+ /**
+ * A map-based cache of schemas by AVRO-CRC-64 fingerprint.
+ * <p>
+ * This class is thread-safe.
+ */
+ class Cache implements SchemaStore {
+ private final Map<Long, Schema> schemas = new MapMaker().makeMap();
+
+ /**
+ * Adds a schema to this cache that can be retrieved using its AVRO-CRC-64
+ * fingerprint.
+ *
+ * @param schema a {@link Schema}
+ */
+ public void addSchema(Schema schema) {
+ long fp = SchemaNormalization.parsingFingerprint64(schema);
+ schemas.put(fp, schema);
+ }
+
+ @Override
+ public Schema findByFingerprint(long fingerprint) {
+ return schemas.get(fingerprint);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/avro/blob/e3c5da69/lang/java/avro/src/main/java/org/apache/avro/util/ReusableByteArrayInputStream.java
----------------------------------------------------------------------
diff --git a/lang/java/avro/src/main/java/org/apache/avro/util/ReusableByteArrayInputStream.java b/lang/java/avro/src/main/java/org/apache/avro/util/ReusableByteArrayInputStream.java
new file mode 100644
index 0000000..6fd2ae4
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/util/ReusableByteArrayInputStream.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.util;
+
+import java.io.ByteArrayInputStream;
+
+public class ReusableByteArrayInputStream extends ByteArrayInputStream {
+ public ReusableByteArrayInputStream() {
+ super(new byte[0]);
+ }
+
+ public void setByteArray(byte[] buf, int offset, int length) {
+ this.buf = buf;
+ this.pos = offset;
+ this.count = Math.min(offset + length, buf.length);
+ this.mark = offset;
+ }
+}
http://git-wip-us.apache.org/repos/asf/avro/blob/e3c5da69/lang/java/avro/src/main/java/org/apache/avro/util/ReusableByteBufferInputStream.java
----------------------------------------------------------------------
diff --git a/lang/java/avro/src/main/java/org/apache/avro/util/ReusableByteBufferInputStream.java b/lang/java/avro/src/main/java/org/apache/avro/util/ReusableByteBufferInputStream.java
new file mode 100644
index 0000000..eff7fdc
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/util/ReusableByteBufferInputStream.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+public class ReusableByteBufferInputStream extends InputStream {
+
+ private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
+
+ private ByteBuffer buffer = EMPTY_BUFFER;
+ private int mark = 0;
+
+ public void setByteBuffer(ByteBuffer buf) {
+ // do not modify the buffer that is passed in
+ this.buffer = buf.duplicate();
+ this.mark = buf.position();
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (buffer.hasRemaining()) {
+ return buffer.get() & 0xff;
+ } else {
+ return -1;
+ }
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (buffer.remaining() <= 0) {
+ return -1;
+ }
+ // allow IndexOutOfBoundsException to be thrown by ByteBuffer#get
+ int bytesToRead = Math.min(len, buffer.remaining());
+ buffer.get(b, off, bytesToRead);
+ return bytesToRead;
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ if (n <= 0) {
+ // n may be negative and results in skipping 0 bytes, according to javadoc
+ return 0;
+ }
+
+ // this catches n > Integer.MAX_VALUE
+ int bytesToSkip = n > buffer.remaining() ? buffer.remaining() : (int) n;
+ buffer.position(buffer.position() + bytesToSkip);
+ return bytesToSkip;
+ }
+
+ @Override
+ public synchronized void mark(int readLimit) {
+ // readLimit is ignored. there is no requirement to implement readLimit, it
+ // is a way for implementations to avoid buffering too much. since all data
+ // for this stream is held in memory, this has no need for such a limit.
+ this.mark = buffer.position();
+ }
+
+ @Override
+ public synchronized void reset() throws IOException {
+ buffer.position(mark);
+ }
+
+ @Override
+ public boolean markSupported() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/avro/blob/e3c5da69/lang/java/avro/src/test/java/org/apache/avro/message/TestBinaryMessageEncoding.java
----------------------------------------------------------------------
diff --git a/lang/java/avro/src/test/java/org/apache/avro/message/TestBinaryMessageEncoding.java b/lang/java/avro/src/test/java/org/apache/avro/message/TestBinaryMessageEncoding.java
new file mode 100644
index 0000000..47656b8
--- /dev/null
+++ b/lang/java/avro/src/test/java/org/apache/avro/message/TestBinaryMessageEncoding.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.message;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.junit.Assert;
+import org.junit.Test;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+public class TestBinaryMessageEncoding {
+ public static final Schema SCHEMA_V1 = SchemaBuilder.record("TestRecord")
+ .fields()
+ .requiredInt("id")
+ .optionalString("msg")
+ .endRecord();
+
+ public static final GenericRecordBuilder V1_BUILDER =
+ new GenericRecordBuilder(SCHEMA_V1);
+
+ public static final List<Record> V1_RECORDS = Arrays.asList(
+ V1_BUILDER.set("id", 1).set("msg", "m-1").build(),
+ V1_BUILDER.set("id", 2).set("msg", "m-2").build(),
+ V1_BUILDER.set("id", 4).set("msg", "m-4").build(),
+ V1_BUILDER.set("id", 6).set("msg", "m-6").build()
+ );
+
+ public static final Schema SCHEMA_V2 = SchemaBuilder.record("TestRecord")
+ .fields()
+ .requiredLong("id")
+ .name("message").aliases("msg").type().optional().stringType()
+ .optionalDouble("data")
+ .endRecord();
+
+ public static final GenericRecordBuilder V2_BUILDER =
+ new GenericRecordBuilder(SCHEMA_V2);
+
+ public static final List<Record> V2_RECORDS = Arrays.asList(
+ V2_BUILDER.set("id", 3L).set("message", "m-3").set("data", 12.3).build(),
+ V2_BUILDER.set("id", 5L).set("message", "m-5").set("data", 23.4).build(),
+ V2_BUILDER.set("id", 7L).set("message", "m-7").set("data", 34.5).build(),
+ V2_BUILDER.set("id", 8L).set("message", "m-8").set("data", 35.6).build()
+ );
+
+ @Test
+ public void testByteBufferRoundTrip() throws Exception {
+ MessageEncoder<Record> encoder = new BinaryMessageEncoder<Record>(
+ GenericData.get(), SCHEMA_V2);
+ MessageDecoder<Record> decoder = new BinaryMessageDecoder<Record>(
+ GenericData.get(), SCHEMA_V2);
+
+ Record copy = decoder.decode(encoder.encode(V2_RECORDS.get(0)));
+
+ Assert.assertTrue("Copy should not be the same object",
+ copy != V2_RECORDS.get(0));
+ Assert.assertEquals("Record should be identical after round-trip",
+ V2_RECORDS.get(0), copy);
+ }
+
+ @Test
+ public void testSchemaEvolution() throws Exception {
+ List<ByteBuffer> buffers = Lists.newArrayList();
+ List<Record> records = Ordering.usingToString().sortedCopy(
+ Iterables.concat(V1_RECORDS, V2_RECORDS));
+
+ MessageEncoder<Record> v1Encoder = new BinaryMessageEncoder<Record>(
+ GenericData.get(), SCHEMA_V1);
+ MessageEncoder<Record> v2Encoder = new BinaryMessageEncoder<Record>(
+ GenericData.get(), SCHEMA_V2);
+
+ for (Record record : records) {
+ if (record.getSchema() == SCHEMA_V1) {
+ buffers.add(v1Encoder.encode(record));
+ } else {
+ buffers.add(v2Encoder.encode(record));
+ }
+ }
+
+ Set<Record> allAsV2 = Sets.newHashSet(V2_RECORDS);
+ allAsV2.add(
+ V2_BUILDER.set("id", 1L).set("message", "m-1").clear("data").build());
+ allAsV2.add(
+ V2_BUILDER.set("id", 2L).set("message", "m-2").clear("data").build());
+ allAsV2.add(
+ V2_BUILDER.set("id", 4L).set("message", "m-4").clear("data").build());
+ allAsV2.add(
+ V2_BUILDER.set("id", 6L).set("message", "m-6").clear("data").build());
+
+ BinaryMessageDecoder<Record> v2Decoder = new BinaryMessageDecoder<Record>(
+ GenericData.get(), SCHEMA_V2);
+ v2Decoder.addSchema(SCHEMA_V1);
+
+ Set<Record> decodedUsingV2 = Sets.newHashSet();
+ for (ByteBuffer buffer : buffers) {
+ decodedUsingV2.add(v2Decoder.decode(buffer));
+ }
+
+ Assert.assertEquals(allAsV2, decodedUsingV2);
+ }
+
+ @Test(expected = MissingSchemaException.class)
+ public void testCompatibleReadFailsWithoutSchema() throws Exception {
+ MessageEncoder<Record> v1Encoder = new BinaryMessageEncoder<Record>(
+ GenericData.get(), SCHEMA_V1);
+ BinaryMessageDecoder<Record> v2Decoder = new BinaryMessageDecoder<Record>(
+ GenericData.get(), SCHEMA_V2);
+
+ ByteBuffer v1Buffer = v1Encoder.encode(V1_RECORDS.get(3));
+
+ v2Decoder.decode(v1Buffer);
+ }
+
+ @Test
+ public void testCompatibleReadWithSchema() throws Exception {
+ MessageEncoder<Record> v1Encoder = new BinaryMessageEncoder<Record>(
+ GenericData.get(), SCHEMA_V1);
+ BinaryMessageDecoder<Record> v2Decoder = new BinaryMessageDecoder<Record>(
+ GenericData.get(), SCHEMA_V2);
+ v2Decoder.addSchema(SCHEMA_V1);
+
+ ByteBuffer v1Buffer = v1Encoder.encode(V1_RECORDS.get(3));
+
+ Record record = v2Decoder.decode(v1Buffer);
+
+ Assert.assertEquals(
+ V2_BUILDER.set("id", 6L).set("message", "m-6").clear("data").build(),
+ record);
+ }
+
+ @Test
+ public void testCompatibleReadWithSchemaFromLookup() throws Exception {
+ MessageEncoder<Record> v1Encoder = new BinaryMessageEncoder<Record>(
+ GenericData.get(), SCHEMA_V1);
+
+ SchemaStore.Cache schemaCache = new SchemaStore.Cache();
+ schemaCache.addSchema(SCHEMA_V1);
+ BinaryMessageDecoder<Record> v2Decoder = new BinaryMessageDecoder<Record>(
+ GenericData.get(), SCHEMA_V2, schemaCache);
+
+ ByteBuffer v1Buffer = v1Encoder.encode(V1_RECORDS.get(2));
+
+ Record record = v2Decoder.decode(v1Buffer);
+
+ Assert.assertEquals(
+ V2_BUILDER.set("id", 4L).set("message", "m-4").clear("data").build(),
+ record);
+ }
+
+ @Test
+ public void testBufferReuse() throws Exception {
+ // This test depends on the serialized version of record 1 being smaller or
+ // the same size as record 0 so that the reused ByteArrayOutputStream won't
+ // expand its internal buffer.
+ MessageEncoder<Record> encoder = new BinaryMessageEncoder<Record>(
+ GenericData.get(), SCHEMA_V1, false);
+
+ ByteBuffer b0 = encoder.encode(V1_RECORDS.get(0));
+ ByteBuffer b1 = encoder.encode(V1_RECORDS.get(1));
+
+ Assert.assertEquals(b0.array(), b1.array());
+
+ MessageDecoder<Record> decoder = new BinaryMessageDecoder<Record>(
+ GenericData.get(), SCHEMA_V1);
+ Assert.assertEquals("Buffer was reused, decode(b0) should be record 1",
+ V1_RECORDS.get(1), decoder.decode(b0));
+ }
+
+ @Test
+ public void testBufferCopy() throws Exception {
+ MessageEncoder<Record> encoder = new BinaryMessageEncoder<Record>(
+ GenericData.get(), SCHEMA_V1);
+
+ ByteBuffer b0 = encoder.encode(V1_RECORDS.get(0));
+ ByteBuffer b1 = encoder.encode(V1_RECORDS.get(1));
+
+ Assert.assertNotEquals(b0.array(), b1.array());
+
+ MessageDecoder<Record> decoder = new BinaryMessageDecoder<Record>(
+ GenericData.get(), SCHEMA_V1);
+ // bytes are not changed by reusing the encoder
+ Assert.assertEquals("Buffer was copied, decode(b0) should be record 0",
+ V1_RECORDS.get(0), decoder.decode(b0));
+ }
+
+ @Test(expected = AvroRuntimeException.class)
+ public void testByteBufferMissingPayload() throws Exception {
+ MessageEncoder<Record> encoder = new BinaryMessageEncoder<Record>(
+ GenericData.get(), SCHEMA_V2);
+ MessageDecoder<Record> decoder = new BinaryMessageDecoder<Record>(
+ GenericData.get(), SCHEMA_V2);
+
+ ByteBuffer buffer = encoder.encode(V2_RECORDS.get(0));
+
+ buffer.limit(12);
+
+ decoder.decode(buffer);
+ }
+
+ @Test(expected = BadHeaderException.class)
+ public void testByteBufferMissingFullHeader() throws Exception {
+ MessageEncoder<Record> encoder = new BinaryMessageEncoder<Record>(
+ GenericData.get(), SCHEMA_V2);
+ MessageDecoder<Record> decoder = new BinaryMessageDecoder<Record>(
+ GenericData.get(), SCHEMA_V2);
+
+ ByteBuffer buffer = encoder.encode(V2_RECORDS.get(0));
+
+ buffer.limit(8);
+
+ decoder.decode(buffer);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/avro/blob/e3c5da69/lang/java/guava/src/main/java/org/apache/avro/GuavaClasses.java
----------------------------------------------------------------------
diff --git a/lang/java/guava/src/main/java/org/apache/avro/GuavaClasses.java b/lang/java/guava/src/main/java/org/apache/avro/GuavaClasses.java
index fd77f2d..25d918f 100644
--- a/lang/java/guava/src/main/java/org/apache/avro/GuavaClasses.java
+++ b/lang/java/guava/src/main/java/org/apache/avro/GuavaClasses.java
@@ -19,6 +19,7 @@
package org.apache.avro;
import com.google.common.collect.MapMaker;
+import com.google.common.primitives.Bytes;
class GuavaClasses {
/*
@@ -27,5 +28,6 @@ class GuavaClasses {
*/
static {
MapMaker.class.getName();
+ Bytes.class.getName();
}
}