You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by mg...@apache.org on 2022/02/01 11:51:36 UTC

[avro] 21/30: AVRO-3302: Add interop tests for rust (#1456)

This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git

commit e8ce233387929144ae2a8ca78865956b443624b1
Author: Martin Grigorov <ma...@users.noreply.github.com>
AuthorDate: Wed Jan 19 09:44:47 2022 +0200

    AVRO-3302: Add interop tests for rust (#1456)
    
    * AVRO-3302: Add interop tests for Rust module
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302: Implement test_interop_data for Rust
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302: Run JS interop tests to read .avro files created by Rust
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302 Fix the path to lang/js
    
    Add debug for failing Maven build
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302: git checkout & install Rust stable
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302: Fix formatting
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302 Remove debug statement
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302 Revert removed debug statement
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302: Improve documentation
    
    First check 'parsed_schemas', then 'resolving_schemas' and finally
    'input_schemas'.
    Enable a test case that is working now.
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302 Seems to work ?!
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302 Code formatting
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302 Properly encode/decode Schema::Ref
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302 Remove debug statements
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302 Print messages for successful reads
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302 Rename Codec::Zstd to Codec::Zstandard for consistency
    
    This is the name used by the other Avro modules (e.g. Java & Perl)
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302 Fix formatting
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302 Collect all errors during interop test and panic at the end
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302 Format imports for +nightly
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302 Use Perl to read the interop .avro files created by Rust
    
    Perl support most of the optional codecs
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302 Code cleanup
    
    Revert changes which are not really needed.
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302 Remove a FIXME that cannot be addressed
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    (cherry picked from commit 98f4f4a87bb61cacf91f50253f8154c8b36de690)
---
 .github/workflows/test-lang-rust-ci.yml     |  71 ++++++
 lang/rust/build.sh                          |  35 ++-
 lang/rust/examples/generate_interop_data.rs |  99 ++++++++
 lang/rust/examples/test_interop_data.rs     |  59 +++++
 lang/rust/src/codec.rs                      |  16 +-
 lang/rust/src/decode.rs                     | 338 +++++++++++++++-------------
 lang/rust/src/encode.rs                     | 199 +++++++++-------
 lang/rust/src/error.rs                      |   4 +
 lang/rust/src/schema.rs                     | 164 +++++++++++++-
 lang/rust/src/schema_compatibility.rs       |   1 -
 lang/rust/src/types.rs                      | 116 ++++++----
 lang/rust/src/util.rs                       |  43 +++-
 lang/rust/tests/schema.rs                   |  86 ++++++-
 13 files changed, 916 insertions(+), 315 deletions(-)

diff --git a/.github/workflows/test-lang-rust-ci.yml b/.github/workflows/test-lang-rust-ci.yml
index dacf461..91364f9 100644
--- a/.github/workflows/test-lang-rust-ci.yml
+++ b/.github/workflows/test-lang-rust-ci.yml
@@ -77,3 +77,74 @@ jobs:
         with:
           command: test
           args: --manifest-path lang/rust/Cargo.toml --doc
+
+  interop:
+    runs-on: ubuntu-latest
+
+    steps:
+      - name: Checkout
+        uses: actions/checkout@v2
+
+      - name: Rust Toolchain
+        uses: actions-rs/toolchain@v1
+        with:
+          profile: minimal
+          toolchain: stable
+          override: true
+
+      - name: Cache Local Maven Repository
+        uses: actions/cache@v2
+        with:
+          path: ~/.m2/repository
+          key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
+          restore-keys: |
+            ${{ runner.os }}-maven-
+
+      - name: Install Java Avro for Interop Test
+        working-directory: .
+        run: mvn -B install -DskipTests
+
+      - name: Create Interop Data Directory
+        working-directory: .
+        run: mkdir -p build/interop/data
+
+      - name: Generate Interop Resources
+        working-directory: lang/java/avro
+        run: mvn -B -P interop-data-generate generate-resources
+
+      - name: Generate interop data
+        run: ./build.sh interop-data-generate
+
+      - name: Rust reads interop files created by Java and Rust
+        run: ./build.sh interop-data-test
+
+      - uses: shogo82148/actions-setup-perl@v1
+        with:
+          perl-version: 5.32
+
+      - name: Install Dependencies
+        run: |
+          sudo apt-get -qqy install --no-install-recommends libcompress-raw-zlib-perl \
+                                                            libcpan-uploader-perl \
+                                                            libencode-perl \
+                                                            libio-string-perl \
+                                                            libjansson-dev \
+                                                            libjson-xs-perl \
+                                                            libmodule-install-perl \
+                                                            libmodule-install-readmefrompod-perl \
+                                                            libobject-tiny-perl \
+                                                            libsnappy-dev \
+                                                            libtest-exception-perl \
+                                                            libtest-pod-perl
+          cpanm --mirror https://www.cpan.org/ install Compress::Zstd \
+                                                       Error::Simple \
+                                                       Module::Install::Repository \
+                                                       Object::Tiny \
+                                                       Regexp::Common \
+                                                       Try::Tiny \
+                                                       inc::Module::Install
+
+
+      - name: Perl reads interop files created by Java and Rust
+        working-directory: lang/perl
+        run: ./build.sh interop-data-test
diff --git a/lang/rust/build.sh b/lang/rust/build.sh
index d9a2484..2f0a824 100755
--- a/lang/rust/build.sh
+++ b/lang/rust/build.sh
@@ -15,7 +15,25 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-set -e
+set -e  # exit on error
+
+root_dir=$(pwd)
+build_dir="../../build/rust"
+dist_dir="../../dist/rust"
+
+
+function clean {
+  if [ -d $build_dir ]; then
+    find $build_dir | xargs chmod 755
+    rm -rf $build_dir
+  fi
+}
+
+
+function prepare_build {
+  clean
+  mkdir -p $build_dir
+}
 
 cd `dirname "$0"`
 
@@ -35,10 +53,21 @@ do
       cargo build --release --lib --all-features
       cargo package
       mkdir -p  ../../dist/rust
-      cp target/package/avro-rs-*.crate ../../dist/rust
+      cp target/package/avro-rs-*.crate $dist_dir
+      ;;
+    interop-data-generate)
+      prepare_build
+      export RUST_LOG=avro_rs=debug
+      export RUST_BACKTRACE=1
+      cargo run --all-features --example generate_interop_data
+      ;;
+
+    interop-data-test)
+      prepare_build
+      cargo run --all-features --example test_interop_data
       ;;
     *)
-      echo "Usage: $0 {lint|test|dist|clean}" >&2
+      echo "Usage: $0 {lint|test|dist|clean|interop-data-generate|interop-data-test}" >&2
       exit 1
   esac
 done
