You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/06/23 14:57:06 UTC
[arrow-datafusion] branch main updated: Move some code before tests module (#6749)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 80d47cce60 Move some code before tests module (#6749)
80d47cce60 is described below
commit 80d47cce606ab7f22a28d1954aa38a0bda391eda
Author: Armin Primadi <ap...@gmail.com>
AuthorDate: Fri Jun 23 21:56:58 2023 +0700
Move some code before tests module (#6749)
---
datafusion/core/src/physical_plan/common.rs | 160 ++++++++++++++--------------
1 file changed, 80 insertions(+), 80 deletions(-)
diff --git a/datafusion/core/src/physical_plan/common.rs b/datafusion/core/src/physical_plan/common.rs
index 982bb4f2e6..19c05eaada 100644
--- a/datafusion/core/src/physical_plan/common.rs
+++ b/datafusion/core/src/physical_plan/common.rs
@@ -284,6 +284,86 @@ fn get_meet_of_orderings_helper(
}
}
+/// Write in Arrow IPC format.
+pub struct IPCWriter {
+ /// path
+ pub path: PathBuf,
+ /// inner writer
+ pub writer: FileWriter<File>,
+ /// batches written
+ pub num_batches: u64,
+ /// rows written
+ pub num_rows: u64,
+ /// bytes written
+ pub num_bytes: u64,
+}
+
+impl IPCWriter {
+ /// Create new writer
+ pub fn new(path: &Path, schema: &Schema) -> Result<Self> {
+ let file = File::create(path).map_err(|e| {
+ DataFusionError::Execution(format!(
+ "Failed to create partition file at {path:?}: {e:?}"
+ ))
+ })?;
+ Ok(Self {
+ num_batches: 0,
+ num_rows: 0,
+ num_bytes: 0,
+ path: path.into(),
+ writer: FileWriter::try_new(file, schema)?,
+ })
+ }
+
+ /// Create new writer with IPC write options
+ pub fn new_with_options(
+ path: &Path,
+ schema: &Schema,
+ write_options: IpcWriteOptions,
+ ) -> Result<Self> {
+ let file = File::create(path).map_err(|e| {
+ DataFusionError::Execution(format!(
+ "Failed to create partition file at {path:?}: {e:?}"
+ ))
+ })?;
+ Ok(Self {
+ num_batches: 0,
+ num_rows: 0,
+ num_bytes: 0,
+ path: path.into(),
+ writer: FileWriter::try_new_with_options(file, schema, write_options)?,
+ })
+ }
+ /// Write one single batch
+ pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
+ self.writer.write(batch)?;
+ self.num_batches += 1;
+ self.num_rows += batch.num_rows() as u64;
+ let num_bytes: usize = batch_byte_size(batch);
+ self.num_bytes += num_bytes as u64;
+ Ok(())
+ }
+
+ /// Finish the writer
+ pub fn finish(&mut self) -> Result<()> {
+ self.writer.finish().map_err(Into::into)
+ }
+
+ /// Path write to
+ pub fn path(&self) -> &Path {
+ &self.path
+ }
+}
+
+/// Returns the total number of bytes of memory occupied physically by this batch.
+pub fn batch_byte_size(batch: &RecordBatch) -> usize {
+ batch
+ .columns()
+ .iter()
+ .map(|array| array.get_array_memory_size())
+ .sum()
+}
+
#[cfg(test)]
mod tests {
use std::ops::Not;
@@ -643,83 +723,3 @@ mod tests {
Ok(())
}
}
-
-/// Write in Arrow IPC format.
-pub struct IPCWriter {
- /// path
- pub path: PathBuf,
- /// inner writer
- pub writer: FileWriter<File>,
- /// batches written
- pub num_batches: u64,
- /// rows written
- pub num_rows: u64,
- /// bytes written
- pub num_bytes: u64,
-}
-
-impl IPCWriter {
- /// Create new writer
- pub fn new(path: &Path, schema: &Schema) -> Result<Self> {
- let file = File::create(path).map_err(|e| {
- DataFusionError::Execution(format!(
- "Failed to create partition file at {path:?}: {e:?}"
- ))
- })?;
- Ok(Self {
- num_batches: 0,
- num_rows: 0,
- num_bytes: 0,
- path: path.into(),
- writer: FileWriter::try_new(file, schema)?,
- })
- }
-
- /// Create new writer with IPC write options
- pub fn new_with_options(
- path: &Path,
- schema: &Schema,
- write_options: IpcWriteOptions,
- ) -> Result<Self> {
- let file = File::create(path).map_err(|e| {
- DataFusionError::Execution(format!(
- "Failed to create partition file at {path:?}: {e:?}"
- ))
- })?;
- Ok(Self {
- num_batches: 0,
- num_rows: 0,
- num_bytes: 0,
- path: path.into(),
- writer: FileWriter::try_new_with_options(file, schema, write_options)?,
- })
- }
- /// Write one single batch
- pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
- self.writer.write(batch)?;
- self.num_batches += 1;
- self.num_rows += batch.num_rows() as u64;
- let num_bytes: usize = batch_byte_size(batch);
- self.num_bytes += num_bytes as u64;
- Ok(())
- }
-
- /// Finish the writer
- pub fn finish(&mut self) -> Result<()> {
- self.writer.finish().map_err(Into::into)
- }
-
- /// Path write to
- pub fn path(&self) -> &Path {
- &self.path
- }
-}
-
-/// Returns the total number of bytes of memory occupied physically by this batch.
-pub fn batch_byte_size(batch: &RecordBatch) -> usize {
- batch
- .columns()
- .iter()
- .map(|array| array.get_array_memory_size())
- .sum()
-}