You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/07/11 11:41:23 UTC

[GitHub] [arrow-rs] alamb commented on a change in pull request #537: Implement `RecordBatch::concat`

alamb commented on a change in pull request #537:
URL: https://github.com/apache/arrow-rs/pull/537#discussion_r667467518



##########
File path: arrow/src/record_batch.rs
##########
@@ -639,4 +669,108 @@ mod tests {
         assert_eq!(batch.column(0).as_ref(), boolean.as_ref());
         assert_eq!(batch.column(1).as_ref(), int.as_ref());
     }
+
+    #[test]
+    fn concat_empty_record_batches() {
+        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
+        let batch = RecordBatch::concat(&schema, &[]).unwrap();
+        assert_eq!(batch.schema().as_ref(), schema.as_ref());
+        assert_eq!(0, batch.num_rows());
+    }
+
+    #[test]
+    fn concat_record_batches_of_same_schema() {
+        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
+        let batch1 = RecordBatch::try_new(
+            schema.clone(),
+            vec![Arc::new(Int32Array::from(vec![1, 2]))],
+        )
+        .unwrap();
+        let batch2 = RecordBatch::try_new(
+            schema.clone(),
+            vec![Arc::new(Int32Array::from(vec![3, 4]))],
+        )
+        .unwrap();
+        let new_batch = RecordBatch::concat(&schema, &[batch1, batch2]).unwrap();
+        assert_eq!(new_batch.schema().as_ref(), schema.as_ref());
+        assert_eq!(4, new_batch.num_rows());
+    }
+
+    #[test]
+    fn concat_record_batches_of_similar_schemas() {

Review comment:
       I would suggest this case be treated as an error -- if a user wants this behavior I think they should explicitly have to specifically pick out the columns they want and create record batches with the same schema

##########
File path: arrow/src/record_batch.rs
##########
@@ -353,6 +354,35 @@ impl RecordBatch {
         let schema = Arc::new(Schema::new(fields));
         RecordBatch::try_new(schema, columns)
     }
+
+    /// Concatenates `batches` together into a single record batch.
+    pub fn concat(schema: &SchemaRef, batches: &[Self]) -> Result<Self> {
+        if batches.is_empty() {
+            return Ok(RecordBatch::new_empty(schema.clone()));
+        }
+        let field_num = schema.fields().len();
+        if let Some((i, _)) = batches
+            .iter()
+            .enumerate()
+            .find(|&(_, batch)| batch.num_columns() < field_num)

Review comment:
       I think if the batches don't have the same schema, an error should result, even if there is some plausible behavior.
   
   My rationale is that it is likely a user error if the schemas don't match, so failing fast will help the programmer identify the problem more easily
   
   So perhaps this function should check `batch.schema() != schema` (aka compare the entire schema for each record batch)

##########
File path: arrow/src/record_batch.rs
##########
@@ -639,4 +669,108 @@ mod tests {
         assert_eq!(batch.column(0).as_ref(), boolean.as_ref());
         assert_eq!(batch.column(1).as_ref(), int.as_ref());
     }
+
+    #[test]
+    fn concat_empty_record_batches() {
+        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
+        let batch = RecordBatch::concat(&schema, &[]).unwrap();
+        assert_eq!(batch.schema().as_ref(), schema.as_ref());
+        assert_eq!(0, batch.num_rows());
+    }
+
+    #[test]
+    fn concat_record_batches_of_same_schema() {
+        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
+        let batch1 = RecordBatch::try_new(
+            schema.clone(),
+            vec![Arc::new(Int32Array::from(vec![1, 2]))],
+        )
+        .unwrap();
+        let batch2 = RecordBatch::try_new(
+            schema.clone(),
+            vec![Arc::new(Int32Array::from(vec![3, 4]))],
+        )
+        .unwrap();
+        let new_batch = RecordBatch::concat(&schema, &[batch1, batch2]).unwrap();
+        assert_eq!(new_batch.schema().as_ref(), schema.as_ref());
+        assert_eq!(4, new_batch.num_rows());
+    }
+
+    #[test]
+    fn concat_record_batches_of_similar_schemas() {
+        let schema1 =
+            Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
+        let schema2 = Arc::new(Schema::new(vec![
+            Field::new("b", DataType::Int32, false),
+            Field::new("c", DataType::Utf8, false),
+        ]));
+        let batch1 = RecordBatch::try_new(
+            schema1.clone(),
+            vec![Arc::new(Int32Array::from(vec![1, 2]))],
+        )
+        .unwrap();
+        let batch2 = RecordBatch::try_new(
+            schema2,
+            vec![
+                Arc::new(Int32Array::from(vec![3, 4])),
+                Arc::new(StringArray::from(vec!["a", "b"])),
+            ],
+        )
+        .unwrap();
+        let new_batch = RecordBatch::concat(&schema1, &[batch1, batch2]).unwrap();
+        assert_eq!(new_batch.schema().as_ref(), schema1.as_ref());
+        assert_eq!(4, new_batch.num_rows());
+    }
+
+    #[test]
+    fn concat_record_batches_of_different_column_num() {
+        let schema1 = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Int32, false),
+            Field::new("b", DataType::Utf8, false),
+        ]));
+        let schema2 =
+            Arc::new(Schema::new(vec![Field::new("c", DataType::Int32, false)]));
+        let batch1 = RecordBatch::try_new(
+            schema1.clone(),
+            vec![
+                Arc::new(Int32Array::from(vec![1, 2])),
+                Arc::new(StringArray::from(vec!["a", "b"])),
+            ],
+        )
+        .unwrap();
+        let batch2 =
+            RecordBatch::try_new(schema2, vec![Arc::new(Int32Array::from(vec![3, 4]))])
+                .unwrap();
+        match RecordBatch::concat(&schema1, &[batch1, batch2]) {
+            Err(ArrowError::InvalidArgumentError(s)) => {
+                assert_eq!(
+            "batches[1] has insufficient number of columns as 2 fields in schema.", s)
+            }
+            _ => panic!("should not be other result"),
+        }

Review comment:
       Another pattern to check errors I have seen  is to use `unwrap_err()`, something like
   
   ```suggestion
           let error = RecordBatch::concat(&schema1, &[batch1, batch2]).unwrap_err();
           assert_eq(error.to_string(), "batches[1] has insufficient number of columns as 2 fields in schema.")
   ```
   
   This one is fine too, however but I figured I would mention it. 

##########
File path: arrow/src/record_batch.rs
##########
@@ -639,4 +669,108 @@ mod tests {
         assert_eq!(batch.column(0).as_ref(), boolean.as_ref());
         assert_eq!(batch.column(1).as_ref(), int.as_ref());
     }
+
+    #[test]
+    fn concat_empty_record_batches() {
+        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
+        let batch = RecordBatch::concat(&schema, &[]).unwrap();
+        assert_eq!(batch.schema().as_ref(), schema.as_ref());
+        assert_eq!(0, batch.num_rows());
+    }
+
+    #[test]
+    fn concat_record_batches_of_same_schema() {
+        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));

Review comment:
       I suggest making the positive case have 2 columns rather than just a single one to get coverage on the loops




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org