You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/04/18 10:25:17 UTC

[GitHub] [arrow] alamb commented on a change in pull request #10066: ARROW-12306: [Rust][datafusion] Read CSV format text from stdin or memory

alamb commented on a change in pull request #10066:
URL: https://github.com/apache/arrow/pull/10066#discussion_r615375862



##########
File path: rust/datafusion/src/physical_plan/csv.rs
##########
@@ -272,25 +371,46 @@ impl ExecutionPlan for CsvExec {
     }
 
     async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
-        Ok(Box::pin(CsvStream::try_new(
-            &self.filenames[partition],
-            self.schema.clone(),
-            self.has_header,
-            self.delimiter,
-            &self.projection,
-            self.batch_size,
-            self.limit,
-        )?))
+        match &self.source {
+            SourceReader::PartitionedFiles { filenames, .. } => {
+                Ok(Box::pin(CsvStream::try_new(
+                    &filenames[partition],
+                    self.schema.clone(),
+                    self.has_header,
+                    self.delimiter,
+                    &self.projection,
+                    self.batch_size,
+                    self.limit,
+                )?))
+            }
+            SourceReader::Reader(rdr) => {
+                if let Some(rdr) = rdr.lock().unwrap().take() {

Review comment:
       I recommend checking here that `partition==0` and returning an internal error otherwise:
   
   Something like
   
   ```
   if partition != 0 { 
      Err(DataFusionError::Internal("Only partition 0 is valid when CSV comes from a reader"))
   }
                               .
   ```

##########
File path: rust/datafusion/src/physical_plan/csv.rs
##########
@@ -272,25 +371,46 @@ impl ExecutionPlan for CsvExec {
     }
 
     async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
-        Ok(Box::pin(CsvStream::try_new(
-            &self.filenames[partition],
-            self.schema.clone(),
-            self.has_header,
-            self.delimiter,
-            &self.projection,
-            self.batch_size,
-            self.limit,
-        )?))
+        match &self.source {
+            SourceReader::PartitionedFiles { filenames, .. } => {
+                Ok(Box::pin(CsvStream::try_new(
+                    &filenames[partition],
+                    self.schema.clone(),
+                    self.has_header,
+                    self.delimiter,
+                    &self.projection,
+                    self.batch_size,
+                    self.limit,
+                )?))
+            }
+            SourceReader::Reader(rdr) => {
+                if let Some(rdr) = rdr.lock().unwrap().take() {
+                    Ok(Box::pin(CsvStream::try_new_from_reader(
+                        rdr,
+                        self.schema.clone(),
+                        self.has_header,
+                        self.delimiter,
+                        &self.projection,
+                        self.batch_size,
+                        self.limit,
+                    )?))
+                } else {
+                    Err(DataFusionError::Execution(
+                        "You can only read once if the data comes from a reader"

Review comment:
       ```suggestion
                           "Error reading CSV: Data can only be read a single time when the source is a reader"
   ```

##########
File path: rust/datafusion/src/datasource/csv.rs
##########
@@ -142,3 +143,142 @@ impl TableProvider for CsvFile {
         self.statistics.clone()
     }
 }
+/// Loads CSV data from a reader
+pub struct CsvRead<R: Read> {
+    reader: Mutex<Option<R>>,
+    schema: SchemaRef,
+    has_header: bool,
+    delimiter: u8,
+    statistics: Statistics,
+}
+
+impl<R: Read> CsvRead<R> {
+    /// Attempt to initialize a `CsvRead` from a reader. The schema MUST be provided in options.
+    pub fn try_new(reader: R, options: CsvReadOptions) -> Result<Self> {
+        let schema = Arc::new(match options.schema {
+            Some(s) => s.clone(),
+            None => {
+                return Err(DataFusionError::Execution(
+                    "Schema must be provided".to_string(),

Review comment:
       ```suggestion
                       "Schema must be provided to CsvRead".to_string(),
   ```

##########
File path: rust/datafusion/src/physical_plan/csv.rs
##########
@@ -106,13 +107,71 @@ impl<'a> CsvReadOptions<'a> {
     }
 }
 
+///  SourceReader represents where the data comes from.
+enum SourceReader {
+    /// The data comes from partitioned files
+    PartitionedFiles {
+        /// Path to directory containing partitioned files with the same schema
+        path: String,
+        /// The individual files under path
+        filenames: Vec<String>,
+    },
+
+    /// The data comes from anything impl Read trait
+    Reader(Mutex<Option<Box<dyn Read + Send + Sync + 'static>>>),
+}
+
+impl std::fmt::Debug for SourceReader {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            SourceReader::PartitionedFiles { path, filenames } => f
+                .debug_struct("PartitionedFiles")
+                .field("path", path)
+                .field("filenames", filenames)
+                .finish()?,
+            SourceReader::Reader(_) => f.write_str("Reader")?,
+        };
+        Ok(())
+    }
+}
+
+impl Clone for SourceReader {
+    fn clone(&self) -> Self {
+        match self {
+            SourceReader::PartitionedFiles { path, filenames } => {
+                Self::PartitionedFiles {
+                    path: path.clone(),
+                    filenames: filenames.clone(),
+                }
+            }
+            SourceReader::Reader(_) => Self::Reader(Mutex::new(None)),

Review comment:
       This might cause some non trivial confusion -- namey that `Clone'`ing a SourceReader will *not* clone the underlying reader. Thus any `Clone`'d `CsvExec` won't be usable at at all (it will generate an error)
   
    I wonder if `CsvExec` really needs to be `Clone` at all -- like can we just remove the `Clone` derivation:
   
   ```
   #[derive(Debug)]
   pub struct CsvExec {
   ```

##########
File path: rust/datafusion/src/datasource/csv.rs
##########
@@ -142,3 +143,142 @@ impl TableProvider for CsvFile {
         self.statistics.clone()
     }
 }
+/// Loads CSV data from a reader
+pub struct CsvRead<R: Read> {

Review comment:
       I wonder if there is any way to reduce the duplication between `CsvRead<R>` and `CsvFile` -- as you have done for `CsvExec`. That way we can reuse the same tests for things like schema matching.
   
   Since the `Reader` gets `Box`ed anyways for execution, I don't think there is any performance difference using something that is generic over `R` vs a `Box<R>`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org