You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2022/12/22 08:56:45 UTC
[arrow-ballista] 01/01: Use lz4 for shuffle compression
This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch use_lz4
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
commit 9c88bc4115260ff55cda6484fae357496fe6fb7b
Author: Daniƫl Heres <da...@coralogix.com>
AuthorDate: Thu Dec 22 09:56:27 2022 +0100
Use lz4 for shuffle compression
---
ballista/executor/src/flight_service.rs | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
diff --git a/ballista/executor/src/flight_service.rs b/ballista/executor/src/flight_service.rs
index d12686ed..be762b1d 100644
--- a/ballista/executor/src/flight_service.rs
+++ b/ballista/executor/src/flight_service.rs
@@ -21,6 +21,7 @@ use std::convert::TryFrom;
use std::fs::File;
use std::pin::Pin;
+use arrow::ipc::CompressionType;
use arrow_flight::SchemaAsIpc;
use ballista_core::error::BallistaError;
use ballista_core::serde::decode_protobuf;
@@ -228,7 +229,10 @@ async fn stream_flight_data<T>(
where
T: Read + Seek,
{
- let options = arrow::ipc::writer::IpcWriteOptions::default();
+ let options = arrow::ipc::writer::IpcWriteOptions::default()
+ .try_with_compression(Some(CompressionType::LZ4_FRAME)).map_err(
+ |err| Status::internal(format!("Couldn't create writer: {}", err.to_string()))
+ )?;
let schema_flight_data = SchemaAsIpc::new(reader.schema().as_ref(), &options).into();
send_response(&tx, Ok(schema_flight_data)).await?;