You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/12/18 02:25:51 UTC

[GitHub] [arrow-rs] corneliusroemer opened a new issue #1059: Allow csv reader builder to do schema inference even when reading csv from stdin

corneliusroemer opened a new issue #1059:
URL: https://github.com/apache/arrow-rs/issues/1059


   **Is your feature request related to a problem or challenge? Please describe what you are trying to do.**
   I would like to stream 100+GB of SARS-CoV-2 sequence data into .parquet with zstd compression (works really well on these sequences)
   
   I would like to do this without having to hard-code the schema, for example through a CLI like https://github.com/domoritz/csv2parquet/blob/main/src/main.rs
   
   However, that CLI requires me to provide a `file` and does not allow me to read from `stdin`. Why? Because the reader builder requires input to be seekable which stdin is not. I
   
   **Describe the solution you'd like**
   It'd be good if the reader builder could be more flexible and infer schema from the first say 100 lines that can still be kept in memory.
   
   **Describe alternatives you've considered**
   I could add a schema option to the CLI tool, but that's annoying and unnecessary because I just want a very simple schema: str/str. I could also do schema inference myself but again this is quite difficult and would be good to be provided from arrow-rs directly.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] jorgecarleitao edited a comment on issue #1059: Allow csv reader builder to do schema inference even when reading csv from stdin

Posted by GitBox <gi...@apache.org>.
jorgecarleitao edited a comment on issue #1059:
URL: https://github.com/apache/arrow-rs/issues/1059#issuecomment-997172436


   Sorry that I did not express myself very well. I meant something like this:
   
   ```rust
   use std::io::Read;
   
   struct ReaderA<R: Read> {
       pub reader: R,
       position: Option<u64>,
       pub buffer: Vec<u8>,
   }
   
   impl<R: Read> Read for ReaderA<R> {
       #[inline]
       fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
           let length = buf.len();
           if let Some(position) = &mut self.position {
               let start = *position as usize;
               // we have seeked to somewhere in our buffer. Read from buffer first
               if start + length < self.buffer.len() {
                   // our buffer fills `buf` => memcopy all
                   buf.copy_from_slice(&self.buffer[start..start + length]);
                   *position += length;
                   Ok(length)
               } else if start <= self.buffer.len() {
                   // edge case where the read covers `self.buffer` and `self.reader`:
                   // read from both accordingly
                   let buffer_remaining = self.buffer.len() - start;
                   (&mut buf[..buffer_remaining])
                       .copy_from_slice(&self.buffer[start..start + buffer_remaining]);
   
                   // read the remaining from the reader
                   let read = self.reader.read(&mut buf[buffer_remaining..])?;
                   *position += (buffer_remaining + read) as u64;
   
                   if *position > self.buffer.len() {
                       // release memory
                       std::mem::swap(&mut self.buffer, &mut vec![]);
                   }
   
                   Ok(buffer_remaining + read)
               } else {
                   // we are past `self.buffer`,
                   self.reader.read(buf)
               }
           } else {
               // no seek was done so far, read to the buffer
               let start = self.buffer.len();
               self.buffer.extend(std::iter::repeat(0).take(length));
               let read = self.reader.read(&mut self.buffer[start..start + length])?;
               (&mut buf).copy_from_slice(&self.buffer[start..start + length]);
               Ok(read)
           }
       }
   }
   
   impl<R: Read> std::io::Seek for ReaderA<R> {
       #[inline]
       fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
           match pos {
               std::io::SeekFrom::Start(position) => {
                   self.position = position;
                   Ok(self.position)
               }
               std::io::SeekFrom::End(_) => panic!("This reader does not support seeking from end"),
               std::io::SeekFrom::Current(position) => {
                   self.position += position;
                   Ok(self.position)
               }
           }
       }
   }
   
   fn main() {}
   ```
   
   I.e. when we do not have seek and need to use data from a `Read` more than once, we can store it in a buffer and use it whenever it is requested (via `seek` to the back + `read`). This idiom uses as little memory as we need to store in a non-seek environment, namely the data that needs to be used twice (once for inference, one for reading).
   
   I hope this is a bit more understandable.
   
   Note that this is a struct to be used specifically for the CSV reader - it is quite useless otherwise since it incurs an extra memcopy and memory usage.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] jorgecarleitao edited a comment on issue #1059: Allow csv reader builder to do schema inference even when reading csv from stdin

