You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/03/22 14:38:42 UTC

[GitHub] [arrow] pitrou commented on a change in pull request #12530: ARROW-14612: [C++] Support for filename-based partitioning

pitrou commented on a change in pull request #12530:
URL: https://github.com/apache/arrow/pull/12530#discussion_r832240231



##########
File path: python/pyarrow/dataset.py
##########
@@ -210,6 +211,26 @@ def partitioning(schema=None, field_names=None, flavor=None,
             raise ValueError(
                 "For the default directory flavor, need to specify "
                 "a Schema or a list of field names")
+    if flavor == "filename":
+        # default flavor
+        if schema is not None:
+            if field_names is not None:
+                raise ValueError(
+                    "Cannot specify both 'schema' and 'field_names'")
+            if dictionaries == 'infer':
+                return FilenamePartitioning.discover(schema=schema)
+            return FilenamePartitioning(schema, dictionaries)
+        elif field_names is not None:
+            if isinstance(field_names, list):
+                return FilenamePartitioning.discover(field_names)
+            else:
+                raise ValueError(
+                    "Expected list of field names, got {}".format(
+                        type(field_names)))
+        else:
+            raise ValueError(
+                "For the default filename flavor, need to specify "

Review comment:
       Why "default"?

##########
File path: python/pyarrow/dataset.py
##########
@@ -210,6 +211,26 @@ def partitioning(schema=None, field_names=None, flavor=None,
             raise ValueError(
                 "For the default directory flavor, need to specify "
                 "a Schema or a list of field names")
+    if flavor == "filename":

Review comment:
       Can you update the docstring to mention this possibility?

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -1592,6 +1593,145 @@ cdef class HivePartitioning(Partitioning):
             res.append(pyarrow_wrap_array(arr))
         return res
 
+cdef class FilenamePartitioning(Partitioning):
+    """
+    A Partitioning based on a specified Schema.
+
+    The FilenamePartitioning expects one segment in the file name for each
+    field in the schema (all fields are required to be present) separated
+    by '_'. For example given schema<year:int16, month:int8> the name
+    "2009_11" would be parsed to ("year"_ == 2009 and "month"_ == 11).
+
+    Parameters
+    ----------
+    schema : Schema
+        The schema that describes the partitions present in the file path.
+    dictionaries : dict[str, Array]
+        If the type of any field of `schema` is a dictionary type, the
+        corresponding entry of `dictionaries` must be an array containing
+        every value which may be taken by the corresponding column or an
+        error will be raised in parsing.
+    segment_encoding : str, default "uri"
+        After splitting paths into segments, decode the segments. Valid
+        values are "uri" (URI-decode segments) and "none" (leave as-is).
+
+    Returns
+    -------
+    FilenamePartitioning
+
+    Examples
+    --------
+    >>> from pyarrow.dataset import FilenamePartitioning
+    >>> partition = FilenamePartitioning(
+    ...     pa.schema([("year", pa.int16()), ("month", pa.int8())]))
+    >>> print(partitioning.parse("2009_11"))
+    ((year == 2009:int16) and (month == 11:int8))
+    """
+
+    cdef:
+        CFilenamePartitioning* filename_partitioning
+
+    def __init__(self, Schema schema not None, dictionaries=None,
+                 segment_encoding="uri"):
+        cdef:
+            shared_ptr[CFilenamePartitioning] c_partitioning
+            CKeyValuePartitioningOptions c_options
+
+        c_options.segment_encoding = _get_segment_encoding(segment_encoding)
+        c_partitioning = make_shared[CFilenamePartitioning](
+            pyarrow_unwrap_schema(schema),
+            _partitioning_dictionaries(schema, dictionaries),
+            c_options,
+        )
+        self.init(<shared_ptr[CPartitioning]> c_partitioning)
+
+    cdef init(self, const shared_ptr[CPartitioning]& sp):
+        Partitioning.init(self, sp)
+        self.filename_partitioning = <CFilenamePartitioning*> sp.get()
+
+    @staticmethod
+    def discover(field_names=None, infer_dictionary=False,
+                 max_partition_dictionary_size=0,
+                 schema=None, segment_encoding="uri"):
+        """
+        Discover a FilenamePartitioning.
+
+        Parameters
+        ----------
+        field_names : list of str
+            The names to associate with the values from the subdirectory names.
+            If schema is given, will be populated from the schema.
+        infer_dictionary : bool, default False
+            When inferring a schema for partition fields, yield dictionary
+            encoded types instead of plain types. This can be more efficient
+            when materializing virtual columns, and Expressions parsed by the
+            finished Partitioning will include dictionaries of all unique
+            inspected values for each field.
+        max_partition_dictionary_size : int, default 0
+            Synonymous with infer_dictionary for backwards compatibility with
+            1.0: setting this to -1 or None is equivalent to passing
+            infer_dictionary=True.
+        schema : Schema, default None
+            Use this schema instead of inferring a schema from partition
+            values. Partition values will be validated against this schema
+            before accumulation into the Partitioning's dictionary.
+        segment_encoding : str, default "uri"
+            After splitting paths into segments, decode the segments. Valid
+            values are "uri" (URI-decode segments) and "none" (leave as-is).
+
+        Returns
+        -------
+        PartitioningFactory
+            To be used in the FileSystemFactoryOptions.
+        """
+        cdef:
+            CPartitioningFactoryOptions c_options
+            vector[c_string] c_field_names
+
+        if max_partition_dictionary_size in {-1, None}:
+            infer_dictionary = True
+        elif max_partition_dictionary_size != 0:
+            raise NotImplementedError("max_partition_dictionary_size must be "
+                                      "0, -1, or None")
+
+        if infer_dictionary:
+            c_options.infer_dictionary = True
+
+        if schema:
+            c_options.schema = pyarrow_unwrap_schema(schema)
+            c_field_names = [tobytes(f.name) for f in schema]
+        elif not field_names:
+            raise ValueError(

Review comment:
       This should be `TypeError`.

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -1592,6 +1593,145 @@ cdef class HivePartitioning(Partitioning):
             res.append(pyarrow_wrap_array(arr))
         return res
 
+cdef class FilenamePartitioning(Partitioning):
+    """
+    A Partitioning based on a specified Schema.
+
+    The FilenamePartitioning expects one segment in the file name for each
+    field in the schema (all fields are required to be present) separated
+    by '_'. For example given schema<year:int16, month:int8> the name
+    "2009_11" would be parsed to ("year"_ == 2009 and "month"_ == 11).
+
+    Parameters
+    ----------
+    schema : Schema
+        The schema that describes the partitions present in the file path.
+    dictionaries : dict[str, Array]
+        If the type of any field of `schema` is a dictionary type, the
+        corresponding entry of `dictionaries` must be an array containing
+        every value which may be taken by the corresponding column or an
+        error will be raised in parsing.
+    segment_encoding : str, default "uri"
+        After splitting paths into segments, decode the segments. Valid
+        values are "uri" (URI-decode segments) and "none" (leave as-is).
+
+    Returns
+    -------
+    FilenamePartitioning
+
+    Examples
+    --------
+    >>> from pyarrow.dataset import FilenamePartitioning
+    >>> partition = FilenamePartitioning(
+    ...     pa.schema([("year", pa.int16()), ("month", pa.int8())]))
+    >>> print(partitioning.parse("2009_11"))
+    ((year == 2009:int16) and (month == 11:int8))
+    """
+
+    cdef:
+        CFilenamePartitioning* filename_partitioning
+
+    def __init__(self, Schema schema not None, dictionaries=None,
+                 segment_encoding="uri"):
+        cdef:
+            shared_ptr[CFilenamePartitioning] c_partitioning
+            CKeyValuePartitioningOptions c_options
+
+        c_options.segment_encoding = _get_segment_encoding(segment_encoding)
+        c_partitioning = make_shared[CFilenamePartitioning](
+            pyarrow_unwrap_schema(schema),
+            _partitioning_dictionaries(schema, dictionaries),
+            c_options,
+        )
+        self.init(<shared_ptr[CPartitioning]> c_partitioning)
+
+    cdef init(self, const shared_ptr[CPartitioning]& sp):
+        Partitioning.init(self, sp)
+        self.filename_partitioning = <CFilenamePartitioning*> sp.get()
+
+    @staticmethod
+    def discover(field_names=None, infer_dictionary=False,
+                 max_partition_dictionary_size=0,
+                 schema=None, segment_encoding="uri"):
+        """
+        Discover a FilenamePartitioning.
+
+        Parameters
+        ----------
+        field_names : list of str
+            The names to associate with the values from the subdirectory names.
+            If schema is given, will be populated from the schema.
+        infer_dictionary : bool, default False
+            When inferring a schema for partition fields, yield dictionary
+            encoded types instead of plain types. This can be more efficient
+            when materializing virtual columns, and Expressions parsed by the
+            finished Partitioning will include dictionaries of all unique
+            inspected values for each field.
+        max_partition_dictionary_size : int, default 0
+            Synonymous with infer_dictionary for backwards compatibility with
+            1.0: setting this to -1 or None is equivalent to passing
+            infer_dictionary=True.
+        schema : Schema, default None
+            Use this schema instead of inferring a schema from partition
+            values. Partition values will be validated against this schema
+            before accumulation into the Partitioning's dictionary.
+        segment_encoding : str, default "uri"
+            After splitting paths into segments, decode the segments. Valid
+            values are "uri" (URI-decode segments) and "none" (leave as-is).
+
+        Returns
+        -------
+        PartitioningFactory
+            To be used in the FileSystemFactoryOptions.
+        """
+        cdef:
+            CPartitioningFactoryOptions c_options
+            vector[c_string] c_field_names
+
+        if max_partition_dictionary_size in {-1, None}:
+            infer_dictionary = True
+        elif max_partition_dictionary_size != 0:
+            raise NotImplementedError("max_partition_dictionary_size must be "
+                                      "0, -1, or None")
+
+        if infer_dictionary:
+            c_options.infer_dictionary = True
+
+        if schema:
+            c_options.schema = pyarrow_unwrap_schema(schema)
+            c_field_names = [tobytes(f.name) for f in schema]
+        elif not field_names:
+            raise ValueError(
+                "Neither field_names nor schema was passed; "
+                "cannot infer field_names")
+        else:
+            c_field_names = [tobytes(s) for s in field_names]
+
+        c_options.segment_encoding = _get_segment_encoding(segment_encoding)
+
+        return PartitioningFactory.wrap(
+            CFilenamePartitioning.MakeFactory(c_field_names, c_options))
+
+    @property
+    def dictionaries(self):

Review comment:
       Is it possible to avoid copy-pasting this, and instead move it to the base class?

##########
File path: cpp/src/arrow/dataset/partition.h
##########
@@ -321,6 +330,30 @@ class ARROW_DS_EXPORT FunctionPartitioning : public Partitioning {
   std::string name_;
 };
 
+class ARROW_DS_EXPORT FilenamePartitioning : public KeyValuePartitioning {
+ public:
+  /// If a field in schema is of dictionary type, the corresponding element of
+  /// dictionaries must be contain the dictionary of values for that field.

Review comment:
       ```suggestion
     /// \brief Construct a FilenamePartitioning from its components.
     ///
     /// If a field in the `schema` is of dictionary type, the corresponding element of
     /// `dictionaries` must contain the dictionary of values for that field.
   ```

##########
File path: cpp/src/arrow/dataset/partition.cc
##########
@@ -61,6 +61,16 @@ Result<std::string> SafeUriUnescape(util::string_view encoded) {
   }
   return decoded;
 }
+
+std::string StripNonPrefix(const std::string& path) {

Review comment:
       Add a short comment describing this?

##########
File path: cpp/src/arrow/dataset/partition.cc
##########
@@ -561,6 +636,74 @@ class DirectoryPartitioningFactory : public KeyValuePartitioningFactory {
   std::vector<std::string> field_names_;
 };
 
+class FilenamePartitioningFactory : public KeyValuePartitioningFactory {
+ public:
+  FilenamePartitioningFactory(std::vector<std::string> field_names,
+                              PartitioningFactoryOptions options)
+      : KeyValuePartitioningFactory(options), field_names_(std::move(field_names)) {
+    Reset();
+    util::InitializeUTF8();
+  }
+
+  std::string type_name() const override { return "filename"; }
+
+  Result<std::shared_ptr<Schema>> Inspect(
+      const std::vector<std::string>& paths) override {
+    for (const auto& path : paths) {
+      size_t field_index = 0;

Review comment:
       Why not make this an `int`?

##########
File path: cpp/src/arrow/dataset/partition_test.cc
##########
@@ -291,7 +293,23 @@ TEST_F(TestPartitioning, DiscoverSchema) {
   AssertInspect({"/0/1", "/hello"}, {Str("alpha"), Int("beta")});
 }
 
-TEST_F(TestPartitioning, DictionaryInference) {
+TEST_F(TestPartitioning, DiscoverSchemaFilename) {
+  factory_ = FilenamePartitioning::MakeFactory({"alpha", "beta"});
+
+  // type is int32 if possible
+  AssertInspect({"0_1_"}, {Int("alpha"), Int("beta")});
+
+  // extra segments are ignored
+  AssertInspect({"0_1_what_"}, {Int("alpha"), Int("beta")});
+
+  // fall back to string if any segment for field alpha is not parseable as int
+  AssertInspect({"0_1_", "hello_1_"}, {Str("alpha"), Int("beta")});
+
+  // If there are too many digits fall back to string
+  AssertInspect({"3760212050_1_"}, {Str("alpha"), Int("beta")});
+}

Review comment:
       Also is URI-encoding tested somewhere?

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -569,6 +570,22 @@ def test_partitioning():
         with pytest.raises(pa.ArrowInvalid):
             partitioning.parse(shouldfail)
 
+    partitioning = ds.FilenamePartitioning(
+        pa.schema([
+            pa.field('group', pa.int64()),
+            pa.field('key', pa.float64())
+        ])
+    )
+    assert partitioning.dictionaries is None

Review comment:
       Are dictionaries tested somewhere else?

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -1592,6 +1593,145 @@ cdef class HivePartitioning(Partitioning):
             res.append(pyarrow_wrap_array(arr))
         return res
 
+cdef class FilenamePartitioning(Partitioning):
+    """
+    A Partitioning based on a specified Schema.
+
+    The FilenamePartitioning expects one segment in the file name for each
+    field in the schema (all fields are required to be present) separated
+    by '_'. For example given schema<year:int16, month:int8> the name
+    "2009_11" would be parsed to ("year"_ == 2009 and "month"_ == 11).
+
+    Parameters
+    ----------
+    schema : Schema
+        The schema that describes the partitions present in the file path.
+    dictionaries : dict[str, Array]
+        If the type of any field of `schema` is a dictionary type, the
+        corresponding entry of `dictionaries` must be an array containing
+        every value which may be taken by the corresponding column or an
+        error will be raised in parsing.
+    segment_encoding : str, default "uri"
+        After splitting paths into segments, decode the segments. Valid
+        values are "uri" (URI-decode segments) and "none" (leave as-is).
+
+    Returns
+    -------
+    FilenamePartitioning
+
+    Examples
+    --------
+    >>> from pyarrow.dataset import FilenamePartitioning
+    >>> partition = FilenamePartitioning(
+    ...     pa.schema([("year", pa.int16()), ("month", pa.int8())]))
+    >>> print(partitioning.parse("2009_11"))
+    ((year == 2009:int16) and (month == 11:int8))
+    """
+
+    cdef:
+        CFilenamePartitioning* filename_partitioning
+
+    def __init__(self, Schema schema not None, dictionaries=None,
+                 segment_encoding="uri"):
+        cdef:
+            shared_ptr[CFilenamePartitioning] c_partitioning
+            CKeyValuePartitioningOptions c_options
+
+        c_options.segment_encoding = _get_segment_encoding(segment_encoding)
+        c_partitioning = make_shared[CFilenamePartitioning](
+            pyarrow_unwrap_schema(schema),
+            _partitioning_dictionaries(schema, dictionaries),
+            c_options,
+        )
+        self.init(<shared_ptr[CPartitioning]> c_partitioning)
+
+    cdef init(self, const shared_ptr[CPartitioning]& sp):
+        Partitioning.init(self, sp)
+        self.filename_partitioning = <CFilenamePartitioning*> sp.get()
+
+    @staticmethod
+    def discover(field_names=None, infer_dictionary=False,
+                 max_partition_dictionary_size=0,
+                 schema=None, segment_encoding="uri"):
+        """
+        Discover a FilenamePartitioning.
+
+        Parameters
+        ----------
+        field_names : list of str
+            The names to associate with the values from the subdirectory names.
+            If schema is given, will be populated from the schema.
+        infer_dictionary : bool, default False
+            When inferring a schema for partition fields, yield dictionary
+            encoded types instead of plain types. This can be more efficient
+            when materializing virtual columns, and Expressions parsed by the
+            finished Partitioning will include dictionaries of all unique
+            inspected values for each field.
+        max_partition_dictionary_size : int, default 0
+            Synonymous with infer_dictionary for backwards compatibility with
+            1.0: setting this to -1 or None is equivalent to passing

Review comment:
       There is no reason to try to enforce backwards compatibility since this is a new API, is there?

##########
File path: cpp/src/arrow/dataset/partition.h
##########
@@ -76,7 +78,8 @@ class ARROW_DS_EXPORT Partitioning {
   /// \brief Parse a path into a partition expression
   virtual Result<compute::Expression> Parse(const std::string& path) const = 0;
 
-  virtual Result<std::string> Format(const compute::Expression& expr) const = 0;
+  virtual Result<std::pair<std::string, std::string>> Format(
+      const compute::Expression& expr) const = 0;

Review comment:
       The new signature isn't immediately understandable (what does the pair represent?).
   Can you instead make it return a locally-defined struct e.g.:
   ```suggestion
     struct SomeStruct { std::string foo, bar };
     virtual Result<SomeStruct> Format(const compute::Expression& expr) const = 0;
   ```

##########
File path: cpp/src/arrow/dataset/partition_test.cc
##########
@@ -291,7 +293,23 @@ TEST_F(TestPartitioning, DiscoverSchema) {
   AssertInspect({"/0/1", "/hello"}, {Str("alpha"), Int("beta")});
 }
 
-TEST_F(TestPartitioning, DictionaryInference) {
+TEST_F(TestPartitioning, DiscoverSchemaFilename) {
+  factory_ = FilenamePartitioning::MakeFactory({"alpha", "beta"});
+
+  // type is int32 if possible
+  AssertInspect({"0_1_"}, {Int("alpha"), Int("beta")});
+
+  // extra segments are ignored
+  AssertInspect({"0_1_what_"}, {Int("alpha"), Int("beta")});
+
+  // fall back to string if any segment for field alpha is not parseable as int
+  AssertInspect({"0_1_", "hello_1_"}, {Str("alpha"), Int("beta")});
+
+  // If there are too many digits fall back to string
+  AssertInspect({"3760212050_1_"}, {Str("alpha"), Int("beta")});
+}

Review comment:
       Is it possible to test for errors here? (not enough fields for example, or bad syntax or bad utf-8 or...)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org