You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "mustafasrepo (via GitHub)" <gi...@apache.org> on 2023/06/05 10:31:20 UTC

[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #6526: Add support for appending data to external tables - CSV

mustafasrepo commented on code in PR #6526:
URL: https://github.com/apache/arrow-datafusion/pull/6526#discussion_r1217871090


##########
datafusion/core/src/datasource/file_format/csv.rs:
##########
@@ -324,6 +349,188 @@ fn build_schema_helper(names: Vec<String>, types: &[HashSet<DataType>]) -> Schem
     Schema::new(fields)
 }
 
+impl Default for CsvSerializer {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+/// Define a struct for serializing CSV records to a stream
+pub struct CsvSerializer {
+    // CSV writer builder
+    builder: WriterBuilder,
+    // Inner buffer for avoiding reallocation
+    buffer: Vec<u8>,
+    // Flag to indicate whether there will be a header
+    header: bool,
+}
+
+impl CsvSerializer {
+    /// Constructor for the CsvSerializer object
+    pub fn new() -> Self {
+        Self {
+            builder: WriterBuilder::new(),
+            header: true,
+            buffer: Vec::with_capacity(4096),
+        }
+    }
+
+    /// Method for setting the CSV writer builder
+    pub fn with_builder(mut self, builder: WriterBuilder) -> Self {
+        self.builder = builder;
+        self
+    }
+
+    /// Method for setting the CSV writer header status
+    pub fn with_header(mut self, header: bool) -> Self {
+        self.header = header;
+        self
+    }
+}
+
+#[async_trait]
+impl BatchSerializer for CsvSerializer {
+    async fn serialize(&mut self, batch: RecordBatch) -> Result<Bytes> {
+        let builder = self.builder.clone();
+        let mut writer = builder.has_headers(self.header).build(&mut self.buffer);
+        writer.write(&batch)?;
+        drop(writer);
+        self.header = false;
+        Ok(Bytes::from(self.buffer.drain(..).collect::<Vec<u8>>()))
+    }
+}
+
+/// This macro tries to abort all writers before returning a possible error.
+macro_rules! handle_err_or_continue {
+    ($result:expr, $writers:expr) => {
+        match $result {
+            Ok(value) => Ok(value),
+            Err(e) => {
+                // Abort all writers before returning the error:
+                for writer in $writers {
+                    let abort_future = writer.abort_writer()?;

Review Comment:
   I converted macro, to function. Also during abortion we receive an error, that case doesn't return early. We try to abort all writers



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org