Posted by GitBox <gi...@apache.org>.
jorgecarleitao edited a comment on issue #1059:
URL: https://github.com/apache/arrow-rs/issues/1059#issuecomment-997172436


   Sorry that I did not express myself very well. I meant something like this:
   
   ```rust
   use std::io::Read;
   
   struct ReaderA<R: Read> {
       pub reader: R,
       position: Option<u64>,
       pub buffer: Vec<u8>,
   }
   
   impl<R: Read> Read for ReaderA<R> {
       #[inline]
       fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
           let length = buf.len();
           if let Some(position) = &mut self.position {
               let start = *position as usize;
               // we have seeked to somewhere in our buffer. Read from buffer first
               if start + length < self.buffer.len() {
                   // our buffer fills `buf` => memcopy all
                   buf.copy_from_slice(&self.buffer[start..start + length]);
                   *position += length;
                   Ok(length)
               } else if start <= self.buffer.len() {
                   // edge case where the read covers `self.buffer` and `self.reader`:
                   // read from both accordingly
                   let buffer_remaining = self.buffer.len() - start;
                   (&mut buf[..buffer_remaining])
                       .copy_from_slice(&self.buffer[start..start + buffer_remaining]);
   
                   // read the remaining from the reader
                   let read = self.reader.read(&mut buf[buffer_remaining..])?;
                   *position += (buffer_remaining + read) as u64;
   
                   if *position > self.buffer.len() {
                       // release memory
                       std::mem::swap(&mut self.buffer, &mut vec![]);
                   }
   
                   Ok(buffer_remaining + read)
               } else {
                   // we are past `self.buffer`,
                   self.reader.read(buf)
               }
           } else {
               // no seek was done so far, read to the buffer
               let start = self.buffer.len();
               self.buffer.extend(std::iter::repeat(0).take(length));
               let read = self.reader.read(&mut self.buffer[start..start + length])?;
               (&mut buf).copy_from_slice(&self.buffer[start..start + length]);
               Ok(read)
           }
       }
   }
   
   impl<R: Read> std::io::Seek for ReaderA<R> {
       #[inline]
       fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
           match pos {
               std::io::SeekFrom::Start(position) => {
                   self.position = position;
                   Ok(self.position)
               }
               std::io::SeekFrom::End(_) => panic!("This reader does not support seeking from end"),
               std::io::SeekFrom::Current(position) => {
                   self.position += position;
                   Ok(self.position)
               }
           }
       }
   }
   
   fn main() {}
   ```
   
   I.e. when we do not have seek and need to use data from a `Read` more than once, we can store it in a buffer and use it whenever it is requested (via `seek` to the back + `read`). This idiom uses as little memory as we need to store in a non-seek environment, namely the data that needs to be used twice (once for inference, one for reading).
   
   I hope this is a bit more understandable.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] domoritz commented on issue #1059: Allow csv reader builder to do schema inference even when reading csv from stdin

Posted by GitBox <gi...@apache.org>.
domoritz commented on issue #1059:
URL: https://github.com/apache/arrow-rs/issues/1059#issuecomment-997143243


   Is this related to https://github.com/apache/arrow-rs/issues/189?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] corneliusroemer commented on issue #1059: Allow csv reader builder to do schema inference even when reading csv from stdin

Posted by GitBox <gi...@apache.org>.
corneliusroemer commented on issue #1059:
URL: https://github.com/apache/arrow-rs/issues/1059#issuecomment-997530963


   Thanks @jorgecarleitao. I'm a beginner at Rust so please bear with me. How do you decide how big the buffer will get? Will all of the input be read into this buffer?
   
   Does your code allow to release the buffer once schema inference has been completed? That's how it would be best: read into memory until schema inference is done, then release. Schema inference options would determine how much is read into memory. This could be set by the user to say first 100 lines to keep buffer memory usage down.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] domoritz commented on issue #1059: Allow csv reader builder to do schema inference even when reading csv from stdin

