You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2022/12/22 08:56:45 UTC

[arrow-ballista] 01/01: Use lz4 for shuffle compression

This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch use_lz4
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git

commit 9c88bc4115260ff55cda6484fae357496fe6fb7b
Author: Daniƫl Heres <da...@coralogix.com>
AuthorDate: Thu Dec 22 09:56:27 2022 +0100

    Use lz4 for shuffle compression
---
 ballista/executor/src/flight_service.rs | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/ballista/executor/src/flight_service.rs b/ballista/executor/src/flight_service.rs
index d12686ed..be762b1d 100644
--- a/ballista/executor/src/flight_service.rs
+++ b/ballista/executor/src/flight_service.rs
@@ -21,6 +21,7 @@ use std::convert::TryFrom;
 use std::fs::File;
 use std::pin::Pin;
 
+use arrow::ipc::CompressionType;
 use arrow_flight::SchemaAsIpc;
 use ballista_core::error::BallistaError;
 use ballista_core::serde::decode_protobuf;
@@ -228,7 +229,10 @@ async fn stream_flight_data<T>(
 where
     T: Read + Seek,
 {
-    let options = arrow::ipc::writer::IpcWriteOptions::default();
+    let options = arrow::ipc::writer::IpcWriteOptions::default()
+        .try_with_compression(Some(CompressionType::LZ4_FRAME)).map_err(
+            |err| Status::internal(format!("Couldn't create writer: {}", err.to_string()))
+    )?;
     let schema_flight_data = SchemaAsIpc::new(reader.schema().as_ref(), &options).into();
     send_response(&tx, Ok(schema_flight_data)).await?;