You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "metesynnada (via GitHub)" <gi...@apache.org> on 2023/02/08 07:59:58 UTC

[GitHub] [arrow-rs] metesynnada opened a new issue, #3674: Deadlock in arrow-csv reader for FIFO file reading

metesynnada opened a new issue, #3674:
URL: https://github.com/apache/arrow-rs/issues/3674

   **Describe the bug**
   
   The bug in the `arrow-csv` reader of the arrow-rs library affects the reading of CSV files in a FIFO environment where an EOF is not received until the file is closed. The bug occurs because the code that reads the buffer is designed to wait for additional bytes, even if the batch size is set to the correct size.
   
   For example, if the batch size is set to 64 and 64 rows are provided to the reader, the decoder will have enough data to create a **`RecordBatch`**. However, when the loop iterates for the second time, the code waits for additional bytes at **`self.reader.fill_buf()?`**, causing a deadlock. This prevents tests for streaming purposes from working, even though this was supported before the PR https://github.com/apache/arrow-rs/pull/3604.
   
   ```rust
   impl<R: BufRead> BufReader<R> {
       fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
           loop {
               let buf = self.reader.fill_buf()?;
               let decoded = self.decoder.decode(buf)?;
               if decoded == 0 {
                   break;
               }
               self.reader.consume(decoded);
           }
   
           self.decoder.flush()
       }
   }
   ```
   
   **To Reproduce**
   
   - Add `nix = "0.26.2"` into `dev-dependencies`
   - Copy paste the code and run the code within `arrow-csv/src/reader/records.rs` or any convenient place in arrow-csv.
   
   ```rust
   #[cfg(test)]
   mod pr {
       use crate::ReaderBuilder;
       use arrow_array::RecordBatch;
       use arrow_schema::{ArrowError, DataType, Field, Schema, SchemaRef};
       use nix::sys::stat;
       use nix::unistd;
       use std::fs::{File, OpenOptions};
       use std::io::BufRead;
       use std::io::BufReader as StdBufReader;
       use std::io::Write;
       use std::path::Path;
       use std::path::PathBuf;
       use std::sync::{Arc, Mutex};
       use std::thread;
       use std::time::{Duration, Instant};
       use tempfile::TempDir;
   
       fn create_fifo_file(
           tmp_dir: &TempDir,
           file_name: &str,
       ) -> Result<PathBuf, ArrowError> {
           let file_path = tmp_dir.path().join(file_name);
           if let Err(e) = unistd::mkfifo(&file_path, stat::Mode::S_IRWXU) {
               Err(ArrowError::CsvError(e.to_string()))
           } else {
               Ok(file_path)
           }
       }
   
       fn write_to_fifo(mut file: &File, line: &str) -> Result<usize, ArrowError> {
           file.write(line.as_bytes()).or_else(|e| {
               // Broken pipe error
               if e.raw_os_error().unwrap() == 32 {
                   thread::sleep(Duration::from_millis(100));
                   return Ok(0);
               }
               Err(ArrowError::CsvError(e.to_string()))
           })
       }
   
       fn read_from_csv<R: BufRead>(
           mut reader: R,
           schema: SchemaRef,
           batch_size: usize,
       ) -> Result<impl Iterator<Item = Result<RecordBatch, ArrowError>>, ArrowError> {
           let mut decoder = ReaderBuilder::new()
               .with_schema(schema)
               .with_batch_size(batch_size)
               .build_decoder();
           let mut next = move || {
               loop {
                   //Deadlock happens here since we are waiting for bytes to produce the first batch.
                   let buf = reader.fill_buf()?;
                   let decoded = decoder.decode(buf)?;
                   if decoded == 0 {
                       break;
                   }
                   reader.consume(decoded);
               }
               decoder.flush()
           };
           Ok(std::iter::from_fn(move || next().transpose()))
       }
   
       const TEST_BATCH_SIZE: usize = 50;
   
       #[test]
       fn csv_reader_env() -> Result<(), ArrowError> {
           // We use a lock to wait for a batch creation
           let waiting = Arc::new(Mutex::new(true));
           let waiting_thread = waiting.clone();
           let tmp_dir = TempDir::new()?;
           let fifo_path = create_fifo_file(&tmp_dir, "fifo_file.csv")?;
           let fifo_path_thread = fifo_path.clone();
           let joinable_iterator = (0..TEST_BATCH_SIZE).map(|_| "a".to_string());
           let fifo_writer = thread::spawn(move || {
               let first_file = OpenOptions::new()
                   .write(true)
                   .open(fifo_path_thread)
                   .unwrap();
               for (cnt, string_col) in joinable_iterator.enumerate() {
                   let line = format!("{string_col},{cnt}\n").to_owned();
                   write_to_fifo(&first_file, &line).unwrap();
               }
               // This part prevents that we get an EOF in FIFO.
               while *waiting_thread.lock().unwrap() {
                   thread::sleep(Duration::from_millis(200));
               }
           });
           let schema = Arc::new(Schema::new(vec![
               Field::new("a1", DataType::Utf8, false),
               Field::new("a2", DataType::UInt32, false),
           ]));
   
           let file = File::open(fifo_path).unwrap();
           let reader = StdBufReader::new(file);
   
           let mut read = read_from_csv(reader, schema.clone(), TEST_BATCH_SIZE)?;
   
           while let Some(Ok(batch)) = read.next() {
               // If we get a batch, the lock will be false and the experiment can finish.
               *waiting.lock().unwrap() = false;
               println!("We get a record batch");
           }
           fifo_writer.join().unwrap();
           Ok(())
       }
   }
   ```
   
   **Expected behavior**
   
   - For reproduced code: Produce the `RecordBatch` and finish.
   - For the algorithm, it should support the producing RecordBatch immediately after the necesseray bytes received.
   
   **Additional context**
   NA


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb commented on issue #3674: Arrow-csv reader cannot produce RecordBatch even if the bytes are necessary

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on issue #3674:
URL: https://github.com/apache/arrow-rs/issues/3674#issuecomment-1423058446

   I wonder if the latency can be reduces by calling `flush()` on the underlying Decoder when the driver program knows (somehow) that it has received the end of a record and is not in the middle of decoding. 
   
   https://docs.rs/arrow-csv/32.0.0/arrow_csv/reader/struct.Decoder.html#method.flush
   
   Perhaps something like
   
   ```rust
           let mut next = move || {
               loop {
                   // force flush to produce RecordBatches if we have fed the entire input
                   // that is available and are sure the data has only complete rows
                   let decoded = if check_have_read_to_boundary() {
                     decoder.flush()
                     decoder.decode(buf)
                   } else {
                     let buf = reader.fill_buf()?;
                     decoder.decode(buf); 
                   }?;
                   if decoded == 0 {
                       break;
                   }
                   reader.consume(decoded);
               }
               decoder.flush()
           };
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on issue #3674: Deadlock in arrow-csv reader for FIFO file reading

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on issue #3674:
URL: https://github.com/apache/arrow-rs/issues/3674#issuecomment-1422196923

   > the code waits for additional bytes atĀ self.reader.fill_buf()?, causing a deadlock
   
   I'm confused by this, what happens if there are less than batch size available or more? This feels like it just slightly changes the buffering behaviour, which isn't really guaranteed. Not saying we can't change this, but I'd like to understand the issue better. It almost feels like the deadlock is the fault of an overly restrictive test?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] metesynnada commented on issue #3674: Arrow-csv reader cannot produce RecordBatch even if the bytes are necessary

Posted by "metesynnada (via GitHub)" <gi...@apache.org>.
metesynnada commented on issue #3674:
URL: https://github.com/apache/arrow-rs/issues/3674#issuecomment-1423178479

   @tustvold thank you for your effort. https://github.com/apache/arrow-rs/pull/3677 it meets our requirements.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] ozankabak commented on issue #3674: Arrow-csv reader cannot produce RecordBatch even if the bytes are necessary

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on issue #3674:
URL: https://github.com/apache/arrow-rs/issues/3674#issuecomment-1422904786

   Deadlock doesn't seem to be the right word. However, the current behavior can result in unnecessarily long latencies if data comes in chunks aligned with batch boundaries. This is an edge case, but when it happens, it becomes a problem in streaming use cases. Thankfully, it is easily avoidable.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] metesynnada commented on issue #3674: Deadlock in arrow-csv reader for FIFO file reading

Posted by "metesynnada (via GitHub)" <gi...@apache.org>.
metesynnada commented on issue #3674:
URL: https://github.com/apache/arrow-rs/issues/3674#issuecomment-1422339772

   I intentionally make a test like this, to make it more clear. 
   
   I think waiting for the next byte to produce a RecordBatch even if we have the necessary bytes is against the streaming semantics. 
   
   We use this test pattern for testing stream pipelines. We test "If I gave a batch, can I get a batch as output?", but testing with actual files becomes "If I gave a batch_size + 1 row, can I get a batch as output?".


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on issue #3674: Arrow-csv reader cannot produce RecordBatch even if the bytes are necessary

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on issue #3674:
URL: https://github.com/apache/arrow-rs/issues/3674#issuecomment-1423086608

   Would you be able to test out https://github.com/apache/arrow-rs/pull/3677 and see if it meets your requirements, if so I can polish it up with some tests, etc...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold closed issue #3674: Arrow-csv reader cannot produce RecordBatch even if the bytes are necessary

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold closed issue #3674: Arrow-csv reader cannot produce RecordBatch even if the bytes are necessary
URL: https://github.com/apache/arrow-rs/issues/3674


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org