You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/09/26 15:55:56 UTC
[arrow-rs] branch master updated: Faster Serde Integration (~80% faster) (#4861)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new fbd9008d31 Faster Serde Integration (~80% faster) (#4861)
fbd9008d31 is described below
commit fbd9008d31e51018494c48eff032a77b93fab56a
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Tue Sep 26 16:55:50 2023 +0100
Faster Serde Integration (~80% faster) (#4861)
* Store decoded numerics in JSON tape
* Add arrow-json serde benchmarks
* Fix timestamp serialize
* Clippy
---
arrow-json/Cargo.toml | 7 ++++
arrow-json/benches/serde.rs | 62 ++++++++++++++++++++++++++++++++
arrow-json/src/reader/primitive_array.rs | 46 +++++++++++++++++++-----
arrow-json/src/reader/serializer.rs | 56 +++++++++++++++--------------
arrow-json/src/reader/tape.rs | 51 ++++++++++++++++++++++++--
arrow-json/src/reader/timestamp_array.rs | 7 ++++
6 files changed, 192 insertions(+), 37 deletions(-)
diff --git a/arrow-json/Cargo.toml b/arrow-json/Cargo.toml
index 977ed4390c..df38a52811 100644
--- a/arrow-json/Cargo.toml
+++ b/arrow-json/Cargo.toml
@@ -54,3 +54,10 @@ serde = { version = "1.0", default-features = false, features = ["derive"] }
futures = "0.3"
tokio = { version = "1.27", default-features = false, features = ["io-util"] }
bytes = "1.4"
+criterion = { version = "0.5", default-features = false }
+rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] }
+
+[[bench]]
+name = "serde"
+harness = false
+
diff --git a/arrow-json/benches/serde.rs b/arrow-json/benches/serde.rs
new file mode 100644
index 0000000000..7636b9c9df
--- /dev/null
+++ b/arrow-json/benches/serde.rs
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow_json::ReaderBuilder;
+use arrow_schema::{DataType, Field, Schema};
+use criterion::*;
+use rand::{thread_rng, Rng};
+use serde::Serialize;
+use std::sync::Arc;
+
+#[allow(deprecated)]
+fn do_bench<R: Serialize>(c: &mut Criterion, name: &str, rows: &[R], schema: &Schema) {
+ let schema = Arc::new(schema.clone());
+ c.bench_function(name, |b| {
+ b.iter(|| {
+ let builder = ReaderBuilder::new(schema.clone()).with_batch_size(64);
+ let mut decoder = builder.build_decoder().unwrap();
+ decoder.serialize(rows)
+ })
+ });
+}
+
+fn criterion_benchmark(c: &mut Criterion) {
+ let mut rng = thread_rng();
+ let schema = Schema::new(vec![Field::new("i32", DataType::Int32, false)]);
+ let v: Vec<i32> = (0..2048).map(|_| rng.gen_range(0..10000)).collect();
+
+ do_bench(c, "small_i32", &v, &schema);
+ let v: Vec<i32> = (0..2048).map(|_| rng.gen()).collect();
+ do_bench(c, "large_i32", &v, &schema);
+
+ let schema = Schema::new(vec![Field::new("i64", DataType::Int64, false)]);
+ let v: Vec<i64> = (0..2048).map(|_| rng.gen_range(0..10000)).collect();
+ do_bench(c, "small_i64", &v, &schema);
+ let v: Vec<i64> = (0..2048).map(|_| rng.gen_range(0..i32::MAX as _)).collect();
+ do_bench(c, "medium_i64", &v, &schema);
+ let v: Vec<i64> = (0..2048).map(|_| rng.gen()).collect();
+ do_bench(c, "large_i64", &v, &schema);
+
+ let schema = Schema::new(vec![Field::new("f32", DataType::Float32, false)]);
+ let v: Vec<f32> = (0..2048).map(|_| rng.gen_range(0.0..10000.)).collect();
+ do_bench(c, "small_f32", &v, &schema);
+ let v: Vec<f32> = (0..2048).map(|_| rng.gen_range(0.0..f32::MAX)).collect();
+ do_bench(c, "large_f32", &v, &schema);
+}
+
+criterion_group!(benches, criterion_benchmark);
+criterion_main!(benches);
diff --git a/arrow-json/src/reader/primitive_array.rs b/arrow-json/src/reader/primitive_array.rs
index c78e4d9140..6cf0bac867 100644
--- a/arrow-json/src/reader/primitive_array.rs
+++ b/arrow-json/src/reader/primitive_array.rs
@@ -91,11 +91,12 @@ impl<P: ArrowPrimitiveType> PrimitiveArrayDecoder<P> {
impl<P> ArrayDecoder for PrimitiveArrayDecoder<P>
where
P: ArrowPrimitiveType + Parser,
- P::Native: ParseJsonNumber,
+ P::Native: ParseJsonNumber + NumCast,
{
fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError> {
let mut builder = PrimitiveBuilder::<P>::with_capacity(pos.len())
.with_data_type(self.data_type.clone());
+ let d = &self.data_type;
for p in pos {
match tape.get(*p) {
@@ -103,10 +104,7 @@ where
TapeElement::String(idx) => {
let s = tape.get_string(idx);
let value = P::parse(s).ok_or_else(|| {
- ArrowError::JsonError(format!(
- "failed to parse \"{s}\" as {}",
- self.data_type
- ))
+ ArrowError::JsonError(format!("failed to parse \"{s}\" as {d}",))
})?;
builder.append_value(value)
@@ -115,14 +113,44 @@ where
let s = tape.get_string(idx);
let value =
ParseJsonNumber::parse(s.as_bytes()).ok_or_else(|| {
- ArrowError::JsonError(format!(
- "failed to parse {s} as {}",
- self.data_type
- ))
+ ArrowError::JsonError(format!("failed to parse {s} as {d}",))
})?;
builder.append_value(value)
}
+ TapeElement::F32(v) => {
+ let v = f32::from_bits(v);
+ let value = NumCast::from(v).ok_or_else(|| {
+ ArrowError::JsonError(format!("failed to parse {v} as {d}",))
+ })?;
+ builder.append_value(value)
+ }
+ TapeElement::I32(v) => {
+ let value = NumCast::from(v).ok_or_else(|| {
+ ArrowError::JsonError(format!("failed to parse {v} as {d}",))
+ })?;
+ builder.append_value(value)
+ }
+ TapeElement::F64(high) => match tape.get(p + 1) {
+ TapeElement::F32(low) => {
+ let v = f64::from_bits((high as u64) << 32 | low as u64);
+ let value = NumCast::from(v).ok_or_else(|| {
+ ArrowError::JsonError(format!("failed to parse {v} as {d}",))
+ })?;
+ builder.append_value(value)
+ }
+ _ => unreachable!(),
+ },
+ TapeElement::I64(high) => match tape.get(p + 1) {
+ TapeElement::I32(low) => {
+ let v = (high as i64) << 32 | low as i64;
+ let value = NumCast::from(v).ok_or_else(|| {
+ ArrowError::JsonError(format!("failed to parse {v} as {d}",))
+ })?;
+ builder.append_value(value)
+ }
+ _ => unreachable!(),
+ },
_ => return Err(tape.error(*p, "primitive")),
}
}
diff --git a/arrow-json/src/reader/serializer.rs b/arrow-json/src/reader/serializer.rs
index 2aa72de943..2fd250bdfc 100644
--- a/arrow-json/src/reader/serializer.rs
+++ b/arrow-json/src/reader/serializer.rs
@@ -77,22 +77,6 @@ impl<'a> TapeSerializer<'a> {
}
}
-/// The tape stores all values as strings, and so must serialize numeric types
-///
-/// Formatting to a string only to parse it back again is rather wasteful,
-/// it may be possible to tweak the tape representation to avoid this
-///
-/// Need to use macro as const generic expressions are unstable
-/// <https://github.com/rust-lang/rust/issues/76560>
-macro_rules! serialize_numeric {
- ($s:ident, $t:ty, $v:ident) => {{
- let mut buffer = [0_u8; <$t>::FORMATTED_SIZE];
- let s = lexical_core::write($v, &mut buffer);
- $s.serialize_number(s);
- Ok(())
- }};
-}
-
impl<'a, 'b> Serializer for &'a mut TapeSerializer<'b> {
type Ok = ();
@@ -115,43 +99,63 @@ impl<'a, 'b> Serializer for &'a mut TapeSerializer<'b> {
}
fn serialize_i8(self, v: i8) -> Result<(), SerializerError> {
- serialize_numeric!(self, i8, v)
+ self.serialize_i32(v as _)
}
fn serialize_i16(self, v: i16) -> Result<(), SerializerError> {
- serialize_numeric!(self, i16, v)
+ self.serialize_i32(v as _)
}
fn serialize_i32(self, v: i32) -> Result<(), SerializerError> {
- serialize_numeric!(self, i32, v)
+ self.elements.push(TapeElement::I32(v));
+ Ok(())
}
fn serialize_i64(self, v: i64) -> Result<(), SerializerError> {
- serialize_numeric!(self, i64, v)
+ let low = v as i32;
+ let high = (v >> 32) as i32;
+ self.elements.push(TapeElement::I64(high));
+ self.elements.push(TapeElement::I32(low));
+ Ok(())
}
fn serialize_u8(self, v: u8) -> Result<(), SerializerError> {
- serialize_numeric!(self, u8, v)
+ self.serialize_i32(v as _)
}
fn serialize_u16(self, v: u16) -> Result<(), SerializerError> {
- serialize_numeric!(self, u16, v)
+ self.serialize_i32(v as _)
}
fn serialize_u32(self, v: u32) -> Result<(), SerializerError> {
- serialize_numeric!(self, u32, v)
+ match i32::try_from(v) {
+ Ok(v) => self.serialize_i32(v),
+ Err(_) => self.serialize_i64(v as _),
+ }
}
fn serialize_u64(self, v: u64) -> Result<(), SerializerError> {
- serialize_numeric!(self, u64, v)
+ match i64::try_from(v) {
+ Ok(v) => self.serialize_i64(v),
+ Err(_) => {
+ let mut buffer = [0_u8; u64::FORMATTED_SIZE];
+ let s = lexical_core::write(v, &mut buffer);
+ self.serialize_number(s);
+ Ok(())
+ }
+ }
}
fn serialize_f32(self, v: f32) -> Result<(), SerializerError> {
- serialize_numeric!(self, f32, v)
+ self.elements.push(TapeElement::F32(v.to_bits()));
+ Ok(())
}
fn serialize_f64(self, v: f64) -> Result<(), SerializerError> {
- serialize_numeric!(self, f64, v)
+ let bits = v.to_bits();
+ self.elements.push(TapeElement::F64((bits >> 32) as u32));
+ self.elements.push(TapeElement::F32(bits as u32));
+ Ok(())
}
fn serialize_char(self, v: char) -> Result<(), SerializerError> {
diff --git a/arrow-json/src/reader/tape.rs b/arrow-json/src/reader/tape.rs
index 5eca7b43dc..801e8f29d5 100644
--- a/arrow-json/src/reader/tape.rs
+++ b/arrow-json/src/reader/tape.rs
@@ -18,6 +18,7 @@
use crate::reader::serializer::TapeSerializer;
use arrow_schema::ArrowError;
use serde::Serialize;
+use std::fmt::Write;
/// We decode JSON to a flattened tape representation,
/// allowing for efficient traversal of the JSON data
@@ -54,6 +55,25 @@ pub enum TapeElement {
///
/// Contains the offset into the [`Tape`] string data
Number(u32),
+
+ /// The high bits of a i64
+ ///
+ /// Followed by [`Self::I32`] containing the low bits
+ I64(i32),
+
+ /// A 32-bit signed integer
+ ///
+ /// May be preceded by [`Self::I64`] containing high bits
+ I32(i32),
+
+ /// The high bits of a 64-bit float
+ ///
+ /// Followed by [`Self::F32`] containing the low bits
+ F64(u32),
+
+ /// A 32-bit float or the low-bits of a 64-bit float if preceded by [`Self::F64`]
+ F32(u32),
+
/// A true literal
True,
/// A false literal
@@ -104,10 +124,15 @@ impl<'a> Tape<'a> {
| TapeElement::Number(_)
| TapeElement::True
| TapeElement::False
- | TapeElement::Null => Ok(cur_idx + 1),
+ | TapeElement::Null
+ | TapeElement::I32(_)
+ | TapeElement::F32(_) => Ok(cur_idx + 1),
+ TapeElement::I64(_) | TapeElement::F64(_) => Ok(cur_idx + 2),
TapeElement::StartList(end_idx) => Ok(end_idx + 1),
TapeElement::StartObject(end_idx) => Ok(end_idx + 1),
- _ => Err(self.error(cur_idx, expected)),
+ TapeElement::EndObject(_) | TapeElement::EndList(_) => {
+ Err(self.error(cur_idx, expected))
+ }
}
}
@@ -153,6 +178,28 @@ impl<'a> Tape<'a> {
TapeElement::True => out.push_str("true"),
TapeElement::False => out.push_str("false"),
TapeElement::Null => out.push_str("null"),
+ TapeElement::I64(high) => match self.get(idx + 1) {
+ TapeElement::I32(low) => {
+ let val = (high as i64) << 32 | low as i64;
+ let _ = write!(out, "{val}");
+ return idx + 2;
+ }
+ _ => unreachable!(),
+ },
+ TapeElement::I32(val) => {
+ let _ = write!(out, "{val}");
+ }
+ TapeElement::F64(high) => match self.get(idx + 1) {
+ TapeElement::F32(low) => {
+ let val = f64::from_bits((high as u64) << 32 | low as u64);
+ let _ = write!(out, "{val}");
+ return idx + 2;
+ }
+ _ => unreachable!(),
+ },
+ TapeElement::F32(val) => {
+ let _ = write!(out, "{}", f32::from_bits(val));
+ }
}
idx + 1
}
diff --git a/arrow-json/src/reader/timestamp_array.rs b/arrow-json/src/reader/timestamp_array.rs
index b80915f6a5..0967261410 100644
--- a/arrow-json/src/reader/timestamp_array.rs
+++ b/arrow-json/src/reader/timestamp_array.rs
@@ -96,6 +96,13 @@ where
builder.append_value(value)
}
+ TapeElement::I32(v) => builder.append_value(v as i64),
+ TapeElement::I64(high) => match tape.get(p + 1) {
+ TapeElement::I32(low) => {
+ builder.append_value((high as i64) << 32 | low as i64)
+ }
+ _ => unreachable!(),
+ },
_ => return Err(tape.error(*p, "primitive")),
}
}