You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/07/02 11:10:59 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2797: Add support for month & year intervals

alamb commented on code in PR #2797:
URL: https://github.com/apache/arrow-datafusion/pull/2797#discussion_r912350722


##########
datafusion/physical-expr/src/expressions/datetime.rs:
##########
@@ -74,88 +77,123 @@ impl PhysicalExpr for DateIntervalExpr {
         self
     }
 
-    fn data_type(&self, input_schema: &Schema) -> datafusion_common::Result<DataType> {
+    fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
         self.lhs.data_type(input_schema)
     }
 
-    fn nullable(&self, input_schema: &Schema) -> datafusion_common::Result<bool> {
+    fn nullable(&self, input_schema: &Schema) -> Result<bool> {
         self.lhs.nullable(input_schema)
     }
 
-    fn evaluate(&self, batch: &RecordBatch) -> datafusion_common::Result<ColumnarValue> {
+    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
         let dates = self.lhs.evaluate(batch)?;
         let intervals = self.rhs.evaluate(batch)?;
 
-        let interval = match intervals {
-            ColumnarValue::Scalar(interval) => match interval {
-                ScalarValue::IntervalDayTime(Some(interval)) => interval as i32,
-                ScalarValue::IntervalYearMonth(Some(_)) => {
-                    return Err(DataFusionError::Execution(
-                        "DateIntervalExpr does not support IntervalYearMonth".to_string(),
-                    ))
-                }
-                ScalarValue::IntervalMonthDayNano(Some(_)) => {
-                    return Err(DataFusionError::Execution(
-                        "DateIntervalExpr does not support IntervalMonthDayNano"
-                            .to_string(),
-                    ))
-                }
-                other => {
-                    return Err(DataFusionError::Execution(format!(
-                        "DateIntervalExpr does not support non-interval type {:?}",
-                        other
-                    )))
-                }
-            },
-            _ => {
-                return Err(DataFusionError::Execution(
-                    "Columnar execution is not yet supported for DateIntervalExpr"
-                        .to_string(),
-                ))
-            }
+        // Unwrap days since epoch
+        let operand = match dates {
+            ColumnarValue::Scalar(scalar) => scalar,
+            _ => Err(DataFusionError::Execution(
+                "Columnar execution is not yet supported for DateIntervalExpr"
+                    .to_string(),
+            ))?,
         };
 
-        match dates {
-            ColumnarValue::Scalar(scalar) => match scalar {
-                ScalarValue::Date32(Some(date)) => match &self.op {
-                    Operator::Plus => Ok(ColumnarValue::Scalar(ScalarValue::Date32(
-                        Some(date + interval),
-                    ))),
-                    Operator::Minus => Ok(ColumnarValue::Scalar(ScalarValue::Date32(
-                        Some(date - interval),
-                    ))),
-                    _ => {
-                        // this should be unreachable because we check the operators in `try_new`
-                        Err(DataFusionError::Execution(
-                            "Invalid operator for DateIntervalExpr".to_string(),
-                        ))
-                    }
-                },
-                ScalarValue::Date64(Some(date)) => match &self.op {
-                    Operator::Plus => Ok(ColumnarValue::Scalar(ScalarValue::Date64(
-                        Some(date + interval as i64),
-                    ))),
-                    Operator::Minus => Ok(ColumnarValue::Scalar(ScalarValue::Date64(
-                        Some(date - interval as i64),
-                    ))),
-                    _ => {
-                        // this should be unreachable because we check the operators in `try_new`
-                        Err(DataFusionError::Execution(
-                            "Invalid operator for DateIntervalExpr".to_string(),
-                        ))
-                    }
-                },
-                _ => {
-                    // this should be unreachable because we check the types in `try_new`
-                    Err(DataFusionError::Execution(
-                        "Invalid lhs type for DateIntervalExpr".to_string(),
-                    ))
-                }
-            },
+        // Convert to NaiveDate
+        let epoch = NaiveDate::from_ymd(1970, 1, 1);
+        let prior = match operand {
+            ScalarValue::Date32(Some(d)) => epoch.add(Duration::days(d as i64)),
+            ScalarValue::Date64(Some(ms)) => epoch.add(Duration::milliseconds(ms)),
+            _ => Err(DataFusionError::Execution(format!(
+                "Invalid lhs type for DateIntervalExpr: {:?}",
+                operand
+            )))?,
+        };
+
+        // Unwrap interval to add
+        let scalar = match &intervals {
+            ColumnarValue::Scalar(interval) => interval,
             _ => Err(DataFusionError::Execution(
                 "Columnar execution is not yet supported for DateIntervalExpr"
                     .to_string(),
-            )),
-        }
+            ))?,
+        };
+
+        // Invert sign for subtraction
+        let sign = match &self.op {
+            Operator::Plus => 1,
+            Operator::Minus => -1,
+            _ => {
+                // this should be unreachable because we check the operators in `try_new`
+                Err(DataFusionError::Execution(
+                    "Invalid operator for DateIntervalExpr".to_string(),
+                ))?
+            }
+        };
+
+        // Do math
+        let posterior = match scalar {
+            ScalarValue::IntervalDayTime(Some(i)) => add_day_time(prior, *i, sign),
+            ScalarValue::IntervalYearMonth(Some(i)) => add_months(prior, *i * sign),
+            ScalarValue::IntervalMonthDayNano(Some(i)) => add_m_d_nano(prior, *i, sign),
+            other => Err(DataFusionError::Execution(format!(
+                "DateIntervalExpr does not support non-interval type {:?}",
+                other
+            )))?,
+        };
+
+        // convert back
+        let res = match operand {
+            ScalarValue::Date32(Some(_)) => {
+                let days = posterior.sub(epoch).num_days() as i32;
+                ColumnarValue::Scalar(ScalarValue::Date32(Some(days)))
+            }
+            ScalarValue::Date64(Some(_)) => {
+                let ms = posterior.sub(epoch).num_milliseconds();
+                ColumnarValue::Scalar(ScalarValue::Date64(Some(ms)))
+            }
+            _ => Err(DataFusionError::Execution(format!(
+                "Invalid lhs type for DateIntervalExpr: {}",
+                scalar
+            )))?,
+        };
+        Ok(res)
     }
 }
