You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2023/04/07 16:12:22 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #5860: when inferring the schema of compressed CSV, decompress before newline-delimited chunking

alamb commented on code in PR #5860:
URL: https://github.com/apache/arrow-datafusion/pull/5860#discussion_r1160795323


##########
testing:
##########


Review Comment:
   I verified this is a commit on master of testing:  👍  https://github.com/apache/arrow-testing/commit/47f7b56b25683202c1fd957668e13f2abafc0f12



##########
datafusion/core/src/datasource/file_format/csv.rs:
##########
@@ -65,6 +65,59 @@ impl Default for CsvFormat {
 }
 
 impl CsvFormat {
+    /// Return a newline delimited stream from the specified file on
+    /// Stream, decompressing if necessary
+    /// Each returned `Bytes` has a whole number of newline delimited rows
+    async fn read_to_delimited_chunks(

Review Comment:
   I feel like there must be a simpler way to express this code, but this does appear to work.
   
   I wonder if we could use `BoxStream` rather than `impl Stream....` 🤔 



##########
datafusion/core/src/datasource/file_format/file_type.rs:
##########
@@ -111,6 +115,58 @@ impl FileCompressionType {
         self.variant.is_compressed()
     }
 
+    /// Given a `Stream`, create a `Stream` which data are decompressed with `FileCompressionType`.
+    pub fn convert_to_compress_stream<
+        T: Stream<Item = Result<Bytes>> + Unpin + Send + 'static,
+    >(
+        &self,
+        s: T,
+    ) -> Result<Box<dyn Stream<Item = Result<Bytes>> + Send + Unpin>> {
+        #[cfg(feature = "compression")]
+        let err_converter = |e: std::io::Error| match e
+            .get_ref()
+            .and_then(|e| e.downcast_ref::<DataFusionError>())
+        {
+            Some(_) => {
+                *(e.into_inner()
+                    .unwrap()
+                    .downcast::<DataFusionError>()
+                    .unwrap())
+            }
+            None => DataFusionError::from(e),
+        };
+
+        Ok(match self.variant {
+            #[cfg(feature = "compression")]
+            GZIP => Box::new(
+                ReaderStream::new(AsyncGzEncoder::new(StreamReader::new(s)))
+                    .map_err(err_converter),

Review Comment:
   I am not sure why we need the `err_converer` -- this worked for me locally:
   
   ```suggestion
               GZIP => Box::new(
                   ReaderStream::new(AsyncGzEncoder::new(StreamReader::new(s)))
                       .map_err(DataFusionError::from),
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org