You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/05/19 12:42:56 UTC

[GitHub] [arrow-datafusion] tustvold opened a new pull request, #2578: Extract Listing URI logic into ListingTableUri structure

tustvold opened a new pull request, #2578:
URL: https://github.com/apache/arrow-datafusion/pull/2578

   _Draft as builds on top of #2572_
   
   # Which issue does this PR close?
   
   Closes #2562 
   Part of #2489
   
    # Rationale for this change
   
   See tickets, the previous logic was inconsistent, and varied based on call-site.
   
   # What changes are included in this PR?
   
   This extracts a `ListingTableUrl` that handles all the logic of translating from the user-provided string, to a canonical representation that can be used by the rest of DataFusion.
   
   I'm having some difficulty with some of the ballista tests, likely related to #2546, and would appreciate some help with this. Perhaps @thinkharderdev ?
   
   # Are there any user-facing changes?
   
   Yes, this makes changes both to the public API, and also the interpretation of certain strings. In particular:
   
   * If passed a string without a scheme, this must exist on the local filesystem or parsing will return an error
   * Globbing will only be applied to strings without a scheme, as glob expressions aren't part of the specification
   
   FYI @timvw 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold merged pull request #2578: Extract Listing URI logic into ListingTableUri structure

Posted by GitBox <gi...@apache.org>.
tustvold merged PR #2578:
URL: https://github.com/apache/arrow-datafusion/pull/2578


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2578: Extract Listing URI logic into ListingTableUri structure

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2578:
URL: https://github.com/apache/arrow-datafusion/pull/2578#discussion_r884746154


