You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by mg...@apache.org on 2022/02/01 11:51:36 UTC
[avro] 21/30: AVRO-3302: Add interop tests for rust (#1456)
This is an automated email from the ASF dual-hosted git repository.
mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git
commit e8ce233387929144ae2a8ca78865956b443624b1
Author: Martin Grigorov <ma...@users.noreply.github.com>
AuthorDate: Wed Jan 19 09:44:47 2022 +0200
AVRO-3302: Add interop tests for rust (#1456)
* AVRO-3302: Add interop tests for Rust module
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3302: Implement test_interop_data for Rust
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3302: Run JS interop tests to read .avro files created by Rust
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3302 Fix the path to lang/js
Add debug for failing Maven build
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3302: git checkout & install Rust stable
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3302: Fix formatting
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3302 Remove debug statement
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3302 Revert removed debug statement
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3302: Improve documentation
First check 'parsed_schemas', then 'resolving_schemas' and finally
'input_schemas'.
Enable a test case that is working now.
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3302 Seems to work ?!
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3302 Code formatting
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3302 Properly encode/decode Schema::Ref
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3302 Remove debug statements
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3302 Print messages for successful reads
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3302 Rename Codec::Zstd to Codec::Zstandard for consistency
This is the name used by the other Avro modules (e.g. Java & Perl)
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3302 Fix formatting
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3302 Collect all errors during interop test and panic at the end
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3302 Format imports for +nightly
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3302 Use Perl to read the interop .avro files created by Rust
Perl support most of the optional codecs
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3302 Code cleanup
Revert changes which are not really needed.
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3302 Remove a FIXME that cannot be addressed
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
(cherry picked from commit 98f4f4a87bb61cacf91f50253f8154c8b36de690)
---
.github/workflows/test-lang-rust-ci.yml | 71 ++++++
lang/rust/build.sh | 35 ++-
lang/rust/examples/generate_interop_data.rs | 99 ++++++++
lang/rust/examples/test_interop_data.rs | 59 +++++
lang/rust/src/codec.rs | 16 +-
lang/rust/src/decode.rs | 338 +++++++++++++++-------------
lang/rust/src/encode.rs | 199 +++++++++-------
lang/rust/src/error.rs | 4 +
lang/rust/src/schema.rs | 164 +++++++++++++-
lang/rust/src/schema_compatibility.rs | 1 -
lang/rust/src/types.rs | 116 ++++++----
lang/rust/src/util.rs | 43 +++-
lang/rust/tests/schema.rs | 86 ++++++-
13 files changed, 916 insertions(+), 315 deletions(-)
diff --git a/.github/workflows/test-lang-rust-ci.yml b/.github/workflows/test-lang-rust-ci.yml
index dacf461..91364f9 100644
--- a/.github/workflows/test-lang-rust-ci.yml
+++ b/.github/workflows/test-lang-rust-ci.yml
@@ -77,3 +77,74 @@ jobs:
with:
command: test
args: --manifest-path lang/rust/Cargo.toml --doc
+
+ interop:
+ runs-on: ubuntu-latest
+
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v2
+
+ - name: Rust Toolchain
+ uses: actions-rs/toolchain@v1
+ with:
+ profile: minimal
+ toolchain: stable
+ override: true
+
+ - name: Cache Local Maven Repository
+ uses: actions/cache@v2
+ with:
+ path: ~/.m2/repository
+ key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
+ restore-keys: |
+ ${{ runner.os }}-maven-
+
+ - name: Install Java Avro for Interop Test
+ working-directory: .
+ run: mvn -B install -DskipTests
+
+ - name: Create Interop Data Directory
+ working-directory: .
+ run: mkdir -p build/interop/data
+
+ - name: Generate Interop Resources
+ working-directory: lang/java/avro
+ run: mvn -B -P interop-data-generate generate-resources
+
+ - name: Generate interop data
+ run: ./build.sh interop-data-generate
+
+ - name: Rust reads interop files created by Java and Rust
+ run: ./build.sh interop-data-test
+
+ - uses: shogo82148/actions-setup-perl@v1
+ with:
+ perl-version: 5.32
+
+ - name: Install Dependencies
+ run: |
+ sudo apt-get -qqy install --no-install-recommends libcompress-raw-zlib-perl \
+ libcpan-uploader-perl \
+ libencode-perl \
+ libio-string-perl \
+ libjansson-dev \
+ libjson-xs-perl \
+ libmodule-install-perl \
+ libmodule-install-readmefrompod-perl \
+ libobject-tiny-perl \
+ libsnappy-dev \
+ libtest-exception-perl \
+ libtest-pod-perl
+ cpanm --mirror https://www.cpan.org/ install Compress::Zstd \
+ Error::Simple \
+ Module::Install::Repository \
+ Object::Tiny \
+ Regexp::Common \
+ Try::Tiny \
+ inc::Module::Install
+
+
+ - name: Perl reads interop files created by Java and Rust
+ working-directory: lang/perl
+ run: ./build.sh interop-data-test
diff --git a/lang/rust/build.sh b/lang/rust/build.sh
index d9a2484..2f0a824 100755
--- a/lang/rust/build.sh
+++ b/lang/rust/build.sh
@@ -15,7 +15,25 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-set -e
+set -e # exit on error
+
+root_dir=$(pwd)
+build_dir="../../build/rust"
+dist_dir="../../dist/rust"
+
+
+function clean {
+ if [ -d $build_dir ]; then
+ find $build_dir | xargs chmod 755
+ rm -rf $build_dir
+ fi
+}
+
+
+function prepare_build {
+ clean
+ mkdir -p $build_dir
+}
cd `dirname "$0"`
@@ -35,10 +53,21 @@ do
cargo build --release --lib --all-features
cargo package
mkdir -p ../../dist/rust
- cp target/package/avro-rs-*.crate ../../dist/rust
+ cp target/package/avro-rs-*.crate $dist_dir
+ ;;
+ interop-data-generate)
+ prepare_build
+ export RUST_LOG=avro_rs=debug
+ export RUST_BACKTRACE=1
+ cargo run --all-features --example generate_interop_data
+ ;;
+
+ interop-data-test)
+ prepare_build
+ cargo run --all-features --example test_interop_data
;;
*)
- echo "Usage: $0 {lint|test|dist|clean}" >&2
+ echo "Usage: $0 {lint|test|dist|clean|interop-data-generate|interop-data-test}" >&2
exit 1
esac
done
diff --git a/lang/rust/examples/generate_interop_data.rs b/lang/rust/examples/generate_interop_data.rs
new file mode 100644
index 0000000..cb8efda
--- /dev/null
+++ b/lang/rust/examples/generate_interop_data.rs
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use avro_rs::{
+ schema::Schema,
+ types::{Record, Value},
+ Codec, Writer,
+};
+use std::collections::HashMap;
+use strum::IntoEnumIterator;
+
+fn create_datum(schema: &Schema) -> Record {
+ let mut datum = Record::new(schema).unwrap();
+ datum.put("intField", 12_i32);
+ datum.put("longField", 15234324_i64);
+ datum.put("stringField", "hey");
+ datum.put("boolField", true);
+ datum.put("floatField", 1234.0_f32);
+ datum.put("doubleField", -1234.0_f64);
+ datum.put("bytesField", b"12312adf".to_vec());
+ datum.put("nullField", Value::Null);
+ datum.put(
+ "arrayField",
+ Value::Array(vec![
+ Value::Double(5.0),
+ Value::Double(0.0),
+ Value::Double(12.0),
+ ]),
+ );
+ let mut map = HashMap::new();
+ map.insert(
+ "a".into(),
+ Value::Record(vec![("label".into(), Value::String("a".into()))]),
+ );
+ map.insert(
+ "bee".into(),
+ Value::Record(vec![("label".into(), Value::String("cee".into()))]),
+ );
+ datum.put("mapField", Value::Map(map));
+ datum.put("unionField", Value::Union(Box::new(Value::Double(12.0))));
+ datum.put("enumField", Value::Enum(2, "C".to_owned()));
+ datum.put("fixedField", Value::Fixed(16, b"1019181716151413".to_vec()));
+ datum.put(
+ "recordField",
+ Value::Record(vec![
+ ("label".into(), Value::String("outer".into())),
+ (
+ "children".into(),
+ Value::Array(vec![Value::Record(vec![
+ ("label".into(), Value::String("inner".into())),
+ ("children".into(), Value::Array(vec![])),
+ ])]),
+ ),
+ ]),
+ );
+
+ datum
+}
+
+fn main() -> anyhow::Result<()> {
+ let schema_str = std::fs::read_to_string("../../share/test/schemas/interop.avsc")
+ .expect("Unable to read the interop Avro schema");
+ let schema = Schema::parse_str(schema_str.as_str())?;
+
+ for codec in Codec::iter() {
+ let mut writer = Writer::with_codec(&schema, Vec::new(), codec);
+ let datum = create_datum(&schema);
+ writer.append(datum)?;
+ let bytes = writer.into_inner()?;
+
+ let codec_name = <&str>::from(codec);
+ let suffix = if codec_name == "null" {
+ "".to_owned()
+ } else {
+ format!("_{}", codec_name)
+ };
+
+ std::fs::write(
+ format!("../../build/interop/data/rust{}.avro", suffix),
+ bytes,
+ )?;
+ }
+
+ Ok(())
+}
diff --git a/lang/rust/examples/test_interop_data.rs b/lang/rust/examples/test_interop_data.rs
new file mode 100644
index 0000000..f86c6c4
--- /dev/null
+++ b/lang/rust/examples/test_interop_data.rs
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use avro_rs::Reader;
+use std::ffi::OsStr;
+
+fn main() -> anyhow::Result<()> {
+ let data_dir = std::fs::read_dir("../../build/interop/data/")
+ .expect("Unable to list the interop data directory");
+
+ let mut errors = Vec::new();
+
+ for entry in data_dir {
+ let path = entry
+ .expect("Unable to read the interop data directory's files")
+ .path();
+
+ if path.is_file() {
+ let ext = path.extension().and_then(OsStr::to_str).unwrap();
+
+ if ext == "avro" {
+ println!("Checking {:?}", &path);
+ let content = std::fs::File::open(&path)?;
+ let reader = Reader::new(&content)?;
+ for value in reader {
+ if let Err(e) = value {
+ errors.push(format!(
+ "There is a problem with reading of '{:?}', \n {:?}\n",
+ &path, e
+ ));
+ }
+ }
+ }
+ }
+ }
+
+ if errors.is_empty() {
+ Ok(())
+ } else {
+ panic!(
+ "There were errors reading some .avro files:\n{}",
+ errors.join(", ")
+ );
+ }
+}
diff --git a/lang/rust/src/codec.rs b/lang/rust/src/codec.rs
index 0ba8abe..c9a584e 100644
--- a/lang/rust/src/codec.rs
+++ b/lang/rust/src/codec.rs
@@ -19,7 +19,7 @@
use crate::{types::Value, AvroResult, Error};
use libflate::deflate::{Decoder, Encoder};
use std::io::{Read, Write};
-use strum_macros::{EnumString, IntoStaticStr};
+use strum_macros::{EnumIter, EnumString, IntoStaticStr};
#[cfg(feature = "bzip")]
use bzip2::{
@@ -34,7 +34,7 @@ use crc32fast::Hasher;
use xz2::read::{XzDecoder, XzEncoder};
/// The compression codec used to compress blocks.
-#[derive(Clone, Copy, Debug, PartialEq, EnumString, IntoStaticStr)]
+#[derive(Clone, Copy, Debug, PartialEq, EnumIter, EnumString, IntoStaticStr)]
#[strum(serialize_all = "kebab_case")]
pub enum Codec {
/// The `Null` codec simply passes through data uncompressed.
@@ -49,7 +49,7 @@ pub enum Codec {
/// CRC32 checksum of the uncompressed data in the block.
Snappy,
#[cfg(feature = "zstandard")]
- Zstd,
+ Zstandard,
#[cfg(feature = "bzip")]
/// The `BZip2` codec uses [BZip2](https://sourceware.org/bzip2/)
/// compression library.
@@ -98,7 +98,7 @@ impl Codec {
*stream = encoded;
}
#[cfg(feature = "zstandard")]
- Codec::Zstd => {
+ Codec::Zstandard => {
let mut encoder = zstd::Encoder::new(Vec::new(), 0).unwrap();
encoder.write_all(stream).map_err(Error::ZstdCompress)?;
*stream = encoder.finish().unwrap();
@@ -157,7 +157,7 @@ impl Codec {
decoded
}
#[cfg(feature = "zstandard")]
- Codec::Zstd => {
+ Codec::Zstandard => {
let mut decoded = Vec::new();
let mut decoder = zstd::Decoder::new(&stream[..]).unwrap();
std::io::copy(&mut decoder, &mut decoded).map_err(Error::ZstdDecompress)?;
@@ -212,7 +212,7 @@ mod tests {
#[cfg(feature = "zstandard")]
#[test]
fn zstd_compress_and_decompress() {
- compress_and_decompress(Codec::Zstd);
+ compress_and_decompress(Codec::Zstandard);
}
#[cfg(feature = "bzip")]
@@ -245,7 +245,7 @@ mod tests {
assert_eq!(<&str>::from(Codec::Snappy), "snappy");
#[cfg(feature = "zstandard")]
- assert_eq!(<&str>::from(Codec::Zstd), "zstd");
+ assert_eq!(<&str>::from(Codec::Zstandard), "zstandard");
#[cfg(feature = "bzip")]
assert_eq!(<&str>::from(Codec::Bzip2), "bzip2");
@@ -265,7 +265,7 @@ mod tests {
assert_eq!(Codec::from_str("snappy").unwrap(), Codec::Snappy);
#[cfg(feature = "zstandard")]
- assert_eq!(Codec::from_str("zstd").unwrap(), Codec::Zstd);
+ assert_eq!(Codec::from_str("zstandard").unwrap(), Codec::Zstandard);
#[cfg(feature = "bzip")]
assert_eq!(Codec::from_str("bzip2").unwrap(), Codec::Bzip2);
diff --git a/lang/rust/src/decode.rs b/lang/rust/src/decode.rs
index 5639d28..26678bf 100644
--- a/lang/rust/src/decode.rs
+++ b/lang/rust/src/decode.rs
@@ -68,183 +68,217 @@ fn decode_seq_len<R: Read>(reader: &mut R) -> AvroResult<usize> {
/// Decode a `Value` from avro format given its `Schema`.
pub fn decode<R: Read>(schema: &Schema, reader: &mut R) -> AvroResult<Value> {
- match *schema {
- Schema::Null => Ok(Value::Null),
- Schema::Boolean => {
- let mut buf = [0u8; 1];
- match reader.read_exact(&mut buf[..]) {
- Ok(_) => match buf[0] {
- 0u8 => Ok(Value::Boolean(false)),
- 1u8 => Ok(Value::Boolean(true)),
- _ => Err(Error::BoolValue(buf[0])),
- },
- Err(io_err) => {
- if let ErrorKind::UnexpectedEof = io_err.kind() {
- Ok(Value::Null)
- } else {
- Err(Error::ReadBoolean(io_err))
+ fn decode0<R: Read>(
+ schema: &Schema,
+ reader: &mut R,
+ schemas_by_name: &mut HashMap<String, Schema>,
+ ) -> AvroResult<Value> {
+ match *schema {
+ Schema::Null => Ok(Value::Null),
+ Schema::Boolean => {
+ let mut buf = [0u8; 1];
+ match reader.read_exact(&mut buf[..]) {
+ Ok(_) => match buf[0] {
+ 0u8 => Ok(Value::Boolean(false)),
+ 1u8 => Ok(Value::Boolean(true)),
+ _ => Err(Error::BoolValue(buf[0])),
+ },
+ Err(io_err) => {
+ if let ErrorKind::UnexpectedEof = io_err.kind() {
+ Ok(Value::Null)
+ } else {
+ Err(Error::ReadBoolean(io_err))
+ }
}
}
}
- }
- Schema::Decimal { ref inner, .. } => match &**inner {
- Schema::Fixed { .. } => match decode(inner, reader)? {
- Value::Fixed(_, bytes) => Ok(Value::Decimal(Decimal::from(bytes))),
- value => Err(Error::FixedValue(value.into())),
- },
- Schema::Bytes => match decode(inner, reader)? {
- Value::Bytes(bytes) => Ok(Value::Decimal(Decimal::from(bytes))),
- value => Err(Error::BytesValue(value.into())),
+ Schema::Decimal { ref inner, .. } => match &**inner {
+ Schema::Fixed { .. } => match decode0(inner, reader, schemas_by_name)? {
+ Value::Fixed(_, bytes) => Ok(Value::Decimal(Decimal::from(bytes))),
+ value => Err(Error::FixedValue(value.into())),
+ },
+ Schema::Bytes => match decode0(inner, reader, schemas_by_name)? {
+ Value::Bytes(bytes) => Ok(Value::Decimal(Decimal::from(bytes))),
+ value => Err(Error::BytesValue(value.into())),
+ },
+ schema => Err(Error::ResolveDecimalSchema(schema.into())),
},
- schema => Err(Error::ResolveDecimalSchema(schema.into())),
- },
- Schema::Uuid => Ok(Value::Uuid(
- Uuid::from_str(match decode(&Schema::String, reader)? {
- Value::String(ref s) => s,
- value => return Err(Error::GetUuidFromStringValue(value.into())),
- })
- .map_err(Error::ConvertStrToUuid)?,
- )),
- Schema::Int => decode_int(reader),
- Schema::Date => zag_i32(reader).map(Value::Date),
- Schema::TimeMillis => zag_i32(reader).map(Value::TimeMillis),
- Schema::Long => decode_long(reader),
- Schema::TimeMicros => zag_i64(reader).map(Value::TimeMicros),
- Schema::TimestampMillis => zag_i64(reader).map(Value::TimestampMillis),
- Schema::TimestampMicros => zag_i64(reader).map(Value::TimestampMicros),
- Schema::Duration => {
- let mut buf = [0u8; 12];
- reader.read_exact(&mut buf).map_err(Error::ReadDuration)?;
- Ok(Value::Duration(Duration::from(buf)))
- }
- Schema::Float => {
- let mut buf = [0u8; std::mem::size_of::<f32>()];
- reader.read_exact(&mut buf[..]).map_err(Error::ReadFloat)?;
- Ok(Value::Float(f32::from_le_bytes(buf)))
- }
- Schema::Double => {
- let mut buf = [0u8; std::mem::size_of::<f64>()];
- reader.read_exact(&mut buf[..]).map_err(Error::ReadDouble)?;
- Ok(Value::Double(f64::from_le_bytes(buf)))
- }
- Schema::Bytes => {
- let len = decode_len(reader)?;
- let mut buf = vec![0u8; len];
- reader.read_exact(&mut buf).map_err(Error::ReadBytes)?;
- Ok(Value::Bytes(buf))
- }
- Schema::String => {
- let len = decode_len(reader)?;
- let mut buf = vec![0u8; len];
- match reader.read_exact(&mut buf) {
- Ok(_) => Ok(Value::String(
- String::from_utf8(buf).map_err(Error::ConvertToUtf8)?,
- )),
- Err(io_err) => {
- if let ErrorKind::UnexpectedEof = io_err.kind() {
- Ok(Value::Null)
- } else {
- Err(Error::ReadString(io_err))
+ Schema::Uuid => Ok(Value::Uuid(
+ Uuid::from_str(match decode0(&Schema::String, reader, schemas_by_name)? {
+ Value::String(ref s) => s,
+ value => return Err(Error::GetUuidFromStringValue(value.into())),
+ })
+ .map_err(Error::ConvertStrToUuid)?,
+ )),
+ Schema::Int => decode_int(reader),
+ Schema::Date => zag_i32(reader).map(Value::Date),
+ Schema::TimeMillis => zag_i32(reader).map(Value::TimeMillis),
+ Schema::Long => decode_long(reader),
+ Schema::TimeMicros => zag_i64(reader).map(Value::TimeMicros),
+ Schema::TimestampMillis => zag_i64(reader).map(Value::TimestampMillis),
+ Schema::TimestampMicros => zag_i64(reader).map(Value::TimestampMicros),
+ Schema::Duration => {
+ let mut buf = [0u8; 12];
+ reader.read_exact(&mut buf).map_err(Error::ReadDuration)?;
+ Ok(Value::Duration(Duration::from(buf)))
+ }
+ Schema::Float => {
+ let mut buf = [0u8; std::mem::size_of::<f32>()];
+ reader.read_exact(&mut buf[..]).map_err(Error::ReadFloat)?;
+ Ok(Value::Float(f32::from_le_bytes(buf)))
+ }
+ Schema::Double => {
+ let mut buf = [0u8; std::mem::size_of::<f64>()];
+ reader.read_exact(&mut buf[..]).map_err(Error::ReadDouble)?;
+ Ok(Value::Double(f64::from_le_bytes(buf)))
+ }
+ Schema::Bytes => {
+ let len = decode_len(reader)?;
+ let mut buf = vec![0u8; len];
+ reader.read_exact(&mut buf).map_err(Error::ReadBytes)?;
+ Ok(Value::Bytes(buf))
+ }
+ Schema::String => {
+ let len = decode_len(reader)?;
+ let mut buf = vec![0u8; len];
+ match reader.read_exact(&mut buf) {
+ Ok(_) => Ok(Value::String(
+ String::from_utf8(buf).map_err(Error::ConvertToUtf8)?,
+ )),
+ Err(io_err) => {
+ if let ErrorKind::UnexpectedEof = io_err.kind() {
+ Ok(Value::Null)
+ } else {
+ Err(Error::ReadString(io_err))
+ }
}
}
}
- }
- Schema::Fixed { size, .. } => {
- let mut buf = vec![0u8; size];
- reader
- .read_exact(&mut buf)
- .map_err(|e| Error::ReadFixed(e, size))?;
- Ok(Value::Fixed(size, buf))
- }
- Schema::Array(ref inner) => {
- let mut items = Vec::new();
+ Schema::Fixed { ref name, size, .. } => {
+ schemas_by_name.insert(name.name.clone(), schema.clone());
+ let mut buf = vec![0u8; size];
+ reader
+ .read_exact(&mut buf)
+ .map_err(|e| Error::ReadFixed(e, size))?;
+ Ok(Value::Fixed(size, buf))
+ }
+ Schema::Array(ref inner) => {
+ let mut items = Vec::new();
- loop {
- let len = decode_seq_len(reader)?;
- if len == 0 {
- break;
- }
+ loop {
+ let len = decode_seq_len(reader)?;
+ if len == 0 {
+ break;
+ }
- items.reserve(len);
- for _ in 0..len {
- items.push(decode(inner, reader)?);
+ items.reserve(len);
+ for _ in 0..len {
+ items.push(decode0(inner, reader, schemas_by_name)?);
+ }
}
- }
- Ok(Value::Array(items))
- }
- Schema::Map(ref inner) => {
- let mut items = HashMap::new();
+ Ok(Value::Array(items))
+ }
+ Schema::Map(ref inner) => {
+ let mut items = HashMap::new();
- loop {
- let len = decode_seq_len(reader)?;
- if len == 0 {
- break;
- }
+ loop {
+ let len = decode_seq_len(reader)?;
+ if len == 0 {
+ break;
+ }
- items.reserve(len);
- for _ in 0..len {
- match decode(&Schema::String, reader)? {
- Value::String(key) => {
- let value = decode(inner, reader)?;
- items.insert(key, value);
+ items.reserve(len);
+ for _ in 0..len {
+ match decode0(&Schema::String, reader, schemas_by_name)? {
+ Value::String(key) => {
+ let value = decode0(inner, reader, schemas_by_name)?;
+ items.insert(key, value);
+ }
+ value => return Err(Error::MapKeyType(value.into())),
}
- value => return Err(Error::MapKeyType(value.into())),
}
}
- }
- Ok(Value::Map(items))
- }
- Schema::Union(ref inner) => match zag_i64(reader) {
- Ok(index) => {
- let variants = inner.variants();
- let variant = variants
- .get(usize::try_from(index).map_err(|e| Error::ConvertI64ToUsize(e, index))?)
- .ok_or_else(|| Error::GetUnionVariant {
- index,
- num_variants: variants.len(),
- })?;
- let value = decode(variant, reader)?;
- Ok(Value::Union(Box::new(value)))
+ Ok(Value::Map(items))
}
- Err(Error::ReadVariableIntegerBytes(io_err)) => {
- if let ErrorKind::UnexpectedEof = io_err.kind() {
- Ok(Value::Union(Box::new(Value::Null)))
- } else {
- Err(Error::ReadVariableIntegerBytes(io_err))
+ Schema::Union(ref inner) => match zag_i64(reader) {
+ Ok(index) => {
+ let variants = inner.variants();
+ let variant = variants
+ .get(
+ usize::try_from(index)
+ .map_err(|e| Error::ConvertI64ToUsize(e, index))?,
+ )
+ .ok_or_else(|| Error::GetUnionVariant {
+ index,
+ num_variants: variants.len(),
+ })?;
+ let value = decode0(variant, reader, schemas_by_name)?;
+ Ok(Value::Union(Box::new(value)))
}
- }
- Err(io_err) => Err(io_err),
- },
+ Err(Error::ReadVariableIntegerBytes(io_err)) => {
+ if let ErrorKind::UnexpectedEof = io_err.kind() {
+ Ok(Value::Union(Box::new(Value::Null)))
+ } else {
+ Err(Error::ReadVariableIntegerBytes(io_err))
+ }
+ }
+ Err(io_err) => Err(io_err),
+ },
- Schema::Record { ref fields, .. } => {
- // Benchmarks indicate ~10% improvement using this method.
- let mut items = Vec::with_capacity(fields.len());
- for field in fields {
- // TODO: This clone is also expensive. See if we can do away with it...
- items.push((field.name.clone(), decode(&field.schema, reader)?));
+ Schema::Record {
+ ref name,
+ ref fields,
+ ..
+ } => {
+ schemas_by_name.insert(name.name.clone(), schema.clone());
+ // Benchmarks indicate ~10% improvement using this method.
+ let mut items = Vec::with_capacity(fields.len());
+ for field in fields {
+ // TODO: This clone is also expensive. See if we can do away with it...
+ items.push((
+ field.name.clone(),
+ decode0(&field.schema, reader, schemas_by_name)?,
+ ));
+ }
+ Ok(Value::Record(items))
}
- Ok(Value::Record(items))
- }
- Schema::Enum { ref symbols, .. } => {
- Ok(if let Value::Int(raw_index) = decode_int(reader)? {
- let index = usize::try_from(raw_index)
- .map_err(|e| Error::ConvertI32ToUsize(e, raw_index))?;
- if (0..=symbols.len()).contains(&index) {
- let symbol = symbols[index].clone();
- Value::Enum(raw_index, symbol)
+ Schema::Enum {
+ ref name,
+ ref symbols,
+ ..
+ } => {
+ schemas_by_name.insert(name.name.clone(), schema.clone());
+ Ok(if let Value::Int(raw_index) = decode_int(reader)? {
+ let index = usize::try_from(raw_index)
+ .map_err(|e| Error::ConvertI32ToUsize(e, raw_index))?;
+ if (0..=symbols.len()).contains(&index) {
+ let symbol = symbols[index].clone();
+ Value::Enum(raw_index, symbol)
+ } else {
+ return Err(Error::GetEnumValue {
+ index,
+ nsymbols: symbols.len(),
+ });
+ }
+ } else {
+ return Err(Error::GetEnumSymbol);
+ })
+ }
+ Schema::Ref { ref name } => {
+ let name = &name.name;
+ if let Some(resolved) = schemas_by_name.get(name.as_str()) {
+ decode0(resolved, reader, &mut schemas_by_name.clone())
} else {
- return Err(Error::GetEnumValue {
- index,
- nsymbols: symbols.len(),
- });
+ Err(Error::SchemaResolutionError(name.clone()))
}
- } else {
- return Err(Error::GetEnumSymbol);
- })
+ }
}
}
+
+ let mut schemas_by_name: HashMap<String, Schema> = HashMap::new();
+ decode0(schema, reader, &mut schemas_by_name)
}
#[cfg(test)]
diff --git a/lang/rust/src/encode.rs b/lang/rust/src/encode.rs
index 5c68051..088def9 100644
--- a/lang/rust/src/encode.rs
+++ b/lang/rust/src/encode.rs
@@ -20,7 +20,7 @@ use crate::{
types::Value,
util::{zig_i32, zig_i64},
};
-use std::convert::TryInto;
+use std::{collections::HashMap, convert::TryInto};
/// Encode a `Value` into avro format.
///
@@ -51,104 +51,135 @@ fn encode_int(i: i32, buffer: &mut Vec<u8>) {
/// be valid with regards to the schema. Schema are needed only to guide the
/// encoding for complex type values.
pub fn encode_ref(value: &Value, schema: &Schema, buffer: &mut Vec<u8>) {
- match value {
- Value::Null => (),
- Value::Boolean(b) => buffer.push(if *b { 1u8 } else { 0u8 }),
- // Pattern | Pattern here to signify that these _must_ have the same encoding.
- Value::Int(i) | Value::Date(i) | Value::TimeMillis(i) => encode_int(*i, buffer),
- Value::Long(i)
- | Value::TimestampMillis(i)
- | Value::TimestampMicros(i)
- | Value::TimeMicros(i) => encode_long(*i, buffer),
- Value::Float(x) => buffer.extend_from_slice(&x.to_le_bytes()),
- Value::Double(x) => buffer.extend_from_slice(&x.to_le_bytes()),
- Value::Decimal(decimal) => match schema {
- Schema::Decimal { inner, .. } => match *inner.clone() {
- Schema::Fixed { size, .. } => {
- let bytes = decimal.to_sign_extended_bytes_with_len(size).unwrap();
- let num_bytes = bytes.len();
- if num_bytes != size {
- panic!(
- "signed decimal bytes length {} not equal to fixed schema size {}",
- num_bytes, size
- );
+ fn encode_ref0(
+ value: &Value,
+ schema: &Schema,
+ buffer: &mut Vec<u8>,
+ schemas_by_name: &mut HashMap<String, Schema>,
+ ) {
+ match &schema {
+ Schema::Ref { ref name } => {
+ let resolved = schemas_by_name.get(name.name.as_str()).unwrap();
+ return encode_ref0(value, resolved, buffer, &mut schemas_by_name.clone());
+ }
+ Schema::Record { ref name, .. }
+ | Schema::Enum { ref name, .. }
+ | Schema::Fixed { ref name, .. } => {
+ schemas_by_name.insert(name.name.clone(), schema.clone());
+ }
+ _ => (),
+ }
+
+ match value {
+ Value::Null => (),
+ Value::Boolean(b) => buffer.push(if *b { 1u8 } else { 0u8 }),
+ // Pattern | Pattern here to signify that these _must_ have the same encoding.
+ Value::Int(i) | Value::Date(i) | Value::TimeMillis(i) => encode_int(*i, buffer),
+ Value::Long(i)
+ | Value::TimestampMillis(i)
+ | Value::TimestampMicros(i)
+ | Value::TimeMicros(i) => encode_long(*i, buffer),
+ Value::Float(x) => buffer.extend_from_slice(&x.to_le_bytes()),
+ Value::Double(x) => buffer.extend_from_slice(&x.to_le_bytes()),
+ Value::Decimal(decimal) => match schema {
+ Schema::Decimal { inner, .. } => match *inner.clone() {
+ Schema::Fixed { size, .. } => {
+ let bytes = decimal.to_sign_extended_bytes_with_len(size).unwrap();
+ let num_bytes = bytes.len();
+ if num_bytes != size {
+ panic!(
+ "signed decimal bytes length {} not equal to fixed schema size {}",
+ num_bytes, size
+ );
+ }
+ encode(&Value::Fixed(size, bytes), inner, buffer)
}
- encode(&Value::Fixed(size, bytes), inner, buffer)
- }
- Schema::Bytes => encode(&Value::Bytes(decimal.try_into().unwrap()), inner, buffer),
- _ => panic!("invalid inner type for decimal: {:?}", inner),
+ Schema::Bytes => {
+ encode(&Value::Bytes(decimal.try_into().unwrap()), inner, buffer)
+ }
+ _ => panic!("invalid inner type for decimal: {:?}", inner),
+ },
+ _ => panic!("invalid schema type for decimal: {:?}", schema),
},
- _ => panic!("invalid type for decimal: {:?}", schema),
- },
- &Value::Duration(duration) => {
- let slice: [u8; 12] = duration.into();
- buffer.extend_from_slice(&slice);
- }
- Value::Uuid(uuid) => encode_bytes(&uuid.to_string(), buffer),
- Value::Bytes(bytes) => match *schema {
- Schema::Bytes => encode_bytes(bytes, buffer),
- Schema::Fixed { .. } => buffer.extend(bytes),
- _ => (),
- },
- Value::String(s) => match *schema {
- Schema::String => {
- encode_bytes(s, buffer);
+ &Value::Duration(duration) => {
+ let slice: [u8; 12] = duration.into();
+ buffer.extend_from_slice(&slice);
}
- Schema::Enum { ref symbols, .. } => {
- if let Some(index) = symbols.iter().position(|item| item == s) {
- encode_int(index as i32, buffer);
+ Value::Uuid(uuid) => encode_bytes(&uuid.to_string(), buffer),
+ Value::Bytes(bytes) => match *schema {
+ Schema::Bytes => encode_bytes(bytes, buffer),
+ Schema::Fixed { .. } => buffer.extend(bytes),
+ _ => error!("invalid schema type for bytes: {:?}", schema),
+ },
+ Value::String(s) => match *schema {
+ Schema::String => {
+ encode_bytes(s, buffer);
+ }
+ Schema::Enum { ref symbols, .. } => {
+ if let Some(index) = symbols.iter().position(|item| item == s) {
+ encode_int(index as i32, buffer);
+ }
+ }
+ _ => error!("invalid schema type for String: {:?}", schema),
+ },
+ Value::Fixed(_, bytes) => buffer.extend(bytes),
+ Value::Enum(i, _) => encode_int(*i, buffer),
+ Value::Union(item) => {
+ if let Schema::Union(ref inner) = *schema {
+ // Find the schema that is matched here. Due to validation, this should always
+ // return a value.
+ let (idx, inner_schema) = inner
+ .find_schema(item)
+ .expect("Invalid Union validation occurred");
+ encode_long(idx as i64, buffer);
+ encode_ref0(&*item, inner_schema, buffer, schemas_by_name);
+ } else {
+ error!("invalid schema type for Union: {:?}", schema);
}
}
- _ => (),
- },
- Value::Fixed(_, bytes) => buffer.extend(bytes),
- Value::Enum(i, _) => encode_int(*i, buffer),
- Value::Union(item) => {
- if let Schema::Union(ref inner) = *schema {
- // Find the schema that is matched here. Due to validation, this should always
- // return a value.
- let (idx, inner_schema) = inner
- .find_schema(item)
- .expect("Invalid Union validation occurred");
- encode_long(idx as i64, buffer);
- encode_ref(&*item, inner_schema, buffer);
- }
- }
- Value::Array(items) => {
- if let Schema::Array(ref inner) = *schema {
- if !items.is_empty() {
- encode_long(items.len() as i64, buffer);
- for item in items.iter() {
- encode_ref(item, inner, buffer);
+ Value::Array(items) => {
+ if let Schema::Array(ref inner) = *schema {
+ if !items.is_empty() {
+ encode_long(items.len() as i64, buffer);
+ for item in items.iter() {
+ encode_ref0(item, inner, buffer, schemas_by_name);
+ }
}
+ buffer.push(0u8);
+ } else {
+ error!("invalid schema type for Array: {:?}", schema);
}
- buffer.push(0u8);
}
- }
- Value::Map(items) => {
- if let Schema::Map(ref inner) = *schema {
- if !items.is_empty() {
- encode_long(items.len() as i64, buffer);
- for (key, value) in items {
- encode_bytes(key, buffer);
- encode_ref(value, inner, buffer);
+ Value::Map(items) => {
+ if let Schema::Map(ref inner) = *schema {
+ if !items.is_empty() {
+ encode_long(items.len() as i64, buffer);
+ for (key, value) in items {
+ encode_bytes(key, buffer);
+ encode_ref0(value, inner, buffer, schemas_by_name);
+ }
}
+ buffer.push(0u8);
+ } else {
+ error!("invalid schema type for Map: {:?}", schema);
}
- buffer.push(0u8);
}
- }
- Value::Record(fields) => {
- if let Schema::Record {
- fields: ref schema_fields,
- ..
- } = *schema
- {
- for (i, &(_, ref value)) in fields.iter().enumerate() {
- encode_ref(value, &schema_fields[i].schema, buffer);
+ Value::Record(fields) => {
+ if let Schema::Record {
+ fields: ref schema_fields,
+ ..
+ } = *schema
+ {
+ for (i, &(_, ref value)) in fields.iter().enumerate() {
+ encode_ref0(value, &schema_fields[i].schema, buffer, schemas_by_name);
+ }
}
}
}
}
+
+ let mut schemas_by_name = HashMap::new();
+ encode_ref0(value, schema, buffer, &mut schemas_by_name)
}
pub fn encode_to_vec(value: &Value, schema: &Schema) -> Vec<u8> {
diff --git a/lang/rust/src/error.rs b/lang/rust/src/error.rs
index f4b1fb5..f65ec72 100644
--- a/lang/rust/src/error.rs
+++ b/lang/rust/src/error.rs
@@ -369,6 +369,10 @@ pub enum Error {
/// Error while converting float to json value
#[error("failed to convert avro float to json: {0}")]
ConvertF64ToJson(f64),
+
+ /// Error while resolving Schema::Ref
+ #[error("Unresolved schema reference: {0}")]
+ SchemaResolutionError(String),
}
impl serde::ser::Error for Error {
diff --git a/lang/rust/src/schema.rs b/lang/rust/src/schema.rs
index 3617bb1..2697328 100644
--- a/lang/rust/src/schema.rs
+++ b/lang/rust/src/schema.rs
@@ -21,6 +21,7 @@ use digest::Digest;
use lazy_static::lazy_static;
use regex::Regex;
use serde::{
+ ser,
ser::{SerializeMap, SerializeSeq},
Deserialize, Serialize, Serializer,
};
@@ -31,6 +32,7 @@ use std::{
convert::TryInto,
fmt,
str::FromStr,
+ sync::{Arc, Mutex},
};
use strum_macros::{EnumDiscriminants, EnumString};
@@ -141,6 +143,10 @@ pub enum Schema {
TimestampMicros,
/// An amount of time defined by a number of months, days and milliseconds.
Duration,
+ // A reference to another schema.
+ Ref {
+ name: Name,
+ },
}
impl PartialEq for Schema {
@@ -234,6 +240,11 @@ impl Name {
fn parse(complex: &Map<String, Value>) -> AvroResult<Self> {
let name = complex.name().ok_or(Error::GetNameField)?;
+ let type_name = match complex.get("type") {
+ Some(Value::Object(complex_type)) => complex_type.name().or(None),
+ _ => None,
+ };
+
let namespace = complex.string("namespace");
let aliases: Option<Vec<String>> = complex
@@ -248,7 +259,7 @@ impl Name {
});
Ok(Name {
- name,
+ name: type_name.unwrap_or(name),
namespace,
aliases,
})
@@ -420,11 +431,24 @@ fn parse_json_integer_for_decimal(value: &serde_json::Number) -> Result<DecimalM
#[derive(Default)]
struct Parser {
input_schemas: HashMap<String, Value>,
+ // A map of name -> Schema::Ref
+ // Used to resolve cyclic references, i.e. when a
+ // field's type is a reference to its record's type
+ resolving_schemas: HashMap<String, Schema>,
input_order: Vec<String>,
+ // A map of name -> fully parsed Schema
+ // Used to avoid parsing the same schema twice
parsed_schemas: HashMap<String, Schema>,
}
impl Schema {
+ // Used to help resolve cyclic references while serializing Schema to JSON.
+ // Needed because serde[_json] does not support using contexts.
+ // TODO: See whether alternatives like
+ // https://users.rust-lang.org/t/serde-question-access-to-a-shared-context-data-within-serialize-and-deserialize/39546
+ // can be used
+ thread_local!(static SCHEMAS_BY_NAME: Arc<Mutex<HashMap<String, Schema>>> = Arc::new(Mutex::new(HashMap::new())));
+
/// Converts `self` into its [Parsing Canonical Form].
///
/// [Parsing Canonical Form]:
@@ -480,6 +504,7 @@ impl Schema {
}
let mut parser = Parser {
input_schemas,
+ resolving_schemas: HashMap::default(),
input_order,
parsed_schemas: HashMap::with_capacity(input.len()),
};
@@ -515,7 +540,8 @@ impl Parser {
.remove_entry(&next_name)
.expect("Key unexpectedly missing");
let parsed = self.parse(&value)?;
- self.parsed_schemas.insert(name, parsed);
+ self.parsed_schemas
+ .insert(get_schema_type_name(name, value), parsed);
}
let mut parsed_schemas = Vec::with_capacity(self.parsed_schemas.len());
@@ -558,9 +584,11 @@ impl Parser {
}
/// Given a name, tries to retrieve the parsed schema from `parsed_schemas`.
- /// If a parsed schema is not found, it checks if a json with that name exists
- /// in `input_schemas` and then parses it (removing it from `input_schemas`)
- /// and adds the parsed schema to `parsed_schemas`
+ /// If a parsed schema is not found, it checks if a currently resolving
+ /// schema with that name exists.
+ /// If a resolving schema is not found, it checks if a json with that name exists
+ /// in `input_schemas` and then parses it (removing it from `input_schemas`)
+ /// and adds the parsed schema to `parsed_schemas`.
///
/// This method allows schemas definitions that depend on other types to
/// parse their dependencies (or look them up if already parsed).
@@ -568,12 +596,20 @@ impl Parser {
if let Some(parsed) = self.parsed_schemas.get(name) {
return Ok(parsed.clone());
}
+ if let Some(resolving_schema) = self.resolving_schemas.get(name) {
+ return Ok(resolving_schema.clone());
+ }
+
let value = self
.input_schemas
.remove(name)
.ok_or_else(|| Error::ParsePrimitive(name.into()))?;
+
let parsed = self.parse(&value)?;
- self.parsed_schemas.insert(name.to_string(), parsed.clone());
+ self.parsed_schemas.insert(
+ get_schema_type_name(name.to_string(), value),
+ parsed.clone(),
+ );
Ok(parsed)
}
@@ -772,6 +808,10 @@ impl Parser {
let mut lookup = HashMap::new();
+ let resolving_schema = Schema::Ref { name: name.clone() };
+ self.resolving_schemas
+ .insert(name.name.clone(), resolving_schema);
+
let fields: Vec<RecordField> = complex
.get("fields")
.and_then(|fields| fields.as_array())
@@ -798,6 +838,7 @@ impl Parser {
self.parsed_schemas
.insert(name.fullname(None), schema.clone());
+ self.resolving_schemas.remove(name.name.as_str());
Ok(schema)
}
@@ -893,12 +934,45 @@ impl Parser {
}
}
+fn get_schema_type_name(name: String, value: Value) -> String {
+ match value.get("type") {
+ Some(Value::Object(complex_type)) => match complex_type.name() {
+ Some(name) => name,
+ _ => name,
+ },
+ _ => name,
+ }
+}
+
impl Serialize for Schema {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
+ fn remember_schema(name: &Name, schema: &Schema) {
+ Schema::SCHEMAS_BY_NAME.with(|schemas_by_name| match schemas_by_name.try_lock() {
+ Ok(mut schemas) => {
+ schemas.insert((&name.name).clone(), schema.clone());
+ }
+ Err(poisoned) => {
+ error!("Wasn't able to lock schemas_by_name {:?}", poisoned);
+ }
+ });
+ }
+
match *self {
+ Schema::Ref { ref name } => {
+ let name = &name.name;
+ Schema::SCHEMAS_BY_NAME.with(|schemas_by_name| {
+ let schemas = schemas_by_name.lock().unwrap();
+ if schemas.contains_key(name.as_str()) {
+ serializer.serialize_str(name)
+ } else {
+ Err(ser::Error::custom(format!("Could not serialize Schema::Ref('{}') because it cannot be found in ({})",
+ name, schemas.keys().cloned().collect::<Vec<_>>().join(", "))))
+ }
+ })
+ }
Schema::Null => serializer.serialize_str("null"),
Schema::Boolean => serializer.serialize_str("boolean"),
Schema::Int => serializer.serialize_str("int"),
@@ -933,6 +1007,7 @@ impl Serialize for Schema {
ref fields,
..
} => {
+ remember_schema(name, self);
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("type", "record")?;
if let Some(ref n) = name.namespace {
@@ -953,6 +1028,7 @@ impl Serialize for Schema {
ref symbols,
..
} => {
+ remember_schema(name, self);
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("type", "enum")?;
map.serialize_entry("name", &name.name)?;
@@ -964,6 +1040,7 @@ impl Serialize for Schema {
ref doc,
ref size,
} => {
+ remember_schema(name, self);
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("type", "fixed")?;
map.serialize_entry("name", &name.name)?;
@@ -1240,7 +1317,7 @@ mod tests {
#[test]
fn test_record_schema() {
- let schema = Schema::parse_str(
+ let parsed = Schema::parse_str(
r#"
{
"type": "record",
@@ -1282,7 +1359,78 @@ mod tests {
lookup,
};
- assert_eq!(expected, schema);
+ assert_eq!(parsed, expected);
+ }
+
+ // AVRO-3302
+ #[test]
+ fn test_record_schema_with_currently_parsing_schema() {
+ let schema = Schema::parse_str(
+ r#"
+ {
+ "type": "record",
+ "name": "test",
+ "fields": [{
+ "name": "recordField",
+ "type": {
+ "type": "record",
+ "name": "Node",
+ "fields": [
+ {"name": "label", "type": "string"},
+ {"name": "children", "type": {"type": "array", "items": "Node"}}
+ ]
+ }
+ }]
+ }
+ "#,
+ )
+ .unwrap();
+
+ let mut lookup = HashMap::new();
+ lookup.insert("recordField".to_owned(), 0);
+
+ let mut node_lookup = HashMap::new();
+ node_lookup.insert("children".to_owned(), 1);
+ node_lookup.insert("label".to_owned(), 0);
+
+ let expected = Schema::Record {
+ name: Name::new("test"),
+ doc: None,
+ fields: vec![RecordField {
+ name: "recordField".to_string(),
+ doc: None,
+ default: None,
+ schema: Schema::Record {
+ name: Name::new("Node"),
+ doc: None,
+ fields: vec![
+ RecordField {
+ name: "label".to_string(),
+ doc: None,
+ default: None,
+ schema: Schema::String,
+ order: RecordFieldOrder::Ascending,
+ position: 0,
+ },
+ RecordField {
+ name: "children".to_string(),
+ doc: None,
+ default: None,
+ schema: Schema::Array(Box::new(Schema::Ref {
+ name: Name::new("Node"),
+ })),
+ order: RecordFieldOrder::Ascending,
+ position: 1,
+ },
+ ],
+ lookup: node_lookup,
+ },
+ order: RecordFieldOrder::Ascending,
+ position: 0,
+ }],
+ lookup,
+ };
+ assert_eq!(schema, expected);
}
#[test]
diff --git a/lang/rust/src/schema_compatibility.rs b/lang/rust/src/schema_compatibility.rs
index 5ba46e1..7c815e0 100644
--- a/lang/rust/src/schema_compatibility.rs
+++ b/lang/rust/src/schema_compatibility.rs
@@ -433,7 +433,6 @@ mod tests {
.map(|s| s.canonical_form())
.collect::<Vec<String>>()
.join(",");
- dbg!(&schema_string);
Schema::parse_str(&format!("[{}]", schema_string)).unwrap()
}
diff --git a/lang/rust/src/types.rs b/lang/rust/src/types.rs
index 75f0509..48a1813 100644
--- a/lang/rust/src/types.rs
+++ b/lang/rust/src/types.rs
@@ -323,6 +323,7 @@ impl Value {
/// for the full set of rules of schema validation.
pub fn validate(&self, schema: &Schema) -> bool {
match (self, schema) {
+ (_, &Schema::Ref { name: _ }) => true,
(&Value::Null, &Schema::Null) => true,
(&Value::Boolean(_), &Schema::Boolean) => true,
(&Value::Int(_), &Schema::Int) => true,
@@ -383,7 +384,10 @@ impl Value {
}
})
}
- _ => false,
+ (v, s) => {
+ error!("Unsupported value-schema combination:\n{:?}\n{:?}", v, s);
+ false
+ }
}
}
@@ -394,45 +398,79 @@ impl Value {
/// in the Avro specification for the full set of rules of schema
/// resolution.
pub fn resolve(mut self, schema: &Schema) -> AvroResult<Self> {
- // Check if this schema is a union, and if the reader schema is not.
- if SchemaKind::from(&self) == SchemaKind::Union
- && SchemaKind::from(schema) != SchemaKind::Union
- {
- // Pull out the Union, and attempt to resolve against it.
- let v = match self {
- Value::Union(b) => *b,
- _ => unreachable!(),
- };
- self = v;
- }
- match *schema {
- Schema::Null => self.resolve_null(),
- Schema::Boolean => self.resolve_boolean(),
- Schema::Int => self.resolve_int(),
- Schema::Long => self.resolve_long(),
- Schema::Float => self.resolve_float(),
- Schema::Double => self.resolve_double(),
- Schema::Bytes => self.resolve_bytes(),
- Schema::String => self.resolve_string(),
- Schema::Fixed { size, .. } => self.resolve_fixed(size),
- Schema::Union(ref inner) => self.resolve_union(inner),
- Schema::Enum { ref symbols, .. } => self.resolve_enum(symbols),
- Schema::Array(ref inner) => self.resolve_array(inner),
- Schema::Map(ref inner) => self.resolve_map(inner),
- Schema::Record { ref fields, .. } => self.resolve_record(fields),
- Schema::Decimal {
- scale,
- precision,
- ref inner,
- } => self.resolve_decimal(precision, scale, inner),
- Schema::Date => self.resolve_date(),
- Schema::TimeMillis => self.resolve_time_millis(),
- Schema::TimeMicros => self.resolve_time_micros(),
- Schema::TimestampMillis => self.resolve_timestamp_millis(),
- Schema::TimestampMicros => self.resolve_timestamp_micros(),
- Schema::Duration => self.resolve_duration(),
- Schema::Uuid => self.resolve_uuid(),
+ pub fn resolve0(
+ value: &mut Value,
+ schema: &Schema,
+ schemas_by_name: &mut HashMap<String, Schema>,
+ ) -> AvroResult<Value> {
+ // Check if this schema is a union, and if the reader schema is not.
+ if SchemaKind::from(&value.clone()) == SchemaKind::Union
+ && SchemaKind::from(schema) != SchemaKind::Union
+ {
+ // Pull out the Union, and attempt to resolve against it.
+ let v = match value {
+ Value::Union(b) => &**b,
+ _ => unreachable!(),
+ };
+ *value = v.clone();
+ }
+ let val: Value = value.clone();
+ match *schema {
+ Schema::Ref { ref name } => {
+ if let Some(resolved) = schemas_by_name.get(name.name.as_str()) {
+ resolve0(value, resolved, &mut schemas_by_name.clone())
+ } else {
+ Err(Error::SchemaResolutionError(name.name.clone()))
+ }
+ }
+ Schema::Null => val.resolve_null(),
+ Schema::Boolean => val.resolve_boolean(),
+ Schema::Int => val.resolve_int(),
+ Schema::Long => val.resolve_long(),
+ Schema::Float => val.resolve_float(),
+ Schema::Double => val.resolve_double(),
+ Schema::Bytes => val.resolve_bytes(),
+ Schema::String => val.resolve_string(),
+ Schema::Fixed { ref name, size, .. } => {
+ schemas_by_name.insert(name.name.clone(), schema.clone());
+ val.resolve_fixed(size)
+ }
+ Schema::Union(ref inner) => val.resolve_union(inner),
+ Schema::Enum {
+ ref name,
+ ref symbols,
+ ..
+ } => {
+ schemas_by_name.insert(name.name.clone(), schema.clone());
+ val.resolve_enum(symbols)
+ }
+ Schema::Array(ref inner) => val.resolve_array(inner),
+ Schema::Map(ref inner) => val.resolve_map(inner),
+ Schema::Record {
+ ref name,
+ ref fields,
+ ..
+ } => {
+ schemas_by_name.insert(name.name.clone(), schema.clone());
+ val.resolve_record(fields)
+ }
+ Schema::Decimal {
+ scale,
+ precision,
+ ref inner,
+ } => val.resolve_decimal(precision, scale, inner),
+ Schema::Date => val.resolve_date(),
+ Schema::TimeMillis => val.resolve_time_millis(),
+ Schema::TimeMicros => val.resolve_time_micros(),
+ Schema::TimestampMillis => val.resolve_timestamp_millis(),
+ Schema::TimestampMicros => val.resolve_timestamp_micros(),
+ Schema::Duration => val.resolve_duration(),
+ Schema::Uuid => val.resolve_uuid(),
+ }
}
+
+ let mut schemas_by_name: HashMap<String, Schema> = HashMap::new();
+ resolve0(&mut self, schema, &mut schemas_by_name)
}
fn resolve_uuid(self) -> Result<Self, Error> {
diff --git a/lang/rust/src/util.rs b/lang/rust/src/util.rs
index f9daf28..e2b353b 100644
--- a/lang/rust/src/util.rs
+++ b/lang/rust/src/util.rs
@@ -21,7 +21,7 @@ use std::{convert::TryFrom, i64, io::Read, sync::Once};
/// Maximum number of bytes that can be allocated when decoding
/// Avro-encoded values. This is a protection against ill-formed
-/// data, whose length field might be interpreted as enourmous.
+/// data, whose length field might be interpreted as enormous.
/// See max_allocation_bytes to change this limit.
pub static mut MAX_ALLOCATION_BYTES: usize = 512 * 1024 * 1024;
static MAX_ALLOCATION_BYTES_ONCE: Once = Once::new();
@@ -153,19 +153,40 @@ mod tests {
#[test]
fn test_zig_i64() {
let mut s = Vec::new();
- zig_i64(std::i32::MAX as i64, &mut s);
+
+ zig_i64(0, &mut s);
+ assert_eq!(s, [0]);
+
+ s.clear();
+ zig_i64(-1, &mut s);
+ assert_eq!(s, [1]);
+
+ s.clear();
+ zig_i64(1, &mut s);
+ assert_eq!(s, [2]);
+
+ s.clear();
+ zig_i64(-64, &mut s);
+ assert_eq!(s, [127]);
+
+ s.clear();
+ zig_i64(64, &mut s);
+ assert_eq!(s, [128, 1]);
+
+ s.clear();
+ zig_i64(i32::MAX as i64, &mut s);
assert_eq!(s, [254, 255, 255, 255, 15]);
s.clear();
- zig_i64(std::i32::MAX as i64 + 1, &mut s);
+ zig_i64(i32::MAX as i64 + 1, &mut s);
assert_eq!(s, [128, 128, 128, 128, 16]);
s.clear();
- zig_i64(std::i32::MIN as i64, &mut s);
+ zig_i64(i32::MIN as i64, &mut s);
assert_eq!(s, [255, 255, 255, 255, 15]);
s.clear();
- zig_i64(std::i32::MIN as i64 - 1, &mut s);
+ zig_i64(i32::MIN as i64 - 1, &mut s);
assert_eq!(s, [129, 128, 128, 128, 16]);
s.clear();
@@ -180,27 +201,27 @@ mod tests {
#[test]
fn test_zig_i32() {
let mut s = Vec::new();
- zig_i32(std::i32::MAX / 2, &mut s);
+ zig_i32(i32::MAX / 2, &mut s);
assert_eq!(s, [254, 255, 255, 255, 7]);
s.clear();
- zig_i32(std::i32::MIN / 2, &mut s);
+ zig_i32(i32::MIN / 2, &mut s);
assert_eq!(s, [255, 255, 255, 255, 7]);
s.clear();
- zig_i32(-(std::i32::MIN / 2), &mut s);
+ zig_i32(-(i32::MIN / 2), &mut s);
assert_eq!(s, [128, 128, 128, 128, 8]);
s.clear();
- zig_i32(std::i32::MIN / 2 - 1, &mut s);
+ zig_i32(i32::MIN / 2 - 1, &mut s);
assert_eq!(s, [129, 128, 128, 128, 8]);
s.clear();
- zig_i32(std::i32::MAX, &mut s);
+ zig_i32(i32::MAX, &mut s);
assert_eq!(s, [254, 255, 255, 255, 15]);
s.clear();
- zig_i32(std::i32::MIN, &mut s);
+ zig_i32(i32::MIN, &mut s);
assert_eq!(s, [255, 255, 255, 255, 15]);
}
diff --git a/lang/rust/tests/schema.rs b/lang/rust/tests/schema.rs
index cc96429..77b6569 100644
--- a/lang/rust/tests/schema.rs
+++ b/lang/rust/tests/schema.rs
@@ -17,9 +17,11 @@
use avro_rs::{
schema::{Name, RecordField},
- Error, Schema,
+ types::{Record, Value},
+ Codec, Error, Reader, Schema, Writer,
};
use lazy_static::lazy_static;
+use log::debug;
fn init() {
let _ = env_logger::builder()
@@ -739,29 +741,28 @@ fn test_parse_list_with_cross_deps_basic() {
}
#[test]
-/// Test that if a cycle of dependencies occurs in the input schema jsons, the algorithm terminates
-/// and returns an error. N.B. In the future, when recursive types are supported, this should be
-/// revisited.
-fn test_parse_list_recursive_type_error() {
+fn test_parse_list_recursive_type() {
init();
let schema_str_1 = r#"{
"name": "A",
+ "doc": "A's schema",
"type": "record",
"fields": [
- {"name": "field_one", "type": "B"}
+ {"name": "a_field_one", "type": "B"}
]
}"#;
let schema_str_2 = r#"{
"name": "B",
+ "doc": "B's schema",
"type": "record",
"fields": [
- {"name": "field_one", "type": "A"}
+ {"name": "b_field_one", "type": "A"}
]
}"#;
let schema_strs_first = [schema_str_1, schema_str_2];
let schema_strs_second = [schema_str_2, schema_str_1];
- let _ = Schema::parse_list(&schema_strs_first).expect_err("Test failed");
- let _ = Schema::parse_list(&schema_strs_second).expect_err("Test failed");
+ let _ = Schema::parse_list(&schema_strs_first).expect("Test failed");
+ let _ = Schema::parse_list(&schema_strs_second).expect("Test failed");
}
#[test]
@@ -1306,6 +1307,73 @@ fn test_root_error_is_not_swallowed_on_parse_error() -> Result<(), String> {
}
}
+// AVRO-3302
+#[test]
+fn test_record_schema_with_cyclic_references() {
+ let schema = Schema::parse_str(
+ r#"
+ {
+ "type": "record",
+ "name": "test",
+ "fields": [{
+ "name": "recordField",
+ "type": {
+ "type": "record",
+ "name": "Node",
+ "fields": [
+ {"name": "label", "type": "string"},
+ {"name": "children", "type": {"type": "array", "items": "Node"}}
+ ]
+ }
+ }]
+ }
+ "#,
+ )
+ .unwrap();
+
+ let mut datum = Record::new(&schema).unwrap();
+ datum.put(
+ "recordField",
+ Value::Record(vec![
+ ("label".into(), Value::String("level_1".into())),
+ (
+ "children".into(),
+ Value::Array(vec![Value::Record(vec![
+ ("label".into(), Value::String("level_2".into())),
+ (
+ "children".into(),
+ Value::Array(vec![Value::Record(vec![
+ ("label".into(), Value::String("level_3".into())),
+ (
+ "children".into(),
+ Value::Array(vec![Value::Record(vec![
+ ("label".into(), Value::String("level_4".into())),
+ ("children".into(), Value::Array(vec![])),
+ ])]),
+ ),
+ ])]),
+ ),
+ ])]),
+ ),
+ ]),
+ );
+
+ let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Null);
+ if let Err(err) = writer.append(datum) {
+ panic!("An error occurred while writing datum: {:?}", err)
+ }
+ let bytes = writer.into_inner().unwrap();
+ assert_eq!(316, bytes.len());
+
+ match Reader::new(&mut bytes.as_slice()) {
+ Ok(mut reader) => match reader.next() {
+ Some(value) => debug!("{:?}", value.unwrap()),
+ None => panic!("No value was read!"),
+ },
+ Err(err) => panic!("An error occurred while reading datum: {:?}", err),
+ }
+}
+
/*
// TODO: (#93) add support for logical type and attributes and uncomment (may need some tweaks to compile)
#[test]