You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/06/27 18:15:22 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2797: Add support for month & year intervals

alamb commented on code in PR #2797:
URL: https://github.com/apache/arrow-datafusion/pull/2797#discussion_r907668027


##########
datafusion/physical-expr/src/expressions/datetime.rs:
##########
@@ -86,76 +89,114 @@ impl PhysicalExpr for DateIntervalExpr {
         let dates = self.lhs.evaluate(batch)?;
         let intervals = self.rhs.evaluate(batch)?;
 
-        let interval = match intervals {
-            ColumnarValue::Scalar(interval) => match interval {
-                ScalarValue::IntervalDayTime(Some(interval)) => interval as i32,
-                ScalarValue::IntervalYearMonth(Some(_)) => {
-                    return Err(DataFusionError::Execution(
-                        "DateIntervalExpr does not support IntervalYearMonth".to_string(),
-                    ))
-                }
-                ScalarValue::IntervalMonthDayNano(Some(_)) => {
-                    return Err(DataFusionError::Execution(
-                        "DateIntervalExpr does not support IntervalMonthDayNano"
-                            .to_string(),
-                    ))
-                }
-                other => {
-                    return Err(DataFusionError::Execution(format!(
-                        "DateIntervalExpr does not support non-interval type {:?}",
-                        other
-                    )))
-                }
-            },
-            _ => {
-                return Err(DataFusionError::Execution(
-                    "Columnar execution is not yet supported for DateIntervalExpr"
-                        .to_string(),
-                ))
+        // Unwrap days since epoch
+        let operand = match dates {
+            ColumnarValue::Scalar(scalar) => scalar,
+            _ => Err(DataFusionError::Execution(
+                "Columnar execution is not yet supported for DateIntervalExpr"
+                    .to_string(),
+            ))?,
+        };
+
+        // Convert to NaiveDate
+        let epoch = NaiveDate::from_ymd(1970, 1, 1);
+        let prior = match operand {
+            ScalarValue::Date32(Some(date)) => {
+                epoch.add(chrono::Duration::days(date as i64))
             }
+            ScalarValue::Date64(Some(date)) => epoch.add(chrono::Duration::days(date)),
+            _ => Err(DataFusionError::Execution(format!(
+                "Invalid lhs type for DateIntervalExpr: {:?}",
+                operand
+            )))?,
         };
 
-        match dates {
-            ColumnarValue::Scalar(scalar) => match scalar {
-                ScalarValue::Date32(Some(date)) => match &self.op {
-                    Operator::Plus => Ok(ColumnarValue::Scalar(ScalarValue::Date32(
-                        Some(date + interval),
-                    ))),
-                    Operator::Minus => Ok(ColumnarValue::Scalar(ScalarValue::Date32(
-                        Some(date - interval),
-                    ))),
-                    _ => {
-                        // this should be unreachable because we check the operators in `try_new`
-                        Err(DataFusionError::Execution(
-                            "Invalid operator for DateIntervalExpr".to_string(),
-                        ))
-                    }
-                },
-                ScalarValue::Date64(Some(date)) => match &self.op {
-                    Operator::Plus => Ok(ColumnarValue::Scalar(ScalarValue::Date64(
-                        Some(date + interval as i64),
-                    ))),
-                    Operator::Minus => Ok(ColumnarValue::Scalar(ScalarValue::Date64(
-                        Some(date - interval as i64),
-                    ))),
-                    _ => {
-                        // this should be unreachable because we check the operators in `try_new`
-                        Err(DataFusionError::Execution(
-                            "Invalid operator for DateIntervalExpr".to_string(),
-                        ))
-                    }
-                },
-                _ => {
-                    // this should be unreachable because we check the types in `try_new`
-                    Err(DataFusionError::Execution(
-                        "Invalid lhs type for DateIntervalExpr".to_string(),
-                    ))
-                }
-            },
+        // Unwrap interval to add
+        let scalar = match &intervals {
+            ColumnarValue::Scalar(interval) => interval,
             _ => Err(DataFusionError::Execution(
                 "Columnar execution is not yet supported for DateIntervalExpr"
                     .to_string(),
-            )),
-        }
+            ))?,
+        };
+
+        // Negate for subtraction
+        let interval = match &scalar {
+            ScalarValue::IntervalDayTime(Some(interval)) => *interval,
+            ScalarValue::IntervalYearMonth(Some(interval)) => *interval as i64,
+            ScalarValue::IntervalMonthDayNano(Some(_interval)) => {
+                Err(DataFusionError::Execution(
+                    "DateIntervalExpr does not support IntervalMonthDayNano".to_string(),
+                ))?
+            }
+            other => Err(DataFusionError::Execution(format!(
+                "DateIntervalExpr does not support non-interval type {:?}",
+                other
+            )))?,
+        };
+        let interval = match &self.op {
+            Operator::Plus => interval,
+            Operator::Minus => -interval,
+            _ => {
+                // this should be unreachable because we check the operators in `try_new`
+                Err(DataFusionError::Execution(
+                    "Invalid operator for DateIntervalExpr".to_string(),
+                ))?
+            }
+        };
+
+        // Add interval
+        let posterior = match scalar {

Review Comment:
   It would b great to eventually put some/all of this logic into the arrow-rs kernels -- for example
   
   https://docs.rs/arrow/16.0.0/arrow/compute/kernels/temporal/index.html
   
   or perhaps https://docs.rs/arrow/16.0.0/arrow/compute/kernels/arithmetic/fn.add.html
   
   That would also likely result in support for columnar execution support (aka adding a column of integers)
   
   Maybe we can (I will file a ticket) start with kernels in datafusion and then port them to arrow.rs



##########
datafusion/core/tests/sql/timestamp.rs:
##########
@@ -814,3 +814,83 @@ async fn group_by_timestamp_millis() -> Result<()> {
     assert_batches_eq!(expected, &actual);
     Ok(())
 }
+
+#[tokio::test]
+async fn interval_year() -> Result<()> {
+    let ctx = SessionContext::new();
+
+    let sql = "select date '1994-01-01' + interval '1' year as date;";
+    let results = execute_to_batches(&ctx, sql).await;
+
+    let expected = vec![
+        "+------------+",
+        "| date       |",
+        "+------------+",
+        "| 1995-01-01 |",
+        "+------------+",
+    ];
+
+    assert_batches_eq!(expected, &results);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn add_interval_month() -> Result<()> {
+    let ctx = SessionContext::new();
+
+    let sql = "select date '1994-01-31' + interval '1' month as date;";
+    let results = execute_to_batches(&ctx, sql).await;
+
+    let expected = vec![
+        "+------------+",
+        "| date       |",
+        "+------------+",
+        "| 1994-02-28 |",
+        "+------------+",
+    ];
+
+    assert_batches_eq!(expected, &results);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn sub_interval_month() -> Result<()> {
+    let ctx = SessionContext::new();
+
+    let sql = "select date '1994-03-31' - interval '1' month as date;";
+    let results = execute_to_batches(&ctx, sql).await;
+
+    let expected = vec![
+        "+------------+",
+        "| date       |",
+        "+------------+",
+        "| 1994-02-28 |",
+        "+------------+",
+    ];
+
+    assert_batches_eq!(expected, &results);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn sub_month_wrap() -> Result<()> {

Review Comment:
   nice



##########
datafusion/physical-expr/src/expressions/datetime.rs:
##########
@@ -86,76 +89,114 @@ impl PhysicalExpr for DateIntervalExpr {
         let dates = self.lhs.evaluate(batch)?;
         let intervals = self.rhs.evaluate(batch)?;
 
-        let interval = match intervals {
-            ColumnarValue::Scalar(interval) => match interval {
-                ScalarValue::IntervalDayTime(Some(interval)) => interval as i32,
-                ScalarValue::IntervalYearMonth(Some(_)) => {
-                    return Err(DataFusionError::Execution(
-                        "DateIntervalExpr does not support IntervalYearMonth".to_string(),
-                    ))
-                }
-                ScalarValue::IntervalMonthDayNano(Some(_)) => {
-                    return Err(DataFusionError::Execution(
-                        "DateIntervalExpr does not support IntervalMonthDayNano"
-                            .to_string(),
-                    ))
-                }
-                other => {
-                    return Err(DataFusionError::Execution(format!(
-                        "DateIntervalExpr does not support non-interval type {:?}",
-                        other
-                    )))
-                }
-            },
-            _ => {
-                return Err(DataFusionError::Execution(
-                    "Columnar execution is not yet supported for DateIntervalExpr"
-                        .to_string(),
-                ))
+        // Unwrap days since epoch
+        let operand = match dates {
+            ColumnarValue::Scalar(scalar) => scalar,
+            _ => Err(DataFusionError::Execution(
+                "Columnar execution is not yet supported for DateIntervalExpr"
+                    .to_string(),
+            ))?,
+        };
+
+        // Convert to NaiveDate
+        let epoch = NaiveDate::from_ymd(1970, 1, 1);
+        let prior = match operand {
+            ScalarValue::Date32(Some(date)) => {
+                epoch.add(chrono::Duration::days(date as i64))
             }
+            ScalarValue::Date64(Some(date)) => epoch.add(chrono::Duration::days(date)),
+            _ => Err(DataFusionError::Execution(format!(
+                "Invalid lhs type for DateIntervalExpr: {:?}",
+                operand
+            )))?,
         };
 
-        match dates {
-            ColumnarValue::Scalar(scalar) => match scalar {
-                ScalarValue::Date32(Some(date)) => match &self.op {
-                    Operator::Plus => Ok(ColumnarValue::Scalar(ScalarValue::Date32(
-                        Some(date + interval),
-                    ))),
-                    Operator::Minus => Ok(ColumnarValue::Scalar(ScalarValue::Date32(
-                        Some(date - interval),
-                    ))),
-                    _ => {
-                        // this should be unreachable because we check the operators in `try_new`
-                        Err(DataFusionError::Execution(
-                            "Invalid operator for DateIntervalExpr".to_string(),
-                        ))
-                    }
-                },
-                ScalarValue::Date64(Some(date)) => match &self.op {
-                    Operator::Plus => Ok(ColumnarValue::Scalar(ScalarValue::Date64(
-                        Some(date + interval as i64),
-                    ))),
-                    Operator::Minus => Ok(ColumnarValue::Scalar(ScalarValue::Date64(
-                        Some(date - interval as i64),
-                    ))),
-                    _ => {
-                        // this should be unreachable because we check the operators in `try_new`
-                        Err(DataFusionError::Execution(
-                            "Invalid operator for DateIntervalExpr".to_string(),
-                        ))
-                    }
-                },
-                _ => {
-                    // this should be unreachable because we check the types in `try_new`
-                    Err(DataFusionError::Execution(
-                        "Invalid lhs type for DateIntervalExpr".to_string(),
-                    ))
-                }
-            },
+        // Unwrap interval to add
+        let scalar = match &intervals {
+            ColumnarValue::Scalar(interval) => interval,
             _ => Err(DataFusionError::Execution(
                 "Columnar execution is not yet supported for DateIntervalExpr"
                     .to_string(),
-            )),
-        }
+            ))?,
+        };
+
+        // Negate for subtraction
+        let interval = match &scalar {
+            ScalarValue::IntervalDayTime(Some(interval)) => *interval,
+            ScalarValue::IntervalYearMonth(Some(interval)) => *interval as i64,
+            ScalarValue::IntervalMonthDayNano(Some(_interval)) => {
+                Err(DataFusionError::Execution(
+                    "DateIntervalExpr does not support IntervalMonthDayNano".to_string(),
+                ))?
+            }
+            other => Err(DataFusionError::Execution(format!(
+                "DateIntervalExpr does not support non-interval type {:?}",
+                other
+            )))?,
+        };
+        let interval = match &self.op {
+            Operator::Plus => interval,
+            Operator::Minus => -interval,
+            _ => {
+                // this should be unreachable because we check the operators in `try_new`
+                Err(DataFusionError::Execution(
+                    "Invalid operator for DateIntervalExpr".to_string(),
+                ))?
+            }
+        };
+
+        // Add interval
+        let posterior = match scalar {
+            ScalarValue::IntervalDayTime(Some(_)) => {
+                prior.add(chrono::Duration::days(interval))
+            }
+            ScalarValue::IntervalYearMonth(Some(_)) => {
+                let target = add_months(prior, interval);
+                let target_plus = add_months(target, 1);
+                let last_day = target_plus.sub(chrono::Duration::days(1));
+                let day = min(prior.day(), last_day.day());

Review Comment:
   I don't really understand this logic -- I would have expected that code would split the interval into the 3 three fields (month, day, nano) and then use the various Chrono operations to do the timestamp arithmetic
   
   https://github.com/apache/arrow/blob/master/format/Schema.fbs#L354-L375



##########
datafusion/common/src/scalar.rs:
##########
@@ -75,9 +76,9 @@ pub enum ScalarValue {
     LargeBinary(Option<Vec<u8>>),
     /// list of nested ScalarValue
     List(Option<Vec<ScalarValue>>, Box<DataType>),
-    /// Date stored as a signed 32bit int
+    /// Date stored as a signed 32bit int days since UNIX epoch 1970-01-01
     Date32(Option<i32>),
-    /// Date stored as a signed 64bit int
+    /// Date stored as a signed 64bit int days since UNIX epoch 1970-01-01

Review Comment:
   ```suggestion
       /// Date stored as a signed 64bit int milliseconds since UNIX epoch 1970-01-01
   ```
   
   I think 64bits are milliseconds rather than days -- format reference: https://github.com/apache/arrow/blob/master/format/Schema.fbs#L202-L210



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org