Posted by GitBox <gi...@apache.org>.
domoritz commented on issue #1059:
URL: https://github.com/apache/arrow-rs/issues/1059#issuecomment-997204135


   Could we use the buffered reader from the standard library if it had a way to set the buffer size? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] jorgecarleitao commented on issue #1059: Allow csv reader builder to do schema inference even when reading csv from stdin

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on issue #1059:
URL: https://github.com/apache/arrow-rs/issues/1059#issuecomment-997172436


   Sorry that I did not express myself very well. I meant something like this:
   
   ```rust
   use std::io::Read;
   
   struct ReaderA<R: Read> {
       pub reader: R,
       position: Option<u64>,
       pub buffer: Vec<u8>,
   }
   
   impl<R: Read> Read for ReaderA<R> {
       #[inline]
       fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
           let length = buf.len();
           if let Some(position) = &mut self.position {
               let start = *position as usize;
               // we have seeked to somewhere in our buffer. Read from buffer first
               if start + length < self.buffer.len() {
                   // our buffer fills `buf` => memcopy all
                   buf.copy_from_slice(&self.buffer[start..start + length]);
                   *position += length;
                   Ok(length)
               } else if start <= self.buffer.len() {
                   // edge case where the read covers `self.buffer` and `self.reader`:
                   // read from both accordingly
                   let buffer_remaining = self.buffer.len() - start;
                   (&mut buf[..buffer_remaining])
                       .copy_from_slice(&self.buffer[start..start + buffer_remaining]);
   
                   // read the remaining from the reader
                   let read = self.reader.read(&mut buf[buffer_remaining..])?;
                   *position += (buffer_remaining + read) as u64;
   
                   if *position > self.buffer.len() {
                       // release memory
                       std::mem::swap(&mut self.buffer, &mut vec![]);
                   }
   
                   Ok(buffer_remaining + read)
               } else {
                   // we are past `self.buffer`,
                   self.reader.read(buf)
               }
           } else {
               // no seek was done so far, read to the buffer
               let start = self.buffer.len();
               self.buffer.extend(std::iter::repeat(0).take(length));
               let read = self.reader.read(&mut self.buffer[start..start + length])?;
               (&mut buf).copy_from_slice(&self.buffer[start..start + length]);
               Ok(read)
           }
       }
   }
   
   impl<R: Read> std::io::Seek for ReaderA<R> {
       #[inline]
       fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
           match pos {
               std::io::SeekFrom::Start(position) => {
                   self.position = position;
                   Ok(self.position)
               }
               std::io::SeekFrom::End(_) => panic!("This reader does not support seeking from end"),
               std::io::SeekFrom::Current(position) => {
                   self.position += position;
                   Ok(self.position)
               }
           }
       }
   }
   
   fn main() {}
   ```
   
   I.e. when we do not have seek and need to use data from a `Read` more than once, we can store it in a buffer and use it whenever it is requested (via `seek` to the back + `read`). This idiom uses as little memory as we need to store in a non-seek environment, namely the data that needs to be used twice (once for inference, one for reading).
   
   I hope this is a bit understandable.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] corneliusroemer commented on issue #1059: Allow csv reader builder to do schema inference even when reading csv from stdin

Posted by GitBox <gi...@apache.org>.
corneliusroemer commented on issue #1059:
URL: https://github.com/apache/arrow-rs/issues/1059#issuecomment-997162988


   @jorgecarleitao I agree that implementing a custom `seekable` `Reader` would be necessary to solve this without dropping the `seekable` requirement. But I'm not sure your suggestion is the way forward since we shouldn't read all of `stdin` into memory. Memory is even scarcer than storage.
   
   If you have 1TB uncompressed that compresses to 1GB. I can uncompress the 1GB to storage, then read it in with a normal file reader (which is seekable). Problem: I need 1TB of space and write-time.
   
   What doesn't work is read the 1TB into memory. No way.
   
   Alternative: read 1GB into memory, infer schema, then stream.
   
   Your suggestions seems to read it all into memory, doesn't it? When would you be allowed to drop early parts of the buffer?
   
   Couldn't one drop all back-buffer once one has gone beyond `max_read_records` in `builder = builder.infer_schema(opts.max_read_records);`? Once the max has been read, there's no need for seeking anymore. Seeking shouldn't happen anymore so one can drop whatever is behind you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] jorgecarleitao commented on issue #1059: Allow csv reader builder to do schema inference even when reading csv from stdin

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on issue #1059:
URL: https://github.com/apache/arrow-rs/issues/1059#issuecomment-997150785


   I think that this may be achieved with the current code with a generic struct over `R: Read` that wraps a `reader: R` that
   
   1. implements `Read` and buffers the result.
   2. implements `Seek` to positions smaller than the buffer (and panics otherwise)
   3. when the read happens over the buffered result, it reads first from the buffer up to the length of the buffered result, and then from the inner reader
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] jorgecarleitao commented on issue #1059: Allow csv reader builder to do schema inference even when reading csv from stdin

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on issue #1059:
URL: https://github.com/apache/arrow-rs/issues/1059#issuecomment-997627871


   > I'm a beginner at Rust so please bear with me.
   
   No worries, we are all learning :)
   
   > How do you decide how big the buffer will get?
   
   I think we can't: a single CSV line can be megabytes long. Part of parsing CSV is deciding how much bytes are needed per "cell" (row, column).
   
   The code above allocates for as long as the inference requires them (so, driven by the number of lines for inference + size of first lines in the specific CSV).
   
   > Does your code allow to release the buffer once schema inference has been completed? That's how it would be best: read into memory until schema inference is done, then release.
   
   Not sure I understood: isn't it the idea that we want to both infer _and_ parse those lines into arrow, but we can't seek to repeat the operation? If that is the case, I think that we can't release the buffer once the inference is done: we can only release it once the data has been infered _and_ parsed into columns. The code above works under this hypothesis: once we move the position past the buffer size, we release (see `// release memory` comment on the code).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org