You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/11/02 09:09:14 UTC
[arrow-datafusion-python] branch master updated: Add `read_json` to `SessionContext` (#56)
This is an automated email from the ASF dual-hosted git repository.
jiayuliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-python.git
The following commit(s) were added to refs/heads/master by this push:
new 12bb587 Add `read_json` to `SessionContext` (#56)
12bb587 is described below
commit 12bb5877eaf0ebd8b60c1732ee7255fc6b2ca647
Author: larskarg <la...@users.noreply.github.com>
AuthorDate: Wed Nov 2 04:09:08 2022 -0500
Add `read_json` to `SessionContext` (#56)
* Expose read_json
* Add additional tests cases
* Address review comments
* Fix Release Audit Tool error
* Fix fmt issues
* Add empty line to test data
Co-authored-by: Lars <la...@Larss-MacBook-Pro.local>
---
datafusion/tests/data_test_context/data.json | 3 +++
datafusion/tests/test_context.py | 34 ++++++++++++++++++++++++++++
src/context.rs | 32 +++++++++++++++++++++++++-
3 files changed, 68 insertions(+), 1 deletion(-)
diff --git a/datafusion/tests/data_test_context/data.json b/datafusion/tests/data_test_context/data.json
new file mode 100644
index 0000000..ff895b6
--- /dev/null
+++ b/datafusion/tests/data_test_context/data.json
@@ -0,0 +1,3 @@
+{"A": "a", "B": 1}
+{"A": "b", "B": 2}
+{"A": "c", "B": 3}
diff --git a/datafusion/tests/test_context.py b/datafusion/tests/test_context.py
index 19b9d0e..50bdf43 100644
--- a/datafusion/tests/test_context.py
+++ b/datafusion/tests/test_context.py
@@ -15,6 +15,8 @@
# specific language governing permissions and limitations
# under the License.
+import os
+
import pyarrow as pa
import pyarrow.dataset as ds
@@ -181,6 +183,38 @@ def test_table_exist(ctx):
assert ctx.table_exist("t") is True
+def test_read_json(ctx):
+ path = os.path.dirname(os.path.abspath(__file__))
+
+ # Default
+ test_data_path = os.path.join(path, "data_test_context", "data.json")
+ df = ctx.read_json(test_data_path)
+ result = df.collect()
+
+ assert result[0].column(0) == pa.array(["a", "b", "c"])
+ assert result[0].column(1) == pa.array([1, 2, 3])
+
+ # Schema
+ schema = pa.schema(
+ [
+ pa.field("A", pa.string(), nullable=True),
+ ]
+ )
+ df = ctx.read_json(test_data_path, schema=schema)
+ result = df.collect()
+
+ assert result[0].column(0) == pa.array(["a", "b", "c"])
+ assert result[0].schema == schema
+
+ # File extension
+ test_data_path = os.path.join(path, "data_test_context", "data.json")
+ df = ctx.read_json(test_data_path, file_extension=".json")
+ result = df.collect()
+
+ assert result[0].column(0) == pa.array(["a", "b", "c"])
+ assert result[0].column(1) == pa.array([1, 2, 3])
+
+
def test_read_csv(ctx):
csv_df = ctx.read_csv(path="testing/data/csv/aggregate_test_100.csv")
csv_df.select(column("c1")).show()
diff --git a/src/context.rs b/src/context.rs
index 9186240..21b3f06 100644
--- a/src/context.rs
+++ b/src/context.rs
@@ -29,7 +29,7 @@ use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::datasource::TableProvider;
use datafusion::datasource::MemTable;
use datafusion::execution::context::{SessionConfig, SessionContext};
-use datafusion::prelude::{AvroReadOptions, CsvReadOptions, ParquetReadOptions};
+use datafusion::prelude::{AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions};
use crate::catalog::{PyCatalog, PyTable};
use crate::dataframe::PyDataFrame;
@@ -269,6 +269,36 @@ impl PySessionContext {
Ok(self.ctx.session_id())
}
+ #[allow(clippy::too_many_arguments)]
+ #[args(
+ schema = "None",
+ schema_infer_max_records = "1000",
+ file_extension = "\".json\"",
+ table_partition_cols = "vec![]"
+ )]
+ fn read_json(
+ &mut self,
+ path: PathBuf,
+ schema: Option<PyArrowType<Schema>>,
+ schema_infer_max_records: usize,
+ file_extension: &str,
+ table_partition_cols: Vec<String>,
+ py: Python,
+ ) -> PyResult<PyDataFrame> {
+ let path = path
+ .to_str()
+ .ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?;
+
+ let mut options = NdJsonReadOptions::default().table_partition_cols(table_partition_cols);
+ options.schema = schema.map(|s| Arc::new(s.0));
+ options.schema_infer_max_records = schema_infer_max_records;
+ options.file_extension = file_extension;
+
+ let result = self.ctx.read_json(path, options);
+ let df = wait_for_future(py, result).map_err(DataFusionError::from)?;
+ Ok(PyDataFrame::new(df))
+ }
+
#[allow(clippy::too_many_arguments)]
#[args(
schema = "None",