You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/11/02 09:09:14 UTC

[arrow-datafusion-python] branch master updated: Add `read_json` to `SessionContext` (#56)

This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-python.git


The following commit(s) were added to refs/heads/master by this push:
     new 12bb587  Add `read_json` to `SessionContext` (#56)
12bb587 is described below

commit 12bb5877eaf0ebd8b60c1732ee7255fc6b2ca647
Author: larskarg <la...@users.noreply.github.com>
AuthorDate: Wed Nov 2 04:09:08 2022 -0500

    Add `read_json` to `SessionContext` (#56)
    
    * Expose read_json
    
    * Add additional tests cases
    
    * Address review comments
    
    * Fix Release Audit Tool error
    
    * Fix fmt issues
    
    * Add empty line to test data
    
    Co-authored-by: Lars <la...@Larss-MacBook-Pro.local>
---
 datafusion/tests/data_test_context/data.json |  3 +++
 datafusion/tests/test_context.py             | 34 ++++++++++++++++++++++++++++
 src/context.rs                               | 32 +++++++++++++++++++++++++-
 3 files changed, 68 insertions(+), 1 deletion(-)

diff --git a/datafusion/tests/data_test_context/data.json b/datafusion/tests/data_test_context/data.json
new file mode 100644
index 0000000..ff895b6
--- /dev/null
+++ b/datafusion/tests/data_test_context/data.json
@@ -0,0 +1,3 @@
+{"A": "a", "B": 1}
+{"A": "b", "B": 2}
+{"A": "c", "B": 3}
diff --git a/datafusion/tests/test_context.py b/datafusion/tests/test_context.py
index 19b9d0e..50bdf43 100644
--- a/datafusion/tests/test_context.py
+++ b/datafusion/tests/test_context.py
@@ -15,6 +15,8 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import os
+
 import pyarrow as pa
 import pyarrow.dataset as ds
 
@@ -181,6 +183,38 @@ def test_table_exist(ctx):
     assert ctx.table_exist("t") is True
 
 
+def test_read_json(ctx):
+    path = os.path.dirname(os.path.abspath(__file__))
+
+    # Default
+    test_data_path = os.path.join(path, "data_test_context", "data.json")
+    df = ctx.read_json(test_data_path)
+    result = df.collect()
+
+    assert result[0].column(0) == pa.array(["a", "b", "c"])
+    assert result[0].column(1) == pa.array([1, 2, 3])
+
+    # Schema
+    schema = pa.schema(
+        [
+            pa.field("A", pa.string(), nullable=True),
+        ]
+    )
+    df = ctx.read_json(test_data_path, schema=schema)
+    result = df.collect()
+
+    assert result[0].column(0) == pa.array(["a", "b", "c"])
+    assert result[0].schema == schema
+
+    # File extension
+    test_data_path = os.path.join(path, "data_test_context", "data.json")
+    df = ctx.read_json(test_data_path, file_extension=".json")
+    result = df.collect()
+
+    assert result[0].column(0) == pa.array(["a", "b", "c"])
+    assert result[0].column(1) == pa.array([1, 2, 3])
+
+
 def test_read_csv(ctx):
     csv_df = ctx.read_csv(path="testing/data/csv/aggregate_test_100.csv")
     csv_df.select(column("c1")).show()
diff --git a/src/context.rs b/src/context.rs
index 9186240..21b3f06 100644
--- a/src/context.rs
+++ b/src/context.rs
@@ -29,7 +29,7 @@ use datafusion::arrow::record_batch::RecordBatch;
 use datafusion::datasource::datasource::TableProvider;
 use datafusion::datasource::MemTable;
 use datafusion::execution::context::{SessionConfig, SessionContext};
-use datafusion::prelude::{AvroReadOptions, CsvReadOptions, ParquetReadOptions};
+use datafusion::prelude::{AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions};
 
 use crate::catalog::{PyCatalog, PyTable};
 use crate::dataframe::PyDataFrame;
@@ -269,6 +269,36 @@ impl PySessionContext {
         Ok(self.ctx.session_id())
     }
 
+    #[allow(clippy::too_many_arguments)]
+    #[args(
+        schema = "None",
+        schema_infer_max_records = "1000",
+        file_extension = "\".json\"",
+        table_partition_cols = "vec![]"
+    )]
+    fn read_json(
+        &mut self,
+        path: PathBuf,
+        schema: Option<PyArrowType<Schema>>,
+        schema_infer_max_records: usize,
+        file_extension: &str,
+        table_partition_cols: Vec<String>,
+        py: Python,
+    ) -> PyResult<PyDataFrame> {
+        let path = path
+            .to_str()
+            .ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?;
+
+        let mut options = NdJsonReadOptions::default().table_partition_cols(table_partition_cols);
+        options.schema = schema.map(|s| Arc::new(s.0));
+        options.schema_infer_max_records = schema_infer_max_records;
+        options.file_extension = file_extension;
+
+        let result = self.ctx.read_json(path, options);
+        let df = wait_for_future(py, result).map_err(DataFusionError::from)?;
+        Ok(PyDataFrame::new(df))
+    }
+
     #[allow(clippy::too_many_arguments)]
     #[args(
         schema = "None",