You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2023/04/12 22:06:26 UTC

[arrow] branch main updated: GH-34216: [Python] Support for reading JSON Datasets With Python (#34586)

This is an automated email from the ASF dual-hosted git repository.

westonpace pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 49631057e9 GH-34216: [Python] Support for reading JSON Datasets With Python (#34586)
49631057e9 is described below

commit 49631057e9cdbf991e11e0be4b9aa0dadf616850
Author: Junming Chen <ju...@outlook.com>
AuthorDate: Thu Apr 13 06:06:18 2023 +0800

    GH-34216: [Python] Support for reading JSON Datasets With Python (#34586)
    
    This PR supports for reading JSON Datasets With Python. As mentioned in [#34216](https://github.com/apache/arrow/issues/34216), only the reading ability are supported.
    
    Please compare the difference between my implemenation of _json.pyx, _json.pyd and _csv.pyx _csv.pyd.
    Cause _csv.pyd utilize pointer for cpp class and my implementation doesn't.
    
    **What changes are included in this PR?**
    
    C++: add inclusion for file_json.h
    Python: reference C++ codes and support reading JSON Datasets
    
    **Are these changes tested?**
    Yes
    6 test samples added in tests/test_dataset.py
    
    * Closes: #34216
    
    Lead-authored-by: Junming Chen <ju...@outlook.com>
    Co-authored-by: Weston Pace <we...@gmail.com>
    Signed-off-by: Weston Pace <we...@gmail.com>
---
 cpp/src/arrow/dataset/api.h                  |   3 +
 python/pyarrow/_dataset.pyx                  | 131 ++++++++++++++++++++++++++-
 python/pyarrow/_json.pxd                     |  36 ++++++++
 python/pyarrow/_json.pyx                     |  42 ++++++++-
 python/pyarrow/dataset.py                    |   6 +-
 python/pyarrow/includes/libarrow_dataset.pxd |   7 ++
 python/pyarrow/tests/test_dataset.py         |  98 ++++++++++++++++++++
 7 files changed, 314 insertions(+), 9 deletions(-)

diff --git a/cpp/src/arrow/dataset/api.h b/cpp/src/arrow/dataset/api.h
index 6554dfc8cb..c2ebd9d300 100644
--- a/cpp/src/arrow/dataset/api.h
+++ b/cpp/src/arrow/dataset/api.h
@@ -26,6 +26,9 @@
 #ifdef ARROW_CSV
 #include "arrow/dataset/file_csv.h"
 #endif
+#ifdef ARROW_JSON
+#include "arrow/dataset/file_json.h"
+#endif
 #include "arrow/dataset/file_ipc.h"
 #ifdef ARROW_ORC
 #include "arrow/dataset/file_orc.h"
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index ba0a2860ec..d2b5554ec1 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -36,6 +36,8 @@ from pyarrow._fs cimport FileSystem, FileSelector
 from pyarrow._csv cimport (
     ConvertOptions, ParseOptions, ReadOptions, WriteOptions)
 from pyarrow.util import _is_iterable, _is_path_like, _stringify_path
+from pyarrow._json cimport ParseOptions as JsonParseOptions
+from pyarrow._json cimport ReadOptions as JsonReadOptions
 
 
 _DEFAULT_BATCH_SIZE = 2**17
@@ -983,7 +985,7 @@ cdef class FileSystemDataset(Dataset):
         The top-level schema of the Dataset.
     format : FileFormat
         File format of the fragments, currently only ParquetFileFormat,
-        IpcFileFormat, and CsvFileFormat are supported.
+        IpcFileFormat, CsvFileFormat, and JsonFileFormat are supported.
     filesystem : FileSystem
         FileSystem of the fragments.
     root_partition : Expression, optional
@@ -1078,7 +1080,7 @@ cdef class FileSystemDataset(Dataset):
             The top-level schema of the DataDataset.
         format : FileFormat
             File format to create fragments from, currently only
-            ParquetFileFormat, IpcFileFormat, and CsvFileFormat are supported.
+            ParquetFileFormat, IpcFileFormat, CsvFileFormat, and JsonFileFormat are supported.
         filesystem : FileSystem
             The filesystem which files are from.
         partitions : list[Expression], optional
@@ -1179,6 +1181,7 @@ cdef class FileFormat(_Weakrefable):
         classes = {
             'ipc': IpcFileFormat,
             'csv': CsvFileFormat,
+            'json': JsonFileFormat,
             'parquet': _get_parquet_symbol('ParquetFileFormat'),
             'orc': _get_orc_fileformat(),
         }
@@ -1315,10 +1318,11 @@ cdef class Fragment(_Weakrefable):
         type_name = frombytes(sp.get().type_name())
 
         classes = {
-            # IpcFileFormat, CsvFileFormat and OrcFileFormat do not have
+            # IpcFileFormat, CsvFileFormat, JsonFileFormat and OrcFileFormat do not have
             # corresponding subclasses of FileFragment
             'ipc': FileFragment,
             'csv': FileFragment,
+            'json': FileFragment,
             'orc': FileFragment,
             'parquet': _get_parquet_symbol('ParquetFileFragment'),
         }
@@ -1928,6 +1932,7 @@ cdef class FragmentScanOptions(_Weakrefable):
 
         classes = {
             'csv': CsvFragmentScanOptions,
+            'json': JsonFragmentScanOptions,
             'parquet': _get_parquet_symbol('ParquetFragmentScanOptions'),
         }
 
@@ -2184,6 +2189,126 @@ cdef class CsvFileWriteOptions(FileWriteOptions):
         self.csv_options = <CCsvFileWriteOptions*> sp.get()
 
 
+cdef class JsonFileFormat(FileFormat):
+    """
+    FileFormat for JSON files.
+
+    Parameters
+    ----------
+    default_fragment_scan_options : JsonFragmentScanOptions
+        Default options for fragments scan.
+    parse_options : pyarrow.json.ParseOptions
+        Options regarding json parsing.
+    read_options : pyarrow.json.ReadOptions
+        General read options.
+    """
+    cdef:
+        CJsonFileFormat* json_format
+
+    # Avoid mistakingly creating attributes
+    __slots__ = ()
+
+    def __init__(self, default_fragment_scan_options=None,
+                 JsonParseOptions parse_options=None,
+                 JsonReadOptions read_options=None):
+        self.init(shared_ptr[CFileFormat](new CJsonFileFormat()))
+        if parse_options is not None or read_options is not None:
+            if default_fragment_scan_options is not None:
+                raise ValueError('If `default_fragment_scan_options` is '
+                                 'given, cannot specify read_options')
+            self.default_fragment_scan_options = JsonFragmentScanOptions(
+                parse_options=parse_options,
+                read_options=read_options)
+        elif isinstance(default_fragment_scan_options, dict):
+            self.default_fragment_scan_options = JsonFragmentScanOptions(
+                **default_fragment_scan_options)
+        elif isinstance(default_fragment_scan_options, JsonFragmentScanOptions):
+            self.default_fragment_scan_options = default_fragment_scan_options
+        elif default_fragment_scan_options is not None:
+            raise TypeError('`default_fragment_scan_options` must be either '
+                            'a dictionary or an instance of '
+                            'JsonFragmentScanOptions')
+
+    cdef void init(self, const shared_ptr[CFileFormat]& sp):
+        FileFormat.init(self, sp)
+        self.json_format = <CJsonFileFormat*> sp.get()
+
+    cdef _set_default_fragment_scan_options(self, FragmentScanOptions options):
+        if options.type_name == 'json':
+            self.json_format.default_fragment_scan_options = options.wrapped
+            self.default_fragment_scan_options.read_options = options.read_options
+            self.default_fragment_scan_options.parse_options = options.parse_options
+        else:
+            super()._set_default_fragment_scan_options(options)
+
+    def equals(self, JsonFileFormat other):
+        return (other and
+                self.default_fragment_scan_options ==
+                other.default_fragment_scan_options)
+
+    def __reduce__(self):
+        return JsonFileFormat, (self.default_fragment_scan_options,)
+
+    def __repr__(self):
+        return "<JsonFileFormat>"
+
+
+cdef class JsonFragmentScanOptions(FragmentScanOptions):
+    """
+    Scan-specific options for JSON fragments.
+
+    Parameters
+    ----------
+    parse_options : pyarrow.json.ParseOptions
+        Options regarding JSON parsing.
+    read_options : pyarrow.json.ReadOptions
+        General read options.
+    """
+    cdef:
+        CJsonFragmentScanOptions* json_options
+
+     # Avoid mistakingly creating attributes
+    __slots__ = ()
+
+    def __init__(self, JsonParseOptions parse_options=None,
+                 JsonReadOptions read_options=None):
+        self.init(shared_ptr[CFragmentScanOptions](
+            new CJsonFragmentScanOptions()))
+        if parse_options is not None:
+            self.parse_options = parse_options
+        if read_options is not None:
+            self.read_options = read_options
+
+    cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp):
+        FragmentScanOptions.init(self, sp)
+        self.json_options = <CJsonFragmentScanOptions*> sp.get()
+
+    @property
+    def parse_options(self):
+        return JsonParseOptions.wrap(self.json_options.parse_options)
+
+    @parse_options.setter
+    def parse_options(self, JsonParseOptions parse_options not None):
+        self.json_options.parse_options = parse_options.options
+
+    @property
+    def read_options(self):
+        return JsonReadOptions.wrap(self.json_options.read_options)
+
+    @read_options.setter
+    def read_options(self, JsonReadOptions read_options not None):
+        self.json_options.read_options = read_options.options
+
+    def equals(self, JsonFragmentScanOptions other):
+        return (
+            other and
+            self.read_options.equals(other.read_options) and
+            self.parse_options.equals(other.parse_options))
+
+    def __reduce__(self):
+        return JsonFragmentScanOptions, (self.parse_options, self.read_options)
+
+
 cdef class Partitioning(_Weakrefable):
 
     def __init__(self):
diff --git a/python/pyarrow/_json.pxd b/python/pyarrow/_json.pxd
new file mode 100644
index 0000000000..42a0a678a9
--- /dev/null
+++ b/python/pyarrow/_json.pxd
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: language_level = 3
+
+from pyarrow.includes.libarrow cimport *
+from pyarrow.lib cimport _Weakrefable
+
+
+cdef class ParseOptions(_Weakrefable):
+    cdef:
+        CJSONParseOptions options
+
+    @staticmethod
+    cdef ParseOptions wrap(CJSONParseOptions options)
+
+cdef class ReadOptions(_Weakrefable):
+    cdef:
+        CJSONReadOptions options
+
+    @staticmethod
+    cdef ReadOptions wrap(CJSONReadOptions options)
diff --git a/python/pyarrow/_json.pyx b/python/pyarrow/_json.pyx
index 4c6d964bd1..70cde6e23f 100644
--- a/python/pyarrow/_json.pyx
+++ b/python/pyarrow/_json.pyx
@@ -40,8 +40,6 @@ cdef class ReadOptions(_Weakrefable):
         This will determine multi-threading granularity as well as
         the size of individual chunks in the Table.
     """
-    cdef:
-        CJSONReadOptions options
 
     # Avoid mistakingly creating attributes
     __slots__ = ()
@@ -84,6 +82,24 @@ cdef class ReadOptions(_Weakrefable):
             self.block_size
         )
 
+    def equals(self, ReadOptions other):
+        return (
+            self.use_threads == other.use_threads and
+            self.block_size == other.block_size
+        )
+
+    def __eq__(self, other):
+        try:
+            return self.equals(other)
+        except TypeError:
+            return False
+
+    @staticmethod
+    cdef ReadOptions wrap(CJSONReadOptions options):
+        out = ReadOptions()
+        out.options = options  # shallow copy
+        return out
+
 
 cdef class ParseOptions(_Weakrefable):
     """
@@ -107,9 +123,6 @@ cdef class ParseOptions(_Weakrefable):
            the output
     """
 
-    cdef:
-        CJSONParseOptions options
-
     __slots__ = ()
 
     def __init__(self, explicit_schema=None, newlines_in_values=None,
@@ -198,6 +211,25 @@ cdef class ParseOptions(_Weakrefable):
 
         self.options.unexpected_field_behavior = v
 
+    def equals(self, ParseOptions other):
+        return (
+            self.explicit_schema == other.explicit_schema and
+            self.newlines_in_values == other.newlines_in_values and
+            self.unexpected_field_behavior == other.unexpected_field_behavior
+        )
+
+    def __eq__(self, other):
+        try:
+            return self.equals(other)
+        except TypeError:
+            return False
+
+    @staticmethod
+    cdef ParseOptions wrap(CJSONParseOptions options):
+        out = ParseOptions()
+        out.options = options  # shallow copy
+        return out
+
 
 cdef _get_reader(input_file, shared_ptr[CInputStream]* out):
     use_memory_map = False
diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py
index c4da9686ec..8bec2080f3 100644
--- a/python/pyarrow/dataset.py
+++ b/python/pyarrow/dataset.py
@@ -23,6 +23,8 @@ from pyarrow.util import _is_iterable, _stringify_path, _is_path_like
 from pyarrow._dataset import (  # noqa
     CsvFileFormat,
     CsvFragmentScanOptions,
+    JsonFileFormat,
+    JsonFragmentScanOptions,
     Dataset,
     DatasetFactory,
     DirectoryPartitioning,
@@ -297,6 +299,8 @@ def _ensure_format(obj):
         if not _orc_available:
             raise ValueError(_orc_msg)
         return OrcFileFormat()
+    elif obj == "json":
+        return JsonFileFormat()
     else:
         raise ValueError("format '{}' is not supported".format(obj))
 
@@ -598,7 +602,7 @@ RecordBatch or Table, iterable of RecordBatch, RecordBatchReader, or URI
         Optionally provide the Schema for the Dataset, in which case it will
         not be inferred from the source.
     format : FileFormat or str
-        Currently "parquet", "ipc"/"arrow"/"feather", "csv", and "orc" are
+        Currently "parquet", "ipc"/"arrow"/"feather", "csv", "json", and "orc" are
         supported. For Feather, only version 2 files are supported.
     filesystem : FileSystem or URI string, default None
         If a single path is given as source and filesystem is None, then the
diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd
index b554633e4b..201fb78217 100644
--- a/python/pyarrow/includes/libarrow_dataset.pxd
+++ b/python/pyarrow/includes/libarrow_dataset.pxd
@@ -277,6 +277,13 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
         CCSVReadOptions read_options
         function[StreamWrapFunc] stream_transform_func
 
+    cdef cppclass CJsonFileFormat "arrow::dataset::JsonFileFormat"(CFileFormat):
+        pass
+
+    cdef cppclass CJsonFragmentScanOptions "arrow::dataset::JsonFragmentScanOptions"(CFragmentScanOptions):
+        CJSONParseOptions parse_options
+        CJSONReadOptions read_options
+
     cdef cppclass CPartitioning "arrow::dataset::Partitioning":
         c_string type_name() const
         CResult[CExpression] Parse(const c_string & path) const
diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py
index 20a1d51d00..66562b76c9 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -35,6 +35,7 @@ import pytest
 import pyarrow as pa
 import pyarrow.compute as pc
 import pyarrow.csv
+import pyarrow.json
 import pyarrow.feather
 import pyarrow.fs as fs
 from pyarrow.tests.util import (change_cwd, _filesystem_uri,
@@ -805,6 +806,12 @@ def test_file_format_pickling():
             skip_rows=3, column_names=['foo'])),
         ds.CsvFileFormat(read_options=pa.csv.ReadOptions(
             skip_rows=3, block_size=2**20)),
+        ds.JsonFileFormat(),
+        ds.JsonFileFormat(
+            parse_options=pa.json.ParseOptions(newlines_in_values=True,
+                                               unexpected_field_behavior="ignore")),
+        ds.JsonFileFormat(read_options=pa.json.ReadOptions(
+            use_threads=False, block_size=14)),
     ]
     try:
         formats.append(ds.OrcFileFormat())
@@ -835,6 +842,12 @@ def test_fragment_scan_options_pickling():
             convert_options=pa.csv.ConvertOptions(strings_can_be_null=True)),
         ds.CsvFragmentScanOptions(
             read_options=pa.csv.ReadOptions(block_size=2**16)),
+        ds.JsonFragmentScanOptions(),
+        ds.JsonFragmentScanOptions(
+            pa.json.ParseOptions(newlines_in_values=False,
+                                 unexpected_field_behavior="error")),
+        ds.JsonFragmentScanOptions(
+            read_options=pa.json.ReadOptions(use_threads=True, block_size=512)),
     ]
 
     if pq is not None:
@@ -972,6 +985,28 @@ def test_make_csv_fragment_from_buffer(dataset_reader):
     assert dataset_reader.to_table(pickled).equals(fragment.to_table())
 
 
+def test_make_json_fragment_from_buffer(dataset_reader):
+    content = '{"alpha" : "a", "num": 12, "animal" : "dog"}\n' + \
+        '{"alpha" : "b", "num": 11, "animal" : "cat"}\n' + \
+        '{"alpha" : "c", "num": 10, "animal" : "rabbit"}\n'
+    buffer = pa.py_buffer(content.encode('utf-8'))
+
+    json_format = ds.JsonFileFormat()
+    fragment = json_format.make_fragment(buffer)
+
+    # When buffer, fragment open returns a BufferReader, not NativeFile
+    assert isinstance(fragment.open(), pa.BufferReader)
+
+    expected = pa.table([['a', 'b', 'c'],
+                         [12, 11, 10],
+                         ['dog', 'cat', 'rabbit']],
+                        names=['alpha', 'num', 'animal'])
+    assert dataset_reader.to_table(fragment).equals(expected)
+
+    pickled = pickle.loads(pickle.dumps(fragment))
+    assert dataset_reader.to_table(pickled).equals(fragment.to_table())
+
+
 @pytest.mark.parquet
 def test_make_parquet_fragment_from_buffer(dataset_reader):
     arrays = [
@@ -3174,6 +3209,69 @@ def test_csv_fragment_options(tempdir, dataset_reader):
         pa.table({'col0': pa.array(['foo', 'spam', 'MYNULL'])}))
 
 
+@pytest.mark.pandas
+def test_json_format(tempdir, dataset_reader):
+    table = pa.table({'a': pa.array([1, 2, 3], type="int64"),
+                      'b': pa.array([.1, .2, .3], type="float64")})
+
+    path = str(tempdir / 'test.json')
+    out = table.to_pandas().to_json(orient='records')[1:-1].replace('},{', '}\n{')
+    with open(path, 'w') as f:
+        f.write(out)
+
+    dataset = ds.dataset(path, format=ds.JsonFileFormat())
+    result = dataset_reader.to_table(dataset)
+    assert result.equals(table)
+
+    assert_dataset_fragment_convenience_methods(dataset)
+
+    dataset = ds.dataset(path, format='json')
+    result = dataset_reader.to_table(dataset)
+    assert result.equals(table)
+
+
+def test_json_format_options(tempdir, dataset_reader):
+    table = pa.table({'a': pa.array([1, 2, 3], type="int64"),
+                      'b': pa.array([.1, .2, .3], type="float64")})
+
+    path = str(tempdir / 'test.json')
+    out = table.to_pandas().to_json(orient='records')[1:-1].replace('},{', '}\n{')
+    with open(path, 'w') as f:
+        f.write(out)
+
+    with pytest.raises(ValueError,
+                       match="try to increase block size"):
+        dataset = ds.dataset(path, format=ds.JsonFileFormat(
+            read_options=pa.json.ReadOptions(block_size=4)))
+
+    dataset = ds.dataset(path, format=ds.JsonFileFormat(
+        read_options=pa.json.ReadOptions(block_size=64)))
+    result = dataset_reader.to_table(dataset)
+    assert result.equals(table)
+
+
+def test_json_fragment_options(tempdir, dataset_reader):
+    table = pa.table({'a': pa.array([1, 2, 3], type="int64"),
+                      'b': pa.array([.1, .2, .3], type="float64")})
+
+    path = str(tempdir / 'test.json')
+    out = table.to_pandas().to_json(orient='records')[1:-1].replace('},{', '}\n{')
+    with open(path, 'w') as f:
+        f.write(out)
+
+    with pytest.raises(ValueError,
+                       match="try to increase block size"):
+        options = ds.JsonFragmentScanOptions(
+            read_options=pa.json.ReadOptions(block_size=4))
+        dataset = ds.dataset(path, format=ds.JsonFileFormat(options))
+
+    options = ds.JsonFragmentScanOptions(
+        read_options=pa.json.ReadOptions(block_size=64))
+    dataset = ds.dataset(path, format=ds.JsonFileFormat(options))
+    result = dataset_reader.to_table(dataset)
+    assert result.equals(table)
+
+
 def test_encoding(tempdir, dataset_reader):
     path = str(tempdir / 'test.csv')