+
+fn add_m_d_nano(prior: NaiveDate, interval: i128, sign: i32) -> NaiveDate {

Review Comment:
   Perhaps we can add some unit tests in this module:
   
   ```rust
   [#cfg(test)]
   mod test {
     [#test] 
     fn test_calculations() { ... }
   }
   ```



##########
datafusion/physical-expr/src/expressions/datetime.rs:
##########
@@ -74,88 +77,123 @@ impl PhysicalExpr for DateIntervalExpr {
         self
     }
 
-    fn data_type(&self, input_schema: &Schema) -> datafusion_common::Result<DataType> {
+    fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
         self.lhs.data_type(input_schema)
     }
 
-    fn nullable(&self, input_schema: &Schema) -> datafusion_common::Result<bool> {
+    fn nullable(&self, input_schema: &Schema) -> Result<bool> {
         self.lhs.nullable(input_schema)
     }
 
-    fn evaluate(&self, batch: &RecordBatch) -> datafusion_common::Result<ColumnarValue> {
+    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
         let dates = self.lhs.evaluate(batch)?;
         let intervals = self.rhs.evaluate(batch)?;
 
-        let interval = match intervals {
-            ColumnarValue::Scalar(interval) => match interval {
-                ScalarValue::IntervalDayTime(Some(interval)) => interval as i32,
-                ScalarValue::IntervalYearMonth(Some(_)) => {
-                    return Err(DataFusionError::Execution(
-                        "DateIntervalExpr does not support IntervalYearMonth".to_string(),
-                    ))
-                }
-                ScalarValue::IntervalMonthDayNano(Some(_)) => {
-                    return Err(DataFusionError::Execution(
-                        "DateIntervalExpr does not support IntervalMonthDayNano"
-                            .to_string(),
-                    ))
-                }
-                other => {
-                    return Err(DataFusionError::Execution(format!(
-                        "DateIntervalExpr does not support non-interval type {:?}",
-                        other
-                    )))
-                }
-            },
-            _ => {
-                return Err(DataFusionError::Execution(
-                    "Columnar execution is not yet supported for DateIntervalExpr"
-                        .to_string(),
-                ))
-            }
+        // Unwrap days since epoch
+        let operand = match dates {
+            ColumnarValue::Scalar(scalar) => scalar,
+            _ => Err(DataFusionError::Execution(
+                "Columnar execution is not yet supported for DateIntervalExpr"
+                    .to_string(),
+            ))?,
         };
 
-        match dates {
-            ColumnarValue::Scalar(scalar) => match scalar {
-                ScalarValue::Date32(Some(date)) => match &self.op {
-                    Operator::Plus => Ok(ColumnarValue::Scalar(ScalarValue::Date32(
-                        Some(date + interval),
-                    ))),
-                    Operator::Minus => Ok(ColumnarValue::Scalar(ScalarValue::Date32(
-                        Some(date - interval),
-                    ))),
-                    _ => {
-                        // this should be unreachable because we check the operators in `try_new`
-                        Err(DataFusionError::Execution(
-                            "Invalid operator for DateIntervalExpr".to_string(),
-                        ))
-                    }
-                },
-                ScalarValue::Date64(Some(date)) => match &self.op {
-                    Operator::Plus => Ok(ColumnarValue::Scalar(ScalarValue::Date64(
-                        Some(date + interval as i64),
-                    ))),
-                    Operator::Minus => Ok(ColumnarValue::Scalar(ScalarValue::Date64(
-                        Some(date - interval as i64),
-                    ))),
-                    _ => {
-                        // this should be unreachable because we check the operators in `try_new`
-                        Err(DataFusionError::Execution(
-                            "Invalid operator for DateIntervalExpr".to_string(),
-                        ))
-                    }
-                },
-                _ => {
-                    // this should be unreachable because we check the types in `try_new`
-                    Err(DataFusionError::Execution(
-                        "Invalid lhs type for DateIntervalExpr".to_string(),
-                    ))
-                }
-            },
+        // Convert to NaiveDate
+        let epoch = NaiveDate::from_ymd(1970, 1, 1);
+        let prior = match operand {
+            ScalarValue::Date32(Some(d)) => epoch.add(Duration::days(d as i64)),
+            ScalarValue::Date64(Some(ms)) => epoch.add(Duration::milliseconds(ms)),
+            _ => Err(DataFusionError::Execution(format!(
+                "Invalid lhs type for DateIntervalExpr: {:?}",
+                operand
+            )))?,
+        };
+
+        // Unwrap interval to add
+        let scalar = match &intervals {
+            ColumnarValue::Scalar(interval) => interval,
             _ => Err(DataFusionError::Execution(
                 "Columnar execution is not yet supported for DateIntervalExpr"
                     .to_string(),
-            )),
-        }
+            ))?,
+        };
+
+        // Invert sign for subtraction
+        let sign = match &self.op {
+            Operator::Plus => 1,
+            Operator::Minus => -1,
+            _ => {
+                // this should be unreachable because we check the operators in `try_new`
+                Err(DataFusionError::Execution(
+                    "Invalid operator for DateIntervalExpr".to_string(),
+                ))?
+            }
+        };
+
+        // Do math
+        let posterior = match scalar {
+            ScalarValue::IntervalDayTime(Some(i)) => add_day_time(prior, *i, sign),
+            ScalarValue::IntervalYearMonth(Some(i)) => add_months(prior, *i * sign),
+            ScalarValue::IntervalMonthDayNano(Some(i)) => add_m_d_nano(prior, *i, sign),
+            other => Err(DataFusionError::Execution(format!(
+                "DateIntervalExpr does not support non-interval type {:?}",
+                other
+            )))?,
+        };
+
+        // convert back
+        let res = match operand {
+            ScalarValue::Date32(Some(_)) => {
+                let days = posterior.sub(epoch).num_days() as i32;
+                ColumnarValue::Scalar(ScalarValue::Date32(Some(days)))
+            }
+            ScalarValue::Date64(Some(_)) => {
+                let ms = posterior.sub(epoch).num_milliseconds();
+                ColumnarValue::Scalar(ScalarValue::Date64(Some(ms)))
+            }
+            _ => Err(DataFusionError::Execution(format!(
+                "Invalid lhs type for DateIntervalExpr: {}",
+                scalar
+            )))?,
+        };
+        Ok(res)
     }
 }
