You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/07/17 20:16:24 UTC

[GitHub] [arrow-datafusion] tustvold opened a new pull request, #2936: Add streaming JSON and CSV (#2935)

tustvold opened a new pull request, #2936:
URL: https://github.com/apache/arrow-datafusion/pull/2936

   # Which issue does this PR close?
   
   Closes #2935
   
    # Rationale for this change
   
   See ticket
   
   # What changes are included in this PR?
   
   It aligns the chunks received from object storage to record boundaries, and then feeds these through the decoders
   
   # Are there any user-facing changes?
   
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2936: Add streaming JSON and CSV reading, `NewlineDelimitedStream' (#2935)

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2936:
URL: https://github.com/apache/arrow-datafusion/pull/2936#discussion_r923648048


##########
datafusion/core/src/physical_plan/file_format/csv.rs:
##########
@@ -198,12 +198,15 @@ impl FormatReader for CsvOpener {
         Box::pin(async move {
             match store.get(&file.location).await? {
                 GetResult::File(file, _) => {
-                    Ok(futures::stream::iter(config.open(file)).boxed())
+                    Ok(futures::stream::iter(config.open(file, true)).boxed())

Review Comment:
   🥳 🦜 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] codecov-commenter commented on pull request #2936: Add streaming JSON and CSV (#2935)

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #2936:
URL: https://github.com/apache/arrow-datafusion/pull/2936#issuecomment-1186604451

   # [Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/2936?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#2936](https://codecov.io/gh/apache/arrow-datafusion/pull/2936?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (ce4f59e) into [master](https://codecov.io/gh/apache/arrow-datafusion/commit/b5537e753078f1a97315d641ea3608b6635c1069?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (b5537e7) will **increase** coverage by `0.02%`.
   > The diff coverage is `80.18%`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #2936      +/-   ##
   ==========================================
   + Coverage   85.30%   85.33%   +0.02%     
   ==========================================
     Files         273      274       +1     
     Lines       49269    49450     +181     
   ==========================================
   + Hits        42029    42198     +169     
   - Misses       7240     7252      +12     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow-datafusion/pull/2936?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [datafusion/core/src/datasource/listing/helpers.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2936/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9jb3JlL3NyYy9kYXRhc291cmNlL2xpc3RpbmcvaGVscGVycy5ycw==) | `94.96% <ø> (ø)` | |
   | [...tafusion/core/src/physical\_plan/file\_format/csv.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2936/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9jb3JlL3NyYy9waHlzaWNhbF9wbGFuL2ZpbGVfZm9ybWF0L2Nzdi5ycw==) | `91.75% <0.00%> (-0.48%)` | :arrow_down: |
   | [...afusion/core/src/physical\_plan/file\_format/json.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2936/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9jb3JlL3NyYy9waHlzaWNhbF9wbGFuL2ZpbGVfZm9ybWF0L2pzb24ucnM=) | `89.07% <0.00%> (-2.00%)` | :arrow_down: |
   | [...tafusion/core/src/physical\_plan/file\_format/mod.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2936/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9jb3JlL3NyYy9waHlzaWNhbF9wbGFuL2ZpbGVfZm9ybWF0L21vZC5ycw==) | `97.36% <ø> (ø)` | |
   | [.../src/physical\_plan/file\_format/delimited\_stream.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2936/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9jb3JlL3NyYy9waHlzaWNhbF9wbGFuL2ZpbGVfZm9ybWF0L2RlbGltaXRlZF9zdHJlYW0ucnM=) | `90.42% <90.42%> (ø)` | |
   | [datafusion/common/src/pyarrow.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2936/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9jb21tb24vc3JjL3B5YXJyb3cucnM=) | `0.00% <0.00%> (ø)` | |
   | [datafusion/proto/src/bytes/mod.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2936/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9wcm90by9zcmMvYnl0ZXMvbW9kLnJz) | `82.75% <0.00%> (ø)` | |
   | [...usion/core/src/avro\_to\_arrow/arrow\_array\_reader.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2936/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9jb3JlL3NyYy9hdnJvX3RvX2Fycm93L2Fycm93X2FycmF5X3JlYWRlci5ycw==) | `0.00% <0.00%> (ø)` | |
   | [datafusion/core/tests/sql/aggregates.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2936/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9jb3JlL3Rlc3RzL3NxbC9hZ2dyZWdhdGVzLnJz) | `99.28% <0.00%> (+0.01%)` | :arrow_up: |
   | [datafusion/expr/src/aggregate\_function.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2936/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9leHByL3NyYy9hZ2dyZWdhdGVfZnVuY3Rpb24ucnM=) | `92.25% <0.00%> (+0.02%)` | :arrow_up: |
   | ... and [4 more](https://codecov.io/gh/apache/arrow-datafusion/pull/2936/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/2936?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/2936?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [b5537e7...ce4f59e](https://codecov.io/gh/apache/arrow-datafusion/pull/2936?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2936: Add streaming JSON and CSV reading, `NewlineDelimitedStream' (#2935)

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2936:
URL: https://github.com/apache/arrow-datafusion/pull/2936#discussion_r923625775