diff --git a/lang/rust/examples/generate_interop_data.rs b/lang/rust/examples/generate_interop_data.rs
new file mode 100644
index 0000000..cb8efda
--- /dev/null
+++ b/lang/rust/examples/generate_interop_data.rs
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use avro_rs::{
+    schema::Schema,
+    types::{Record, Value},
+    Codec, Writer,
+};
+use std::collections::HashMap;
+use strum::IntoEnumIterator;
+
+fn create_datum(schema: &Schema) -> Record {
+    let mut datum = Record::new(schema).unwrap();
+    datum.put("intField", 12_i32);
+    datum.put("longField", 15234324_i64);
+    datum.put("stringField", "hey");
+    datum.put("boolField", true);
+    datum.put("floatField", 1234.0_f32);
+    datum.put("doubleField", -1234.0_f64);
+    datum.put("bytesField", b"12312adf".to_vec());
+    datum.put("nullField", Value::Null);
+    datum.put(
+        "arrayField",
+        Value::Array(vec![
+            Value::Double(5.0),
+            Value::Double(0.0),
+            Value::Double(12.0),
+        ]),
+    );
+    let mut map = HashMap::new();
+    map.insert(
+        "a".into(),
+        Value::Record(vec![("label".into(), Value::String("a".into()))]),
+    );
+    map.insert(
+        "bee".into(),
+        Value::Record(vec![("label".into(), Value::String("cee".into()))]),
+    );
+    datum.put("mapField", Value::Map(map));
+    datum.put("unionField", Value::Union(Box::new(Value::Double(12.0))));
+    datum.put("enumField", Value::Enum(2, "C".to_owned()));
+    datum.put("fixedField", Value::Fixed(16, b"1019181716151413".to_vec()));
+    datum.put(
+        "recordField",
+        Value::Record(vec![
+            ("label".into(), Value::String("outer".into())),
+            (
+                "children".into(),
+                Value::Array(vec![Value::Record(vec![
+                    ("label".into(), Value::String("inner".into())),
+                    ("children".into(), Value::Array(vec![])),
+                ])]),
+            ),
+        ]),
+    );
+
+    datum
+}
+
+fn main() -> anyhow::Result<()> {
+    let schema_str = std::fs::read_to_string("../../share/test/schemas/interop.avsc")
+        .expect("Unable to read the interop Avro schema");
+    let schema = Schema::parse_str(schema_str.as_str())?;
+
+    for codec in Codec::iter() {
+        let mut writer = Writer::with_codec(&schema, Vec::new(), codec);
+        let datum = create_datum(&schema);
+        writer.append(datum)?;
+        let bytes = writer.into_inner()?;
+
+        let codec_name = <&str>::from(codec);
+        let suffix = if codec_name == "null" {
+            "".to_owned()
+        } else {
+            format!("_{}", codec_name)
+        };
+
+        std::fs::write(
+            format!("../../build/interop/data/rust{}.avro", suffix),
+            bytes,
+        )?;
+    }
+
+    Ok(())
+}
diff --git a/lang/rust/examples/test_interop_data.rs b/lang/rust/examples/test_interop_data.rs
new file mode 100644
index 0000000..f86c6c4
--- /dev/null
+++ b/lang/rust/examples/test_interop_data.rs
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use avro_rs::Reader;
+use std::ffi::OsStr;
+
+fn main() -> anyhow::Result<()> {
+    let data_dir = std::fs::read_dir("../../build/interop/data/")
+        .expect("Unable to list the interop data directory");
+
+    let mut errors = Vec::new();
+
+    for entry in data_dir {
+        let path = entry
+            .expect("Unable to read the interop data directory's files")
+            .path();
+
+        if path.is_file() {
+            let ext = path.extension().and_then(OsStr::to_str).unwrap();
+
+            if ext == "avro" {
+                println!("Checking {:?}", &path);
+                let content = std::fs::File::open(&path)?;
+                let reader = Reader::new(&content)?;
+                for value in reader {
+                    if let Err(e) = value {
+                        errors.push(format!(
+                            "There is a problem with reading of '{:?}', \n {:?}\n",
+                            &path, e
+                        ));
+                    }
+                }
+            }
+        }
+    }
+
+    if errors.is_empty() {
+        Ok(())
+    } else {
+        panic!(
+            "There were errors reading some .avro files:\n{}",
+            errors.join(", ")
+        );
+    }
+}
diff --git a/lang/rust/src/codec.rs b/lang/rust/src/codec.rs
index 0ba8abe..c9a584e 100644
--- a/lang/rust/src/codec.rs
+++ b/lang/rust/src/codec.rs
@@ -19,7 +19,7 @@
 use crate::{types::Value, AvroResult, Error};
 use libflate::deflate::{Decoder, Encoder};
 use std::io::{Read, Write};