+
+fn add_m_d_nano(prior: NaiveDate, interval: i128, sign: i32) -> NaiveDate {
+    let interval = interval as u128;
+    let months = (interval >> 96) as i32 * sign;
+    let days = (interval >> 64) as i32 * sign;
+    let nanos = interval as i64 * sign as i64;
+    let a = add_months(prior, months);
+    let b = a.add(Duration::days(days as i64));
+    let c = b.add(Duration::nanoseconds(nanos));
+    c
+}
+
+fn add_day_time(prior: NaiveDate, interval: i64, sign: i32) -> NaiveDate {
+    let interval = interval as u64;
+    let days = (interval >> 32) as i32 * sign;
+    let ms = interval as i32 * sign;
+    let intermediate = prior.add(Duration::days(days as i64));
+    let posterior = intermediate.add(Duration::milliseconds(ms as i64));
+    posterior
+}
+
+fn add_months(prior: NaiveDate, interval: i32) -> NaiveDate {
+    let target = chrono_add_months(prior, interval);
+    let target_plus = chrono_add_months(target, 1);
+    let last_day = target_plus.sub(chrono::Duration::days(1));
+    let day = min(prior.day(), last_day.day());
+    NaiveDate::from_ymd(target.year(), target.month(), day)
+}
+
+fn chrono_add_months(dt: NaiveDate, delta: i32) -> NaiveDate {

Review Comment:
   
   👍  - I like filing a ticket / PR in the target repo and then leaving a link in the comments of DataFusion
   
   Btw I poked around and found this code in arrow2 (from @jorgecarleitao ) that is similar: https://docs.rs/arrow2/latest/src/arrow2/temporal_conversions.rs.html#342-368
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org