You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/07/31 18:41:57 UTC

[GitHub] [arrow-rs] HagaiHargil opened a new issue #646: `ipc::StreamReader` doesn't see new data after stream was emptied

HagaiHargil opened a new issue #646:
URL: https://github.com/apache/arrow-rs/issues/646


   **Describe the bug**
   When using `ipc::StreamReader`, if by any chance the stream buffer empties and then re-fills, the current `StreamReader` will not detect the new data that was added.
   
   **To Reproduce**
   ```rust
   fn test_intermittent_writing() {
       // Generate mock data
       let schema = Schema::new(vec![
           Field::new("id", DataType::Int32, false)
       ]);
       let second_schema = schema.clone();
   
       std::thread::spawn(|| start_writer_and_write(String::from("test_test.d"), second_schema));
       // Let the writer write something
       std::thread::sleep(std::time::Duration::from_secs(1));
       let mut stream = StreamReader::try_new(File::open("test_test.d").unwrap()).unwrap();
       let mut idx = 0;
       // Start looping and reading the stream. The first loop detects and prints
       // out the array. No other loops detect anything, even though after
       // several seconds new data appears in the stream.
       while idx < 12 {
           match stream.next() {
               Some(x) => println!("{:?}", x),
               None => println!("None"),
           }
           idx += 1;
           std::thread::sleep(std::time::Duration::from_secs(1));
       }
       println!("Stopped loop, checking whether the data is there");
       let mut stream = StreamReader::try_new(File::open("test_test.d").unwrap()).unwrap();
       println!("{:?}", stream.next().unwrap());  // prints the Batch
       println!("{:?}", stream.next().unwrap());  // also prints the second Batch
   }
   
   fn start_writer_and_write(stream_name: String, schema: Schema) {
       let stream = File::create(stream_name).unwrap();
       let mut stream = StreamWriter::try_new(stream, &schema).unwrap();
       let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
       let batch = RecordBatch::try_new(
           Arc::new(schema.clone()),
           vec![Arc::new(id_array)]
       ).unwrap();
       stream.write(&batch).unwrap();
       std::thread::sleep(std::time::Duration::from_secs(10));
       stream.write(&batch).unwrap();
       println!("I wrote it, goodbye");
   }
   ```
   
   **Expected behavior**
   From my understanding, as long as the `StreamReader` is open new data should be detected even if the stream was empty for some time.
   
   **Additional context**
   I noticed this behavior when writing to a stream from `pyarrow`, but the code above shows that a Rust-to-Rust version of this issue also exists.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] HagaiHargil commented on issue #646: `ipc::StreamReader` doesn't see new data after stream was emptied

Posted by GitBox <gi...@apache.org>.
HagaiHargil commented on issue #646:
URL: https://github.com/apache/arrow-rs/issues/646#issuecomment-891605802


   I have exactly zero experience with the internals of this library, but it seems to me that `total_blocks`, the variable responsible for keeping tabs on the number of existing data blocks in the stream, is never updated past its initialization [here](https://github.com/apache/arrow-rs/blob/6bf1988852f87da21a163203eec4c83a7b692901/arrow/src/ipc/reader.rs#L587).
   
   Then, when I call `.next()` on the iterator I arrive [here](https://github.com/apache/arrow-rs/blob/6bf1988852f87da21a163203eec4c83a7b692901/arrow/src/ipc/reader.rs#L738), which checks for the number of remaining blocks. Since the number of data blocks is only determined at initialization time, there comes a moment when the stream exhausts the initial blocks it had and then it will always return `None`.
   
   Please forgive me if this quick dive was unneeded and/or incorrect, I'm only hoping for a quick-ish fix of this issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org