You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/09/26 15:55:56 UTC

[arrow-rs] branch master updated: Faster Serde Integration (~80% faster) (#4861)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new fbd9008d31 Faster Serde Integration (~80% faster) (#4861)
fbd9008d31 is described below

commit fbd9008d31e51018494c48eff032a77b93fab56a
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Tue Sep 26 16:55:50 2023 +0100

    Faster Serde Integration (~80% faster) (#4861)
    
    * Store decoded numerics in JSON tape
    
    * Add arrow-json serde benchmarks
    
    * Fix timestamp serialize
    
    * Clippy
---
 arrow-json/Cargo.toml                    |  7 ++++
 arrow-json/benches/serde.rs              | 62 ++++++++++++++++++++++++++++++++
 arrow-json/src/reader/primitive_array.rs | 46 +++++++++++++++++++-----
 arrow-json/src/reader/serializer.rs      | 56 +++++++++++++++--------------
 arrow-json/src/reader/tape.rs            | 51 ++++++++++++++++++++++++--
 arrow-json/src/reader/timestamp_array.rs |  7 ++++
 6 files changed, 192 insertions(+), 37 deletions(-)

diff --git a/arrow-json/Cargo.toml b/arrow-json/Cargo.toml
index 977ed4390c..df38a52811 100644
--- a/arrow-json/Cargo.toml
+++ b/arrow-json/Cargo.toml
@@ -54,3 +54,10 @@ serde = { version = "1.0", default-features = false, features = ["derive"] }
 futures = "0.3"
 tokio = { version = "1.27", default-features = false, features = ["io-util"] }
 bytes = "1.4"
+criterion = { version = "0.5", default-features = false }
+rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] }
+
+[[bench]]
+name = "serde"
+harness = false
+
diff --git a/arrow-json/benches/serde.rs b/arrow-json/benches/serde.rs
new file mode 100644
index 0000000000..7636b9c9df
--- /dev/null
+++ b/arrow-json/benches/serde.rs
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow_json::ReaderBuilder;
+use arrow_schema::{DataType, Field, Schema};
+use criterion::*;
+use rand::{thread_rng, Rng};
+use serde::Serialize;
+use std::sync::Arc;
+
+#[allow(deprecated)]
+fn do_bench<R: Serialize>(c: &mut Criterion, name: &str, rows: &[R], schema: &Schema) {
+    let schema = Arc::new(schema.clone());
+    c.bench_function(name, |b| {
+        b.iter(|| {
+            let builder = ReaderBuilder::new(schema.clone()).with_batch_size(64);
+            let mut decoder = builder.build_decoder().unwrap();
+            decoder.serialize(rows)
+        })
+    });
+}
+
+fn criterion_benchmark(c: &mut Criterion) {
+    let mut rng = thread_rng();
+    let schema = Schema::new(vec![Field::new("i32", DataType::Int32, false)]);
+    let v: Vec<i32> = (0..2048).map(|_| rng.gen_range(0..10000)).collect();
+
+    do_bench(c, "small_i32", &v, &schema);
+    let v: Vec<i32> = (0..2048).map(|_| rng.gen()).collect();
+    do_bench(c, "large_i32", &v, &schema);
+
+    let schema = Schema::new(vec![Field::new("i64", DataType::Int64, false)]);
+    let v: Vec<i64> = (0..2048).map(|_| rng.gen_range(0..10000)).collect();
+    do_bench(c, "small_i64", &v, &schema);
+    let v: Vec<i64> = (0..2048).map(|_| rng.gen_range(0..i32::MAX as _)).collect();
+    do_bench(c, "medium_i64", &v, &schema);
+    let v: Vec<i64> = (0..2048).map(|_| rng.gen()).collect();
+    do_bench(c, "large_i64", &v, &schema);
+
+    let schema = Schema::new(vec![Field::new("f32", DataType::Float32, false)]);
+    let v: Vec<f32> = (0..2048).map(|_| rng.gen_range(0.0..10000.)).collect();
+    do_bench(c, "small_f32", &v, &schema);
+    let v: Vec<f32> = (0..2048).map(|_| rng.gen_range(0.0..f32::MAX)).collect();
+    do_bench(c, "large_f32", &v, &schema);
+}
+
+criterion_group!(benches, criterion_benchmark);
+criterion_main!(benches);
diff --git a/arrow-json/src/reader/primitive_array.rs b/arrow-json/src/reader/primitive_array.rs
index c78e4d9140..6cf0bac867 100644
--- a/arrow-json/src/reader/primitive_array.rs
+++ b/arrow-json/src/reader/primitive_array.rs
@@ -91,11 +91,12 @@ impl<P: ArrowPrimitiveType> PrimitiveArrayDecoder<P> {
 impl<P> ArrayDecoder for PrimitiveArrayDecoder<P>
 where
     P: ArrowPrimitiveType + Parser,
-    P::Native: ParseJsonNumber,
+    P::Native: ParseJsonNumber + NumCast,
 {
     fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError> {
         let mut builder = PrimitiveBuilder::<P>::with_capacity(pos.len())
             .with_data_type(self.data_type.clone());
+        let d = &self.data_type;
 
         for p in pos {
             match tape.get(*p) {
@@ -103,10 +104,7 @@ where
                 TapeElement::String(idx) => {
                     let s = tape.get_string(idx);
                     let value = P::parse(s).ok_or_else(|| {
-                        ArrowError::JsonError(format!(
-                            "failed to parse \"{s}\" as {}",
-                            self.data_type
-                        ))
+                        ArrowError::JsonError(format!("failed to parse \"{s}\" as {d}",))
                     })?;
 
                     builder.append_value(value)
@@ -115,14 +113,44 @@ where
                     let s = tape.get_string(idx);
                     let value =
                         ParseJsonNumber::parse(s.as_bytes()).ok_or_else(|| {
-                            ArrowError::JsonError(format!(
-                                "failed to parse {s} as {}",
-                                self.data_type
-                            ))
+                            ArrowError::JsonError(format!("failed to parse {s} as {d}",))
                         })?;
 
                     builder.append_value(value)
                 }
+                TapeElement::F32(v) => {
+                    let v = f32::from_bits(v);
+                    let value = NumCast::from(v).ok_or_else(|| {
+                        ArrowError::JsonError(format!("failed to parse {v} as {d}",))
+                    })?;
+                    builder.append_value(value)
+                }
+                TapeElement::I32(v) => {
+                    let value = NumCast::from(v).ok_or_else(|| {
+                        ArrowError::JsonError(format!("failed to parse {v} as {d}",))
+                    })?;
+                    builder.append_value(value)
+                }
+                TapeElement::F64(high) => match tape.get(p + 1) {
+                    TapeElement::F32(low) => {
+                        let v = f64::from_bits((high as u64) << 32 | low as u64);
+                        let value = NumCast::from(v).ok_or_else(|| {
+                            ArrowError::JsonError(format!("failed to parse {v} as {d}",))
+                        })?;
+                        builder.append_value(value)
+                    }
+                    _ => unreachable!(),
+                },
+                TapeElement::I64(high) => match tape.get(p + 1) {
+                    TapeElement::I32(low) => {
+                        let v = (high as i64) << 32 | low as i64;
+                        let value = NumCast::from(v).ok_or_else(|| {
+                            ArrowError::JsonError(format!("failed to parse {v} as {d}",))
+                        })?;
+                        builder.append_value(value)
+                    }
+                    _ => unreachable!(),
+                },
                 _ => return Err(tape.error(*p, "primitive")),
             }
         }
diff --git a/arrow-json/src/reader/serializer.rs b/arrow-json/src/reader/serializer.rs
index 2aa72de943..2fd250bdfc 100644
--- a/arrow-json/src/reader/serializer.rs
+++ b/arrow-json/src/reader/serializer.rs
@@ -77,22 +77,6 @@ impl<'a> TapeSerializer<'a> {
     }
 }
 
-/// The tape stores all values as strings, and so must serialize numeric types
-///
-/// Formatting to a string only to parse it back again is rather wasteful,
-/// it may be possible to tweak the tape representation to avoid this
-///
-/// Need to use macro as const generic expressions are unstable
-/// <https://github.com/rust-lang/rust/issues/76560>
-macro_rules! serialize_numeric {
-    ($s:ident, $t:ty, $v:ident) => {{
-        let mut buffer = [0_u8; <$t>::FORMATTED_SIZE];
-        let s = lexical_core::write($v, &mut buffer);
-        $s.serialize_number(s);
-        Ok(())
-    }};
-}
-
 impl<'a, 'b> Serializer for &'a mut TapeSerializer<'b> {
     type Ok = ();
 
@@ -115,43 +99,63 @@ impl<'a, 'b> Serializer for &'a mut TapeSerializer<'b> {
     }
 
     fn serialize_i8(self, v: i8) -> Result<(), SerializerError> {
-        serialize_numeric!(self, i8, v)
+        self.serialize_i32(v as _)
     }
 
     fn serialize_i16(self, v: i16) -> Result<(), SerializerError> {
-        serialize_numeric!(self, i16, v)
+        self.serialize_i32(v as _)
     }
 
     fn serialize_i32(self, v: i32) -> Result<(), SerializerError> {
-        serialize_numeric!(self, i32, v)
+        self.elements.push(TapeElement::I32(v));
+        Ok(())
     }
 
     fn serialize_i64(self, v: i64) -> Result<(), SerializerError> {
-        serialize_numeric!(self, i64, v)
+        let low = v as i32;
+        let high = (v >> 32) as i32;
+        self.elements.push(TapeElement::I64(high));
+        self.elements.push(TapeElement::I32(low));
+        Ok(())
     }
 
     fn serialize_u8(self, v: u8) -> Result<(), SerializerError> {
-        serialize_numeric!(self, u8, v)
+        self.serialize_i32(v as _)
     }
 
     fn serialize_u16(self, v: u16) -> Result<(), SerializerError> {
-        serialize_numeric!(self, u16, v)
+        self.serialize_i32(v as _)
     }
 
     fn serialize_u32(self, v: u32) -> Result<(), SerializerError> {
-        serialize_numeric!(self, u32, v)
+        match i32::try_from(v) {
+            Ok(v) => self.serialize_i32(v),
+            Err(_) => self.serialize_i64(v as _),
+        }
     }
 
     fn serialize_u64(self, v: u64) -> Result<(), SerializerError> {
-        serialize_numeric!(self, u64, v)
+        match i64::try_from(v) {
+            Ok(v) => self.serialize_i64(v),
+            Err(_) => {
+                let mut buffer = [0_u8; u64::FORMATTED_SIZE];
+                let s = lexical_core::write(v, &mut buffer);
+                self.serialize_number(s);
+                Ok(())
+            }
+        }
     }
 
     fn serialize_f32(self, v: f32) -> Result<(), SerializerError> {
-        serialize_numeric!(self, f32, v)
+        self.elements.push(TapeElement::F32(v.to_bits()));
+        Ok(())
     }
 
     fn serialize_f64(self, v: f64) -> Result<(), SerializerError> {
-        serialize_numeric!(self, f64, v)
+        let bits = v.to_bits();
+        self.elements.push(TapeElement::F64((bits >> 32) as u32));
+        self.elements.push(TapeElement::F32(bits as u32));
+        Ok(())
     }
 
     fn serialize_char(self, v: char) -> Result<(), SerializerError> {
diff --git a/arrow-json/src/reader/tape.rs b/arrow-json/src/reader/tape.rs
index 5eca7b43dc..801e8f29d5 100644
--- a/arrow-json/src/reader/tape.rs
+++ b/arrow-json/src/reader/tape.rs
@@ -18,6 +18,7 @@
 use crate::reader::serializer::TapeSerializer;
 use arrow_schema::ArrowError;
 use serde::Serialize;
+use std::fmt::Write;
 
 /// We decode JSON to a flattened tape representation,
 /// allowing for efficient traversal of the JSON data
@@ -54,6 +55,25 @@ pub enum TapeElement {
     ///
     /// Contains the offset into the [`Tape`] string data
     Number(u32),
+
+    /// The high bits of a i64
+    ///
+    /// Followed by [`Self::I32`] containing the low bits
+    I64(i32),
+
+    /// A 32-bit signed integer
+    ///
+    /// May be preceded by [`Self::I64`] containing high bits
+    I32(i32),
+
+    /// The high bits of a 64-bit float
+    ///
+    /// Followed by [`Self::F32`] containing the low bits
+    F64(u32),
+
+    /// A 32-bit float or the low-bits of a 64-bit float if preceded by [`Self::F64`]
+    F32(u32),
+
     /// A true literal
     True,
     /// A false literal
@@ -104,10 +124,15 @@ impl<'a> Tape<'a> {
             | TapeElement::Number(_)
             | TapeElement::True
             | TapeElement::False
-            | TapeElement::Null => Ok(cur_idx + 1),
+            | TapeElement::Null
+            | TapeElement::I32(_)
+            | TapeElement::F32(_) => Ok(cur_idx + 1),
+            TapeElement::I64(_) | TapeElement::F64(_) => Ok(cur_idx + 2),
             TapeElement::StartList(end_idx) => Ok(end_idx + 1),
             TapeElement::StartObject(end_idx) => Ok(end_idx + 1),
-            _ => Err(self.error(cur_idx, expected)),
+            TapeElement::EndObject(_) | TapeElement::EndList(_) => {
+                Err(self.error(cur_idx, expected))
+            }
         }
     }
 
@@ -153,6 +178,28 @@ impl<'a> Tape<'a> {
             TapeElement::True => out.push_str("true"),
             TapeElement::False => out.push_str("false"),
             TapeElement::Null => out.push_str("null"),
+            TapeElement::I64(high) => match self.get(idx + 1) {
+                TapeElement::I32(low) => {
+                    let val = (high as i64) << 32 | low as i64;
+                    let _ = write!(out, "{val}");
+                    return idx + 2;
+                }
+                _ => unreachable!(),
+            },
+            TapeElement::I32(val) => {
+                let _ = write!(out, "{val}");
+            }
+            TapeElement::F64(high) => match self.get(idx + 1) {
+                TapeElement::F32(low) => {
+                    let val = f64::from_bits((high as u64) << 32 | low as u64);
+                    let _ = write!(out, "{val}");
+                    return idx + 2;
+                }
+                _ => unreachable!(),
+            },
+            TapeElement::F32(val) => {
+                let _ = write!(out, "{}", f32::from_bits(val));
+            }
         }
         idx + 1
     }
diff --git a/arrow-json/src/reader/timestamp_array.rs b/arrow-json/src/reader/timestamp_array.rs
index b80915f6a5..0967261410 100644
--- a/arrow-json/src/reader/timestamp_array.rs
+++ b/arrow-json/src/reader/timestamp_array.rs
@@ -96,6 +96,13 @@ where
 
                     builder.append_value(value)
                 }
+                TapeElement::I32(v) => builder.append_value(v as i64),
+                TapeElement::I64(high) => match tape.get(p + 1) {
+                    TapeElement::I32(low) => {
+                        builder.append_value((high as i64) << 32 | low as i64)
+                    }
+                    _ => unreachable!(),
+                },
                 _ => return Err(tape.error(*p, "primitive")),
             }
         }