##########
datafusion/core/src/physical_plan/file_format/chunked_store.rs:
##########
@@ -0,0 +1,122 @@
+use async_trait::async_trait;
+use bytes::Bytes;
+use futures::stream::BoxStream;
+use futures::StreamExt;
+use object_store::path::Path;
+use object_store::Result;
+use object_store::{GetResult, ListResult, ObjectMeta, ObjectStore};
+use std::fmt::{Debug, Display, Formatter};
+use std::ops::Range;
+use std::sync::Arc;
+
+/// Wraps a [`ObjectStore`] and makes its get response return chunks
+///
+/// TODO: Upstream into object_store_rs
+#[derive(Debug)]
+pub struct ChunkedStore {
+    inner: Arc<dyn ObjectStore>,
+    chunk_size: usize,
+}
+
+impl ChunkedStore {
+    pub fn new(inner: Arc<dyn ObjectStore>, chunk_size: usize) -> Self {
+        Self { inner, chunk_size }
+    }
+}
+
+impl Display for ChunkedStore {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(f, "ChunkedStore({})", self.inner)
+    }
+}
+
+#[async_trait]
+impl ObjectStore for ChunkedStore {
+    async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+        self.inner.put(location, bytes).await
+    }
+
+    async fn get(&self, location: &Path) -> Result<GetResult> {
+        let bytes = self.inner.get(location).await?.bytes().await?;
+        let mut offset = 0;
+        let chunk_size = self.chunk_size;
+
+        Ok(GetResult::Stream(
+            futures::stream::iter(std::iter::from_fn(move || {
+                let remaining = bytes.len() - offset;
+                if remaining == 0 {
+                    return None;
+                }
+                let to_read = remaining.min(chunk_size);
+                let next_offset = offset + to_read;
+                let slice = bytes.slice(offset..next_offset);
+                offset = next_offset;
+                Some(Ok(slice))
+            }))
+            .boxed(),
+        ))
+    }
+
+    async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
+        self.inner.get_range(location, range).await
+    }
+
+    async fn head(&self, location: &Path) -> Result<ObjectMeta> {
+        self.inner.head(location).await
+    }
+
+    async fn delete(&self, location: &Path) -> Result<()> {
+        self.inner.delete(location).await
+    }
+
+    async fn list(
+        &self,
+        prefix: Option<&Path>,
+    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+        self.inner.list(prefix).await
+    }
+
+    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
+        self.inner.list_with_delimiter(prefix).await
+    }
+
+    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
+        self.inner.copy(from, to).await
+    }
+
+    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
+        self.inner.copy_if_not_exists(from, to).await
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use object_store::memory::InMemory;
+
+    #[tokio::test]
+    async fn test_chunked() {
+        let location = Path::parse("test").unwrap();
+        let store = Arc::new(InMemory::new());
+        store
+            .put(&location, Bytes::from(vec![0; 1001]))
+            .await
+            .unwrap();
+
+        for chunk_size in [10, 20, 31] {
+            let store = ChunkedStore::new(store.clone(), chunk_size);
+            let mut s = match store.get(&location).await.unwrap() {
+                GetResult::Stream(s) => s,
+                _ => unreachable!(),
+            };
+
+            let mut remaining = 1001;
+            while let Some(next) = s.next().await {
+                let size = next.unwrap().len();
+                let expected = remaining.min(chunk_size);
+                assert_eq!(size, expected);
+                remaining -= expected;
+            }
+        }

Review Comment:
   I recommend also `assert_eq!(remaining, 0)` at the end of the test to ensure nothing is lost



##########
datafusion/core/src/physical_plan/file_format/csv.rs:
##########
@@ -198,12 +198,15 @@ impl FormatReader for CsvOpener {
         Box::pin(async move {
             match store.get(&file.location).await? {
                 GetResult::File(file, _) => {
-                    Ok(futures::stream::iter(config.open(file)).boxed())
+                    Ok(futures::stream::iter(config.open(file, true)).boxed())

Review Comment:
   Is `first-chunk` a bug fix?



##########
datafusion/core/src/physical_plan/file_format/json.rs:
##########
@@ -440,4 +443,38 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn test_chunked() {
+        let mut ctx = SessionContext::new();
+
+        for chunk_size in [10, 20, 30, 40] {
+            ctx.runtime_env().register_object_store(
+                "file",
+                "",
+                Arc::new(ChunkedStore::new(

Review Comment:
   very nice 👌 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on pull request #2936: Add streaming JSON and CSV reading, `NewlineDelimitedStream' (#2935)

Posted by GitBox <gi...@apache.org>.
tustvold commented on PR #2936:
URL: https://github.com/apache/arrow-datafusion/pull/2936#issuecomment-1187666048

   Marking as draft whilst I work on some tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2936: Add streaming JSON and CSV reading, `NewlineDelimitedStream' (#2935)

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2936:
URL: https://github.com/apache/arrow-datafusion/pull/2936#discussion_r923644501


##########
datafusion/core/src/physical_plan/file_format/csv.rs:
##########
@@ -198,12 +198,15 @@ impl FormatReader for CsvOpener {
         Box::pin(async move {
             match store.get(&file.location).await? {
                 GetResult::File(file, _) => {
-                    Ok(futures::stream::iter(config.open(file)).boxed())
+                    Ok(futures::stream::iter(config.open(file, true)).boxed())

Review Comment:
   Yup :smile: Tests FTW



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2936: Add streaming JSON and CSV reading, `NewlineDelimitedStream' (#2935)

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2936:
URL: https://github.com/apache/arrow-datafusion/pull/2936#discussion_r923453238


##########
datafusion/core/src/physical_plan/file_format/delimited_stream.rs:
##########
@@ -0,0 +1,215 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::{DataFusionError, Result};
+use bytes::Bytes;
+use futures::{Stream, StreamExt};
+use std::collections::VecDeque;
+
+/// The ASCII encoding of `"`
+const QUOTE: u8 = 34;
+
+/// The ASCII encoding of `\n`
+const NEWLINE: u8 = 10;
+
+/// The ASCII encoding of `\`
+const ESCAPE: u8 = 92;
+
+/// [`LineDelimiter`] is provided with a stream of [`Bytes`] and returns an iterator
+/// of [`Bytes`] containing a whole number of new line delimited records
+#[derive(Debug, Default)]
+struct LineDelimiter {
+    /// Complete chunks of [`Bytes`]
+    complete: VecDeque<Bytes>,
+    /// Remainder bytes that form the next record
+    remainder: Vec<u8>,
+    /// True if the last character was the escape character
+    is_escape: bool,
+    /// True if currently processing a quoted string
+    is_quote: bool,
+}
+
+impl LineDelimiter {
+    /// Creates a new [`LineDelimiter`] with the provided delimiter
+    fn new() -> Self {
+        Self::default()
+    }
+
+    /// Adds the next set of [`Bytes`]
+    fn push(&mut self, val: impl Into<Bytes>) {
+        let val: Bytes = val.into();
+
+        let is_escape = &mut self.is_escape;
+        let is_quote = &mut self.is_quote;
+        let mut record_ends = val.iter().enumerate().filter_map(|(idx, v)| {
+            if *is_escape {
+                *is_escape = false;
+                None
+            } else if *v == ESCAPE {
+                *is_escape = true;
+                None
+            } else if *v == QUOTE {
+                *is_quote = !*is_quote;
+                None
+            } else if *is_quote {
+                None
+            } else {
+                (*v == NEWLINE).then(|| idx + 1)
+            }
+        });
+
+        let start_offset = match self.remainder.is_empty() {
+            true => 0,
+            false => match record_ends.next() {
+                Some(idx) => {
+                    self.remainder.extend_from_slice(&val[0..idx]);
+                    self.complete
+                        .push_back(Bytes::from(std::mem::take(&mut self.remainder)));
+                    idx
+                }
+                None => {
+                    self.remainder.extend_from_slice(&val);
+                    return;
+                }
+            },
+        };
+        let end_offset = record_ends.last().unwrap_or(start_offset);
+        if start_offset != end_offset {
+            self.complete.push_back(val.slice(start_offset..end_offset));
+        }
+
+        if end_offset != val.len() {
+            self.remainder.extend_from_slice(&val[end_offset..])
+        }
+    }
+
+    /// Marks the end of the stream, delimiting any remaining bytes
+    ///
+    /// Returns `true` if there is no remaining data to be read
+    fn finish(&mut self) -> Result<bool> {
+        if !self.remainder.is_empty() {
+            if self.is_quote {
+                return Err(DataFusionError::Execution(
+                    "encountered unterminated string".to_string(),
+                ));
+            }
+
+            if self.is_escape {
+                return Err(DataFusionError::Execution(
+                    "encountered trailing escape character".to_string(),
+                ));
+            }
+
+            self.complete
+                .push_back(Bytes::from(std::mem::take(&mut self.remainder)))
+        }
+        Ok(self.complete.is_empty())
+    }
+}
+
+impl Iterator for LineDelimiter {
+    type Item = Bytes;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.complete.pop_front()
+    }
+}
+
+/// Given a [`Stream`] of [`Bytes`] returns a [`Stream`] where each
+/// yielded [`Bytes`] contains a whole number of new line delimited records
+pub fn newline_delimited_stream<S>(s: S) -> impl Stream<Item = Result<Bytes>>
+where
+    S: Stream<Item = Result<Bytes>> + Unpin,
+{
+    let delimiter = LineDelimiter::new();
+
+    futures::stream::unfold((s, delimiter), |(mut s, mut delimiter)| async move {
+        loop {
+            if let Some(next) = delimiter.next() {
+                return Some((Ok(next), (s, delimiter)));
+            }
+
+            match s.next().await {
+                Some(Ok(bytes)) => delimiter.push(bytes),
+                Some(Err(e)) => return Some((Err(e), (s, delimiter))),
+                None => match delimiter.finish() {
+                    Ok(true) => return None,
+                    Ok(false) => continue,
+                    Err(e) => return Some((Err(e), (s, delimiter))),
+                },
+            }
+        }
+    })
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use futures::stream::TryStreamExt;
+
+    #[test]
+    fn test_delimiter() {
+        let mut delimiter = LineDelimiter::new();
+        delimiter.push("hello\nworld");
+        delimiter.push("\n\n");
+
+        assert_eq!(delimiter.next().unwrap(), Bytes::from("hello\n"));
+        assert_eq!(delimiter.next().unwrap(), Bytes::from("world\n"));
+        assert_eq!(delimiter.next().unwrap(), Bytes::from("\n"));
+        assert!(delimiter.next().is_none());
+
+        delimiter.push("");

Review Comment:
   👍  that would be good to document (the intent of the test).
   
   I think the main suggestion of break up the test into smaller self contained blocks with descriptive names still holds even if this particular cut-off point would not be ideal.
   
   The total test size will be larger, but I think each test will be easier to understand what it is testing.
   
   maybe worth a thought



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2936: Add streaming JSON and CSV reading, `NewlineDelimitedStream' (#2935)

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2936:
URL: https://github.com/apache/arrow-datafusion/pull/2936#discussion_r923437122


##########
datafusion/core/src/physical_plan/file_format/delimited_stream.rs:
##########
@@ -0,0 +1,215 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::{DataFusionError, Result};
+use bytes::Bytes;
+use futures::{Stream, StreamExt};
+use std::collections::VecDeque;
+
+/// The ASCII encoding of `"`
+const QUOTE: u8 = 34;
+
+/// The ASCII encoding of `\n`
+const NEWLINE: u8 = 10;
+
+/// The ASCII encoding of `\`
+const ESCAPE: u8 = 92;
+
+/// [`LineDelimiter`] is provided with a stream of [`Bytes`] and returns an iterator
+/// of [`Bytes`] containing a whole number of new line delimited records
+#[derive(Debug, Default)]
+struct LineDelimiter {
+    /// Complete chunks of [`Bytes`]
+    complete: VecDeque<Bytes>,
+    /// Remainder bytes that form the next record
+    remainder: Vec<u8>,

Review Comment:
   I think the copy is unavoidable as the nature of the remainder, is you need to take data from two separate `Bytes`. It should only be a single "line" though, and so should be relatively minor from a performance standpoint



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ursabot commented on pull request #2936: Add streaming JSON and CSV reading, `NewlineDelimitedStream' (#2935)

Posted by GitBox <gi...@apache.org>.
ursabot commented on PR #2936:
URL: https://github.com/apache/arrow-datafusion/pull/2936#issuecomment-1188389286

   Benchmark runs are scheduled for baseline = b772c6df99fa23bbb75b65d06afb2e4dd3bba697 and contender = 944ef3d6eeb5e3b31c5561df9b7b8cbdef7367f5. 944ef3d6eeb5e3b31c5561df9b7b8cbdef7367f5 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] [ec2-t3-xlarge-us-east-2](https://conbench.ursa.dev/compare/runs/54592367924e4f87b47ca8ad049e36fe...507a146d2ae84e3c8009e0ab3ffae188/)
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] [test-mac-arm](https://conbench.ursa.dev/compare/runs/99c2301d21094d6b9de02fddb4877fc6...7071456bd3b841f4a726221a63fffab8/)
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] [ursa-i9-9960x](https://conbench.ursa.dev/compare/runs/14fbd2ef8bfa447db18ee6b28efcaed1...3e239b1202d243ae83d593e419a71cd7/)
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] [ursa-thinkcentre-m75q](https://conbench.ursa.dev/compare/runs/8dbbec480dc145a8980e5f7ae2032b2c...560bbabf683d4c249a60372509fb6706/)
   Buildkite builds:
   Supported benchmarks:
   ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
   test-mac-arm: Supported benchmark langs: C++, Python, R
   ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
   ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2936: Add streaming JSON and CSV reading, `NewlineDelimitedStream' (#2935)

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2936:
URL: https://github.com/apache/arrow-datafusion/pull/2936#discussion_r923385082


##########
datafusion/core/src/physical_plan/file_format/delimited_stream.rs:
##########
@@ -0,0 +1,215 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::{DataFusionError, Result};
+use bytes::Bytes;
+use futures::{Stream, StreamExt};
+use std::collections::VecDeque;
+
+/// The ASCII encoding of `"`
+const QUOTE: u8 = 34;

Review Comment:
   I wonder if the `b` constant syntax would make this easier to validate (and the same below)
   
   ```suggestion
   const QUOTE: u8 = b'"';
   ```



##########
datafusion/core/src/physical_plan/file_format/delimited_stream.rs:
##########
@@ -0,0 +1,215 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::{DataFusionError, Result};
+use bytes::Bytes;
+use futures::{Stream, StreamExt};
+use std::collections::VecDeque;
+
+/// The ASCII encoding of `"`
+const QUOTE: u8 = 34;
+
+/// The ASCII encoding of `\n`
+const NEWLINE: u8 = 10;
+
+/// The ASCII encoding of `\`
+const ESCAPE: u8 = 92;
+
+/// [`LineDelimiter`] is provided with a stream of [`Bytes`] and returns an iterator
+/// of [`Bytes`] containing a whole number of new line delimited records
+#[derive(Debug, Default)]
+struct LineDelimiter {
+    /// Complete chunks of [`Bytes`]
+    complete: VecDeque<Bytes>,
+    /// Remainder bytes that form the next record
+    remainder: Vec<u8>,
+    /// True if the last character was the escape character
+    is_escape: bool,
+    /// True if currently processing a quoted string
+    is_quote: bool,
+}
+
+impl LineDelimiter {
+    /// Creates a new [`LineDelimiter`] with the provided delimiter
+    fn new() -> Self {
+        Self::default()
+    }
+
+    /// Adds the next set of [`Bytes`]
+    fn push(&mut self, val: impl Into<Bytes>) {
+        let val: Bytes = val.into();
+
+        let is_escape = &mut self.is_escape;
+        let is_quote = &mut self.is_quote;
+        let mut record_ends = val.iter().enumerate().filter_map(|(idx, v)| {

Review Comment:
   Documenting the escaping rules that `LineDelimiter` assumes might be good (like `\` style escapes with `"` quotes)



##########
datafusion/core/src/physical_plan/file_format/delimited_stream.rs:
##########
@@ -0,0 +1,215 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::{DataFusionError, Result};
+use bytes::Bytes;
+use futures::{Stream, StreamExt};
+use std::collections::VecDeque;
+
+/// The ASCII encoding of `"`
+const QUOTE: u8 = 34;
+
+/// The ASCII encoding of `\n`
+const NEWLINE: u8 = 10;
+
+/// The ASCII encoding of `\`
+const ESCAPE: u8 = 92;
+
+/// [`LineDelimiter`] is provided with a stream of [`Bytes`] and returns an iterator
+/// of [`Bytes`] containing a whole number of new line delimited records
+#[derive(Debug, Default)]
+struct LineDelimiter {
+    /// Complete chunks of [`Bytes`]
+    complete: VecDeque<Bytes>,
+    /// Remainder bytes that form the next record
+    remainder: Vec<u8>,
+    /// True if the last character was the escape character
+    is_escape: bool,
+    /// True if currently processing a quoted string
+    is_quote: bool,
+}
+
+impl LineDelimiter {
+    /// Creates a new [`LineDelimiter`] with the provided delimiter
+    fn new() -> Self {
+        Self::default()
+    }
+
+    /// Adds the next set of [`Bytes`]
+    fn push(&mut self, val: impl Into<Bytes>) {
+        let val: Bytes = val.into();
+
+        let is_escape = &mut self.is_escape;
+        let is_quote = &mut self.is_quote;
+        let mut record_ends = val.iter().enumerate().filter_map(|(idx, v)| {
+            if *is_escape {
+                *is_escape = false;
+                None
+            } else if *v == ESCAPE {
+                *is_escape = true;
+                None
+            } else if *v == QUOTE {
+                *is_quote = !*is_quote;
+                None
+            } else if *is_quote {
+                None
+            } else {
+                (*v == NEWLINE).then(|| idx + 1)
+            }
+        });
+
+        let start_offset = match self.remainder.is_empty() {
+            true => 0,
+            false => match record_ends.next() {
+                Some(idx) => {
+                    self.remainder.extend_from_slice(&val[0..idx]);
+                    self.complete
+                        .push_back(Bytes::from(std::mem::take(&mut self.remainder)));

Review Comment:
   If `remainder` was a `bytes` this would probably be cleaner (though the clause below to handle no records in the chunk would be more complicated); However, I suspect the "next `Bytes` actually has more than one record ending is the more common case



##########
datafusion/core/src/physical_plan/file_format/delimited_stream.rs:
##########
@@ -0,0 +1,215 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::{DataFusionError, Result};
+use bytes::Bytes;
+use futures::{Stream, StreamExt};
+use std::collections::VecDeque;
+
+/// The ASCII encoding of `"`
+const QUOTE: u8 = 34;
+
+/// The ASCII encoding of `\n`
+const NEWLINE: u8 = 10;
+
+/// The ASCII encoding of `\`
+const ESCAPE: u8 = 92;
+
+/// [`LineDelimiter`] is provided with a stream of [`Bytes`] and returns an iterator
+/// of [`Bytes`] containing a whole number of new line delimited records
+#[derive(Debug, Default)]
+struct LineDelimiter {
+    /// Complete chunks of [`Bytes`]
+    complete: VecDeque<Bytes>,
+    /// Remainder bytes that form the next record
+    remainder: Vec<u8>,
+    /// True if the last character was the escape character
+    is_escape: bool,
+    /// True if currently processing a quoted string
+    is_quote: bool,
+}
+
+impl LineDelimiter {
+    /// Creates a new [`LineDelimiter`] with the provided delimiter
+    fn new() -> Self {
+        Self::default()
+    }
+
+    /// Adds the next set of [`Bytes`]
+    fn push(&mut self, val: impl Into<Bytes>) {
+        let val: Bytes = val.into();
+
+        let is_escape = &mut self.is_escape;
+        let is_quote = &mut self.is_quote;
+        let mut record_ends = val.iter().enumerate().filter_map(|(idx, v)| {
+            if *is_escape {
+                *is_escape = false;
+                None
+            } else if *v == ESCAPE {
+                *is_escape = true;
+                None
+            } else if *v == QUOTE {
+                *is_quote = !*is_quote;
+                None
+            } else if *is_quote {
+                None
+            } else {
+                (*v == NEWLINE).then(|| idx + 1)
+            }
+        });
+
+        let start_offset = match self.remainder.is_empty() {
+            true => 0,
+            false => match record_ends.next() {
+                Some(idx) => {
+                    self.remainder.extend_from_slice(&val[0..idx]);
+                    self.complete
+                        .push_back(Bytes::from(std::mem::take(&mut self.remainder)));
+                    idx
+                }
+                None => {
+                    self.remainder.extend_from_slice(&val);
+                    return;
+                }
+            },
+        };
+        let end_offset = record_ends.last().unwrap_or(start_offset);
+        if start_offset != end_offset {
+            self.complete.push_back(val.slice(start_offset..end_offset));
+        }
+
+        if end_offset != val.len() {
+            self.remainder.extend_from_slice(&val[end_offset..])
+        }
+    }
+
+    /// Marks the end of the stream, delimiting any remaining bytes
+    ///
+    /// Returns `true` if there is no remaining data to be read
+    fn finish(&mut self) -> Result<bool> {
+        if !self.remainder.is_empty() {
+            if self.is_quote {
+                return Err(DataFusionError::Execution(
+                    "encountered unterminated string".to_string(),
+                ));
+            }
+
+            if self.is_escape {
+                return Err(DataFusionError::Execution(
+                    "encountered trailing escape character".to_string(),
+                ));
+            }
+
+            self.complete
+                .push_back(Bytes::from(std::mem::take(&mut self.remainder)))
+        }
+        Ok(self.complete.is_empty())
+    }
+}
+
+impl Iterator for LineDelimiter {
+    type Item = Bytes;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.complete.pop_front()
+    }
+}
+
+/// Given a [`Stream`] of [`Bytes`] returns a [`Stream`] where each
+/// yielded [`Bytes`] contains a whole number of new line delimited records

Review Comment:
   👍 



##########
datafusion/core/src/physical_plan/file_format/delimited_stream.rs:
##########
@@ -0,0 +1,215 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::{DataFusionError, Result};
+use bytes::Bytes;
+use futures::{Stream, StreamExt};
+use std::collections::VecDeque;
+
+/// The ASCII encoding of `"`
+const QUOTE: u8 = 34;
+
+/// The ASCII encoding of `\n`
+const NEWLINE: u8 = 10;
+
+/// The ASCII encoding of `\`
+const ESCAPE: u8 = 92;
+
+/// [`LineDelimiter`] is provided with a stream of [`Bytes`] and returns an iterator
+/// of [`Bytes`] containing a whole number of new line delimited records
+#[derive(Debug, Default)]
+struct LineDelimiter {
+    /// Complete chunks of [`Bytes`]
+    complete: VecDeque<Bytes>,
+    /// Remainder bytes that form the next record
+    remainder: Vec<u8>,

Review Comment:
   I wonder if you could use something like
   
   ```suggestion
       remainder: Bytes,
   ```
   
   As bytes has a `slice` method https://docs.rs/bytes/1.1.0/bytes/struct.Bytes.html#method.slice 🤔 
   
   Which might reduce some copies 🤷  



##########
datafusion/core/src/physical_plan/file_format/delimited_stream.rs:
##########
@@ -0,0 +1,215 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::{DataFusionError, Result};
+use bytes::Bytes;
+use futures::{Stream, StreamExt};
+use std::collections::VecDeque;
+
+/// The ASCII encoding of `"`
+const QUOTE: u8 = 34;
+
+/// The ASCII encoding of `\n`
+const NEWLINE: u8 = 10;
+
+/// The ASCII encoding of `\`
+const ESCAPE: u8 = 92;
+
+/// [`LineDelimiter`] is provided with a stream of [`Bytes`] and returns an iterator
+/// of [`Bytes`] containing a whole number of new line delimited records
+#[derive(Debug, Default)]
+struct LineDelimiter {
+    /// Complete chunks of [`Bytes`]
+    complete: VecDeque<Bytes>,
+    /// Remainder bytes that form the next record
+    remainder: Vec<u8>,
+    /// True if the last character was the escape character
+    is_escape: bool,
+    /// True if currently processing a quoted string
+    is_quote: bool,
+}
+
+impl LineDelimiter {
+    /// Creates a new [`LineDelimiter`] with the provided delimiter
+    fn new() -> Self {
+        Self::default()
+    }
+
+    /// Adds the next set of [`Bytes`]
+    fn push(&mut self, val: impl Into<Bytes>) {
+        let val: Bytes = val.into();
+
+        let is_escape = &mut self.is_escape;
+        let is_quote = &mut self.is_quote;
+        let mut record_ends = val.iter().enumerate().filter_map(|(idx, v)| {
+            if *is_escape {
+                *is_escape = false;
+                None
+            } else if *v == ESCAPE {
+                *is_escape = true;
+                None
+            } else if *v == QUOTE {
+                *is_quote = !*is_quote;
+                None
+            } else if *is_quote {
+                None
+            } else {
+                (*v == NEWLINE).then(|| idx + 1)
+            }
+        });
+
+        let start_offset = match self.remainder.is_empty() {
+            true => 0,
+            false => match record_ends.next() {
+                Some(idx) => {
+                    self.remainder.extend_from_slice(&val[0..idx]);
+                    self.complete
+                        .push_back(Bytes::from(std::mem::take(&mut self.remainder)));
+                    idx
+                }
+                None => {
+                    self.remainder.extend_from_slice(&val);
+                    return;
+                }
+            },
+        };
+        let end_offset = record_ends.last().unwrap_or(start_offset);
+        if start_offset != end_offset {
+            self.complete.push_back(val.slice(start_offset..end_offset));
+        }
+
+        if end_offset != val.len() {
+            self.remainder.extend_from_slice(&val[end_offset..])
+        }
+    }
+
+    /// Marks the end of the stream, delimiting any remaining bytes
+    ///
+    /// Returns `true` if there is no remaining data to be read
+    fn finish(&mut self) -> Result<bool> {
+        if !self.remainder.is_empty() {
+            if self.is_quote {
+                return Err(DataFusionError::Execution(
+                    "encountered unterminated string".to_string(),
+                ));
+            }
+
+            if self.is_escape {
+                return Err(DataFusionError::Execution(
+                    "encountered trailing escape character".to_string(),
+                ));
+            }
+
+            self.complete
+                .push_back(Bytes::from(std::mem::take(&mut self.remainder)))
+        }
+        Ok(self.complete.is_empty())
+    }
+}
+
+impl Iterator for LineDelimiter {
+    type Item = Bytes;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.complete.pop_front()
+    }
+}
+
+/// Given a [`Stream`] of [`Bytes`] returns a [`Stream`] where each
+/// yielded [`Bytes`] contains a whole number of new line delimited records
+pub fn newline_delimited_stream<S>(s: S) -> impl Stream<Item = Result<Bytes>>
+where
+    S: Stream<Item = Result<Bytes>> + Unpin,
+{
+    let delimiter = LineDelimiter::new();
+
+    futures::stream::unfold((s, delimiter), |(mut s, mut delimiter)| async move {
+        loop {
+            if let Some(next) = delimiter.next() {
+                return Some((Ok(next), (s, delimiter)));
+            }
+
+            match s.next().await {
+                Some(Ok(bytes)) => delimiter.push(bytes),
+                Some(Err(e)) => return Some((Err(e), (s, delimiter))),
+                None => match delimiter.finish() {
+                    Ok(true) => return None,
+                    Ok(false) => continue,
+                    Err(e) => return Some((Err(e), (s, delimiter))),
+                },
+            }
+        }
+    })
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use futures::stream::TryStreamExt;
+
+    #[test]
+    fn test_delimiter() {
+        let mut delimiter = LineDelimiter::new();
+        delimiter.push("hello\nworld");
+        delimiter.push("\n\n");
+
+        assert_eq!(delimiter.next().unwrap(), Bytes::from("hello\n"));
+        assert_eq!(delimiter.next().unwrap(), Bytes::from("world\n"));
+        assert_eq!(delimiter.next().unwrap(), Bytes::from("\n"));
+        assert!(delimiter.next().is_none());
+
+        delimiter.push("");

Review Comment:
   I would recommend making a new test here as a way to make the tests more self documenting.
   
   Unless it is important that data can be `push`ed into a `LineDelimiter` after `finish()` is called 🤔 
   
   ```suggestion
       #[test]
       fn test_delimiter_escaped() {
           delimiter.push("");
           
   ```



##########
datafusion/core/src/physical_plan/file_format/delimited_stream.rs:
##########
@@ -0,0 +1,215 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::{DataFusionError, Result};
+use bytes::Bytes;
+use futures::{Stream, StreamExt};
+use std::collections::VecDeque;
+
+/// The ASCII encoding of `"`
+const QUOTE: u8 = 34;
+
+/// The ASCII encoding of `\n`
+const NEWLINE: u8 = 10;
+
+/// The ASCII encoding of `\`
+const ESCAPE: u8 = 92;
+
+/// [`LineDelimiter`] is provided with a stream of [`Bytes`] and returns an iterator
+/// of [`Bytes`] containing a whole number of new line delimited records
+#[derive(Debug, Default)]
+struct LineDelimiter {
+    /// Complete chunks of [`Bytes`]
+    complete: VecDeque<Bytes>,
+    /// Remainder bytes that form the next record
+    remainder: Vec<u8>,
+    /// True if the last character was the escape character
+    is_escape: bool,
+    /// True if currently processing a quoted string
+    is_quote: bool,
+}
+
+impl LineDelimiter {
+    /// Creates a new [`LineDelimiter`] with the provided delimiter
+    fn new() -> Self {
+        Self::default()
+    }
+
+    /// Adds the next set of [`Bytes`]
+    fn push(&mut self, val: impl Into<Bytes>) {
+        let val: Bytes = val.into();
+
+        let is_escape = &mut self.is_escape;
+        let is_quote = &mut self.is_quote;
+        let mut record_ends = val.iter().enumerate().filter_map(|(idx, v)| {
+            if *is_escape {
+                *is_escape = false;
+                None
+            } else if *v == ESCAPE {
+                *is_escape = true;
+                None
+            } else if *v == QUOTE {
+                *is_quote = !*is_quote;
+                None
+            } else if *is_quote {
+                None
+            } else {
+                (*v == NEWLINE).then(|| idx + 1)
+            }
+        });
+
+        let start_offset = match self.remainder.is_empty() {
+            true => 0,
+            false => match record_ends.next() {
+                Some(idx) => {
+                    self.remainder.extend_from_slice(&val[0..idx]);
+                    self.complete
+                        .push_back(Bytes::from(std::mem::take(&mut self.remainder)));
+                    idx
+                }
+                None => {
+                    self.remainder.extend_from_slice(&val);
+                    return;
+                }
+            },
+        };
+        let end_offset = record_ends.last().unwrap_or(start_offset);
+        if start_offset != end_offset {
+            self.complete.push_back(val.slice(start_offset..end_offset));
+        }
+
+        if end_offset != val.len() {
+            self.remainder.extend_from_slice(&val[end_offset..])
+        }
+    }
+
+    /// Marks the end of the stream, delimiting any remaining bytes
+    ///
+    /// Returns `true` if there is no remaining data to be read
+    fn finish(&mut self) -> Result<bool> {
+        if !self.remainder.is_empty() {
+            if self.is_quote {
+                return Err(DataFusionError::Execution(
+                    "encountered unterminated string".to_string(),
+                ));
+            }
+
+            if self.is_escape {
+                return Err(DataFusionError::Execution(
+                    "encountered trailing escape character".to_string(),
+                ));
+            }
+
+            self.complete
+                .push_back(Bytes::from(std::mem::take(&mut self.remainder)))
+        }
+        Ok(self.complete.is_empty())
+    }
+}
+
+impl Iterator for LineDelimiter {
+    type Item = Bytes;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.complete.pop_front()
+    }
+}
+
+/// Given a [`Stream`] of [`Bytes`] returns a [`Stream`] where each
+/// yielded [`Bytes`] contains a whole number of new line delimited records
+pub fn newline_delimited_stream<S>(s: S) -> impl Stream<Item = Result<Bytes>>
+where
+    S: Stream<Item = Result<Bytes>> + Unpin,
+{
+    let delimiter = LineDelimiter::new();
+
+    futures::stream::unfold((s, delimiter), |(mut s, mut delimiter)| async move {
+        loop {
+            if let Some(next) = delimiter.next() {
+                return Some((Ok(next), (s, delimiter)));
+            }
+
+            match s.next().await {
+                Some(Ok(bytes)) => delimiter.push(bytes),
+                Some(Err(e)) => return Some((Err(e), (s, delimiter))),
+                None => match delimiter.finish() {
+                    Ok(true) => return None,
+                    Ok(false) => continue,
+                    Err(e) => return Some((Err(e), (s, delimiter))),
+                },
+            }
+        }
+    })
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use futures::stream::TryStreamExt;
+
+    #[test]
+    fn test_delimiter() {

Review Comment:
   Is the case where some `push` doesn't have a delimiter covered? I couldn't find it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold merged pull request #2936: Add streaming JSON and CSV reading, `NewlineDelimitedStream' (#2935)

Posted by GitBox <gi...@apache.org>.
tustvold merged PR #2936:
URL: https://github.com/apache/arrow-datafusion/pull/2936


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2936: Add streaming JSON and CSV reading, `NewlineDelimitedStream' (#2935)

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2936:
URL: https://github.com/apache/arrow-datafusion/pull/2936#discussion_r923438607


##########
datafusion/core/src/physical_plan/file_format/delimited_stream.rs:
##########
@@ -0,0 +1,215 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::{DataFusionError, Result};
+use bytes::Bytes;
+use futures::{Stream, StreamExt};
+use std::collections::VecDeque;
+
+/// The ASCII encoding of `"`
+const QUOTE: u8 = 34;
+
+/// The ASCII encoding of `\n`
+const NEWLINE: u8 = 10;
+
+/// The ASCII encoding of `\`
+const ESCAPE: u8 = 92;
+
+/// [`LineDelimiter`] is provided with a stream of [`Bytes`] and returns an iterator
+/// of [`Bytes`] containing a whole number of new line delimited records
+#[derive(Debug, Default)]
+struct LineDelimiter {
+    /// Complete chunks of [`Bytes`]
+    complete: VecDeque<Bytes>,
+    /// Remainder bytes that form the next record
+    remainder: Vec<u8>,
+    /// True if the last character was the escape character
+    is_escape: bool,
+    /// True if currently processing a quoted string
+    is_quote: bool,
+}
+
+impl LineDelimiter {
+    /// Creates a new [`LineDelimiter`] with the provided delimiter
+    fn new() -> Self {
+        Self::default()
+    }
+
+    /// Adds the next set of [`Bytes`]
+    fn push(&mut self, val: impl Into<Bytes>) {
+        let val: Bytes = val.into();
+
+        let is_escape = &mut self.is_escape;
+        let is_quote = &mut self.is_quote;
+        let mut record_ends = val.iter().enumerate().filter_map(|(idx, v)| {
+            if *is_escape {
+                *is_escape = false;
+                None
+            } else if *v == ESCAPE {
+                *is_escape = true;
+                None
+            } else if *v == QUOTE {
+                *is_quote = !*is_quote;
+                None
+            } else if *is_quote {
+                None
+            } else {
+                (*v == NEWLINE).then(|| idx + 1)
+            }
+        });
+
+        let start_offset = match self.remainder.is_empty() {
+            true => 0,
+            false => match record_ends.next() {
+                Some(idx) => {
+                    self.remainder.extend_from_slice(&val[0..idx]);
+                    self.complete
+                        .push_back(Bytes::from(std::mem::take(&mut self.remainder)));
+                    idx
+                }
+                None => {
+                    self.remainder.extend_from_slice(&val);
+                    return;
+                }
+            },
+        };
+        let end_offset = record_ends.last().unwrap_or(start_offset);
+        if start_offset != end_offset {
+            self.complete.push_back(val.slice(start_offset..end_offset));
+        }
+
+        if end_offset != val.len() {
+            self.remainder.extend_from_slice(&val[end_offset..])
+        }
+    }
+
+    /// Marks the end of the stream, delimiting any remaining bytes
+    ///
+    /// Returns `true` if there is no remaining data to be read
+    fn finish(&mut self) -> Result<bool> {
+        if !self.remainder.is_empty() {
+            if self.is_quote {
+                return Err(DataFusionError::Execution(
+                    "encountered unterminated string".to_string(),
+                ));
+            }
+
+            if self.is_escape {
+                return Err(DataFusionError::Execution(
+                    "encountered trailing escape character".to_string(),
+                ));
+            }
+
+            self.complete
+                .push_back(Bytes::from(std::mem::take(&mut self.remainder)))
+        }
+        Ok(self.complete.is_empty())
+    }
+}
+
+impl Iterator for LineDelimiter {
+    type Item = Bytes;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.complete.pop_front()
+    }
+}
+
+/// Given a [`Stream`] of [`Bytes`] returns a [`Stream`] where each
+/// yielded [`Bytes`] contains a whole number of new line delimited records
+pub fn newline_delimited_stream<S>(s: S) -> impl Stream<Item = Result<Bytes>>
+where
+    S: Stream<Item = Result<Bytes>> + Unpin,
+{
+    let delimiter = LineDelimiter::new();
+
+    futures::stream::unfold((s, delimiter), |(mut s, mut delimiter)| async move {
+        loop {
+            if let Some(next) = delimiter.next() {
+                return Some((Ok(next), (s, delimiter)));
+            }
+
+            match s.next().await {
+                Some(Ok(bytes)) => delimiter.push(bytes),
+                Some(Err(e)) => return Some((Err(e), (s, delimiter))),
+                None => match delimiter.finish() {
+                    Ok(true) => return None,
+                    Ok(false) => continue,
+                    Err(e) => return Some((Err(e), (s, delimiter))),
+                },
+            }
+        }
+    })
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use futures::stream::TryStreamExt;
+
+    #[test]
+    fn test_delimiter() {
+        let mut delimiter = LineDelimiter::new();
+        delimiter.push("hello\nworld");
+        delimiter.push("\n\n");
+
+        assert_eq!(delimiter.next().unwrap(), Bytes::from("hello\n"));
+        assert_eq!(delimiter.next().unwrap(), Bytes::from("world\n"));
+        assert_eq!(delimiter.next().unwrap(), Bytes::from("\n"));
+        assert!(delimiter.next().is_none());
+
+        delimiter.push("");

Review Comment:
   I think it is important to test that more data can be added to a LineDelimiter after data has been pulled from it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2936: Add streaming JSON and CSV reading, `NewlineDelimitedStream' (#2935)

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2936:
URL: https://github.com/apache/arrow-datafusion/pull/2936#discussion_r923437953


##########
datafusion/core/src/physical_plan/file_format/delimited_stream.rs:
##########
@@ -0,0 +1,215 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::{DataFusionError, Result};
+use bytes::Bytes;
+use futures::{Stream, StreamExt};
+use std::collections::VecDeque;
+
+/// The ASCII encoding of `"`
+const QUOTE: u8 = 34;
+
+/// The ASCII encoding of `\n`
+const NEWLINE: u8 = 10;
+
+/// The ASCII encoding of `\`
+const ESCAPE: u8 = 92;
+
+/// [`LineDelimiter`] is provided with a stream of [`Bytes`] and returns an iterator
+/// of [`Bytes`] containing a whole number of new line delimited records
+#[derive(Debug, Default)]
+struct LineDelimiter {
+    /// Complete chunks of [`Bytes`]
+    complete: VecDeque<Bytes>,
+    /// Remainder bytes that form the next record
+    remainder: Vec<u8>,

Review Comment:
   makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org