You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2022/12/22 08:56:44 UTC

[arrow-ballista] branch use_lz4 created (now 9c88bc41)

This is an automated email from the ASF dual-hosted git repository.

dheres pushed a change to branch use_lz4
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


      at 9c88bc41 Use lz4 for shuffle compression

This branch includes the following new commits:

     new 9c88bc41 Use lz4 for shuffle compression

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[arrow-ballista] 01/01: Use lz4 for shuffle compression

Posted by dh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch use_lz4
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git

commit 9c88bc4115260ff55cda6484fae357496fe6fb7b
Author: Daniƫl Heres <da...@coralogix.com>
AuthorDate: Thu Dec 22 09:56:27 2022 +0100

    Use lz4 for shuffle compression
---
 ballista/executor/src/flight_service.rs | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/ballista/executor/src/flight_service.rs b/ballista/executor/src/flight_service.rs
index d12686ed..be762b1d 100644
--- a/ballista/executor/src/flight_service.rs
+++ b/ballista/executor/src/flight_service.rs
@@ -21,6 +21,7 @@ use std::convert::TryFrom;
 use std::fs::File;
 use std::pin::Pin;
 
+use arrow::ipc::CompressionType;
 use arrow_flight::SchemaAsIpc;
 use ballista_core::error::BallistaError;
 use ballista_core::serde::decode_protobuf;
@@ -228,7 +229,10 @@ async fn stream_flight_data<T>(
 where
     T: Read + Seek,
 {
-    let options = arrow::ipc::writer::IpcWriteOptions::default();
+    let options = arrow::ipc::writer::IpcWriteOptions::default()
+        .try_with_compression(Some(CompressionType::LZ4_FRAME)).map_err(
+            |err| Status::internal(format!("Couldn't create writer: {}", err.to_string()))
+    )?;
     let schema_flight_data = SchemaAsIpc::new(reader.schema().as_ref(), &options).into();
     send_response(&tx, Ok(schema_flight_data)).await?;