You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/01/14 06:37:23 UTC

[GitHub] [arrow-datafusion] jorgecarleitao commented on a change in pull request #1556: Officially maintained Arrow2 branch

jorgecarleitao commented on a change in pull request #1556:
URL: https://github.com/apache/arrow-datafusion/pull/1556#discussion_r784507391



##########
File path: datafusion/Cargo.toml
##########
@@ -74,14 +76,24 @@ regex = { version = "^1.4.3", optional = true }
 lazy_static = { version = "^1.4.0" }
 smallvec = { version = "1.6", features = ["union"] }
 rand = "0.8"
-avro-rs = { version = "0.13", features = ["snappy"], optional = true }
 num-traits = { version = "0.2", optional = true }
 pyo3 = { version = "0.14", optional = true }
+avro-schema = { version = "0.2", optional = true }

Review comment:
       Is this needed?

##########
File path: datafusion/src/logical_plan/dfschema.rs
##########
@@ -536,9 +536,10 @@ mod tests {
     fn from_qualified_schema_into_arrow_schema() -> Result<()> {
         let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
         let arrow_schema: Schema = schema.into();
-        let expected = "Field { name: \"c0\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None }, \

Review comment:
       note how `dict_id` and `dict_is_ordered`, two confusing attributes of `Field`, were removed. `dict_is_ordered` is now on the `DataType::Dictionary` itself, which allows implementations to leverage this flag in compute directly, `dict_id` is now only required when writing to IPC (via a different mechanism)

##########
File path: ballista/rust/core/src/execution_plans/shuffle_writer.rs
##########
@@ -532,6 +541,7 @@ mod tests {
             .unwrap();
 
         let num_rows = stats
+            // see https://github.com/jorgecarleitao/arrow2/pull/416 for fix

Review comment:
       ```suggestion
   ```

##########
File path: ballista/rust/core/Cargo.toml
##########
@@ -35,23 +35,21 @@ async-trait = "0.1.36"
 futures = "0.3"
 hashbrown = "0.11"
 log = "0.4"
-prost = "0.8"
+prost = "0.9"
 serde = {version = "1", features = ["derive"]}
 sqlparser = "0.13"
 tokio = "1.0"
-tonic = "0.5"
+tonic = "0.6"
 uuid = { version = "0.8", features = ["v4"] }
 chrono = { version = "0.4", default-features = false }
 
-# workaround for https://github.com/apache/arrow-datafusion/issues/1498
-# should be able to remove when we update arrow-flight
-quote = "=1.0.10"
-arrow-flight = { version = "6.4.0"  }
+arrow-format = { version = "0.3", features = ["flight-data", "flight-service"] }

Review comment:
       `arrow-format` is an auxiliary crate that only contains the IPC and flight. This is so that we can more easily follow changes to the Arrow spec there (and/or particular libs we use to derive proto/flat buffers).
   

##########
File path: datafusion/src/physical_plan/file_format/json.rs
##########
@@ -50,6 +54,43 @@ impl NdJsonExec {
     }
 }
 
+// TODO: implement iterator in upstream json::Reader type
+struct JsonBatchReader<R: Read> {
+    reader: R,
+    schema: SchemaRef,
+    batch_size: usize,
+    proj: Option<Vec<String>>,
+}
+
+impl<R: BufRead> Iterator for JsonBatchReader<R> {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        // json::read::read_rows iterates on the empty vec and reads at most n rows
+        let mut rows = vec![String::default(); self.batch_size];

Review comment:
       This can be moved to `JsonBatchReader` and re-used across batches (that is the idea, at least)

##########
File path: datafusion-examples/examples/simple_udaf.rs
##########
@@ -37,11 +37,11 @@ fn create_context() -> Result<ExecutionContext> {
     // define data in two partitions
     let batch1 = RecordBatch::try_new(
         schema.clone(),
-        vec![Arc::new(Float32Array::from(vec![2.0, 4.0, 8.0]))],
+        vec![Arc::new(Float32Array::from_values(vec![2.0, 4.0, 8.0]))],

Review comment:
       ```suggestion
           vec![Arc::new(Float32Array::from_slice([2.0, 4.0, 8.0]))],
   ```

##########
File path: datafusion/Cargo.toml
##########
@@ -39,25 +39,27 @@ path = "src/lib.rs"
 
 [features]
 default = ["crypto_expressions", "regex_expressions", "unicode_expressions"]
-simd = ["arrow/simd"]
+# FIXME: https://github.com/jorgecarleitao/arrow2/issues/580
+#simd = ["arrow/simd"]
+simd = []
 crypto_expressions = ["md-5", "sha2", "blake2", "blake3"]
 regex_expressions = ["regex"]
 unicode_expressions = ["unicode-segmentation"]
-pyarrow = ["pyo3", "arrow/pyarrow"]
+# FIXME: add pyarrow support to arrow2 pyarrow = ["pyo3", "arrow/pyarrow"]
+pyarrow = ["pyo3"]
 # Used for testing ONLY: causes all values to hash to the same value (test for collisions)
 force_hash_collisions = []
 # Used to enable the avro format
-avro = ["avro-rs", "num-traits"]
+avro = ["arrow/io_avro", "arrow/io_avro_async", "arrow/io_avro_compression", "num-traits", "avro-schema"]
 
 [dependencies]
 ahash = { version = "0.7", default-features = false }
 hashbrown = { version = "0.11", features = ["raw"] }
-arrow = { version = "6.4.0", features = ["prettyprint"] }
-parquet = { version = "6.4.0", features = ["arrow"] }
+parquet = { package = "parquet2", version = "0.8", default_features = false, features = ["stream"] }
 sqlparser = "0.13"
 paste = "^1.0"
 num_cpus = "1.13.0"
-chrono = { version = "0.4", default-features = false }
+chrono = { version = "0.4", default-features = false, features = ["clock"] }

Review comment:
       Note that this feature has a known vulnerability. `arrow-rs` depends on it, `arrow2` does not (casting from utf8 to datetime is consistent with the arrow spec's definitions of timezone-aware timezones). It seems that datafusion depends on this (due to how postgres casts string to datetimes?)

##########
File path: datafusion-examples/examples/simple_udaf.rs
##########
@@ -37,11 +37,11 @@ fn create_context() -> Result<ExecutionContext> {
     // define data in two partitions
     let batch1 = RecordBatch::try_new(
         schema.clone(),
-        vec![Arc::new(Float32Array::from(vec![2.0, 4.0, 8.0]))],
+        vec![Arc::new(Float32Array::from_values(vec![2.0, 4.0, 8.0]))],
     )?;
     let batch2 = RecordBatch::try_new(
         schema.clone(),
-        vec![Arc::new(Float32Array::from(vec![64.0]))],
+        vec![Arc::new(Float32Array::from_values(vec![64.0]))],

Review comment:
       ```suggestion
           vec![Arc::new(Float32Array::from_slice([64.0]))],
   ```

##########
File path: datafusion/src/physical_plan/hash_join.rs
##########
@@ -681,8 +666,8 @@ fn build_join_indexes(
     match join_type {
         JoinType::Inner | JoinType::Semi | JoinType::Anti => {
             // Using a buffer builder to avoid slower normal builder
-            let mut left_indices = UInt64BufferBuilder::new(0);
-            let mut right_indices = UInt32BufferBuilder::new(0);
+            let mut left_indices = Vec::<u64>::new();

Review comment:
       this is another major difference: arrow2 is interoperable with `Vec`: `Buffer<T>` implements `From<Vec<T>>`

##########
File path: datafusion/Cargo.toml
##########
@@ -39,25 +39,27 @@ path = "src/lib.rs"
 
 [features]
 default = ["crypto_expressions", "regex_expressions", "unicode_expressions"]
-simd = ["arrow/simd"]
+# FIXME: https://github.com/jorgecarleitao/arrow2/issues/580
+#simd = ["arrow/simd"]
+simd = []

Review comment:
       ```suggestion
   simd = ["arrow/simd"]
   ```

##########
File path: datafusion/src/datasource/file_format/parquet.rs
##########
@@ -342,12 +332,12 @@ mod tests {
 
     use super::*;
     use arrow::array::{
-        BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
-        TimestampNanosecondArray,
+        BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array,
     };
     use futures::StreamExt;
 
     #[tokio::test]
+    /// Parquet2 lacks the ability to set batch size for parquet reader

Review comment:
       it is by design: parquet's unit of parallel work is the column chunk. Setting a batch size breaks that unit of work since it requires shared state and synchronization across column chunks (longer version here: https://medium.com/@henry.yijieshen/for-csv-or-the-like-we-could-split-file-by-offsets-and-take-special-care-for-those-records-spread-3ff43195f90)
   
   The upsell is that arrow2 supports parallel deserialization out of the box, e.g. [this example](https://github.com/jorgecarleitao/arrow2/blob/main/examples/parquet_read_parallel/src/main.rs), which allows to break from the usual one thread one file constraint found in other systems. In Polars we saw a 1.5-2x speedup when doing this (they use rayon, but the principle applies).
   

##########
File path: ballista-examples/src/bin/ballista-sql.rs
##########
@@ -27,7 +27,7 @@ async fn main() -> Result<()> {
         .build()?;
     let ctx = BallistaContext::remote("localhost", 50050, &config);
 
-    let testdata = datafusion::arrow::util::test_util::arrow_test_data();

Review comment:
       this was made private in `arrow2` because all tests in arrow2 are outside `src/`, so that adding tests does not require re-compiling the crate (and makes it easier to follow which changes are to the source and which changes are to the tests)

##########
File path: datafusion/Cargo.toml
##########
@@ -74,14 +76,24 @@ regex = { version = "^1.4.3", optional = true }
 lazy_static = { version = "^1.4.0" }
 smallvec = { version = "1.6", features = ["union"] }
 rand = "0.8"
-avro-rs = { version = "0.13", features = ["snappy"], optional = true }
 num-traits = { version = "0.2", optional = true }
 pyo3 = { version = "0.14", optional = true }
+avro-schema = { version = "0.2", optional = true }
+
+[dependencies.arrow]
+package = "arrow2"
+version="0.8"
+features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "io_print", "ahash",
+    "compute_merge_sort", "compute_concatenate", "compute_regex_match", "compute_arithmetics",
+    "compute_cast", "compute_partition", "compute_temporal", "compute_take", "compute_aggregate",
+    "compute_comparison", "compute_if_then_else", "compute_nullif", "compute_boolean", "compute_length",
+    "compute_limit", "compute_boolean_kleene", "compute_like", "compute_filter", "compute_window",]

Review comment:
       `"compute"` feature is an alias for all `compute_*` features.

##########
File path: datafusion/tests/parquet_pruning.rs
##########
@@ -697,13 +730,17 @@ fn make_timestamp_batch(offset: Duration) -> RecordBatch {
         .map(|(i, _)| format!("Row {} + {}", i, offset))
         .collect::<Vec<_>>();
 
-    let arr_nanos = TimestampNanosecondArray::from_opt_vec(ts_nanos, None);
-    let arr_micros = TimestampMicrosecondArray::from_opt_vec(ts_micros, None);
-    let arr_millis = TimestampMillisecondArray::from_opt_vec(ts_millis, None);
-    let arr_seconds = TimestampSecondArray::from_opt_vec(ts_seconds, None);
+    let arr_nanos = PrimitiveArray::<i64>::from(ts_nanos)
+        .to(DataType::Timestamp(TimeUnit::Nanosecond, None));

Review comment:
       this is an example of why it is so easy to add timezone support in arrow2 - the physical type (`i64`) is separated from the logical type `DataType::Timestamp(TimeUnit::Nanosecond, None)` so that we "attach" logical types to the (physical) array.
   

##########
File path: ballista/rust/core/src/execution_plans/shuffle_writer.rs
##########
@@ -567,6 +577,7 @@ mod tests {
             .downcast_ref::<StructArray>()
             .unwrap();
         let num_rows = stats
+            // see https://github.com/jorgecarleitao/arrow2/pull/416 for fix

Review comment:
       ```suggestion
   ```

##########
File path: datafusion-examples/examples/flight_client.rs
##########
@@ -57,23 +55,26 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
     // the schema should be the first message returned, else client should error
     let flight_data = stream.message().await?.unwrap();
     // convert FlightData to a stream
-    let schema = Arc::new(Schema::try_from(&flight_data)?);
+    let (schema, ipc_schema) =
+        deserialize_schemas(flight_data.data_body.as_slice()).unwrap();
+    let schema = Arc::new(schema);
     println!("Schema: {:?}", schema);
 
     // all the remaining stream messages should be dictionary and record batches
     let mut results = vec![];
-    let dictionaries_by_field = vec![None; schema.fields().len()];
+    let dictionaries_by_field = HashMap::new();

Review comment:
       this is a subtle change for interoperability with the arrow spec - arrow2 implements the complete arrow specification (all official integration tests pass against C++, [diff on the tests](https://github.com/jorgecarleitao/arrow2/blob/main/integration-testing/unskip.patch)).

##########
File path: ballista/rust/core/src/utils.rs
##########
@@ -30,16 +30,17 @@ use crate::memory_stream::MemoryStream;
 use crate::serde::scheduler::PartitionStats;
 
 use crate::config::BallistaConfig;
+use arrow::io::ipc::write::WriteOptions;

Review comment:
       ```suggestion
   use datafusion::arrow::io::ipc::write::WriteOptions;
   ```
   
   for consistency ^_^

##########
File path: datafusion/src/avro_to_arrow/reader.rs
##########
@@ -101,30 +100,49 @@ impl ReaderBuilder {
     }
 
     /// Create a new `Reader` from the `ReaderBuilder`
-    pub fn build<'a, R>(self, source: R) -> Result<Reader<'a, R>>
+    pub fn build<R>(self, source: R) -> Result<Reader<R>>
     where
         R: Read + Seek,
     {
         let mut source = source;
 
         // check if schema should be inferred
-        let schema = match self.schema {
-            Some(schema) => schema,
-            None => Arc::new(super::read_avro_schema_from_reader(&mut source)?),
-        };
         source.seek(SeekFrom::Start(0))?;
-        Reader::try_new(source, schema, self.batch_size, self.projection)
+        let (mut avro_schemas, mut schema, codec, file_marker) =
+            read::read_metadata(&mut source)?;
+        if let Some(proj) = self.projection {

Review comment:
       projections on avro files are not tested in arrow2. Note that a projection in a row-based format is not so appealing because we still need to read the whole row to extract the correct columns and thus the bulk of the cost is still there.
   
   A more defensive approach here is to perform the projection after read (i.e. remove columns from the RecordBatch). Filled upstream: https://github.com/jorgecarleitao/arrow2/issues/764

##########
File path: datafusion/src/datasource/object_store/mod.rs
##########
@@ -33,6 +33,12 @@ use local::LocalFileSystem;
 
 use crate::error::{DataFusionError, Result};
 
+/// Both Read and Seek
+pub trait ReadSeek: Read + Seek {}
+
+impl<R: Read + Seek> ReadSeek for std::io::BufReader<R> {}
+impl<R: AsRef<[u8]>> ReadSeek for std::io::Cursor<R> {}

Review comment:
       Not sure the trait is needed, but 
   
   ```suggestion
   impl<T: Read + Seek> ReadSeek for T {}
   ```
   
   should generalize for everything.
   

##########
File path: datafusion/src/datasource/file_format/csv.rs
##########
@@ -96,18 +97,30 @@ impl FileFormat for CsvFormat {
         let mut records_to_read = self.schema_infer_max_rec.unwrap_or(std::usize::MAX);
 
         while let Some(obj_reader) = readers.next().await {
-            let mut reader = obj_reader?.sync_reader()?;
-            let (schema, records_read) = arrow::csv::reader::infer_reader_schema(
+            let mut reader = csv::read::ReaderBuilder::new()
+                .delimiter(self.delimiter)
+                .has_headers(self.has_header)
+                .from_reader(obj_reader?.sync_reader()?);
+
+            let schema = csv::read::infer_schema(
                 &mut reader,
-                self.delimiter,
                 Some(records_to_read),
                 self.has_header,
+                &csv::read::infer,
             )?;
-            if records_read == 0 {
-                continue;
-            }
+
+            // if records_read == 0 {
+            //     continue;
+            // }
+            // schemas.push(schema.clone());
+            // records_to_read -= records_read;
+            // if records_to_read == 0 {
+            //     break;
+            // }
+            //
+            // FIXME: return recods_read from infer_schema

Review comment:
       Addressed here: https://github.com/jorgecarleitao/arrow2/pull/765

##########
File path: datafusion/src/physical_plan/projection.rs
##########
@@ -70,16 +70,15 @@ impl ProjectionExec {
                     e.data_type(&input_schema)?,
                     e.nullable(&input_schema)?,
                 );
-                field.set_metadata(get_field_metadata(e, &input_schema));
+                if let Some(metadata) = get_field_metadata(e, &input_schema) {
+                    field = field.with_metadata(metadata);

Review comment:
       ```field.metadata = metadata;```
   
   would also work when field is `mut` (since everything is public in `Field`)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org