-use strum_macros::{EnumString, IntoStaticStr};
+use strum_macros::{EnumIter, EnumString, IntoStaticStr};
 
 #[cfg(feature = "bzip")]
 use bzip2::{
@@ -34,7 +34,7 @@ use crc32fast::Hasher;
 use xz2::read::{XzDecoder, XzEncoder};
 
 /// The compression codec used to compress blocks.
-#[derive(Clone, Copy, Debug, PartialEq, EnumString, IntoStaticStr)]
+#[derive(Clone, Copy, Debug, PartialEq, EnumIter, EnumString, IntoStaticStr)]
 #[strum(serialize_all = "kebab_case")]
 pub enum Codec {
     /// The `Null` codec simply passes through data uncompressed.
@@ -49,7 +49,7 @@ pub enum Codec {
     /// CRC32 checksum of the uncompressed data in the block.
     Snappy,
     #[cfg(feature = "zstandard")]
-    Zstd,
+    Zstandard,
     #[cfg(feature = "bzip")]
     /// The `BZip2` codec uses [BZip2](https://sourceware.org/bzip2/)
     /// compression library.
@@ -98,7 +98,7 @@ impl Codec {
                 *stream = encoded;
             }
             #[cfg(feature = "zstandard")]
-            Codec::Zstd => {
+            Codec::Zstandard => {
                 let mut encoder = zstd::Encoder::new(Vec::new(), 0).unwrap();
                 encoder.write_all(stream).map_err(Error::ZstdCompress)?;
                 *stream = encoder.finish().unwrap();
@@ -157,7 +157,7 @@ impl Codec {
                 decoded
             }
             #[cfg(feature = "zstandard")]
-            Codec::Zstd => {
+            Codec::Zstandard => {
                 let mut decoded = Vec::new();
                 let mut decoder = zstd::Decoder::new(&stream[..]).unwrap();
                 std::io::copy(&mut decoder, &mut decoded).map_err(Error::ZstdDecompress)?;
@@ -212,7 +212,7 @@ mod tests {
     #[cfg(feature = "zstandard")]
     #[test]
     fn zstd_compress_and_decompress() {
-        compress_and_decompress(Codec::Zstd);
+        compress_and_decompress(Codec::Zstandard);
     }
 
     #[cfg(feature = "bzip")]
@@ -245,7 +245,7 @@ mod tests {
         assert_eq!(<&str>::from(Codec::Snappy), "snappy");
 
         #[cfg(feature = "zstandard")]
-        assert_eq!(<&str>::from(Codec::Zstd), "zstd");
+        assert_eq!(<&str>::from(Codec::Zstandard), "zstandard");
 
         #[cfg(feature = "bzip")]
         assert_eq!(<&str>::from(Codec::Bzip2), "bzip2");
@@ -265,7 +265,7 @@ mod tests {
         assert_eq!(Codec::from_str("snappy").unwrap(), Codec::Snappy);
 
         #[cfg(feature = "zstandard")]
-        assert_eq!(Codec::from_str("zstd").unwrap(), Codec::Zstd);
+        assert_eq!(Codec::from_str("zstandard").unwrap(), Codec::Zstandard);
 
         #[cfg(feature = "bzip")]
         assert_eq!(Codec::from_str("bzip2").unwrap(), Codec::Bzip2);
diff --git a/lang/rust/src/decode.rs b/lang/rust/src/decode.rs
index 5639d28..26678bf 100644
--- a/lang/rust/src/decode.rs
+++ b/lang/rust/src/decode.rs
@@ -68,183 +68,217 @@ fn decode_seq_len<R: Read>(reader: &mut R) -> AvroResult<usize> {
 
 /// Decode a `Value` from avro format given its `Schema`.
 pub fn decode<R: Read>(schema: &Schema, reader: &mut R) -> AvroResult<Value> {
-    match *schema {
-        Schema::Null => Ok(Value::Null),
-        Schema::Boolean => {
-            let mut buf = [0u8; 1];
-            match reader.read_exact(&mut buf[..]) {
-                Ok(_) => match buf[0] {
-                    0u8 => Ok(Value::Boolean(false)),
-                    1u8 => Ok(Value::Boolean(true)),
-                    _ => Err(Error::BoolValue(buf[0])),
-                },
-                Err(io_err) => {
-                    if let ErrorKind::UnexpectedEof = io_err.kind() {
-                        Ok(Value::Null)
-                    } else {
-                        Err(Error::ReadBoolean(io_err))
+    fn decode0<R: Read>(
+        schema: &Schema,
+        reader: &mut R,
+        schemas_by_name: &mut HashMap<String, Schema>,
+    ) -> AvroResult<Value> {
+        match *schema {
+            Schema::Null => Ok(Value::Null),
+            Schema::Boolean => {
+                let mut buf = [0u8; 1];
+                match reader.read_exact(&mut buf[..]) {
+                    Ok(_) => match buf[0] {
+                        0u8 => Ok(Value::Boolean(false)),
+                        1u8 => Ok(Value::Boolean(true)),
+                        _ => Err(Error::BoolValue(buf[0])),
+                    },
+                    Err(io_err) => {
+                        if let ErrorKind::UnexpectedEof = io_err.kind() {
+                            Ok(Value::Null)
+                        } else {
+                            Err(Error::ReadBoolean(io_err))
+                        }
                     }
                 }
             }
-        }
-        Schema::Decimal { ref inner, .. } => match &**inner {
-            Schema::Fixed { .. } => match decode(inner, reader)? {
-                Value::Fixed(_, bytes) => Ok(Value::Decimal(Decimal::from(bytes))),
-                value => Err(Error::FixedValue(value.into())),
-            },
-            Schema::Bytes => match decode(inner, reader)? {
-                Value::Bytes(bytes) => Ok(Value::Decimal(Decimal::from(bytes))),
-                value => Err(Error::BytesValue(value.into())),
+            Schema::Decimal { ref inner, .. } => match &**inner {
+                Schema::Fixed { .. } => match decode0(inner, reader, schemas_by_name)? {
+                    Value::Fixed(_, bytes) => Ok(Value::Decimal(Decimal::from(bytes))),
+                    value => Err(Error::FixedValue(value.into())),
+                },
+                Schema::Bytes => match decode0(inner, reader, schemas_by_name)? {
+                    Value::Bytes(bytes) => Ok(Value::Decimal(Decimal::from(bytes))),
+                    value => Err(Error::BytesValue(value.into())),
+                },
+                schema => Err(Error::ResolveDecimalSchema(schema.into())),
             },
-            schema => Err(Error::ResolveDecimalSchema(schema.into())),
-        },
-        Schema::Uuid => Ok(Value::Uuid(
-            Uuid::from_str(match decode(&Schema::String, reader)? {
-                Value::String(ref s) => s,
-                value => return Err(Error::GetUuidFromStringValue(value.into())),
-            })
-            .map_err(Error::ConvertStrToUuid)?,
-        )),
-        Schema::Int => decode_int(reader),
-        Schema::Date => zag_i32(reader).map(Value::Date),
-        Schema::TimeMillis => zag_i32(reader).map(Value::TimeMillis),
-        Schema::Long => decode_long(reader),
-        Schema::TimeMicros => zag_i64(reader).map(Value::TimeMicros),
-        Schema::TimestampMillis => zag_i64(reader).map(Value::TimestampMillis),
-        Schema::TimestampMicros => zag_i64(reader).map(Value::TimestampMicros),
-        Schema::Duration => {
-            let mut buf = [0u8; 12];
-            reader.read_exact(&mut buf).map_err(Error::ReadDuration)?;
-            Ok(Value::Duration(Duration::from(buf)))
-        }
-        Schema::Float => {
-            let mut buf = [0u8; std::mem::size_of::<f32>()];
-            reader.read_exact(&mut buf[..]).map_err(Error::ReadFloat)?;
-            Ok(Value::Float(f32::from_le_bytes(buf)))
-        }
-        Schema::Double => {
-            let mut buf = [0u8; std::mem::size_of::<f64>()];
-            reader.read_exact(&mut buf[..]).map_err(Error::ReadDouble)?;
-            Ok(Value::Double(f64::from_le_bytes(buf)))
-        }
-        Schema::Bytes => {
-            let len = decode_len(reader)?;
-            let mut buf = vec![0u8; len];
-            reader.read_exact(&mut buf).map_err(Error::ReadBytes)?;
-            Ok(Value::Bytes(buf))
-        }
-        Schema::String => {
-            let len = decode_len(reader)?;
-            let mut buf = vec![0u8; len];
-            match reader.read_exact(&mut buf) {
-                Ok(_) => Ok(Value::String(
-                    String::from_utf8(buf).map_err(Error::ConvertToUtf8)?,
-                )),
-                Err(io_err) => {
-                    if let ErrorKind::UnexpectedEof = io_err.kind() {
-                        Ok(Value::Null)
-                    } else {
-                        Err(Error::ReadString(io_err))
+            Schema::Uuid => Ok(Value::Uuid(
+                Uuid::from_str(match decode0(&Schema::String, reader, schemas_by_name)? {
+                    Value::String(ref s) => s,
+                    value => return Err(Error::GetUuidFromStringValue(value.into())),
+                })
+                .map_err(Error::ConvertStrToUuid)?,
+            )),
+            Schema::Int => decode_int(reader),
+            Schema::Date => zag_i32(reader).map(Value::Date),
+            Schema::TimeMillis => zag_i32(reader).map(Value::TimeMillis),
+            Schema::Long => decode_long(reader),
+            Schema::TimeMicros => zag_i64(reader).map(Value::TimeMicros),
+            Schema::TimestampMillis => zag_i64(reader).map(Value::TimestampMillis),
+            Schema::TimestampMicros => zag_i64(reader).map(Value::TimestampMicros),
+            Schema::Duration => {
+                let mut buf = [0u8; 12];
+                reader.read_exact(&mut buf).map_err(Error::ReadDuration)?;
+                Ok(Value::Duration(Duration::from(buf)))
+            }
+            Schema::Float => {
+                let mut buf = [0u8; std::mem::size_of::<f32>()];
+                reader.read_exact(&mut buf[..]).map_err(Error::ReadFloat)?;
+                Ok(Value::Float(f32::from_le_bytes(buf)))
+            }
+            Schema::Double => {
+                let mut buf = [0u8; std::mem::size_of::<f64>()];
+                reader.read_exact(&mut buf[..]).map_err(Error::ReadDouble)?;
+                Ok(Value::Double(f64::from_le_bytes(buf)))
+            }
+            Schema::Bytes => {
+                let len = decode_len(reader)?;
+                let mut buf = vec![0u8; len];
+                reader.read_exact(&mut buf).map_err(Error::ReadBytes)?;
+                Ok(Value::Bytes(buf))
+            }
+            Schema::String => {
+                let len = decode_len(reader)?;
+                let mut buf = vec![0u8; len];
+                match reader.read_exact(&mut buf) {
+                    Ok(_) => Ok(Value::String(
+                        String::from_utf8(buf).map_err(Error::ConvertToUtf8)?,
+                    )),
+                    Err(io_err) => {
+                        if let ErrorKind::UnexpectedEof = io_err.kind() {
+                            Ok(Value::Null)
+                        } else {
+                            Err(Error::ReadString(io_err))
+                        }
                     }
                 }
             }
-        }
-        Schema::Fixed { size, .. } => {
-            let mut buf = vec![0u8; size];
-            reader
-                .read_exact(&mut buf)
-                .map_err(|e| Error::ReadFixed(e, size))?;
-            Ok(Value::Fixed(size, buf))
-        }
-        Schema::Array(ref inner) => {
-            let mut items = Vec::new();
+            Schema::Fixed { ref name, size, .. } => {
+                schemas_by_name.insert(name.name.clone(), schema.clone());
+                let mut buf = vec![0u8; size];
+                reader
+                    .read_exact(&mut buf)
+                    .map_err(|e| Error::ReadFixed(e, size))?;
+                Ok(Value::Fixed(size, buf))
+            }
+            Schema::Array(ref inner) => {
+                let mut items = Vec::new();
 
-            loop {
-                let len = decode_seq_len(reader)?;
-                if len == 0 {
-                    break;
-                }
+                loop {
+                    let len = decode_seq_len(reader)?;
+                    if len == 0 {
+                        break;
+                    }
 
-                items.reserve(len);
-                for _ in 0..len {
-                    items.push(decode(inner, reader)?);
+                    items.reserve(len);
+                    for _ in 0..len {
+                        items.push(decode0(inner, reader, schemas_by_name)?);
+                    }
                 }
-            }
 
-            Ok(Value::Array(items))
-        }
-        Schema::Map(ref inner) => {
-            let mut items = HashMap::new();
+                Ok(Value::Array(items))
+            }
+            Schema::Map(ref inner) => {
+                let mut items = HashMap::new();
 
-            loop {
-                let len = decode_seq_len(reader)?;
-                if len == 0 {
-                    break;
-                }
+                loop {
+                    let len = decode_seq_len(reader)?;
+                    if len == 0 {
+                        break;
+                    }
 
-                items.reserve(len);
-                for _ in 0..len {
-                    match decode(&Schema::String, reader)? {
-                        Value::String(key) => {
-                            let value = decode(inner, reader)?;
-                            items.insert(key, value);
+                    items.reserve(len);
+                    for _ in 0..len {
+                        match decode0(&Schema::String, reader, schemas_by_name)? {
+                            Value::String(key) => {
+                                let value = decode0(inner, reader, schemas_by_name)?;
+                                items.insert(key, value);
+                            }
+                            value => return Err(Error::MapKeyType(value.into())),
                         }
-                        value => return Err(Error::MapKeyType(value.into())),
                     }
                 }
-            }
 
-            Ok(Value::Map(items))
-        }
-        Schema::Union(ref inner) => match zag_i64(reader) {
-            Ok(index) => {
-                let variants = inner.variants();
-                let variant = variants
-                    .get(usize::try_from(index).map_err(|e| Error::ConvertI64ToUsize(e, index))?)
-                    .ok_or_else(|| Error::GetUnionVariant {
-                        index,
-                        num_variants: variants.len(),
-                    })?;
-                let value = decode(variant, reader)?;
-                Ok(Value::Union(Box::new(value)))
+                Ok(Value::Map(items))
             }
-            Err(Error::ReadVariableIntegerBytes(io_err)) => {
-                if let ErrorKind::UnexpectedEof = io_err.kind() {
-                    Ok(Value::Union(Box::new(Value::Null)))
-                } else {
-                    Err(Error::ReadVariableIntegerBytes(io_err))
+            Schema::Union(ref inner) => match zag_i64(reader) {
+                Ok(index) => {
+                    let variants = inner.variants();
+                    let variant = variants
+                        .get(
+                            usize::try_from(index)
+                                .map_err(|e| Error::ConvertI64ToUsize(e, index))?,
+                        )
+                        .ok_or_else(|| Error::GetUnionVariant {
+                            index,
+                            num_variants: variants.len(),
+                        })?;
+                    let value = decode0(variant, reader, schemas_by_name)?;
+                    Ok(Value::Union(Box::new(value)))
                 }
-            }
-            Err(io_err) => Err(io_err),
-        },
+                Err(Error::ReadVariableIntegerBytes(io_err)) => {
+                    if let ErrorKind::UnexpectedEof = io_err.kind() {
+                        Ok(Value::Union(Box::new(Value::Null)))
+                    } else {
+                        Err(Error::ReadVariableIntegerBytes(io_err))
+                    }
+                }
+                Err(io_err) => Err(io_err),
+            },
 
-        Schema::Record { ref fields, .. } => {
-            // Benchmarks indicate ~10% improvement using this method.
-            let mut items = Vec::with_capacity(fields.len());
-            for field in fields {
-                // TODO: This clone is also expensive. See if we can do away with it...
-                items.push((field.name.clone(), decode(&field.schema, reader)?));
+            Schema::Record {
+                ref name,
+                ref fields,
+                ..
+            } => {
+                schemas_by_name.insert(name.name.clone(), schema.clone());
+                // Benchmarks indicate ~10% improvement using this method.
+                let mut items = Vec::with_capacity(fields.len());
+                for field in fields {
+                    // TODO: This clone is also expensive. See if we can do away with it...
+                    items.push((
+                        field.name.clone(),
+                        decode0(&field.schema, reader, schemas_by_name)?,
+                    ));
+                }
+                Ok(Value::Record(items))
             }
-            Ok(Value::Record(items))
-        }
-        Schema::Enum { ref symbols, .. } => {
-            Ok(if let Value::Int(raw_index) = decode_int(reader)? {
-                let index = usize::try_from(raw_index)
-                    .map_err(|e| Error::ConvertI32ToUsize(e, raw_index))?;
-                if (0..=symbols.len()).contains(&index) {
-                    let symbol = symbols[index].clone();
-                    Value::Enum(raw_index, symbol)
+            Schema::Enum {
+                ref name,
+                ref symbols,
+                ..
+            } => {
+                schemas_by_name.insert(name.name.clone(), schema.clone());
+                Ok(if let Value::Int(raw_index) = decode_int(reader)? {
+                    let index = usize::try_from(raw_index)
+                        .map_err(|e| Error::ConvertI32ToUsize(e, raw_index))?;
+                    if (0..=symbols.len()).contains(&index) {
+                        let symbol = symbols[index].clone();
+                        Value::Enum(raw_index, symbol)
+                    } else {
+                        return Err(Error::GetEnumValue {
+                            index,
+                            nsymbols: symbols.len(),
+                        });
+                    }
+                } else {
+                    return Err(Error::GetEnumSymbol);
+                })
+            }
+            Schema::Ref { ref name } => {
+                let name = &name.name;
+                if let Some(resolved) = schemas_by_name.get(name.as_str()) {
+                    decode0(resolved, reader, &mut schemas_by_name.clone())
                 } else {
-                    return Err(Error::GetEnumValue {
-                        index,
-                        nsymbols: symbols.len(),
-                    });
+                    Err(Error::SchemaResolutionError(name.clone()))
                 }
-            } else {
-                return Err(Error::GetEnumSymbol);
-            })
+            }
         }
     }
+
+    let mut schemas_by_name: HashMap<String, Schema> = HashMap::new();
+    decode0(schema, reader, &mut schemas_by_name)
 }
 
 #[cfg(test)]
diff --git a/lang/rust/src/encode.rs b/lang/rust/src/encode.rs
index 5c68051..088def9 100644
--- a/lang/rust/src/encode.rs
+++ b/lang/rust/src/encode.rs
@@ -20,7 +20,7 @@ use crate::{
     types::Value,
     util::{zig_i32, zig_i64},
 };
-use std::convert::TryInto;
+use std::{collections::HashMap, convert::TryInto};
 
 /// Encode a `Value` into avro format.
 ///
@@ -51,104 +51,135 @@ fn encode_int(i: i32, buffer: &mut Vec<u8>) {
 /// be valid with regards to the schema. Schema are needed only to guide the
 /// encoding for complex type values.
 pub fn encode_ref(value: &Value, schema: &Schema, buffer: &mut Vec<u8>) {
-    match value {
-        Value::Null => (),
-        Value::Boolean(b) => buffer.push(if *b { 1u8 } else { 0u8 }),
-        // Pattern | Pattern here to signify that these _must_ have the same encoding.
-        Value::Int(i) | Value::Date(i) | Value::TimeMillis(i) => encode_int(*i, buffer),
-        Value::Long(i)
-        | Value::TimestampMillis(i)
-        | Value::TimestampMicros(i)
-        | Value::TimeMicros(i) => encode_long(*i, buffer),
-        Value::Float(x) => buffer.extend_from_slice(&x.to_le_bytes()),
-        Value::Double(x) => buffer.extend_from_slice(&x.to_le_bytes()),
-        Value::Decimal(decimal) => match schema {
-            Schema::Decimal { inner, .. } => match *inner.clone() {
-                Schema::Fixed { size, .. } => {
-                    let bytes = decimal.to_sign_extended_bytes_with_len(size).unwrap();
-                    let num_bytes = bytes.len();
-                    if num_bytes != size {
-                        panic!(
-                            "signed decimal bytes length {} not equal to fixed schema size {}",
-                            num_bytes, size
-                        );
+    fn encode_ref0(
+        value: &Value,
+        schema: &Schema,
+        buffer: &mut Vec<u8>,
+        schemas_by_name: &mut HashMap<String, Schema>,
+    ) {
+        match &schema {
+            Schema::Ref { ref name } => {
+                let resolved = schemas_by_name.get(name.name.as_str()).unwrap();
+                return encode_ref0(value, resolved, buffer, &mut schemas_by_name.clone());
+            }
+            Schema::Record { ref name, .. }
+            | Schema::Enum { ref name, .. }
+            | Schema::Fixed { ref name, .. } => {
+                schemas_by_name.insert(name.name.clone(), schema.clone());
+            }
+            _ => (),
+        }
+
+        match value {
+            Value::Null => (),
+            Value::Boolean(b) => buffer.push(if *b { 1u8 } else { 0u8 }),
+            // Pattern | Pattern here to signify that these _must_ have the same encoding.
+            Value::Int(i) | Value::Date(i) | Value::TimeMillis(i) => encode_int(*i, buffer),
+            Value::Long(i)
+            | Value::TimestampMillis(i)
+            | Value::TimestampMicros(i)
+            | Value::TimeMicros(i) => encode_long(*i, buffer),
+            Value::Float(x) => buffer.extend_from_slice(&x.to_le_bytes()),
+            Value::Double(x) => buffer.extend_from_slice(&x.to_le_bytes()),
+            Value::Decimal(decimal) => match schema {
+                Schema::Decimal { inner, .. } => match *inner.clone() {
+                    Schema::Fixed { size, .. } => {
+                        let bytes = decimal.to_sign_extended_bytes_with_len(size).unwrap();
+                        let num_bytes = bytes.len();
+                        if num_bytes != size {
+                            panic!(
+                                "signed decimal bytes length {} not equal to fixed schema size {}",
+                                num_bytes, size
+                            );
+                        }
+                        encode(&Value::Fixed(size, bytes), inner, buffer)
                     }
-                    encode(&Value::Fixed(size, bytes), inner, buffer)
-                }
-                Schema::Bytes => encode(&Value::Bytes(decimal.try_into().unwrap()), inner, buffer),
-                _ => panic!("invalid inner type for decimal: {:?}", inner),
+                    Schema::Bytes => {
+                        encode(&Value::Bytes(decimal.try_into().unwrap()), inner, buffer)
+                    }
+                    _ => panic!("invalid inner type for decimal: {:?}", inner),
+                },
+                _ => panic!("invalid schema type for decimal: {:?}", schema),
             },
-            _ => panic!("invalid type for decimal: {:?}", schema),
-        },
-        &Value::Duration(duration) => {
-            let slice: [u8; 12] = duration.into();
-            buffer.extend_from_slice(&slice);
-        }
-        Value::Uuid(uuid) => encode_bytes(&uuid.to_string(), buffer),
-        Value::Bytes(bytes) => match *schema {
-            Schema::Bytes => encode_bytes(bytes, buffer),
-            Schema::Fixed { .. } => buffer.extend(bytes),
-            _ => (),
-        },
-        Value::String(s) => match *schema {
-            Schema::String => {
-                encode_bytes(s, buffer);
+            &Value::Duration(duration) => {
+                let slice: [u8; 12] = duration.into();
+                buffer.extend_from_slice(&slice);
             }
-            Schema::Enum { ref symbols, .. } => {
-                if let Some(index) = symbols.iter().position(|item| item == s) {
-                    encode_int(index as i32, buffer);
+            Value::Uuid(uuid) => encode_bytes(&uuid.to_string(), buffer),
+            Value::Bytes(bytes) => match *schema {
+                Schema::Bytes => encode_bytes(bytes, buffer),
+                Schema::Fixed { .. } => buffer.extend(bytes),
+                _ => error!("invalid schema type for bytes: {:?}", schema),
+            },
+            Value::String(s) => match *schema {
+                Schema::String => {
+                    encode_bytes(s, buffer);
+                }
+                Schema::Enum { ref symbols, .. } => {
+                    if let Some(index) = symbols.iter().position(|item| item == s) {
+                        encode_int(index as i32, buffer);
+                    }
+                }
+                _ => error!("invalid schema type for String: {:?}", schema),
+            },
+            Value::Fixed(_, bytes) => buffer.extend(bytes),
+            Value::Enum(i, _) => encode_int(*i, buffer),
+            Value::Union(item) => {
+                if let Schema::Union(ref inner) = *schema {
+                    // Find the schema that is matched here. Due to validation, this should always
+                    // return a value.
+                    let (idx, inner_schema) = inner
+                        .find_schema(item)
+                        .expect("Invalid Union validation occurred");
+                    encode_long(idx as i64, buffer);
+                    encode_ref0(&*item, inner_schema, buffer, schemas_by_name);
+                } else {
+                    error!("invalid schema type for Union: {:?}", schema);
                 }
             }
-            _ => (),
-        },
-        Value::Fixed(_, bytes) => buffer.extend(bytes),
-        Value::Enum(i, _) => encode_int(*i, buffer),
-        Value::Union(item) => {
-            if let Schema::Union(ref inner) = *schema {
-                // Find the schema that is matched here. Due to validation, this should always
-                // return a value.
-                let (idx, inner_schema) = inner
-                    .find_schema(item)
-                    .expect("Invalid Union validation occurred");
-                encode_long(idx as i64, buffer);
-                encode_ref(&*item, inner_schema, buffer);
-            }
-        }
-        Value::Array(items) => {
-            if let Schema::Array(ref inner) = *schema {
-                if !items.is_empty() {
-                    encode_long(items.len() as i64, buffer);
-                    for item in items.iter() {
-                        encode_ref(item, inner, buffer);
+            Value::Array(items) => {
+                if let Schema::Array(ref inner) = *schema {
+                    if !items.is_empty() {
+                        encode_long(items.len() as i64, buffer);
+                        for item in items.iter() {
+                            encode_ref0(item, inner, buffer, schemas_by_name);
+                        }
                     }
+                    buffer.push(0u8);
+                } else {
+                    error!("invalid schema type for Array: {:?}", schema);
                 }
-                buffer.push(0u8);
             }
-        }
-        Value::Map(items) => {
-            if let Schema::Map(ref inner) = *schema {
-                if !items.is_empty() {
-                    encode_long(items.len() as i64, buffer);
-                    for (key, value) in items {
-                        encode_bytes(key, buffer);
-                        encode_ref(value, inner, buffer);
+            Value::Map(items) => {
+                if let Schema::Map(ref inner) = *schema {
+                    if !items.is_empty() {
+                        encode_long(items.len() as i64, buffer);
+                        for (key, value) in items {
+                            encode_bytes(key, buffer);
+                            encode_ref0(value, inner, buffer, schemas_by_name);
+                        }
                     }
+                    buffer.push(0u8);
+                } else {
+                    error!("invalid schema type for Map: {:?}", schema);
                 }
-                buffer.push(0u8);
             }
-        }
-        Value::Record(fields) => {
-            if let Schema::Record {
-                fields: ref schema_fields,
-                ..
-            } = *schema
-            {
-                for (i, &(_, ref value)) in fields.iter().enumerate() {
-                    encode_ref(value, &schema_fields[i].schema, buffer);
+            Value::Record(fields) => {
+                if let Schema::Record {
+                    fields: ref schema_fields,
+                    ..
+                } = *schema
+                {
+                    for (i, &(_, ref value)) in fields.iter().enumerate() {
+                        encode_ref0(value, &schema_fields[i].schema, buffer, schemas_by_name);
+                    }
                 }
             }
         }
     }
+
+    let mut schemas_by_name = HashMap::new();
+    encode_ref0(value, schema, buffer, &mut schemas_by_name)
 }
 
 pub fn encode_to_vec(value: &Value, schema: &Schema) -> Vec<u8> {
diff --git a/lang/rust/src/error.rs b/lang/rust/src/error.rs
index f4b1fb5..f65ec72 100644
--- a/lang/rust/src/error.rs
+++ b/lang/rust/src/error.rs
@@ -369,6 +369,10 @@ pub enum Error {
     /// Error while converting float to json value
     #[error("failed to convert avro float to json: {0}")]
     ConvertF64ToJson(f64),
+
+    /// Error while resolving Schema::Ref
+    #[error("Unresolved schema reference: {0}")]
+    SchemaResolutionError(String),
 }
 
 impl serde::ser::Error for Error {
diff --git a/lang/rust/src/schema.rs b/lang/rust/src/schema.rs
index 3617bb1..2697328 100644
--- a/lang/rust/src/schema.rs
+++ b/lang/rust/src/schema.rs
@@ -21,6 +21,7 @@ use digest::Digest;
 use lazy_static::lazy_static;
 use regex::Regex;
 use serde::{
+    ser,
     ser::{SerializeMap, SerializeSeq},
     Deserialize, Serialize, Serializer,
 };
@@ -31,6 +32,7 @@ use std::{
     convert::TryInto,
     fmt,
     str::FromStr,
+    sync::{Arc, Mutex},
 };
 use strum_macros::{EnumDiscriminants, EnumString};
 
@@ -141,6 +143,10 @@ pub enum Schema {
     TimestampMicros,
     /// An amount of time defined by a number of months, days and milliseconds.
     Duration,
+    // A reference to another schema.
+    Ref {
+        name: Name,
+    },
 }
 
 impl PartialEq for Schema {
@@ -234,6 +240,11 @@ impl Name {
     fn parse(complex: &Map<String, Value>) -> AvroResult<Self> {
         let name = complex.name().ok_or(Error::GetNameField)?;
 
+        let type_name = match complex.get("type") {
+            Some(Value::Object(complex_type)) => complex_type.name().or(None),
+            _ => None,
+        };
+
         let namespace = complex.string("namespace");
 
         let aliases: Option<Vec<String>> = complex
@@ -248,7 +259,7 @@ impl Name {
             });
 
         Ok(Name {
-            name,
+            name: type_name.unwrap_or(name),
             namespace,
             aliases,
         })
@@ -420,11 +431,24 @@ fn parse_json_integer_for_decimal(value: &serde_json::Number) -> Result<DecimalM
 #[derive(Default)]
 struct Parser {
     input_schemas: HashMap<String, Value>,
+    // A map of name -> Schema::Ref
+    // Used to resolve cyclic references, i.e. when a
+    // field's type is a reference to its record's type
+    resolving_schemas: HashMap<String, Schema>,
     input_order: Vec<String>,
+    // A map of name -> fully parsed Schema
+    // Used to avoid parsing the same schema twice
     parsed_schemas: HashMap<String, Schema>,
 }
 
 impl Schema {
+    // Used to help resolve cyclic references while serializing Schema to JSON.
+    // Needed because serde[_json] does not support using contexts.
+    // TODO: See whether alternatives like
+    // https://users.rust-lang.org/t/serde-question-access-to-a-shared-context-data-within-serialize-and-deserialize/39546
+    // can be used
+    thread_local!(static SCHEMAS_BY_NAME: Arc<Mutex<HashMap<String, Schema>>> = Arc::new(Mutex::new(HashMap::new())));
+
     /// Converts `self` into its [Parsing Canonical Form].
     ///
     /// [Parsing Canonical Form]:
@@ -480,6 +504,7 @@ impl Schema {
         }
         let mut parser = Parser {
             input_schemas,
+            resolving_schemas: HashMap::default(),
             input_order,
             parsed_schemas: HashMap::with_capacity(input.len()),
         };
@@ -515,7 +540,8 @@ impl Parser {
                 .remove_entry(&next_name)
                 .expect("Key unexpectedly missing");
             let parsed = self.parse(&value)?;
-            self.parsed_schemas.insert(name, parsed);
+            self.parsed_schemas
+                .insert(get_schema_type_name(name, value), parsed);
         }
 
         let mut parsed_schemas = Vec::with_capacity(self.parsed_schemas.len());
@@ -558,9 +584,11 @@ impl Parser {
     }
 
     /// Given a name, tries to retrieve the parsed schema from `parsed_schemas`.
-    /// If a parsed schema is not found, it checks if a json  with that name exists
-    /// in `input_schemas` and then parses it  (removing it from `input_schemas`)
-    /// and adds the parsed schema to `parsed_schemas`
+    /// If a parsed schema is not found, it checks if a currently resolving
+    /// schema with that name exists.
+    /// If a resolving schema is not found, it checks if a json with that name exists
+    /// in `input_schemas` and then parses it (removing it from `input_schemas`)
+    /// and adds the parsed schema to `parsed_schemas`.
     ///
     /// This method allows schemas definitions that depend on other types to
     /// parse their dependencies (or look them up if already parsed).
@@ -568,12 +596,20 @@ impl Parser {
         if let Some(parsed) = self.parsed_schemas.get(name) {
             return Ok(parsed.clone());
         }
+        if let Some(resolving_schema) = self.resolving_schemas.get(name) {
+            return Ok(resolving_schema.clone());
+        }
+
         let value = self
             .input_schemas
             .remove(name)
             .ok_or_else(|| Error::ParsePrimitive(name.into()))?;
+
         let parsed = self.parse(&value)?;
-        self.parsed_schemas.insert(name.to_string(), parsed.clone());
+        self.parsed_schemas.insert(
+            get_schema_type_name(name.to_string(), value),
+            parsed.clone(),
+        );
         Ok(parsed)
     }
 
@@ -772,6 +808,10 @@ impl Parser {
 
         let mut lookup = HashMap::new();
 
+        let resolving_schema = Schema::Ref { name: name.clone() };
+        self.resolving_schemas
+            .insert(name.name.clone(), resolving_schema);
+
         let fields: Vec<RecordField> = complex
             .get("fields")
             .and_then(|fields| fields.as_array())
@@ -798,6 +838,7 @@ impl Parser {
 
         self.parsed_schemas
             .insert(name.fullname(None), schema.clone());
+        self.resolving_schemas.remove(name.name.as_str());
         Ok(schema)
     }
 
@@ -893,12 +934,45 @@ impl Parser {
     }
 }
 
+fn get_schema_type_name(name: String, value: Value) -> String {
+    match value.get("type") {
+        Some(Value::Object(complex_type)) => match complex_type.name() {
+            Some(name) => name,
+            _ => name,
+        },
+        _ => name,
+    }
+}
+
 impl Serialize for Schema {
     fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
     where
         S: Serializer,
     {
+        fn remember_schema(name: &Name, schema: &Schema) {
+            Schema::SCHEMAS_BY_NAME.with(|schemas_by_name| match schemas_by_name.try_lock() {
+                Ok(mut schemas) => {
+                    schemas.insert((&name.name).clone(), schema.clone());
+                }
+                Err(poisoned) => {
+                    error!("Wasn't able to lock schemas_by_name {:?}", poisoned);
+                }
+            });
+        }
+
         match *self {
+            Schema::Ref { ref name } => {
+                let name = &name.name;
+                Schema::SCHEMAS_BY_NAME.with(|schemas_by_name| {
+                    let schemas = schemas_by_name.lock().unwrap();
+                    if schemas.contains_key(name.as_str()) {
+                        serializer.serialize_str(name)
+                    } else {
+                        Err(ser::Error::custom(format!("Could not serialize Schema::Ref('{}') because it cannot be found in ({})",
+                                                       name, schemas.keys().cloned().collect::<Vec<_>>().join(", "))))
+                    }
+                })
+            }
             Schema::Null => serializer.serialize_str("null"),
             Schema::Boolean => serializer.serialize_str("boolean"),
             Schema::Int => serializer.serialize_str("int"),
@@ -933,6 +1007,7 @@ impl Serialize for Schema {
                 ref fields,
                 ..
             } => {
+                remember_schema(name, self);
                 let mut map = serializer.serialize_map(None)?;
                 map.serialize_entry("type", "record")?;
                 if let Some(ref n) = name.namespace {
@@ -953,6 +1028,7 @@ impl Serialize for Schema {
                 ref symbols,
                 ..
             } => {
+                remember_schema(name, self);
                 let mut map = serializer.serialize_map(None)?;
                 map.serialize_entry("type", "enum")?;
                 map.serialize_entry("name", &name.name)?;
@@ -964,6 +1040,7 @@ impl Serialize for Schema {
                 ref doc,
                 ref size,
             } => {
+                remember_schema(name, self);
                 let mut map = serializer.serialize_map(None)?;
                 map.serialize_entry("type", "fixed")?;
                 map.serialize_entry("name", &name.name)?;
@@ -1240,7 +1317,7 @@ mod tests {
 
     #[test]
     fn test_record_schema() {
-        let schema = Schema::parse_str(
+        let parsed = Schema::parse_str(
             r#"
             {
                 "type": "record",
@@ -1282,7 +1359,78 @@ mod tests {
             lookup,
         };
 
-        assert_eq!(expected, schema);
+        assert_eq!(parsed, expected);
+    }
+
+    // AVRO-3302
+    #[test]
+    fn test_record_schema_with_currently_parsing_schema() {
+        let schema = Schema::parse_str(
+            r#"
+            {
+                "type": "record",
+                "name": "test",
+                "fields": [{
+                    "name": "recordField",
+                    "type": {
+                        "type": "record",
+                        "name": "Node",
+                        "fields": [
+                            {"name": "label", "type": "string"},
+                            {"name": "children", "type": {"type": "array", "items": "Node"}}
+                        ]
+                    }
+                }]
+            }
+        "#,
+        )
+        .unwrap();
+
+        let mut lookup = HashMap::new();
+        lookup.insert("recordField".to_owned(), 0);
+
+        let mut node_lookup = HashMap::new();
+        node_lookup.insert("children".to_owned(), 1);
+        node_lookup.insert("label".to_owned(), 0);
+
+        let expected = Schema::Record {
+            name: Name::new("test"),
+            doc: None,
+            fields: vec![RecordField {
+                name: "recordField".to_string(),
+                doc: None,
+                default: None,
+                schema: Schema::Record {
+                    name: Name::new("Node"),
+                    doc: None,
+                    fields: vec![
+                        RecordField {
+                            name: "label".to_string(),
+                            doc: None,
+                            default: None,
+                            schema: Schema::String,
+                            order: RecordFieldOrder::Ascending,
+                            position: 0,
+                        },
+                        RecordField {
+                            name: "children".to_string(),
+                            doc: None,
+                            default: None,
+                            schema: Schema::Array(Box::new(Schema::Ref {
+                                name: Name::new("Node"),
+                            })),
+                            order: RecordFieldOrder::Ascending,
+                            position: 1,
+                        },
+                    ],
+                    lookup: node_lookup,
+                },
+                order: RecordFieldOrder::Ascending,
+                position: 0,
+            }],
+            lookup,
+        };
+        assert_eq!(schema, expected);
     }
 
     #[test]
diff --git a/lang/rust/src/schema_compatibility.rs b/lang/rust/src/schema_compatibility.rs
index 5ba46e1..7c815e0 100644
--- a/lang/rust/src/schema_compatibility.rs
+++ b/lang/rust/src/schema_compatibility.rs
@@ -433,7 +433,6 @@ mod tests {
             .map(|s| s.canonical_form())
             .collect::<Vec<String>>()
             .join(",");
-        dbg!(&schema_string);
         Schema::parse_str(&format!("[{}]", schema_string)).unwrap()
     }
 
diff --git a/lang/rust/src/types.rs b/lang/rust/src/types.rs
index 75f0509..48a1813 100644
--- a/lang/rust/src/types.rs
+++ b/lang/rust/src/types.rs
@@ -323,6 +323,7 @@ impl Value {
     /// for the full set of rules of schema validation.
     pub fn validate(&self, schema: &Schema) -> bool {
         match (self, schema) {
+            (_, &Schema::Ref { name: _ }) => true,
             (&Value::Null, &Schema::Null) => true,
             (&Value::Boolean(_), &Schema::Boolean) => true,
             (&Value::Int(_), &Schema::Int) => true,
@@ -383,7 +384,10 @@ impl Value {
                     }
                 })
             }
-            _ => false,
+            (v, s) => {
+                error!("Unsupported value-schema combination:\n{:?}\n{:?}", v, s);
+                false
+            }
         }
     }
 
@@ -394,45 +398,79 @@ impl Value {
     /// in the Avro specification for the full set of rules of schema
     /// resolution.
     pub fn resolve(mut self, schema: &Schema) -> AvroResult<Self> {
-        // Check if this schema is a union, and if the reader schema is not.
-        if SchemaKind::from(&self) == SchemaKind::Union
-            && SchemaKind::from(schema) != SchemaKind::Union
-        {
-            // Pull out the Union, and attempt to resolve against it.
-            let v = match self {
-                Value::Union(b) => *b,
-                _ => unreachable!(),
-            };
-            self = v;
-        }
-        match *schema {
-            Schema::Null => self.resolve_null(),
-            Schema::Boolean => self.resolve_boolean(),
-            Schema::Int => self.resolve_int(),
-            Schema::Long => self.resolve_long(),
-            Schema::Float => self.resolve_float(),
-            Schema::Double => self.resolve_double(),
-            Schema::Bytes => self.resolve_bytes(),
-            Schema::String => self.resolve_string(),
-            Schema::Fixed { size, .. } => self.resolve_fixed(size),
-            Schema::Union(ref inner) => self.resolve_union(inner),
-            Schema::Enum { ref symbols, .. } => self.resolve_enum(symbols),
-            Schema::Array(ref inner) => self.resolve_array(inner),
-            Schema::Map(ref inner) => self.resolve_map(inner),
-            Schema::Record { ref fields, .. } => self.resolve_record(fields),
-            Schema::Decimal {
-                scale,
-                precision,
-                ref inner,
-            } => self.resolve_decimal(precision, scale, inner),
-            Schema::Date => self.resolve_date(),
-            Schema::TimeMillis => self.resolve_time_millis(),
-            Schema::TimeMicros => self.resolve_time_micros(),
-            Schema::TimestampMillis => self.resolve_timestamp_millis(),
-            Schema::TimestampMicros => self.resolve_timestamp_micros(),
-            Schema::Duration => self.resolve_duration(),
-            Schema::Uuid => self.resolve_uuid(),
+        pub fn resolve0(
+            value: &mut Value,
+            schema: &Schema,
+            schemas_by_name: &mut HashMap<String, Schema>,
+        ) -> AvroResult<Value> {
+            // Check if this schema is a union, and if the reader schema is not.
+            if SchemaKind::from(&value.clone()) == SchemaKind::Union
+                && SchemaKind::from(schema) != SchemaKind::Union
+            {
+                // Pull out the Union, and attempt to resolve against it.
+                let v = match value {
+                    Value::Union(b) => &**b,
+                    _ => unreachable!(),
+                };
+                *value = v.clone();
+            }
+            let val: Value = value.clone();
+            match *schema {
+                Schema::Ref { ref name } => {
+                    if let Some(resolved) = schemas_by_name.get(name.name.as_str()) {
+                        resolve0(value, resolved, &mut schemas_by_name.clone())
+                    } else {
+                        Err(Error::SchemaResolutionError(name.name.clone()))
+                    }
+                }
+                Schema::Null => val.resolve_null(),
+                Schema::Boolean => val.resolve_boolean(),
+                Schema::Int => val.resolve_int(),
+                Schema::Long => val.resolve_long(),
+                Schema::Float => val.resolve_float(),
+                Schema::Double => val.resolve_double(),
+                Schema::Bytes => val.resolve_bytes(),
+                Schema::String => val.resolve_string(),
+                Schema::Fixed { ref name, size, .. } => {
+                    schemas_by_name.insert(name.name.clone(), schema.clone());
+                    val.resolve_fixed(size)
+                }
+                Schema::Union(ref inner) => val.resolve_union(inner),
+                Schema::Enum {
+                    ref name,
+                    ref symbols,
+                    ..
+                } => {
+                    schemas_by_name.insert(name.name.clone(), schema.clone());
+                    val.resolve_enum(symbols)
+                }
+                Schema::Array(ref inner) => val.resolve_array(inner),
+                Schema::Map(ref inner) => val.resolve_map(inner),
+                Schema::Record {
+                    ref name,
+                    ref fields,
+                    ..
+                } => {
+                    schemas_by_name.insert(name.name.clone(), schema.clone());
+                    val.resolve_record(fields)
+                }
+                Schema::Decimal {
+                    scale,
+                    precision,
+                    ref inner,
+                } => val.resolve_decimal(precision, scale, inner),
+                Schema::Date => val.resolve_date(),
+                Schema::TimeMillis => val.resolve_time_millis(),
+                Schema::TimeMicros => val.resolve_time_micros(),
+                Schema::TimestampMillis => val.resolve_timestamp_millis(),
+                Schema::TimestampMicros => val.resolve_timestamp_micros(),
+                Schema::Duration => val.resolve_duration(),
+                Schema::Uuid => val.resolve_uuid(),
+            }
         }
+
+        let mut schemas_by_name: HashMap<String, Schema> = HashMap::new();
+        resolve0(&mut self, schema, &mut schemas_by_name)
     }
 
     fn resolve_uuid(self) -> Result<Self, Error> {
diff --git a/lang/rust/src/util.rs b/lang/rust/src/util.rs
index f9daf28..e2b353b 100644
--- a/lang/rust/src/util.rs
+++ b/lang/rust/src/util.rs
@@ -21,7 +21,7 @@ use std::{convert::TryFrom, i64, io::Read, sync::Once};
 
 /// Maximum number of bytes that can be allocated when decoding
 /// Avro-encoded values. This is a protection against ill-formed
-/// data, whose length field might be interpreted as enourmous.
+/// data, whose length field might be interpreted as enormous.
 /// See max_allocation_bytes to change this limit.
 pub static mut MAX_ALLOCATION_BYTES: usize = 512 * 1024 * 1024;
 static MAX_ALLOCATION_BYTES_ONCE: Once = Once::new();
@@ -153,19 +153,40 @@ mod tests {
     #[test]
     fn test_zig_i64() {
         let mut s = Vec::new();
-        zig_i64(std::i32::MAX as i64, &mut s);
+
+        zig_i64(0, &mut s);
+        assert_eq!(s, [0]);
+
+        s.clear();
+        zig_i64(-1, &mut s);
+        assert_eq!(s, [1]);
+
+        s.clear();
+        zig_i64(1, &mut s);
+        assert_eq!(s, [2]);
+
+        s.clear();
+        zig_i64(-64, &mut s);
+        assert_eq!(s, [127]);
+
+        s.clear();
+        zig_i64(64, &mut s);
+        assert_eq!(s, [128, 1]);
+
+        s.clear();
+        zig_i64(i32::MAX as i64, &mut s);
         assert_eq!(s, [254, 255, 255, 255, 15]);
 
         s.clear();
-        zig_i64(std::i32::MAX as i64 + 1, &mut s);
+        zig_i64(i32::MAX as i64 + 1, &mut s);
         assert_eq!(s, [128, 128, 128, 128, 16]);
 
         s.clear();
-        zig_i64(std::i32::MIN as i64, &mut s);
+        zig_i64(i32::MIN as i64, &mut s);
         assert_eq!(s, [255, 255, 255, 255, 15]);
 
         s.clear();
-        zig_i64(std::i32::MIN as i64 - 1, &mut s);
+        zig_i64(i32::MIN as i64 - 1, &mut s);
         assert_eq!(s, [129, 128, 128, 128, 16]);
 
         s.clear();
@@ -180,27 +201,27 @@ mod tests {
     #[test]
     fn test_zig_i32() {
         let mut s = Vec::new();
-        zig_i32(std::i32::MAX / 2, &mut s);
+        zig_i32(i32::MAX / 2, &mut s);
         assert_eq!(s, [254, 255, 255, 255, 7]);
 
         s.clear();
-        zig_i32(std::i32::MIN / 2, &mut s);
+        zig_i32(i32::MIN / 2, &mut s);
         assert_eq!(s, [255, 255, 255, 255, 7]);
 
         s.clear();
-        zig_i32(-(std::i32::MIN / 2), &mut s);
+        zig_i32(-(i32::MIN / 2), &mut s);
         assert_eq!(s, [128, 128, 128, 128, 8]);
 
         s.clear();
-        zig_i32(std::i32::MIN / 2 - 1, &mut s);
+        zig_i32(i32::MIN / 2 - 1, &mut s);
         assert_eq!(s, [129, 128, 128, 128, 8]);
 
         s.clear();
-        zig_i32(std::i32::MAX, &mut s);
+        zig_i32(i32::MAX, &mut s);
         assert_eq!(s, [254, 255, 255, 255, 15]);
 
         s.clear();
-        zig_i32(std::i32::MIN, &mut s);
+        zig_i32(i32::MIN, &mut s);
         assert_eq!(s, [255, 255, 255, 255, 15]);
     }
 
diff --git a/lang/rust/tests/schema.rs b/lang/rust/tests/schema.rs
index cc96429..77b6569 100644
--- a/lang/rust/tests/schema.rs
+++ b/lang/rust/tests/schema.rs
@@ -17,9 +17,11 @@
 
 use avro_rs::{
     schema::{Name, RecordField},
-    Error, Schema,
+    types::{Record, Value},
+    Codec, Error, Reader, Schema, Writer,
 };
 use lazy_static::lazy_static;
+use log::debug;
 
 fn init() {
     let _ = env_logger::builder()
@@ -739,29 +741,28 @@ fn test_parse_list_with_cross_deps_basic() {
 }
 
 #[test]
-/// Test that if a cycle of dependencies occurs in the input schema jsons, the algorithm terminates
-/// and returns an error. N.B. In the future, when recursive types are supported, this should be
-/// revisited.
-fn test_parse_list_recursive_type_error() {
+fn test_parse_list_recursive_type() {
     init();
     let schema_str_1 = r#"{
         "name": "A",
+        "doc": "A's schema",
         "type": "record",
         "fields": [
-            {"name": "field_one", "type": "B"}
+            {"name": "a_field_one", "type": "B"}
         ]
     }"#;
     let schema_str_2 = r#"{
         "name": "B",
+        "doc": "B's schema",
         "type": "record",
         "fields": [
-            {"name": "field_one", "type": "A"}
+            {"name": "b_field_one", "type": "A"}
         ]
     }"#;
     let schema_strs_first = [schema_str_1, schema_str_2];
     let schema_strs_second = [schema_str_2, schema_str_1];
-    let _ = Schema::parse_list(&schema_strs_first).expect_err("Test failed");
-    let _ = Schema::parse_list(&schema_strs_second).expect_err("Test failed");
+    let _ = Schema::parse_list(&schema_strs_first).expect("Test failed");
+    let _ = Schema::parse_list(&schema_strs_second).expect("Test failed");
 }
 
 #[test]
@@ -1306,6 +1307,73 @@ fn test_root_error_is_not_swallowed_on_parse_error() -> Result<(), String> {
     }
 }
 
+// AVRO-3302
+#[test]
+fn test_record_schema_with_cyclic_references() {
+    let schema = Schema::parse_str(
+        r#"
+            {
+                "type": "record",
+                "name": "test",
+                "fields": [{
+                    "name": "recordField",
+                    "type": {
+                        "type": "record",
+                        "name": "Node",
+                        "fields": [
+                            {"name": "label", "type": "string"},
+                            {"name": "children", "type": {"type": "array", "items": "Node"}}
+                        ]
+                    }
+                }]
+            }
+        "#,
+    )
+    .unwrap();
+
+    let mut datum = Record::new(&schema).unwrap();
+    datum.put(
+        "recordField",
+        Value::Record(vec![
+            ("label".into(), Value::String("level_1".into())),
+            (
+                "children".into(),
+                Value::Array(vec![Value::Record(vec![
+                    ("label".into(), Value::String("level_2".into())),
+                    (
+                        "children".into(),
+                        Value::Array(vec![Value::Record(vec![
+                            ("label".into(), Value::String("level_3".into())),
+                            (
+                                "children".into(),
+                                Value::Array(vec![Value::Record(vec![
+                                    ("label".into(), Value::String("level_4".into())),
+                                    ("children".into(), Value::Array(vec![])),
+                                ])]),
+                            ),
+                        ])]),
+                    ),
+                ])]),
+            ),
+        ]),
+    );
+
+    let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Null);
+    if let Err(err) = writer.append(datum) {
+        panic!("An error occurred while writing datum: {:?}", err)
+    }
+    let bytes = writer.into_inner().unwrap();
+    assert_eq!(316, bytes.len());
+
+    match Reader::new(&mut bytes.as_slice()) {
+        Ok(mut reader) => match reader.next() {
+            Some(value) => debug!("{:?}", value.unwrap()),
+            None => panic!("No value was read!"),
+        },
+        Err(err) => panic!("An error occurred while reading datum: {:?}", err),
+    }
+}
+
 /*
 // TODO: (#93) add support for logical type and attributes and uncomment (may need some tweaks to compile)
 #[test]