##########
benchmarks/src/bin/tpch.rs:
##########
@@ -50,6 +50,7 @@ use datafusion::{
 
 use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
 use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
+use datafusion::datasource::listing::ListingTableUrl;

Review Comment:
   I wonder if it is time to move `datasource` to its own crate, if possible 🤔 



##########
datafusion/core/src/datasource/listing/path.rs:
##########
@@ -0,0 +1,304 @@
+// Licensed to the Apache Software Foundation (ASF) under one

Review Comment:
   minor comment: maybe this file should be called "url.rs" for consistency



##########
datafusion/core/src/datasource/listing/path.rs:
##########
@@ -0,0 +1,304 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::datasource::object_store::ObjectStoreUrl;
+use datafusion_common::{DataFusionError, Result};
+use datafusion_data_access::object_store::ObjectStore;
+use datafusion_data_access::FileMeta;
+use futures::stream::BoxStream;
+use futures::{StreamExt, TryStreamExt};
+use glob::Pattern;
+use itertools::Itertools;
+use std::path::is_separator;
+use url::Url;
+
+/// A parsed URL identifying files for a listing table, see [`ListingTableUrl::parse`]
+/// for more information on the supported expressions
+#[derive(Debug, Clone)]
+pub struct ListingTableUrl {
+    /// A URL that identifies a file or directory to list files from
+    url: Url,
+    /// An optional glob expression used to filter files
+    glob: Option<Pattern>,
+}
+
+impl ListingTableUrl {
+    /// Parse a provided string as a `ListingTableUrl`
+    ///
+    /// # Paths without a Scheme
+    ///
+    /// If no scheme is provided, or the string is an absolute filesystem path
+    /// as determined [`std::path::Path::is_absolute`], the string will be
+    /// interpreted as a path on the local filesystem using the operating
+    /// system's standard path delimiter, i.e. `\` on Windows, `/` on Unix.
+    ///
+    /// If the path contains any of `'?', '*', '['`, it will be considered
+    /// a glob expression and resolved as described in the section below.
+    ///
+    /// Otherwise, the path will be resolved to an absolute path, returning
+    /// an error if it does not exist, and converted to a [file URI]
+    ///
+    /// If you wish to specify a path that does not exist on the local
+    /// machine you must provide it as a fully-qualified [file URI]
+    /// e.g. `file:///myfile.txt`
+    ///
+    /// ## Glob Paths
+    ///
+    /// If no scheme is provided, and the path contains a glob expression, it will
+    /// be resolved as follows.
+    ///
+    /// The string up to the first path segment containing a glob expression will be extracted,
+    /// and resolved in the same manner as a normal scheme-less path. That is, resolved to
+    /// an absolute path on the local filesystem, returning an error if it does not exist,
+    /// and converted to a [file URI]
+    ///
+    /// The remaining string will be interpreted as a [`glob::Pattern`] and used as a
+    /// filter when listing files from object storage
+    ///
+    /// [file URI]: https://en.wikipedia.org/wiki/File_URI_scheme
+    pub fn parse(s: impl AsRef<str>) -> Result<Self> {
+        let s = s.as_ref();
+
+        // This is necessary to handle the case of a path starting with a drive letter
+        if std::path::Path::new(s).is_absolute() {
+            return Self::parse_path(s);
+        }
+
+        match Url::parse(s) {
+            Ok(url) => Ok(Self { url, glob: None }),
+            Err(url::ParseError::RelativeUrlWithoutBase) => Self::parse_path(s),
+            Err(e) => Err(DataFusionError::External(Box::new(e))),
+        }
+    }
+
+    /// Creates a new [`ListingTableUrl`] interpreting `s` as a filesystem path
+    fn parse_path(s: &str) -> Result<Self> {
+        let (prefix, glob) = match split_glob_expression(s) {
+            Some((prefix, glob)) => {
+                let glob = Pattern::new(glob)
+                    .map_err(|e| DataFusionError::External(Box::new(e)))?;
+                (prefix, Some(glob))
+            }
+            None => (s, None),
+        };
+
+        let path = std::path::Path::new(prefix).canonicalize()?;
+        let url = match path.is_file() {
+            true => Url::from_file_path(path).unwrap(),
+            false => Url::from_directory_path(path).unwrap(),
+        };
+
+        Ok(Self { url, glob })
+    }
+
+    /// Returns the URL scheme
+    pub fn scheme(&self) -> &str {
+        self.url.scheme()
+    }
+
+    /// Returns the path as expected by [`ObjectStore`]
+    ///
+    /// In particular for file scheme URLs, this is an absolute
+    /// on the local filesystem in the OS-specific path representation
+    ///
+    /// For other URLs, this is a the host and path of the URL,
+    /// delimited by `/`, and with no leading `/`
+    ///
+    /// TODO: Handle paths consistently (#2489)
+    fn prefix(&self) -> &str {
+        match self.scheme() {
+            "file" => match cfg!(target_family = "windows") {
+                true => self.url.path().strip_prefix('/').unwrap(),
+                false => self.url.path(),
+            },
+            _ => &self.url[url::Position::BeforeHost..url::Position::AfterPath],
+        }
+    }
+
+    /// Strips the prefix of this [`ListingTableUrl`] from the provided path, returning
+    /// an iterator of the remaining path segments
+    ///
+    /// TODO: Handle paths consistently (#2489)
+    pub(crate) fn strip_prefix<'a, 'b: 'a>(
+        &'a self,
+        path: &'b str,
+    ) -> Option<impl Iterator<Item = &'b str> + 'a> {
+        let prefix = self.prefix();
+        // Ignore empty path segments
+        let diff = itertools::diff_with(
+            path.split(is_separator).filter(|s| !s.is_empty()),
+            prefix.split(is_separator).filter(|s| !s.is_empty()),
+            |a, b| a == b,
+        );
+
+        match diff {
+            // Match with remaining
+            Some(itertools::Diff::Shorter(_, subpath)) => Some(subpath),
+            _ => None,
+        }
+    }
+
+    /// List all files identified by this [`ListingTableUrl`] for the provided `file_extension`
+    pub(crate) fn list_all_files<'a>(
+        &'a self,
+        store: &'a dyn ObjectStore,
+        file_extension: &'a str,
+    ) -> BoxStream<'a, Result<FileMeta>> {
+        futures::stream::once(async move {
+            let prefix = self.prefix();
+            store.list_file(prefix.as_ref()).await
+        })
+        .try_flatten()
+        .map_err(DataFusionError::IoError)
+        .try_filter(move |meta| {
+            let path = meta.path();
+
+            let extension_match = path.ends_with(file_extension);
+            let glob_match = match &self.glob {
+                Some(glob) => match self.strip_prefix(path) {
+                    Some(mut segments) => {
+                        let stripped = segments.join("/");
+                        glob.matches(&stripped)
+                    }
+                    None => false,
+                },
+                None => true,
+            };
+
+            futures::future::ready(extension_match && glob_match)
+        })
+        .boxed()
+    }
+
+    /// Returns this [`ListingTableUrl`] as a string
+    pub fn as_str(&self) -> &str {
+        self.as_ref()
+    }
+
+    /// Return the [`ObjectStoreUrl`] for this [`ListingTableUrl`]
+    pub fn object_store(&self) -> ObjectStoreUrl {
+        let url = &self.url[url::Position::BeforeScheme..url::Position::BeforePath];
+        ObjectStoreUrl::parse(url).unwrap()
+    }
+}
+
+impl AsRef<str> for ListingTableUrl {
+    fn as_ref(&self) -> &str {
+        self.url.as_ref()
+    }
+}
+
+impl AsRef<Url> for ListingTableUrl {
+    fn as_ref(&self) -> &Url {
+        &self.url
+    }
+}
+
+impl std::fmt::Display for ListingTableUrl {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        self.as_str().fmt(f)
+    }
+}
+
+const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];
+
+/// Splits `path` at the first path segment containing a glob expression, returning
+/// `None` if no glob expression found.
+///
+/// Path delimiters are determined using [`std::path::is_separator`] which
+/// permits `/` as a path delimiter even on Windows platforms.
+///
+fn split_glob_expression(path: &str) -> Option<(&str, &str)> {
+    let mut last_separator = 0;
+
+    for (byte_idx, char) in path.char_indices() {
+        if GLOB_START_CHARS.contains(&char) {
+            if last_separator == 0 {
+                return Some((".", path));
+            }
+            return Some(path.split_at(last_separator));
+        }
+
+        if std::path::is_separator(char) {
+            last_separator = byte_idx + char.len_utf8();
+        }
+    }
+    None
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_prefix_path() {
+        let root = std::env::current_dir().unwrap();
+        let root = root.to_string_lossy();
+
+        let url = ListingTableUrl::parse(&root).unwrap();
+        let child = format!("{}/partition/file", root);
+
+        let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
+        assert_eq!(prefix, vec!["partition", "file"]);
+    }
+
+    #[test]
+    fn test_prefix_s3() {
+        let url = ListingTableUrl::parse("s3://bucket/foo/bar").unwrap();
+        assert_eq!(url.prefix(), "bucket/foo/bar");
+
+        let path = "bucket/foo/bar/partition/foo.parquet";
+        let prefix: Vec<_> = url.strip_prefix(path).unwrap().collect();
+        assert_eq!(prefix, vec!["partition", "foo.parquet"]);
+    }
+
+    #[test]
+    fn test_split_glob() {
+        fn test(input: &str, expected: Option<(&str, &str)>) {
+            assert_eq!(
+                split_glob_expression(input),
+                expected,
+                "testing split_glob_expression with {}",
+                input
+            );
+        }
+
+        // no glob patterns
+        test("/", None);
+        test("/a.txt", None);
+        test("/a", None);
+        test("/a/", None);
+        test("/a/b", None);
+        test("/a/b/", None);
+        test("/a/b.txt", None);
+        test("/a/b/c.txt", None);
+        // glob patterns, thus we build the longest path (os-specific)
+        test("*.txt", Some((".", "*.txt")));
+        test("/*.txt", Some(("/", "*.txt")));
+        test("/a/*b.txt", Some(("/a/", "*b.txt")));
+        test("/a/*/b.txt", Some(("/a/", "*/b.txt")));
+        test("/a/b/[123]/file*.txt", Some(("/a/b/", "[123]/file*.txt")));
+        test("/a/b*.txt", Some(("/a/", "b*.txt")));
+        test("/a/b/**/c*.txt", Some(("/a/b/", "**/c*.txt")));
+
+        // https://github.com/apache/arrow-datafusion/issues/2465

Review Comment:
   https://github.com/apache/arrow-datafusion/issues/2465 is closed -- is this meant to refer to another ticket? Or is it saying this isa test for that issue 🤔 



##########
datafusion/core/src/datasource/listing/helpers.rs:
##########
@@ -601,50 +557,29 @@ mod tests {
         assert_eq!(
             None,
             parse_partitions_for_path(
-                "bucket/mytable",
+                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
                 "bucket/mytable/v1/file.csv",
                 &[String::from("mypartition")]
             )
         );
         assert_eq!(
             Some(vec!["v1", "v2"]),
             parse_partitions_for_path(
-                "bucket/mytable",
+                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
                 "bucket/mytable/mypartition=v1/otherpartition=v2/file.csv",
                 &[String::from("mypartition"), String::from("otherpartition")]
             )
         );
         assert_eq!(
             Some(vec!["v1"]),
             parse_partitions_for_path(
-                "bucket/mytable",
+                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
                 "bucket/mytable/mypartition=v1/otherpartition=v2/file.csv",
                 &[String::from("mypartition")]
             )
         );
     }
 
-    #[cfg(target_os = "windows")]

Review Comment:
   Why were these tests removed? I think ensuring we have proper windows coverage is important



##########
datafusion/core/src/lib.rs:
##########
@@ -52,11 +52,11 @@
 //!    .to_string();
 //!
 //! let expected = vec![
-//!     "+---+--------------------------+",
-//!     "| a | MIN(tests/example.csv.b) |",
-//!     "+---+--------------------------+",
-//!     "| 1 | 2                        |",
-//!     "+---+--------------------------+"
+//!     "+---+----------------+",
+//!     "| a | MIN(?table?.b) |",

Review Comment:
   it seems ok to me. I am not sure if the `use the filename as table name` function of csv was intentional (like maybe it is some sort of dataframe compatibility)?
   
   cc @andygrove 



##########
datafusion/core/src/physical_plan/file_format/mod.rs:
##########
@@ -68,6 +70,8 @@ lazy_static! {
 pub struct FileScanConfig {
     /// Store from which the `files` should be fetched
     pub object_store: Arc<dyn ObjectStore>,
+    /// Object store URL
+    pub object_store_url: ObjectStoreUrl,

Review Comment:
   worth a tracking ticket?



##########
datafusion/core/src/datasource/listing/path.rs:
##########
@@ -0,0 +1,304 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::datasource::object_store::ObjectStoreUrl;
+use datafusion_common::{DataFusionError, Result};
+use datafusion_data_access::object_store::ObjectStore;
+use datafusion_data_access::FileMeta;
+use futures::stream::BoxStream;
+use futures::{StreamExt, TryStreamExt};
+use glob::Pattern;
+use itertools::Itertools;
+use std::path::is_separator;
+use url::Url;
+
+/// A parsed URL identifying files for a listing table, see [`ListingTableUrl::parse`]
+/// for more information on the supported expressions
+#[derive(Debug, Clone)]
+pub struct ListingTableUrl {
+    /// A URL that identifies a file or directory to list files from
+    url: Url,
+    /// An optional glob expression used to filter files
+    glob: Option<Pattern>,
+}
+
+impl ListingTableUrl {
+    /// Parse a provided string as a `ListingTableUrl`
+    ///
+    /// # Paths without a Scheme
+    ///
+    /// If no scheme is provided, or the string is an absolute filesystem path
+    /// as determined [`std::path::Path::is_absolute`], the string will be
+    /// interpreted as a path on the local filesystem using the operating
+    /// system's standard path delimiter, i.e. `\` on Windows, `/` on Unix.
+    ///
+    /// If the path contains any of `'?', '*', '['`, it will be considered

Review Comment:
   I think we should also explicitly say here that globs are only supported with local files, not when the path is a url. 



##########
datafusion/core/src/datasource/listing/helpers.rs:
##########
@@ -476,33 +426,34 @@ mod tests {
         let filter = Expr::eq(col("mypartition"), lit("val1"));
         let pruned = pruned_partition_list(
             store.as_ref(),
-            "tablepath/",
+            &ListingTableUrl::parse("file:///tablepath/").unwrap(),

Review Comment:
   these tests certainly look much nicer



##########
datafusion/core/src/datasource/listing/path.rs:
##########
@@ -0,0 +1,304 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::datasource::object_store::ObjectStoreUrl;
+use datafusion_common::{DataFusionError, Result};
+use datafusion_data_access::object_store::ObjectStore;
+use datafusion_data_access::FileMeta;
+use futures::stream::BoxStream;
+use futures::{StreamExt, TryStreamExt};
+use glob::Pattern;
+use itertools::Itertools;
+use std::path::is_separator;
+use url::Url;
+
+/// A parsed URL identifying files for a listing table, see [`ListingTableUrl::parse`]
+/// for more information on the supported expressions
+#[derive(Debug, Clone)]
+pub struct ListingTableUrl {
+    /// A URL that identifies a file or directory to list files from
+    url: Url,
+    /// An optional glob expression used to filter files
+    glob: Option<Pattern>,
+}
+
+impl ListingTableUrl {
+    /// Parse a provided string as a `ListingTableUrl`
+    ///
+    /// # Paths without a Scheme
+    ///
+    /// If no scheme is provided, or the string is an absolute filesystem path
+    /// as determined [`std::path::Path::is_absolute`], the string will be
+    /// interpreted as a path on the local filesystem using the operating
+    /// system's standard path delimiter, i.e. `\` on Windows, `/` on Unix.
+    ///
+    /// If the path contains any of `'?', '*', '['`, it will be considered
+    /// a glob expression and resolved as described in the section below.
+    ///
+    /// Otherwise, the path will be resolved to an absolute path, returning
+    /// an error if it does not exist, and converted to a [file URI]
+    ///
+    /// If you wish to specify a path that does not exist on the local
+    /// machine you must provide it as a fully-qualified [file URI]
+    /// e.g. `file:///myfile.txt`
+    ///
+    /// ## Glob Paths
+    ///
+    /// If no scheme is provided, and the path contains a glob expression, it will
+    /// be resolved as follows.
+    ///
+    /// The string up to the first path segment containing a glob expression will be extracted,
+    /// and resolved in the same manner as a normal scheme-less path. That is, resolved to
+    /// an absolute path on the local filesystem, returning an error if it does not exist,
+    /// and converted to a [file URI]
+    ///
+    /// The remaining string will be interpreted as a [`glob::Pattern`] and used as a
+    /// filter when listing files from object storage
+    ///
+    /// [file URI]: https://en.wikipedia.org/wiki/File_URI_scheme
+    pub fn parse(s: impl AsRef<str>) -> Result<Self> {
+        let s = s.as_ref();
+
+        // This is necessary to handle the case of a path starting with a drive letter
+        if std::path::Path::new(s).is_absolute() {
+            return Self::parse_path(s);
+        }
+
+        match Url::parse(s) {
+            Ok(url) => Ok(Self { url, glob: None }),
+            Err(url::ParseError::RelativeUrlWithoutBase) => Self::parse_path(s),
+            Err(e) => Err(DataFusionError::External(Box::new(e))),
+        }
+    }
+
+    /// Creates a new [`ListingTableUrl`] interpreting `s` as a filesystem path
+    fn parse_path(s: &str) -> Result<Self> {

Review Comment:
   Is there any use case for this being `pub`? Like to allow users to force the parsing to treat it like a path



##########
datafusion/core/src/datasource/listing/path.rs:
##########
@@ -0,0 +1,304 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::datasource::object_store::ObjectStoreUrl;
+use datafusion_common::{DataFusionError, Result};
+use datafusion_data_access::object_store::ObjectStore;
+use datafusion_data_access::FileMeta;
+use futures::stream::BoxStream;
+use futures::{StreamExt, TryStreamExt};
+use glob::Pattern;
+use itertools::Itertools;
+use std::path::is_separator;
+use url::Url;
+
+/// A parsed URL identifying files for a listing table, see [`ListingTableUrl::parse`]
+/// for more information on the supported expressions
+#[derive(Debug, Clone)]
+pub struct ListingTableUrl {
+    /// A URL that identifies a file or directory to list files from
+    url: Url,
+    /// An optional glob expression used to filter files
+    glob: Option<Pattern>,
+}
+
+impl ListingTableUrl {
+    /// Parse a provided string as a `ListingTableUrl`
+    ///
+    /// # Paths without a Scheme
+    ///
+    /// If no scheme is provided, or the string is an absolute filesystem path

Review Comment:
   👍  this is great. Thank you for this writeup. I think it makes sense to me
   
   @yjshen  is this behavior ok with you?



##########
datafusion/core/src/datasource/object_store.rs:
##########
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! ObjectStoreRegistry holds all the object stores at Runtime with a scheme for each store.
+//! This allows the user to extend DataFusion with different storage systems such as S3 or HDFS
+//! and query data inside these systems.
+
+use datafusion_common::{DataFusionError, Result};
+use datafusion_data_access::object_store::local::{LocalFileSystem, LOCAL_SCHEME};
+use datafusion_data_access::object_store::ObjectStore;
+use parking_lot::RwLock;
+use std::collections::HashMap;
+use std::sync::Arc;
+use url::Url;
+
+/// A parsed URL identifying a particular [`ObjectStore`]
+#[derive(Debug, Clone)]
+pub struct ObjectStoreUrl {
+    url: Url,
+}
+
+impl ObjectStoreUrl {
+    /// Parse an [`ObjectStoreUrl`] from a string
+    pub fn parse(s: impl AsRef<str>) -> Result<Self> {
+        let mut parsed =
+            Url::parse(s.as_ref()).map_err(|e| DataFusionError::External(Box::new(e)))?;
+
+        let remaining = &parsed[url::Position::BeforePath..];
+        if !remaining.is_empty() && remaining != "/" {
+            return Err(DataFusionError::Execution(format!(
+                "ObjectStoreUrl must only contain scheme and authority, got: {}",
+                remaining
+            )));
+        }
+
+        // Always set path for consistency
+        parsed.set_path("/");
+        Ok(Self { url: parsed })
+    }
+
+    /// An [`ObjectStoreUrl`] for the local filesystem
+    pub fn local_filesystem() -> Self {
+        Self::parse("file://").unwrap()
+    }
+
+    /// Returns this [`ObjectStoreUrl`] as a string
+    pub fn as_str(&self) -> &str {
+        self.as_ref()
+    }
+}
+
+impl AsRef<str> for ObjectStoreUrl {
+    fn as_ref(&self) -> &str {
+        self.url.as_ref()
+    }
+}
+
+impl AsRef<Url> for ObjectStoreUrl {
+    fn as_ref(&self) -> &Url {
+        &self.url
+    }
+}
+
+impl std::fmt::Display for ObjectStoreUrl {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        self.as_str().fmt(f)
+    }
+}
+
+/// Object store registry
+pub struct ObjectStoreRegistry {
+    /// A map from scheme to object store that serve list / read operations for the store
+    pub object_stores: RwLock<HashMap<String, Arc<dyn ObjectStore>>>,
+}
+
+impl std::fmt::Debug for ObjectStoreRegistry {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        f.debug_struct("ObjectStoreRegistry")
+            .field(
+                "schemes",
+                &self.object_stores.read().keys().collect::<Vec<_>>(),
+            )
+            .finish()
+    }
+}
+
+impl Default for ObjectStoreRegistry {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl ObjectStoreRegistry {
+    /// Create the registry that object stores can registered into.
+    /// ['LocalFileSystem'] store is registered in by default to support read local files natively.
+    pub fn new() -> Self {
+        let mut map: HashMap<String, Arc<dyn ObjectStore>> = HashMap::new();
+        map.insert(LOCAL_SCHEME.to_string(), Arc::new(LocalFileSystem));
+
+        Self {
+            object_stores: RwLock::new(map),
+        }
+    }
+
+    /// Adds a new store to this registry.
+    /// If a store of the same prefix existed before, it is replaced in the registry and returned.
+    pub fn register_store(
+        &self,
+        scheme: String,
+        store: Arc<dyn ObjectStore>,
+    ) -> Option<Arc<dyn ObjectStore>> {
+        let mut stores = self.object_stores.write();
+        stores.insert(scheme, store)
+    }
+
+    /// Get the store registered for scheme
+    pub fn get(&self, scheme: &str) -> Option<Arc<dyn ObjectStore>> {
+        let stores = self.object_stores.read();
+        stores.get(scheme).cloned()
+    }
+
+    /// Get a suitable store for the provided URL. For example:
+    ///
+    /// - URL with scheme `file://` or no schema will return the default LocalFS store
+    /// - URL with scheme `s3://` will return the S3 store if it's registered
+    ///
+    pub fn get_by_url(&self, url: impl AsRef<Url>) -> Result<Arc<dyn ObjectStore>> {
+        let url = url.as_ref();
+        let stores = self.object_stores.read();
+        let store = stores.get(url.scheme()).map(Clone::clone).ok_or_else(|| {

Review Comment:
   Is the `Clone::clone` needed here? I thought `get` already returned an owned `Arc`?



##########
datafusion/core/src/catalog/schema.rs:
##########
@@ -156,31 +156,33 @@ impl ObjectStoreSchemaProvider {
             .register_store(scheme.into(), object_store)
     }
 
-    /// Retrieves a `ObjectStore` instance by scheme
-    pub fn object_store<'a>(
+    /// Retrieves a `ObjectStore` instance for a given Url
+    pub fn object_store(
         &self,
-        uri: &'a str,
-    ) -> Result<(Arc<dyn ObjectStore>, &'a str)> {
+        url: impl AsRef<url::Url>,

Review Comment:
   this seems like a reasonable change so that object store paths are always identified by url 👍 
   



##########
datafusion/core/src/datasource/object_store.rs:
##########
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! ObjectStoreRegistry holds all the object stores at Runtime with a scheme for each store.
+//! This allows the user to extend DataFusion with different storage systems such as S3 or HDFS
+//! and query data inside these systems.
+
+use datafusion_common::{DataFusionError, Result};
+use datafusion_data_access::object_store::local::{LocalFileSystem, LOCAL_SCHEME};
+use datafusion_data_access::object_store::ObjectStore;
+use parking_lot::RwLock;
+use std::collections::HashMap;
+use std::sync::Arc;
+use url::Url;
+
+/// A parsed URL identifying a particular [`ObjectStore`]
+#[derive(Debug, Clone)]
+pub struct ObjectStoreUrl {
+    url: Url,
+}
+
+impl ObjectStoreUrl {
+    /// Parse an [`ObjectStoreUrl`] from a string
+    pub fn parse(s: impl AsRef<str>) -> Result<Self> {
+        let mut parsed =
+            Url::parse(s.as_ref()).map_err(|e| DataFusionError::External(Box::new(e)))?;
+
+        let remaining = &parsed[url::Position::BeforePath..];
+        if !remaining.is_empty() && remaining != "/" {
+            return Err(DataFusionError::Execution(format!(
+                "ObjectStoreUrl must only contain scheme and authority, got: {}",

Review Comment:
   does it technically require an `authority`? I thought authority refers to `hostname:port` whereas I think the object store only requires a hostname



##########
datafusion/core/src/datasource/listing/path.rs:
##########
@@ -0,0 +1,304 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::datasource::object_store::ObjectStoreUrl;
+use datafusion_common::{DataFusionError, Result};
+use datafusion_data_access::object_store::ObjectStore;
+use datafusion_data_access::FileMeta;
+use futures::stream::BoxStream;
+use futures::{StreamExt, TryStreamExt};
+use glob::Pattern;
+use itertools::Itertools;
+use std::path::is_separator;
+use url::Url;
+
+/// A parsed URL identifying files for a listing table, see [`ListingTableUrl::parse`]
+/// for more information on the supported expressions
+#[derive(Debug, Clone)]
+pub struct ListingTableUrl {
+    /// A URL that identifies a file or directory to list files from
+    url: Url,
+    /// An optional glob expression used to filter files
+    glob: Option<Pattern>,
+}
+
+impl ListingTableUrl {
+    /// Parse a provided string as a `ListingTableUrl`
+    ///
+    /// # Paths without a Scheme
+    ///
+    /// If no scheme is provided, or the string is an absolute filesystem path
+    /// as determined [`std::path::Path::is_absolute`], the string will be
+    /// interpreted as a path on the local filesystem using the operating
+    /// system's standard path delimiter, i.e. `\` on Windows, `/` on Unix.
+    ///
+    /// If the path contains any of `'?', '*', '['`, it will be considered
+    /// a glob expression and resolved as described in the section below.
+    ///
+    /// Otherwise, the path will be resolved to an absolute path, returning
+    /// an error if it does not exist, and converted to a [file URI]
+    ///
+    /// If you wish to specify a path that does not exist on the local
+    /// machine you must provide it as a fully-qualified [file URI]
+    /// e.g. `file:///myfile.txt`
+    ///
+    /// ## Glob Paths
+    ///
+    /// If no scheme is provided, and the path contains a glob expression, it will
+    /// be resolved as follows.
+    ///
+    /// The string up to the first path segment containing a glob expression will be extracted,
+    /// and resolved in the same manner as a normal scheme-less path. That is, resolved to
+    /// an absolute path on the local filesystem, returning an error if it does not exist,
+    /// and converted to a [file URI]
+    ///
+    /// The remaining string will be interpreted as a [`glob::Pattern`] and used as a
+    /// filter when listing files from object storage
+    ///
+    /// [file URI]: https://en.wikipedia.org/wiki/File_URI_scheme
+    pub fn parse(s: impl AsRef<str>) -> Result<Self> {
+        let s = s.as_ref();
+
+        // This is necessary to handle the case of a path starting with a drive letter
+        if std::path::Path::new(s).is_absolute() {
+            return Self::parse_path(s);
+        }
+
+        match Url::parse(s) {
+            Ok(url) => Ok(Self { url, glob: None }),
+            Err(url::ParseError::RelativeUrlWithoutBase) => Self::parse_path(s),
+            Err(e) => Err(DataFusionError::External(Box::new(e))),
+        }
+    }
+
+    /// Creates a new [`ListingTableUrl`] interpreting `s` as a filesystem path
+    fn parse_path(s: &str) -> Result<Self> {
+        let (prefix, glob) = match split_glob_expression(s) {
+            Some((prefix, glob)) => {
+                let glob = Pattern::new(glob)
+                    .map_err(|e| DataFusionError::External(Box::new(e)))?;
+                (prefix, Some(glob))
+            }
+            None => (s, None),
+        };
+
+        let path = std::path::Path::new(prefix).canonicalize()?;
+        let url = match path.is_file() {
+            true => Url::from_file_path(path).unwrap(),
+            false => Url::from_directory_path(path).unwrap(),
+        };
+
+        Ok(Self { url, glob })
+    }
+
+    /// Returns the URL scheme
+    pub fn scheme(&self) -> &str {
+        self.url.scheme()
+    }
+
+    /// Returns the path as expected by [`ObjectStore`]
+    ///
+    /// In particular for file scheme URLs, this is an absolute
+    /// on the local filesystem in the OS-specific path representation
+    ///
+    /// For other URLs, this is a the host and path of the URL,
+    /// delimited by `/`, and with no leading `/`
+    ///
+    /// TODO: Handle paths consistently (#2489)
+    fn prefix(&self) -> &str {
+        match self.scheme() {
+            "file" => match cfg!(target_family = "windows") {
+                true => self.url.path().strip_prefix('/').unwrap(),
+                false => self.url.path(),
+            },
+            _ => &self.url[url::Position::BeforeHost..url::Position::AfterPath],
+        }
+    }
+
+    /// Strips the prefix of this [`ListingTableUrl`] from the provided path, returning
+    /// an iterator of the remaining path segments
+    ///
+    /// TODO: Handle paths consistently (#2489)
+    pub(crate) fn strip_prefix<'a, 'b: 'a>(
+        &'a self,
+        path: &'b str,
+    ) -> Option<impl Iterator<Item = &'b str> + 'a> {
+        let prefix = self.prefix();
+        // Ignore empty path segments
+        let diff = itertools::diff_with(
+            path.split(is_separator).filter(|s| !s.is_empty()),
+            prefix.split(is_separator).filter(|s| !s.is_empty()),
+            |a, b| a == b,
+        );
+
+        match diff {
+            // Match with remaining
+            Some(itertools::Diff::Shorter(_, subpath)) => Some(subpath),
+            _ => None,
+        }
+    }
+
+    /// List all files identified by this [`ListingTableUrl`] for the provided `file_extension`
+    pub(crate) fn list_all_files<'a>(
+        &'a self,
+        store: &'a dyn ObjectStore,
+        file_extension: &'a str,
+    ) -> BoxStream<'a, Result<FileMeta>> {
+        futures::stream::once(async move {
+            let prefix = self.prefix();
+            store.list_file(prefix.as_ref()).await
+        })
+        .try_flatten()
+        .map_err(DataFusionError::IoError)
+        .try_filter(move |meta| {
+            let path = meta.path();
+
+            let extension_match = path.ends_with(file_extension);
+            let glob_match = match &self.glob {
+                Some(glob) => match self.strip_prefix(path) {
+                    Some(mut segments) => {
+                        let stripped = segments.join("/");
+                        glob.matches(&stripped)
+                    }
+                    None => false,
+                },
+                None => true,
+            };
+
+            futures::future::ready(extension_match && glob_match)
+        })
+        .boxed()
+    }
+
+    /// Returns this [`ListingTableUrl`] as a string
+    pub fn as_str(&self) -> &str {
+        self.as_ref()
+    }
+
+    /// Return the [`ObjectStoreUrl`] for this [`ListingTableUrl`]
+    pub fn object_store(&self) -> ObjectStoreUrl {
+        let url = &self.url[url::Position::BeforeScheme..url::Position::BeforePath];
+        ObjectStoreUrl::parse(url).unwrap()
+    }
+}
+
+impl AsRef<str> for ListingTableUrl {
+    fn as_ref(&self) -> &str {
+        self.url.as_ref()
+    }
+}
+
+impl AsRef<Url> for ListingTableUrl {
+    fn as_ref(&self) -> &Url {
+        &self.url
+    }
+}
+
+impl std::fmt::Display for ListingTableUrl {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        self.as_str().fmt(f)
+    }
+}
+
+const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];
+
+/// Splits `path` at the first path segment containing a glob expression, returning
+/// `None` if no glob expression found.
+///
+/// Path delimiters are determined using [`std::path::is_separator`] which
+/// permits `/` as a path delimiter even on Windows platforms.
+///
+fn split_glob_expression(path: &str) -> Option<(&str, &str)> {
+    let mut last_separator = 0;
+
+    for (byte_idx, char) in path.char_indices() {
+        if GLOB_START_CHARS.contains(&char) {
+            if last_separator == 0 {
+                return Some((".", path));
+            }
+            return Some(path.split_at(last_separator));
+        }
+
+        if std::path::is_separator(char) {
+            last_separator = byte_idx + char.len_utf8();
+        }
+    }
+    None
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]

Review Comment:
   I am a big fan of the consolidated logic and ability to test here 💯 



##########
datafusion/core/tests/sql/mod.rs:
##########
@@ -811,25 +813,57 @@ pub fn table_with_sequence(
     Ok(Arc::new(MemTable::try_new(schema, partitions)?))
 }
 
-// Normalizes parts of an explain plan that vary from run to run (such as path)
-fn normalize_for_explain(s: &str) -> String {
-    // Convert things like /Users/alamb/Software/arrow/testing/data/csv/aggregate_test_100.csv
-    // to ARROW_TEST_DATA/csv/aggregate_test_100.csv
-    let data_path = datafusion::test_util::arrow_test_data();
-    let s = s.replace(&data_path, "ARROW_TEST_DATA");
+pub struct ExplainNormalizer {

Review Comment:
   👍 



##########
datafusion/core/src/datasource/listing/path.rs:
##########
@@ -0,0 +1,304 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::datasource::object_store::ObjectStoreUrl;
+use datafusion_common::{DataFusionError, Result};
+use datafusion_data_access::object_store::ObjectStore;
+use datafusion_data_access::FileMeta;
+use futures::stream::BoxStream;
+use futures::{StreamExt, TryStreamExt};
+use glob::Pattern;
+use itertools::Itertools;
+use std::path::is_separator;
+use url::Url;
+
+/// A parsed URL identifying files for a listing table, see [`ListingTableUrl::parse`]
+/// for more information on the supported expressions
+#[derive(Debug, Clone)]
+pub struct ListingTableUrl {
+    /// A URL that identifies a file or directory to list files from
+    url: Url,
+    /// An optional glob expression used to filter files
+    glob: Option<Pattern>,
+}
+
+impl ListingTableUrl {
+    /// Parse a provided string as a `ListingTableUrl`
+    ///
+    /// # Paths without a Scheme
+    ///
+    /// If no scheme is provided, or the string is an absolute filesystem path
+    /// as determined [`std::path::Path::is_absolute`], the string will be
+    /// interpreted as a path on the local filesystem using the operating
+    /// system's standard path delimiter, i.e. `\` on Windows, `/` on Unix.
+    ///
+    /// If the path contains any of `'?', '*', '['`, it will be considered
+    /// a glob expression and resolved as described in the section below.
+    ///
+    /// Otherwise, the path will be resolved to an absolute path, returning
+    /// an error if it does not exist, and converted to a [file URI]
+    ///
+    /// If you wish to specify a path that does not exist on the local
+    /// machine you must provide it as a fully-qualified [file URI]
+    /// e.g. `file:///myfile.txt`
+    ///
+    /// ## Glob Paths
+    ///
+    /// If no scheme is provided, and the path contains a glob expression, it will
+    /// be resolved as follows.
+    ///
+    /// The string up to the first path segment containing a glob expression will be extracted,
+    /// and resolved in the same manner as a normal scheme-less path. That is, resolved to
+    /// an absolute path on the local filesystem, returning an error if it does not exist,
+    /// and converted to a [file URI]
+    ///
+    /// The remaining string will be interpreted as a [`glob::Pattern`] and used as a
+    /// filter when listing files from object storage
+    ///
+    /// [file URI]: https://en.wikipedia.org/wiki/File_URI_scheme
+    pub fn parse(s: impl AsRef<str>) -> Result<Self> {
+        let s = s.as_ref();
+
+        // This is necessary to handle the case of a path starting with a drive letter
+        if std::path::Path::new(s).is_absolute() {
+            return Self::parse_path(s);
+        }
+
+        match Url::parse(s) {
+            Ok(url) => Ok(Self { url, glob: None }),
+            Err(url::ParseError::RelativeUrlWithoutBase) => Self::parse_path(s),
+            Err(e) => Err(DataFusionError::External(Box::new(e))),
+        }
+    }
+
+    /// Creates a new [`ListingTableUrl`] interpreting `s` as a filesystem path
+    fn parse_path(s: &str) -> Result<Self> {
+        let (prefix, glob) = match split_glob_expression(s) {
+            Some((prefix, glob)) => {
+                let glob = Pattern::new(glob)
+                    .map_err(|e| DataFusionError::External(Box::new(e)))?;
+                (prefix, Some(glob))
+            }
+            None => (s, None),
+        };
+
+        let path = std::path::Path::new(prefix).canonicalize()?;
+        let url = match path.is_file() {
+            true => Url::from_file_path(path).unwrap(),
+            false => Url::from_directory_path(path).unwrap(),
+        };
+
+        Ok(Self { url, glob })
+    }
+
+    /// Returns the URL scheme
+    pub fn scheme(&self) -> &str {
+        self.url.scheme()
+    }
+
+    /// Returns the path as expected by [`ObjectStore`]
+    ///
+    /// In particular for file scheme URLs, this is an absolute
+    /// on the local filesystem in the OS-specific path representation
+    ///
+    /// For other URLs, this is a the host and path of the URL,
+    /// delimited by `/`, and with no leading `/`
+    ///
+    /// TODO: Handle paths consistently (#2489)
+    fn prefix(&self) -> &str {
+        match self.scheme() {
+            "file" => match cfg!(target_family = "windows") {
+                true => self.url.path().strip_prefix('/').unwrap(),
+                false => self.url.path(),
+            },
+            _ => &self.url[url::Position::BeforeHost..url::Position::AfterPath],
+        }
+    }
+
+    /// Strips the prefix of this [`ListingTableUrl`] from the provided path, returning
+    /// an iterator of the remaining path segments
+    ///
+    /// TODO: Handle paths consistently (#2489)
+    pub(crate) fn strip_prefix<'a, 'b: 'a>(
+        &'a self,
+        path: &'b str,
+    ) -> Option<impl Iterator<Item = &'b str> + 'a> {
+        let prefix = self.prefix();
+        // Ignore empty path segments
+        let diff = itertools::diff_with(
+            path.split(is_separator).filter(|s| !s.is_empty()),
+            prefix.split(is_separator).filter(|s| !s.is_empty()),
+            |a, b| a == b,
+        );
+
+        match diff {
+            // Match with remaining
+            Some(itertools::Diff::Shorter(_, subpath)) => Some(subpath),
+            _ => None,
+        }
+    }
+
+    /// List all files identified by this [`ListingTableUrl`] for the provided `file_extension`
+    pub(crate) fn list_all_files<'a>(
+        &'a self,
+        store: &'a dyn ObjectStore,
+        file_extension: &'a str,
+    ) -> BoxStream<'a, Result<FileMeta>> {
+        futures::stream::once(async move {
+            let prefix = self.prefix();
+            store.list_file(prefix.as_ref()).await
+        })
+        .try_flatten()
+        .map_err(DataFusionError::IoError)
+        .try_filter(move |meta| {
+            let path = meta.path();
+
+            let extension_match = path.ends_with(file_extension);
+            let glob_match = match &self.glob {
+                Some(glob) => match self.strip_prefix(path) {
+                    Some(mut segments) => {
+                        let stripped = segments.join("/");
+                        glob.matches(&stripped)
+                    }
+                    None => false,
+                },
+                None => true,
+            };
+
+            futures::future::ready(extension_match && glob_match)
+        })
+        .boxed()
+    }
+
+    /// Returns this [`ListingTableUrl`] as a string
+    pub fn as_str(&self) -> &str {
+        self.as_ref()
+    }
+
+    /// Return the [`ObjectStoreUrl`] for this [`ListingTableUrl`]
+    pub fn object_store(&self) -> ObjectStoreUrl {
+        let url = &self.url[url::Position::BeforeScheme..url::Position::BeforePath];
+        ObjectStoreUrl::parse(url).unwrap()
+    }
+}
+
+impl AsRef<str> for ListingTableUrl {
+    fn as_ref(&self) -> &str {
+        self.url.as_ref()
+    }
+}
+
+impl AsRef<Url> for ListingTableUrl {
+    fn as_ref(&self) -> &Url {
+        &self.url
+    }
+}
+
+impl std::fmt::Display for ListingTableUrl {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        self.as_str().fmt(f)
+    }
+}
+
+const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];
+
+/// Splits `path` at the first path segment containing a glob expression, returning
+/// `None` if no glob expression found.
+///
+/// Path delimiters are determined using [`std::path::is_separator`] which
+/// permits `/` as a path delimiter even on Windows platforms.
+///
+fn split_glob_expression(path: &str) -> Option<(&str, &str)> {
+    let mut last_separator = 0;
+
+    for (byte_idx, char) in path.char_indices() {
+        if GLOB_START_CHARS.contains(&char) {
+            if last_separator == 0 {
+                return Some((".", path));
+            }
+            return Some(path.split_at(last_separator));
+        }
+
+        if std::path::is_separator(char) {
+            last_separator = byte_idx + char.len_utf8();
+        }
+    }
+    None
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_prefix_path() {
+        let root = std::env::current_dir().unwrap();
+        let root = root.to_string_lossy();
+
+        let url = ListingTableUrl::parse(&root).unwrap();
+        let child = format!("{}/partition/file", root);
+
+        let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
+        assert_eq!(prefix, vec!["partition", "file"]);
+    }
+
+    #[test]
+    fn test_prefix_s3() {
+        let url = ListingTableUrl::parse("s3://bucket/foo/bar").unwrap();
+        assert_eq!(url.prefix(), "bucket/foo/bar");
+
+        let path = "bucket/foo/bar/partition/foo.parquet";
+        let prefix: Vec<_> = url.strip_prefix(path).unwrap().collect();
+        assert_eq!(prefix, vec!["partition", "foo.parquet"]);

Review Comment:
   maybe also a negative test (like what happens if `path = "other-bucket/foo/bar/partition/foo.parquet"`?



##########
datafusion/core/Cargo.toml:
##########
@@ -67,7 +67,9 @@ datafusion-physical-expr = { path = "../physical-expr", version = "8.0.0" }
 datafusion-row = { path = "../row", version = "8.0.0" }
 datafusion-sql = { path = "../sql", version = "8.0.0" }
 futures = "0.3"
+glob = "0.3.0"
 hashbrown = { version = "0.12", features = ["raw"] }
+itertools = "0.10"

Review Comment:
   But datafusion doesn't depend on prost, right? If we could avoid another dependency that would be goo in my opinion



##########
datafusion/core/src/datasource/object_store.rs:
##########
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! ObjectStoreRegistry holds all the object stores at Runtime with a scheme for each store.
+//! This allows the user to extend DataFusion with different storage systems such as S3 or HDFS
+//! and query data inside these systems.
+
+use datafusion_common::{DataFusionError, Result};
+use datafusion_data_access::object_store::local::{LocalFileSystem, LOCAL_SCHEME};
+use datafusion_data_access::object_store::ObjectStore;
+use parking_lot::RwLock;
+use std::collections::HashMap;
+use std::sync::Arc;
+use url::Url;
+
+/// A parsed URL identifying a particular [`ObjectStore`]
+#[derive(Debug, Clone)]
+pub struct ObjectStoreUrl {
+    url: Url,
+}
+
+impl ObjectStoreUrl {
+    /// Parse an [`ObjectStoreUrl`] from a string
+    pub fn parse(s: impl AsRef<str>) -> Result<Self> {
+        let mut parsed =
+            Url::parse(s.as_ref()).map_err(|e| DataFusionError::External(Box::new(e)))?;
+
+        let remaining = &parsed[url::Position::BeforePath..];
+        if !remaining.is_empty() && remaining != "/" {
+            return Err(DataFusionError::Execution(format!(
+                "ObjectStoreUrl must only contain scheme and authority, got: {}",
+                remaining
+            )));
+        }
+
+        // Always set path for consistency
+        parsed.set_path("/");
+        Ok(Self { url: parsed })
+    }
+
+    /// An [`ObjectStoreUrl`] for the local filesystem
+    pub fn local_filesystem() -> Self {
+        Self::parse("file://").unwrap()
+    }
+
+    /// Returns this [`ObjectStoreUrl`] as a string
+    pub fn as_str(&self) -> &str {
+        self.as_ref()
+    }
+}
+
+impl AsRef<str> for ObjectStoreUrl {
+    fn as_ref(&self) -> &str {
+        self.url.as_ref()
+    }
+}
+
+impl AsRef<Url> for ObjectStoreUrl {
+    fn as_ref(&self) -> &Url {
+        &self.url
+    }
+}
+
+impl std::fmt::Display for ObjectStoreUrl {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        self.as_str().fmt(f)
+    }
+}
+
+/// Object store registry
+pub struct ObjectStoreRegistry {
+    /// A map from scheme to object store that serve list / read operations for the store
+    pub object_stores: RwLock<HashMap<String, Arc<dyn ObjectStore>>>,
+}
+
+impl std::fmt::Debug for ObjectStoreRegistry {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        f.debug_struct("ObjectStoreRegistry")
+            .field(
+                "schemes",
+                &self.object_stores.read().keys().collect::<Vec<_>>(),
+            )
+            .finish()
+    }
+}
+
+impl Default for ObjectStoreRegistry {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl ObjectStoreRegistry {
+    /// Create the registry that object stores can registered into.
+    /// ['LocalFileSystem'] store is registered in by default to support read local files natively.
+    pub fn new() -> Self {
+        let mut map: HashMap<String, Arc<dyn ObjectStore>> = HashMap::new();
+        map.insert(LOCAL_SCHEME.to_string(), Arc::new(LocalFileSystem));
+
+        Self {
+            object_stores: RwLock::new(map),
+        }
+    }
+
+    /// Adds a new store to this registry.
+    /// If a store of the same prefix existed before, it is replaced in the registry and returned.
+    pub fn register_store(
+        &self,
+        scheme: String,
+        store: Arc<dyn ObjectStore>,
+    ) -> Option<Arc<dyn ObjectStore>> {
+        let mut stores = self.object_stores.write();
+        stores.insert(scheme, store)
+    }
+
+    /// Get the store registered for scheme
+    pub fn get(&self, scheme: &str) -> Option<Arc<dyn ObjectStore>> {
+        let stores = self.object_stores.read();
+        stores.get(scheme).cloned()
+    }
+
+    /// Get a suitable store for the provided URL. For example:
+    ///
+    /// - URL with scheme `file://` or no schema will return the default LocalFS store
+    /// - URL with scheme `s3://` will return the S3 store if it's registered
+    ///
+    pub fn get_by_url(&self, url: impl AsRef<Url>) -> Result<Arc<dyn ObjectStore>> {
+        let url = url.as_ref();
+        let stores = self.object_stores.read();
+        let store = stores.get(url.scheme()).map(Clone::clone).ok_or_else(|| {
+            DataFusionError::Internal(format!(
+                "No suitable object store found for {}",
+                url
+            ))
+        })?;
+
+        Ok(store)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::datasource::listing::ListingTableUrl;
+    use datafusion_data_access::object_store::local::LocalFileSystem;
+    use std::sync::Arc;
+
+    #[test]
+    fn test_object_store_url() {
+        let listing = ListingTableUrl::parse("file:///").unwrap();
+        let store = listing.object_store();
+        assert_eq!(store.as_str(), "file:///");
+
+        let file = ObjectStoreUrl::parse("file://").unwrap();
+        assert_eq!(file.as_str(), "file:///");
+
+        let listing = ListingTableUrl::parse("s3://bucket/").unwrap();
+        let store = listing.object_store();
+        assert_eq!(store.as_str(), "s3://bucket/");
+
+        let url = ObjectStoreUrl::parse("s3://bucket").unwrap();
+        assert_eq!(url.as_str(), "s3://bucket/");
+
+        let err = ObjectStoreUrl::parse("s3://bucket?").unwrap_err();
+        assert_eq!(err.to_string(), "Execution error: ObjectStoreUrl must only contain scheme and authority, got: ?");
+
+        let err = ObjectStoreUrl::parse("s3://bucket?foo=bar").unwrap_err();

Review Comment:
   I think it is also worth checking with username and passwords in the URL:
   
   s3://host:port/foo
   s3://username@host:port/foo
   s3://username:password@host:port/foo
   



##########
datafusion/core/src/datasource/object_store.rs:
##########
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! ObjectStoreRegistry holds all the object stores at Runtime with a scheme for each store.

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2578: Extract Listing URI logic into ListingTableUri structure

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2578:
URL: https://github.com/apache/arrow-datafusion/pull/2578#discussion_r877009325


##########
ballista/rust/core/src/serde/physical_plan/mod.rs:
##########
@@ -1029,14 +1029,10 @@ fn decode_scan_config(
         .map(|f| f.try_into())
         .collect::<Result<Vec<_>, _>>()?;
 
-    let object_store = if let Some(file) = file_groups.get(0).and_then(|h| h.get(0)) {
-        runtime.object_store(file.file_meta.path())?.0
-    } else {
-        Arc::new(LocalFileSystem {})
-    };
+    // TODO: This will not roundtrip object storage correctly as it loses the scheme

Review Comment:
   I'm not really sure how to handle this, as at this point the scheme has been stripped from the paths. I suspect something else needs to be encoded in FileScanExecConf - but I'm not very familiar with this code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2578: Extract Listing URI logic into ListingTableUri structure

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2578:
URL: https://github.com/apache/arrow-datafusion/pull/2578#discussion_r883525171


##########
datafusion/core/tests/sql/explain_analyze.rs:
##########
@@ -642,7 +642,7 @@ order by
 #[tokio::test]
 async fn test_physical_plan_display_indent() {
     // Hard code target_partitions as it appears in the RepartitionExec output
-    let config = SessionConfig::new().with_target_partitions(3);
+    let config = SessionConfig::new().with_target_partitions(9000);

Review Comment:
   This was causing an amusing issue, as the mac runner happened to have 3 CPUs, it would replace the 3 with NUM_CPUS :facepalm: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2578: Extract Listing URI logic into ListingTableUri structure

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2578:
URL: https://github.com/apache/arrow-datafusion/pull/2578#discussion_r877048813


##########
ballista/rust/core/src/serde/physical_plan/mod.rs:
##########
@@ -1029,14 +1029,10 @@ fn decode_scan_config(
         .map(|f| f.try_into())
         .collect::<Result<Vec<_>, _>>()?;
 
-    let object_store = if let Some(file) = file_groups.get(0).and_then(|h| h.get(0)) {
-        runtime.object_store(file.file_meta.path())?.0
-    } else {
-        Arc::new(LocalFileSystem {})
-    };
+    // TODO: This will not roundtrip object storage correctly as it loses the scheme

Review Comment:
   Yeah, I think it should probably round-trip scheme and host explicitly and use this to determine the object store. Currently ObjectStoreRegistry is a tad confused w.r.t. host, but its something that I intend to fix soon so would be good to cover it as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2578: Extract Listing URI logic into ListingTableUri structure

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2578:
URL: https://github.com/apache/arrow-datafusion/pull/2578#discussion_r884745100


##########
datafusion/core/tests/sql/explain_analyze.rs:
##########
@@ -642,7 +642,7 @@ order by
 #[tokio::test]
 async fn test_physical_plan_display_indent() {
     // Hard code target_partitions as it appears in the RepartitionExec output
-    let config = SessionConfig::new().with_target_partitions(3);
+    let config = SessionConfig::new().with_target_partitions(9000);

Review Comment:
   Here is another way it is handled ("explain normalizer"): 
   
   
   https://github.com/tustvold/arrow-datafusion/blob/url-refactor/datafusion/core/tests/sql/mod.rs#L820-L858



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2578: Extract Listing URI logic into ListingTableUri structure

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2578:
URL: https://github.com/apache/arrow-datafusion/pull/2578#discussion_r884793962


##########
datafusion/core/src/datasource/listing/helpers.rs:
##########
@@ -601,50 +557,29 @@ mod tests {
         assert_eq!(
             None,
             parse_partitions_for_path(
-                "bucket/mytable",
+                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
                 "bucket/mytable/v1/file.csv",
                 &[String::from("mypartition")]
             )
         );
         assert_eq!(
             Some(vec!["v1", "v2"]),
             parse_partitions_for_path(
-                "bucket/mytable",
+                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
                 "bucket/mytable/mypartition=v1/otherpartition=v2/file.csv",
                 &[String::from("mypartition"), String::from("otherpartition")]
             )
         );
         assert_eq!(
             Some(vec!["v1"]),
             parse_partitions_for_path(
-                "bucket/mytable",
+                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
                 "bucket/mytable/mypartition=v1/otherpartition=v2/file.csv",
                 &[String::from("mypartition")]
             )
         );
     }
 
-    #[cfg(target_os = "windows")]

Review Comment:
   An earlier version normalized paths, it still does but is not strictly enforced. Will keep until replaced with Path abstraction from `object_store`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2578: Extract Listing URI logic into ListingTableUri structure

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2578:
URL: https://github.com/apache/arrow-datafusion/pull/2578#discussion_r877010770


##########
datafusion/core/src/datasource/listing/path.rs:
##########
@@ -0,0 +1,217 @@
+use datafusion_common::{DataFusionError, Result};
+use datafusion_data_access::object_store::ObjectStore;
+use datafusion_data_access::FileMeta;
+use futures::stream::BoxStream;
+use futures::{StreamExt, TryStreamExt};
+use glob::Pattern;
+use url::Url;
+
+/// A parsed URL identifying files for a listing table, see [`ListingTableUrl::parse`]
+/// for more information on the supported expressions
+#[derive(Debug, Clone)]
+pub struct ListingTableUrl {
+    /// A URL that identifies a file or directory to list files from
+    url: Url,
+    /// An optional glob expression used to filter files
+    glob: Option<Pattern>,
+}
+
+impl ListingTableUrl {
+    /// Parse a provided string as a `ListingTableUrl`
+    ///
+    /// # Paths without a Scheme
+    ///
+    /// If no scheme is provided, the string will be interpreted as a
+    /// path on the local filesystem, using the operating system's
+    /// standard path delimiter - i.e. `\` on Windows, `/` on Unix.
+    ///
+    /// If the path contains any of `'?', '*', '['`, it will be considered
+    /// a glob expression and resolved as described in the section below.
+    ///
+    /// Otherwise, the path will be resolved to an absolute path, returning
+    /// an error if it does not exist, and converted to a [file URI]
+    ///
+    /// If you wish to specify a path that does not exist on the local
+    /// machine you must provide it as a fully-qualified [file URI]
+    /// e.g. `file:///myfile.txt`
+    ///
+    /// ## Glob Paths
+    ///
+    /// If no scheme is provided, and the path contains a glob expression, it will
+    /// be resolved as follows.
+    ///
+    /// The string up to the first path segment containing a glob expression will be extracted,
+    /// and resolved in the same manner as a normal scheme-less path. That is, resolved to
+    /// an absolute path on the local filesystem, returning an error if it does not exist,
+    /// and converted to a [file URI]
+    ///
+    /// The remaining string will be interpreted as a [`glob::Pattern`] and used as a
+    /// filter when listing files from object storage
+    ///
+    /// [file URI]: https://en.wikipedia.org/wiki/File_URI_scheme
+    pub fn parse(s: impl AsRef<str>) -> Result<Self> {
+        let s = s.as_ref();
+        Ok(match Url::parse(s) {
+            Ok(url) => Self { url, glob: None },
+            Err(url::ParseError::RelativeUrlWithoutBase) => {
+                let (prefix, glob) = match split_glob_expression(s) {
+                    Some((prefix, glob)) => {
+                        let glob = Pattern::new(glob)
+                            .map_err(|e| DataFusionError::External(Box::new(e)))?;
+                        (prefix, Some(glob))
+                    }
+                    None => (s, None),
+                };
+
+                let path = std::path::Path::new(prefix).canonicalize()?;
+                let url = match path.is_file() {
+                    true => Url::from_file_path(path).unwrap(),
+                    false => Url::from_directory_path(path).unwrap(),
+                };
+
+                Self { url, glob }
+            }
+            Err(e) => return Err(DataFusionError::External(Box::new(e))),
+        })
+    }
+
+    /// Returns the URL scheme
+    pub fn scheme(&self) -> &str {
+        self.url.scheme()
+    }
+
+    /// Returns the path as expected by [`ObjectStore`]
+    ///
+    /// In particular for file scheme URLs, this has a leading `/`
+    /// and describes an absolute path on the local filesystem
+    ///
+    /// For other URLs, this also contains the host component
+    /// and lacks a leading `/`
+    ///
+    /// TODO: Handle paths consistently (#2489)
+    fn prefix(&self) -> &str {

Review Comment:
   As part of #2489 I hope to bring some consistency to what the paths passed to the ObjectStore actually are. It's currently a bit confused



##########
datafusion/core/src/datasource/listing/path.rs:
##########
@@ -0,0 +1,217 @@
+use datafusion_common::{DataFusionError, Result};
+use datafusion_data_access::object_store::ObjectStore;
+use datafusion_data_access::FileMeta;
+use futures::stream::BoxStream;
+use futures::{StreamExt, TryStreamExt};
+use glob::Pattern;
+use url::Url;
+
+/// A parsed URL identifying files for a listing table, see [`ListingTableUrl::parse`]
+/// for more information on the supported expressions
+#[derive(Debug, Clone)]
+pub struct ListingTableUrl {
+    /// A URL that identifies a file or directory to list files from
+    url: Url,
+    /// An optional glob expression used to filter files
+    glob: Option<Pattern>,
+}
+
+impl ListingTableUrl {
+    /// Parse a provided string as a `ListingTableUrl`
+    ///
+    /// # Paths without a Scheme
+    ///
+    /// If no scheme is provided, the string will be interpreted as a
+    /// path on the local filesystem, using the operating system's
+    /// standard path delimiter - i.e. `\` on Windows, `/` on Unix.
+    ///
+    /// If the path contains any of `'?', '*', '['`, it will be considered
+    /// a glob expression and resolved as described in the section below.
+    ///
+    /// Otherwise, the path will be resolved to an absolute path, returning
+    /// an error if it does not exist, and converted to a [file URI]
+    ///
+    /// If you wish to specify a path that does not exist on the local
+    /// machine you must provide it as a fully-qualified [file URI]
+    /// e.g. `file:///myfile.txt`
+    ///
+    /// ## Glob Paths
+    ///
+    /// If no scheme is provided, and the path contains a glob expression, it will
+    /// be resolved as follows.
+    ///
+    /// The string up to the first path segment containing a glob expression will be extracted,
+    /// and resolved in the same manner as a normal scheme-less path. That is, resolved to
+    /// an absolute path on the local filesystem, returning an error if it does not exist,
+    /// and converted to a [file URI]
+    ///
+    /// The remaining string will be interpreted as a [`glob::Pattern`] and used as a
+    /// filter when listing files from object storage
+    ///
+    /// [file URI]: https://en.wikipedia.org/wiki/File_URI_scheme
+    pub fn parse(s: impl AsRef<str>) -> Result<Self> {
+        let s = s.as_ref();
+        Ok(match Url::parse(s) {
+            Ok(url) => Self { url, glob: None },
+            Err(url::ParseError::RelativeUrlWithoutBase) => {
+                let (prefix, glob) = match split_glob_expression(s) {
+                    Some((prefix, glob)) => {
+                        let glob = Pattern::new(glob)
+                            .map_err(|e| DataFusionError::External(Box::new(e)))?;
+                        (prefix, Some(glob))
+                    }
+                    None => (s, None),
+                };
+
+                let path = std::path::Path::new(prefix).canonicalize()?;
+                let url = match path.is_file() {
+                    true => Url::from_file_path(path).unwrap(),
+                    false => Url::from_directory_path(path).unwrap(),
+                };
+
+                Self { url, glob }
+            }
+            Err(e) => return Err(DataFusionError::External(Box::new(e))),
+        })
+    }
+
+    /// Returns the URL scheme
+    pub fn scheme(&self) -> &str {
+        self.url.scheme()
+    }
+
+    /// Returns the path as expected by [`ObjectStore`]
+    ///
+    /// In particular for file scheme URLs, this has a leading `/`
+    /// and describes an absolute path on the local filesystem
+    ///
+    /// For other URLs, this also contains the host component
+    /// and lacks a leading `/`
+    ///
+    /// TODO: Handle paths consistently (#2489)
+    fn prefix(&self) -> &str {
+        match self.scheme() {
+            "file" => self.url.path(),
+            _ => &self.url[url::Position::BeforeHost..url::Position::AfterPath],
+        }
+    }
+
+    /// Strips the prefix of this [`ListingTableUrl`] from the provided path, returning
+    /// an iterator of the remaining path segments
+    pub(crate) fn strip_prefix<'a, 'b: 'a>(
+        &'a self,
+        path: &'b str,
+    ) -> Option<impl Iterator<Item = &'b str> + 'a> {
+        // Ignore empty path segments
+        let diff = itertools::diff_with(
+            path.split('/').filter(|s| !s.is_empty()),
+            self.prefix().split('/').filter(|s| !s.is_empty()),
+            |a, b| a == b,
+        );
+
+        match diff {
+            // Match with remaining
+            Some(itertools::Diff::Shorter(_, subpath)) => Some(subpath),
+            _ => None,
+        }
+    }
+
+    /// List all files identified by this [`ListingTableUrl`] for the provided `file_extension`
+    pub(crate) fn list_all_files<'a>(

Review Comment:
   This combines the various different ObjectStore listing methods into a single method. This is clearer and more consistent



##########
datafusion/core/src/catalog/schema.rs:
##########
@@ -157,10 +157,7 @@ impl ObjectStoreSchemaProvider {
     }
 
     /// Retrieves a `ObjectStore` instance by scheme
-    pub fn object_store<'a>(
-        &self,
-        uri: &'a str,
-    ) -> Result<(Arc<dyn ObjectStore>, &'a str)> {
+    pub fn object_store(&self, uri: &ListingTableUrl) -> Result<Arc<dyn ObjectStore>> {

Review Comment:
   The scheme stripping logic is now part of ListingTableUrl



##########
datafusion/core/src/test_util.rs:
##########
@@ -203,7 +203,7 @@ fn get_data_dir(udf_env: &str, submodule_data: &str) -> Result<PathBuf, Box<dyn
         if !trimmed.is_empty() {
             let pb = PathBuf::from(trimmed);
             if pb.is_dir() {
-                return Ok(pb);
+                return Ok(pb.canonicalize().unwrap());

Review Comment:
   This is necessary for normalize_for_explain to work correctly



##########
datafusion/core/src/lib.rs:
##########
@@ -52,11 +52,11 @@
 //!    .to_string();
 //!
 //! let expected = vec![
-//!     "+---+--------------------------+",
-//!     "| a | MIN(tests/example.csv.b) |",
-//!     "+---+--------------------------+",
-//!     "| 1 | 2                        |",
-//!     "+---+--------------------------+"
+//!     "+---+----------------+",
+//!     "| a | MIN(?table?.b) |",

Review Comment:
   I'm not entirely sure what change caused this... I wonder if we aren't running doctests in CI :thinking: 



##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -504,7 +504,7 @@ mod tests {
                 "bucket/key-prefix/file3",
                 "bucket/key-prefix/file4",
             ],
-            "bucket/key-prefix/",
+            "file:///bucket/key-prefix/",

Review Comment:
   These files don't actually exist, so we need to fully qualify the URLs



##########
datafusion/data-access/src/object_store/mod.rs:
##########
@@ -137,144 +85,3 @@ pub trait ObjectStore: Sync + Send + Debug {
     /// Get object reader for one file
     fn file_reader(&self, file: SizedFile) -> Result<Arc<dyn ObjectReader>>;
 }
-
-/// Normalize a path without requiring it to exist on the filesystem (path::canonicalize)
-pub fn normalize_path<P: AsRef<Path>>(path: P) -> PathBuf {
-    let ends_with_slash = path
-        .as_ref()
-        .to_str()
-        .map_or(false, |s| s.ends_with(path::MAIN_SEPARATOR));
-    let mut normalized = PathBuf::new();
-    for component in path.as_ref().components() {
-        match &component {
-            Component::ParentDir => {
-                if !normalized.pop() {
-                    normalized.push(component);
-                }
-            }
-            _ => {
-                normalized.push(component);
-            }
-        }
-    }
-    if ends_with_slash {
-        normalized.push("");
-    }
-    normalized
-}
-
-const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];
-
-/// Determine whether the path contains a globbing character
-fn contains_glob_start_char(path: &str) -> bool {
-    path.chars().any(|c| GLOB_START_CHARS.contains(&c))
-}
-
-/// Filters the file_stream to only contain files that end with suffix
-fn filter_suffix(file_stream: FileMetaStream, suffix: &str) -> Result<FileMetaStream> {
-    let suffix = suffix.to_owned();
-    Ok(Box::pin(
-        file_stream.try_filter(move |f| ready(f.path().ends_with(&suffix))),
-    ))
-}
-
-fn find_longest_search_path_without_glob_pattern(glob_pattern: &str) -> String {
-    // in case the glob_pattern is not actually a glob pattern, take the entire thing
-    if !contains_glob_start_char(glob_pattern) {
-        glob_pattern.to_string()
-    } else {
-        // take all the components of the path (left-to-right) which do not contain a glob pattern
-        let components_in_glob_pattern = Path::new(glob_pattern).components();
-        let mut path_buf_for_longest_search_path_without_glob_pattern = PathBuf::new();
-        for component_in_glob_pattern in components_in_glob_pattern {
-            let component_as_str =
-                component_in_glob_pattern.as_os_str().to_str().unwrap();
-            if contains_glob_start_char(component_as_str) {
-                break;
-            }
-            path_buf_for_longest_search_path_without_glob_pattern
-                .push(component_in_glob_pattern);
-        }
-
-        let mut result = path_buf_for_longest_search_path_without_glob_pattern
-            .to_str()
-            .unwrap()
-            .to_string();
-
-        // when we're not at the root, append a separator
-        if path_buf_for_longest_search_path_without_glob_pattern
-            .components()
-            .count()
-            > 1
-        {
-            result.push(path::MAIN_SEPARATOR);
-        }
-        result
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-
-    #[tokio::test]
-    async fn test_is_glob_path() -> Result<()> {
-        assert!(!contains_glob_start_char("/"));
-        assert!(!contains_glob_start_char("/test"));
-        assert!(!contains_glob_start_char("/test/"));
-        assert!(contains_glob_start_char("/test*"));
-        Ok(())
-    }
-
-    fn test_longest_base_path(input: &str, expected: &str) {
-        assert_eq!(
-            find_longest_search_path_without_glob_pattern(input),
-            expected,
-            "testing find_longest_search_path_without_glob_pattern with {}",
-            input
-        );
-    }
-
-    #[tokio::test]
-    async fn test_find_longest_search_path_without_glob_pattern() -> Result<()> {

Review Comment:
   This test is reformulated and moved to ListingTableUrl



##########
datafusion/data-access/src/object_store/mod.rs:
##########
@@ -78,54 +74,6 @@ pub trait ObjectStore: Sync + Send + Debug {
     /// Returns all the files in path `prefix`
     async fn list_file(&self, prefix: &str) -> Result<FileMetaStream>;
 
-    /// Calls `list_file` with a suffix filter

Review Comment:
   This logic is now encapsulated in ListingTableUrl



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2578: Extract Listing URI logic into ListingTableUri structure

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2578:
URL: https://github.com/apache/arrow-datafusion/pull/2578#discussion_r883601666


##########
datafusion/core/Cargo.toml:
##########
@@ -85,6 +87,7 @@ sqlparser = "0.17"
 tempfile = "3"
 tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] }
 tokio-stream = "0.1"
+url = "2.2"

Review Comment:
   This is a new dependency, it is brought in by hyper and friends and so again is pretty ubiquitous



##########
datafusion/core/Cargo.toml:
##########
@@ -67,7 +67,9 @@ datafusion-physical-expr = { path = "../physical-expr", version = "8.0.0" }
 datafusion-row = { path = "../row", version = "8.0.0" }
 datafusion-sql = { path = "../sql", version = "8.0.0" }
 futures = "0.3"
+glob = "0.3.0"

Review Comment:
   This is moved from data_access



##########
datafusion/core/Cargo.toml:
##########
@@ -67,7 +67,9 @@ datafusion-physical-expr = { path = "../physical-expr", version = "8.0.0" }
 datafusion-row = { path = "../row", version = "8.0.0" }
 datafusion-sql = { path = "../sql", version = "8.0.0" }
 futures = "0.3"
+glob = "0.3.0"
 hashbrown = { version = "0.12", features = ["raw"] }
+itertools = "0.10"

Review Comment:
   This is technically a new dependency of this crate, but it is so ubiquitous and is a dependency of prost, etc.. so this is probably ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] thinkharderdev commented on pull request #2578: Extract Listing URI logic into ListingTableUri structure

Posted by GitBox <gi...@apache.org>.
thinkharderdev commented on PR #2578:
URL: https://github.com/apache/arrow-datafusion/pull/2578#issuecomment-1131700909

   Aside from the serialization issue we discussed above, I think this looks great. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2578: Extract Listing URI logic into ListingTableUri structure

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2578:
URL: https://github.com/apache/arrow-datafusion/pull/2578#discussion_r884798823


##########
datafusion/core/src/datasource/object_store.rs:
##########
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! ObjectStoreRegistry holds all the object stores at Runtime with a scheme for each store.
+//! This allows the user to extend DataFusion with different storage systems such as S3 or HDFS
+//! and query data inside these systems.
+
+use datafusion_common::{DataFusionError, Result};
+use datafusion_data_access::object_store::local::{LocalFileSystem, LOCAL_SCHEME};
+use datafusion_data_access::object_store::ObjectStore;
+use parking_lot::RwLock;
+use std::collections::HashMap;
+use std::sync::Arc;
+use url::Url;
+
+/// A parsed URL identifying a particular [`ObjectStore`]
+#[derive(Debug, Clone)]
+pub struct ObjectStoreUrl {
+    url: Url,
+}
+
+impl ObjectStoreUrl {
+    /// Parse an [`ObjectStoreUrl`] from a string
+    pub fn parse(s: impl AsRef<str>) -> Result<Self> {
+        let mut parsed =
+            Url::parse(s.as_ref()).map_err(|e| DataFusionError::External(Box::new(e)))?;
+
+        let remaining = &parsed[url::Position::BeforePath..];
+        if !remaining.is_empty() && remaining != "/" {
+            return Err(DataFusionError::Execution(format!(
+                "ObjectStoreUrl must only contain scheme and authority, got: {}",
+                remaining
+            )));
+        }
+
+        // Always set path for consistency
+        parsed.set_path("/");
+        Ok(Self { url: parsed })
+    }
+
+    /// An [`ObjectStoreUrl`] for the local filesystem
+    pub fn local_filesystem() -> Self {
+        Self::parse("file://").unwrap()
+    }
+
+    /// Returns this [`ObjectStoreUrl`] as a string
+    pub fn as_str(&self) -> &str {
+        self.as_ref()
+    }
+}
+
+impl AsRef<str> for ObjectStoreUrl {
+    fn as_ref(&self) -> &str {
+        self.url.as_ref()
+    }
+}
+
+impl AsRef<Url> for ObjectStoreUrl {
+    fn as_ref(&self) -> &Url {
+        &self.url
+    }
+}
+
+impl std::fmt::Display for ObjectStoreUrl {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        self.as_str().fmt(f)
+    }
+}
+
+/// Object store registry
+pub struct ObjectStoreRegistry {
+    /// A map from scheme to object store that serve list / read operations for the store
+    pub object_stores: RwLock<HashMap<String, Arc<dyn ObjectStore>>>,
+}
+
+impl std::fmt::Debug for ObjectStoreRegistry {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        f.debug_struct("ObjectStoreRegistry")
+            .field(
+                "schemes",
+                &self.object_stores.read().keys().collect::<Vec<_>>(),
+            )
+            .finish()
+    }
+}
+
+impl Default for ObjectStoreRegistry {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl ObjectStoreRegistry {
+    /// Create the registry that object stores can registered into.
+    /// ['LocalFileSystem'] store is registered in by default to support read local files natively.
+    pub fn new() -> Self {
+        let mut map: HashMap<String, Arc<dyn ObjectStore>> = HashMap::new();
+        map.insert(LOCAL_SCHEME.to_string(), Arc::new(LocalFileSystem));
+
+        Self {
+            object_stores: RwLock::new(map),
+        }
+    }
+
+    /// Adds a new store to this registry.
+    /// If a store of the same prefix existed before, it is replaced in the registry and returned.
+    pub fn register_store(
+        &self,
+        scheme: String,
+        store: Arc<dyn ObjectStore>,
+    ) -> Option<Arc<dyn ObjectStore>> {
+        let mut stores = self.object_stores.write();
+        stores.insert(scheme, store)
+    }
+
+    /// Get the store registered for scheme
+    pub fn get(&self, scheme: &str) -> Option<Arc<dyn ObjectStore>> {
+        let stores = self.object_stores.read();
+        stores.get(scheme).cloned()
+    }
+
+    /// Get a suitable store for the provided URL. For example:
+    ///
+    /// - URL with scheme `file://` or no schema will return the default LocalFS store
+    /// - URL with scheme `s3://` will return the S3 store if it's registered
+    ///
+    pub fn get_by_url(&self, url: impl AsRef<Url>) -> Result<Arc<dyn ObjectStore>> {
+        let url = url.as_ref();
+        let stores = self.object_stores.read();
+        let store = stores.get(url.scheme()).map(Clone::clone).ok_or_else(|| {

Review Comment:
   Will update to use the member function



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2578: Extract Listing URI logic into ListingTableUri structure

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2578:
URL: https://github.com/apache/arrow-datafusion/pull/2578#discussion_r884785430


##########
datafusion/core/src/datasource/listing/path.rs:
##########
@@ -0,0 +1,304 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::datasource::object_store::ObjectStoreUrl;
+use datafusion_common::{DataFusionError, Result};
+use datafusion_data_access::object_store::ObjectStore;
+use datafusion_data_access::FileMeta;
+use futures::stream::BoxStream;
+use futures::{StreamExt, TryStreamExt};
+use glob::Pattern;
+use itertools::Itertools;
+use std::path::is_separator;
+use url::Url;
+
+/// A parsed URL identifying files for a listing table, see [`ListingTableUrl::parse`]
+/// for more information on the supported expressions
+#[derive(Debug, Clone)]
+pub struct ListingTableUrl {
+    /// A URL that identifies a file or directory to list files from
+    url: Url,
+    /// An optional glob expression used to filter files
+    glob: Option<Pattern>,
+}
+
+impl ListingTableUrl {
+    /// Parse a provided string as a `ListingTableUrl`
+    ///
+    /// # Paths without a Scheme
+    ///
+    /// If no scheme is provided, or the string is an absolute filesystem path
+    /// as determined [`std::path::Path::is_absolute`], the string will be
+    /// interpreted as a path on the local filesystem using the operating
+    /// system's standard path delimiter, i.e. `\` on Windows, `/` on Unix.
+    ///
+    /// If the path contains any of `'?', '*', '['`, it will be considered
+    /// a glob expression and resolved as described in the section below.
+    ///
+    /// Otherwise, the path will be resolved to an absolute path, returning
+    /// an error if it does not exist, and converted to a [file URI]
+    ///
+    /// If you wish to specify a path that does not exist on the local
+    /// machine you must provide it as a fully-qualified [file URI]
+    /// e.g. `file:///myfile.txt`
+    ///
+    /// ## Glob Paths
+    ///
+    /// If no scheme is provided, and the path contains a glob expression, it will
+    /// be resolved as follows.
+    ///
+    /// The string up to the first path segment containing a glob expression will be extracted,
+    /// and resolved in the same manner as a normal scheme-less path. That is, resolved to
+    /// an absolute path on the local filesystem, returning an error if it does not exist,
+    /// and converted to a [file URI]
+    ///
+    /// The remaining string will be interpreted as a [`glob::Pattern`] and used as a
+    /// filter when listing files from object storage
+    ///
+    /// [file URI]: https://en.wikipedia.org/wiki/File_URI_scheme
+    pub fn parse(s: impl AsRef<str>) -> Result<Self> {
+        let s = s.as_ref();
+
+        // This is necessary to handle the case of a path starting with a drive letter
+        if std::path::Path::new(s).is_absolute() {
+            return Self::parse_path(s);
+        }
+
+        match Url::parse(s) {
+            Ok(url) => Ok(Self { url, glob: None }),
+            Err(url::ParseError::RelativeUrlWithoutBase) => Self::parse_path(s),
+            Err(e) => Err(DataFusionError::External(Box::new(e))),
+        }
+    }
+
+    /// Creates a new [`ListingTableUrl`] interpreting `s` as a filesystem path
+    fn parse_path(s: &str) -> Result<Self> {
+        let (prefix, glob) = match split_glob_expression(s) {
+            Some((prefix, glob)) => {
+                let glob = Pattern::new(glob)
+                    .map_err(|e| DataFusionError::External(Box::new(e)))?;
+                (prefix, Some(glob))
+            }
+            None => (s, None),
+        };
+
+        let path = std::path::Path::new(prefix).canonicalize()?;
+        let url = match path.is_file() {
+            true => Url::from_file_path(path).unwrap(),
+            false => Url::from_directory_path(path).unwrap(),
+        };
+
+        Ok(Self { url, glob })
+    }
+
+    /// Returns the URL scheme
+    pub fn scheme(&self) -> &str {
+        self.url.scheme()
+    }
+
+    /// Returns the path as expected by [`ObjectStore`]
+    ///
+    /// In particular for file scheme URLs, this is an absolute
+    /// on the local filesystem in the OS-specific path representation
+    ///
+    /// For other URLs, this is a the host and path of the URL,
+    /// delimited by `/`, and with no leading `/`
+    ///
+    /// TODO: Handle paths consistently (#2489)
+    fn prefix(&self) -> &str {
+        match self.scheme() {
+            "file" => match cfg!(target_family = "windows") {
+                true => self.url.path().strip_prefix('/').unwrap(),
+                false => self.url.path(),
+            },
+            _ => &self.url[url::Position::BeforeHost..url::Position::AfterPath],
+        }
+    }
+
+    /// Strips the prefix of this [`ListingTableUrl`] from the provided path, returning
+    /// an iterator of the remaining path segments
+    ///
+    /// TODO: Handle paths consistently (#2489)
+    pub(crate) fn strip_prefix<'a, 'b: 'a>(
+        &'a self,
+        path: &'b str,
+    ) -> Option<impl Iterator<Item = &'b str> + 'a> {
+        let prefix = self.prefix();
+        // Ignore empty path segments
+        let diff = itertools::diff_with(
+            path.split(is_separator).filter(|s| !s.is_empty()),
+            prefix.split(is_separator).filter(|s| !s.is_empty()),
+            |a, b| a == b,
+        );
+
+        match diff {
+            // Match with remaining
+            Some(itertools::Diff::Shorter(_, subpath)) => Some(subpath),
+            _ => None,
+        }
+    }
+
+    /// List all files identified by this [`ListingTableUrl`] for the provided `file_extension`
+    pub(crate) fn list_all_files<'a>(
+        &'a self,
+        store: &'a dyn ObjectStore,
+        file_extension: &'a str,
+    ) -> BoxStream<'a, Result<FileMeta>> {
+        futures::stream::once(async move {
+            let prefix = self.prefix();
+            store.list_file(prefix.as_ref()).await
+        })
+        .try_flatten()
+        .map_err(DataFusionError::IoError)
+        .try_filter(move |meta| {
+            let path = meta.path();
+
+            let extension_match = path.ends_with(file_extension);
+            let glob_match = match &self.glob {
+                Some(glob) => match self.strip_prefix(path) {
+                    Some(mut segments) => {
+                        let stripped = segments.join("/");
+                        glob.matches(&stripped)
+                    }
+                    None => false,
+                },
+                None => true,
+            };
+
+            futures::future::ready(extension_match && glob_match)
+        })
+        .boxed()
+    }
+
+    /// Returns this [`ListingTableUrl`] as a string
+    pub fn as_str(&self) -> &str {
+        self.as_ref()
+    }
+
+    /// Return the [`ObjectStoreUrl`] for this [`ListingTableUrl`]
+    pub fn object_store(&self) -> ObjectStoreUrl {
+        let url = &self.url[url::Position::BeforeScheme..url::Position::BeforePath];
+        ObjectStoreUrl::parse(url).unwrap()
+    }
+}
+
+impl AsRef<str> for ListingTableUrl {
+    fn as_ref(&self) -> &str {
+        self.url.as_ref()
+    }
+}
+
+impl AsRef<Url> for ListingTableUrl {
+    fn as_ref(&self) -> &Url {
+        &self.url
+    }
+}
+
+impl std::fmt::Display for ListingTableUrl {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        self.as_str().fmt(f)
+    }
+}
+
+const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];
+
+/// Splits `path` at the first path segment containing a glob expression, returning
+/// `None` if no glob expression found.
+///
+/// Path delimiters are determined using [`std::path::is_separator`] which
+/// permits `/` as a path delimiter even on Windows platforms.
+///
+fn split_glob_expression(path: &str) -> Option<(&str, &str)> {
+    let mut last_separator = 0;
+
+    for (byte_idx, char) in path.char_indices() {
+        if GLOB_START_CHARS.contains(&char) {
+            if last_separator == 0 {
+                return Some((".", path));
+            }
+            return Some(path.split_at(last_separator));
+        }
+
+        if std::path::is_separator(char) {
+            last_separator = byte_idx + char.len_utf8();
+        }
+    }
+    None
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_prefix_path() {
+        let root = std::env::current_dir().unwrap();
+        let root = root.to_string_lossy();
+
+        let url = ListingTableUrl::parse(&root).unwrap();
+        let child = format!("{}/partition/file", root);
+
+        let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
+        assert_eq!(prefix, vec!["partition", "file"]);
+    }
+
+    #[test]
+    fn test_prefix_s3() {
+        let url = ListingTableUrl::parse("s3://bucket/foo/bar").unwrap();
+        assert_eq!(url.prefix(), "bucket/foo/bar");
+
+        let path = "bucket/foo/bar/partition/foo.parquet";
+        let prefix: Vec<_> = url.strip_prefix(path).unwrap().collect();
+        assert_eq!(prefix, vec!["partition", "foo.parquet"]);
+    }
+
+    #[test]
+    fn test_split_glob() {
+        fn test(input: &str, expected: Option<(&str, &str)>) {
+            assert_eq!(
+                split_glob_expression(input),
+                expected,
+                "testing split_glob_expression with {}",
+                input
+            );
+        }
+
+        // no glob patterns
+        test("/", None);
+        test("/a.txt", None);
+        test("/a", None);
+        test("/a/", None);
+        test("/a/b", None);
+        test("/a/b/", None);
+        test("/a/b.txt", None);
+        test("/a/b/c.txt", None);
+        // glob patterns, thus we build the longest path (os-specific)
+        test("*.txt", Some((".", "*.txt")));
+        test("/*.txt", Some(("/", "*.txt")));
+        test("/a/*b.txt", Some(("/a/", "*b.txt")));
+        test("/a/*/b.txt", Some(("/a/", "*/b.txt")));
+        test("/a/b/[123]/file*.txt", Some(("/a/b/", "[123]/file*.txt")));
+        test("/a/b*.txt", Some(("/a/", "b*.txt")));
+        test("/a/b/**/c*.txt", Some(("/a/b/", "**/c*.txt")));
+
+        // https://github.com/apache/arrow-datafusion/issues/2465

Review Comment:
   It is copied over from where this test was adapted from



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2578: Extract Listing URI logic into ListingTableUri structure

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2578:
URL: https://github.com/apache/arrow-datafusion/pull/2578#discussion_r884714926


##########
datafusion/core/src/datasource/object_store.rs:
##########
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! ObjectStoreRegistry holds all the object stores at Runtime with a scheme for each store.
+//! This allows the user to extend DataFusion with different storage systems such as S3 or HDFS
+//! and query data inside these systems.
+
+use datafusion_common::{DataFusionError, Result};
+use datafusion_data_access::object_store::local::{LocalFileSystem, LOCAL_SCHEME};
+use datafusion_data_access::object_store::ObjectStore;
+use parking_lot::RwLock;
+use std::collections::HashMap;
+use std::sync::Arc;
+use url::Url;
+
+/// A parsed URL identifying a particular [`ObjectStore`]
+#[derive(Debug, Clone)]
+pub struct ObjectStoreUrl {
+    url: Url,
+}
+
+impl ObjectStoreUrl {
+    /// Parse an [`ObjectStoreUrl`] from a string
+    pub fn parse(s: impl AsRef<str>) -> Result<Self> {
+        let mut parsed =
+            Url::parse(s.as_ref()).map_err(|e| DataFusionError::External(Box::new(e)))?;
+
+        let remaining = &parsed[url::Position::BeforePath..];
+        if !remaining.is_empty() && remaining != "/" {
+            return Err(DataFusionError::Execution(format!(
+                "ObjectStoreUrl must only contain scheme and authority, got: {}",
+                remaining
+            )));
+        }
+
+        // Always set path for consistency
+        parsed.set_path("/");
+        Ok(Self { url: parsed })
+    }
+
+    /// An [`ObjectStoreUrl`] for the local filesystem
+    pub fn local_filesystem() -> Self {
+        Self::parse("file://").unwrap()
+    }
+
+    /// Returns this [`ObjectStoreUrl`] as a string
+    pub fn as_str(&self) -> &str {
+        self.as_ref()
+    }
+}
+
+impl AsRef<str> for ObjectStoreUrl {
+    fn as_ref(&self) -> &str {
+        self.url.as_ref()
+    }
+}
+
+impl AsRef<Url> for ObjectStoreUrl {
+    fn as_ref(&self) -> &Url {
+        &self.url
+    }
+}
+
+impl std::fmt::Display for ObjectStoreUrl {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        self.as_str().fmt(f)
+    }
+}
+
+/// Object store registry
+pub struct ObjectStoreRegistry {

Review Comment:
   This is moved from object_store_registry



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2578: Extract Listing URI logic into ListingTableUri structure

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2578:
URL: https://github.com/apache/arrow-datafusion/pull/2578#discussion_r884715120


##########
datafusion/core/src/datasource/object_store_registry.rs:
##########
@@ -1,138 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! ObjectStoreRegistry holds all the object stores at Runtime with a scheme for each store.
-//! This allows the user to extend DataFusion with different storage systems such as S3 or HDFS
-//! and query data inside these systems.
-
-use datafusion_common::{DataFusionError, Result};
-use datafusion_data_access::object_store::local::{LocalFileSystem, LOCAL_SCHEME};
-use datafusion_data_access::object_store::ObjectStore;
-use parking_lot::RwLock;
-use std::collections::HashMap;
-use std::fmt;
-use std::sync::Arc;
-
-/// Object store registry
-pub struct ObjectStoreRegistry {

Review Comment:
   Moved to object_store module



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on pull request #2578: Extract Listing URI logic into ListingTableUri structure

Posted by GitBox <gi...@apache.org>.
tustvold commented on PR #2578:
URL: https://github.com/apache/arrow-datafusion/pull/2578#issuecomment-1140493117

   https://github.com/apache/arrow-datafusion/pull/2639 seems to have introduced logical conflicts... Fixing...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2578: Extract Listing URI logic into ListingTableUri structure

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2578:
URL: https://github.com/apache/arrow-datafusion/pull/2578#discussion_r884801557


##########
datafusion/core/src/datasource/listing/path.rs:
##########
@@ -0,0 +1,304 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::datasource::object_store::ObjectStoreUrl;
+use datafusion_common::{DataFusionError, Result};
+use datafusion_data_access::object_store::ObjectStore;
+use datafusion_data_access::FileMeta;
+use futures::stream::BoxStream;
+use futures::{StreamExt, TryStreamExt};
+use glob::Pattern;
+use itertools::Itertools;
+use std::path::is_separator;
+use url::Url;
+
+/// A parsed URL identifying files for a listing table, see [`ListingTableUrl::parse`]
+/// for more information on the supported expressions
+#[derive(Debug, Clone)]
+pub struct ListingTableUrl {
+    /// A URL that identifies a file or directory to list files from
+    url: Url,
+    /// An optional glob expression used to filter files
+    glob: Option<Pattern>,
+}
+
+impl ListingTableUrl {
+    /// Parse a provided string as a `ListingTableUrl`
+    ///
+    /// # Paths without a Scheme
+    ///
+    /// If no scheme is provided, or the string is an absolute filesystem path
+    /// as determined [`std::path::Path::is_absolute`], the string will be
+    /// interpreted as a path on the local filesystem using the operating
+    /// system's standard path delimiter, i.e. `\` on Windows, `/` on Unix.
+    ///
+    /// If the path contains any of `'?', '*', '['`, it will be considered

Review Comment:
   Updated header to be "Glob File Paths"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2578: Extract Listing URI logic into ListingTableUri structure

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2578:
URL: https://github.com/apache/arrow-datafusion/pull/2578#discussion_r884801124


##########
datafusion/core/src/datasource/listing/path.rs:
##########
@@ -0,0 +1,304 @@
+// Licensed to the Apache Software Foundation (ASF) under one

Review Comment:
   Heh, this was my trick to avoid a name collision with the url crate :laughing:



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on pull request #2578: Extract Listing URI logic into ListingTableUri structure

Posted by GitBox <gi...@apache.org>.
tustvold commented on PR #2578:
URL: https://github.com/apache/arrow-datafusion/pull/2578#issuecomment-1133067944

   > I am under the impression that support for globbing (almost as an edge-case) does not belong in here
   
   I don't disagree with this, but aside from not supporting globbing, I couldn't see another way to support it whilst also treating the argument as a URI, which is critical for object store support. Alternative suggestions welcome...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2578: Extract Listing URI logic into ListingTableUri structure

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2578:
URL: https://github.com/apache/arrow-datafusion/pull/2578#discussion_r877048813


##########
ballista/rust/core/src/serde/physical_plan/mod.rs:
##########
@@ -1029,14 +1029,10 @@ fn decode_scan_config(
         .map(|f| f.try_into())
         .collect::<Result<Vec<_>, _>>()?;
 
-    let object_store = if let Some(file) = file_groups.get(0).and_then(|h| h.get(0)) {
-        runtime.object_store(file.file_meta.path())?.0
-    } else {
-        Arc::new(LocalFileSystem {})
-    };
+    // TODO: This will not roundtrip object storage correctly as it loses the scheme

Review Comment:
   Yeah, I think it should probably round-trip scheme and host explicitly and use this to determine the object store. Currently ObjectStoreRegistry doesn't handle host consistently, but its something that I intend to fix soon so would be good to cover it as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2578: Extract Listing URI logic into ListingTableUri structure

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2578:
URL: https://github.com/apache/arrow-datafusion/pull/2578#discussion_r884786799


##########
datafusion/core/src/datasource/object_store.rs:
##########
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! ObjectStoreRegistry holds all the object stores at Runtime with a scheme for each store.
+//! This allows the user to extend DataFusion with different storage systems such as S3 or HDFS
+//! and query data inside these systems.
+
+use datafusion_common::{DataFusionError, Result};
+use datafusion_data_access::object_store::local::{LocalFileSystem, LOCAL_SCHEME};
+use datafusion_data_access::object_store::ObjectStore;
+use parking_lot::RwLock;
+use std::collections::HashMap;
+use std::sync::Arc;
+use url::Url;
+
+/// A parsed URL identifying a particular [`ObjectStore`]
+#[derive(Debug, Clone)]
+pub struct ObjectStoreUrl {
+    url: Url,
+}
+
+impl ObjectStoreUrl {
+    /// Parse an [`ObjectStoreUrl`] from a string
+    pub fn parse(s: impl AsRef<str>) -> Result<Self> {
+        let mut parsed =
+            Url::parse(s.as_ref()).map_err(|e| DataFusionError::External(Box::new(e)))?;
+
+        let remaining = &parsed[url::Position::BeforePath..];
+        if !remaining.is_empty() && remaining != "/" {
+            return Err(DataFusionError::Execution(format!(
+                "ObjectStoreUrl must only contain scheme and authority, got: {}",

Review Comment:
   Authority refers to the entire section, and may contain just a hostname, or also username, password, port, etc... I figured it made sense to hedge for future functionality



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2578: Extract Listing URI logic into ListingTableUri structure

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2578:
URL: https://github.com/apache/arrow-datafusion/pull/2578#discussion_r884336279


##########
datafusion/core/src/physical_plan/file_format/mod.rs:
##########
@@ -68,6 +70,8 @@ lazy_static! {
 pub struct FileScanConfig {
     /// Store from which the `files` should be fetched
     pub object_store: Arc<dyn ObjectStore>,
+    /// Object store URL
+    pub object_store_url: ObjectStoreUrl,

Review Comment:
   This was the solution I came up with to allow ballista to round-trip the physical plan, store a canonical ObjectStoreUrl instead of relying on the files to contain the scheme.
   
    I think perhaps in a subsequent PR I'll try to remove the `object_store` member which is now redundant.
   
   FYI @thinkharderdev 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2578: Extract Listing URI logic into ListingTableUri structure

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2578:
URL: https://github.com/apache/arrow-datafusion/pull/2578#discussion_r884797982


##########
datafusion/core/src/datasource/listing/path.rs:
##########
@@ -0,0 +1,304 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::datasource::object_store::ObjectStoreUrl;
+use datafusion_common::{DataFusionError, Result};
+use datafusion_data_access::object_store::ObjectStore;
+use datafusion_data_access::FileMeta;
+use futures::stream::BoxStream;
+use futures::{StreamExt, TryStreamExt};
+use glob::Pattern;
+use itertools::Itertools;
+use std::path::is_separator;
+use url::Url;
+
+/// A parsed URL identifying files for a listing table, see [`ListingTableUrl::parse`]
+/// for more information on the supported expressions
+#[derive(Debug, Clone)]
+pub struct ListingTableUrl {
+    /// A URL that identifies a file or directory to list files from
+    url: Url,
+    /// An optional glob expression used to filter files
+    glob: Option<Pattern>,
+}
+
+impl ListingTableUrl {
+    /// Parse a provided string as a `ListingTableUrl`
+    ///
+    /// # Paths without a Scheme
+    ///
+    /// If no scheme is provided, or the string is an absolute filesystem path
+    /// as determined [`std::path::Path::is_absolute`], the string will be
+    /// interpreted as a path on the local filesystem using the operating
+    /// system's standard path delimiter, i.e. `\` on Windows, `/` on Unix.
+    ///
+    /// If the path contains any of `'?', '*', '['`, it will be considered
+    /// a glob expression and resolved as described in the section below.
+    ///
+    /// Otherwise, the path will be resolved to an absolute path, returning
+    /// an error if it does not exist, and converted to a [file URI]
+    ///
+    /// If you wish to specify a path that does not exist on the local
+    /// machine you must provide it as a fully-qualified [file URI]
+    /// e.g. `file:///myfile.txt`
+    ///
+    /// ## Glob Paths
+    ///
+    /// If no scheme is provided, and the path contains a glob expression, it will
+    /// be resolved as follows.
+    ///
+    /// The string up to the first path segment containing a glob expression will be extracted,
+    /// and resolved in the same manner as a normal scheme-less path. That is, resolved to
+    /// an absolute path on the local filesystem, returning an error if it does not exist,
+    /// and converted to a [file URI]
+    ///
+    /// The remaining string will be interpreted as a [`glob::Pattern`] and used as a
+    /// filter when listing files from object storage
+    ///
+    /// [file URI]: https://en.wikipedia.org/wiki/File_URI_scheme
+    pub fn parse(s: impl AsRef<str>) -> Result<Self> {
+        let s = s.as_ref();
+
+        // This is necessary to handle the case of a path starting with a drive letter
+        if std::path::Path::new(s).is_absolute() {
+            return Self::parse_path(s);
+        }
+
+        match Url::parse(s) {
+            Ok(url) => Ok(Self { url, glob: None }),
+            Err(url::ParseError::RelativeUrlWithoutBase) => Self::parse_path(s),
+            Err(e) => Err(DataFusionError::External(Box::new(e))),
+        }
+    }
+
+    /// Creates a new [`ListingTableUrl`] interpreting `s` as a filesystem path
+    fn parse_path(s: &str) -> Result<Self> {

Review Comment:
   I think I'd rather keep it private until such a use-case comes along. There's enough funky here, that I'm loathe to introduce more potential for strangeness :sweat_smile: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] thinkharderdev commented on a diff in pull request #2578: Extract Listing URI logic into ListingTableUri structure

Posted by GitBox <gi...@apache.org>.
thinkharderdev commented on code in PR #2578:
URL: https://github.com/apache/arrow-datafusion/pull/2578#discussion_r877046195


##########
ballista/rust/core/src/serde/physical_plan/mod.rs:
##########
@@ -1029,14 +1029,10 @@ fn decode_scan_config(
         .map(|f| f.try_into())
         .collect::<Result<Vec<_>, _>>()?;
 
-    let object_store = if let Some(file) = file_groups.get(0).and_then(|h| h.get(0)) {
-        runtime.object_store(file.file_meta.path())?.0
-    } else {
-        Arc::new(LocalFileSystem {})
-    };
+    // TODO: This will not roundtrip object storage correctly as it loses the scheme

Review Comment:
   Yeah, I think originally I had added a `fn scheme(&self) -> String` method to `ObjectStore` and then we can add that into the serialized `FileScanExecConf`. 
   
   I got rid of it before merging because I thought using the scheme from the file URI would be simpler, but I guess we need to bring it back. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2578: Extract Listing URI logic into ListingTableUri structure

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2578:
URL: https://github.com/apache/arrow-datafusion/pull/2578#discussion_r884745100


##########
datafusion/core/tests/sql/explain_analyze.rs:
##########
@@ -642,7 +642,7 @@ order by
 #[tokio::test]
 async fn test_physical_plan_display_indent() {
     // Hard code target_partitions as it appears in the RepartitionExec output
-    let config = SessionConfig::new().with_target_partitions(3);
+    let config = SessionConfig::new().with_target_partitions(9000);

Review Comment:
   Here is another way it is handled ("explain normalizer"): 
   
   
   https://github.com/tustvold/arrow-datafusion/blob/url-refactor/datafusion/core/tests/sql/mod.rs#L820-L858



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2578: Extract Listing URI logic into ListingTableUri structure

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2578:
URL: https://github.com/apache/arrow-datafusion/pull/2578#discussion_r877015064


##########
datafusion/core/src/lib.rs:
##########
@@ -52,11 +52,11 @@
 //!    .to_string();
 //!
 //! let expected = vec![
-//!     "+---+--------------------------+",
-//!     "| a | MIN(tests/example.csv.b) |",
-//!     "+---+--------------------------+",
-//!     "| 1 | 2                        |",
-//!     "+---+--------------------------+"
+//!     "+---+----------------+",
+//!     "| a | MIN(?table?.b) |",

Review Comment:
   I'm not entirely sure what change caused this... I wonder if we aren't running doctests in CI :thinking: 
   
   Edit it is because of https://github.com/apache/arrow-datafusion/pull/2578/files#diff-e9c2d69b5793675f76c48aee263e29006065f1b28c50f056ccc6efe7d062aa77L611, this makes CSV consistent with other formats, which is probably a good thing



##########
datafusion/core/src/lib.rs:
##########
@@ -52,11 +52,11 @@
 //!    .to_string();
 //!
 //! let expected = vec![
-//!     "+---+--------------------------+",
-//!     "| a | MIN(tests/example.csv.b) |",
-//!     "+---+--------------------------+",
-//!     "| 1 | 2                        |",
-//!     "+---+--------------------------+"
+//!     "+---+----------------+",
+//!     "| a | MIN(?table?.b) |",

Review Comment:
   I'm not entirely sure what change caused this... I wonder if we aren't running doctests in CI :thinking: 
   
   Edit: it is because of https://github.com/apache/arrow-datafusion/pull/2578/files#diff-e9c2d69b5793675f76c48aee263e29006065f1b28c50f056ccc6efe7d062aa77L611, this makes CSV consistent with other formats, which is probably a good thing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on pull request #2578: Extract Listing URI logic into ListingTableUri structure

Posted by GitBox <gi...@apache.org>.
tustvold commented on PR #2578:
URL: https://github.com/apache/arrow-datafusion/pull/2578#issuecomment-1139573514

   I'm working on fixing the windows issues, will then do the ballista dance, I think this should be ready for review though


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2578: Extract Listing URI logic into ListingTableUri structure

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2578:
URL: https://github.com/apache/arrow-datafusion/pull/2578#discussion_r884789305


##########
datafusion/core/src/datasource/object_store.rs:
##########
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! ObjectStoreRegistry holds all the object stores at Runtime with a scheme for each store.
+//! This allows the user to extend DataFusion with different storage systems such as S3 or HDFS
+//! and query data inside these systems.
+
+use datafusion_common::{DataFusionError, Result};
+use datafusion_data_access::object_store::local::{LocalFileSystem, LOCAL_SCHEME};
+use datafusion_data_access::object_store::ObjectStore;
+use parking_lot::RwLock;
+use std::collections::HashMap;
+use std::sync::Arc;
+use url::Url;
+
+/// A parsed URL identifying a particular [`ObjectStore`]
+#[derive(Debug, Clone)]
+pub struct ObjectStoreUrl {
+    url: Url,
+}
+
+impl ObjectStoreUrl {
+    /// Parse an [`ObjectStoreUrl`] from a string
+    pub fn parse(s: impl AsRef<str>) -> Result<Self> {
+        let mut parsed =
+            Url::parse(s.as_ref()).map_err(|e| DataFusionError::External(Box::new(e)))?;
+
+        let remaining = &parsed[url::Position::BeforePath..];
+        if !remaining.is_empty() && remaining != "/" {
+            return Err(DataFusionError::Execution(format!(
+                "ObjectStoreUrl must only contain scheme and authority, got: {}",

Review Comment:
   While precise I wonder if this error message would be confusing ? It is probably fine



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2578: Extract Listing URI logic into ListingTableUri structure

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2578:
URL: https://github.com/apache/arrow-datafusion/pull/2578#discussion_r884796834


##########
datafusion/core/Cargo.toml:
##########
@@ -67,7 +67,9 @@ datafusion-physical-expr = { path = "../physical-expr", version = "8.0.0" }
 datafusion-row = { path = "../row", version = "8.0.0" }
 datafusion-sql = { path = "../sql", version = "8.0.0" }
 futures = "0.3"
+glob = "0.3.0"
 hashbrown = { version = "0.12", features = ["raw"] }
+itertools = "0.10"

Review Comment:
   Well once we bring in the new object_store we will depend on it...
   
   Ultimately it is such a fundamental crate, like bytes, parking_lot, etc... I'm inclined to think it isn't really a problem...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2578: Extract Listing URI logic into ListingTableUri structure

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2578:
URL: https://github.com/apache/arrow-datafusion/pull/2578#discussion_r884839244


##########
datafusion/core/Cargo.toml:
##########
@@ -67,7 +67,9 @@ datafusion-physical-expr = { path = "../physical-expr", version = "8.0.0" }
 datafusion-row = { path = "../row", version = "8.0.0" }
 datafusion-sql = { path = "../sql", version = "8.0.0" }
 futures = "0.3"
+glob = "0.3.0"
 hashbrown = { version = "0.12", features = ["raw"] }
+itertools = "0.10"

Review Comment:
   ok, I still cringe a little, but I vote :shipit: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] timvw commented on pull request #2578: Extract Listing URI logic into ListingTableUri structure

Posted by GitBox <gi...@apache.org>.
timvw commented on PR #2578:
URL: https://github.com/apache/arrow-datafusion/pull/2578#issuecomment-1132816001

   Looks good.. (I am under the impression that support for globbing (almost as an edge-case) does not belong in here but should be implemented on a more outward/user-facing api instead where it a proper part of the spec..)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org