You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/10/21 07:15:34 UTC

[GitHub] [arrow-datafusion] mustafasrepo opened a new pull request, #3916: Feature/window bound scalar

mustafasrepo opened a new pull request, #3916:
URL: https://github.com/apache/arrow-datafusion/pull/3916

   # Support for non-u64 types for Window Bound
   
   # Which issue does this PR close?
   
   This PR closes the #3571 . 
   
   # Rationale for this change
   
   With this change we are adding support for queries like below.
   
   ```sql
   SELECT 
   	COUNT(*) OVER(ORDER BY ts RANGE BETWEEN '1 DAY' PRECEDING AND '1 DAY' FOLLOWING) as cnt 
   FROM t
   ```
   
   # What changes are included in this PR?
   
   - Closing this issue requires support for parsing expressions inside window frame queries. Hence we have updated sqlparser version.
   - `assert_contains` and `assert_not_contains` macros are used in tests at different places. These macros were duplicated where they were used, we have moved them under [https://github.com/synnada-ai/arrow-datafusion/blob/feature/window_bound_scalar/datafusion/common/src/test_util.rs](https://github.com/synnada-ai/arrow-datafusion/blob/feature/window_bound_scalar/datafusion/common/src/test_util.rs) to prevent code duplication.
   - Util codes for datetime arithmetic are moved under [https://github.com/synnada-ai/arrow-datafusion/blob/feature/window_bound_scalar/datafusion/common/src/scalar.rs](https://github.com/synnada-ai/arrow-datafusion/blob/feature/window_bound_scalar/datafusion/common/src/scalar.rs). They are reached now with `.add` and `.sub` api of `ScalarValue` enum.
   - We removed type coercions between `ScalarValue` types. Arithmetic operations are done only between same types (for `datetime` types arithmetic operations are done only with `INTERVAL` types.) 
   - Previously we were rejecting queries in the form
   
   ```sql
   SELECT
         COUNT(c1) OVER (ORDER BY c2 RANGE BETWEEN 1 PRECEDING AND 2 PRECEDING)
         FROM aggregate_test_100
   ```
   
   during window frame creation. Since we are supporting more reach variants such as 
   
   ```sql
   SELECT 
   	COUNT(*) OVER(ORDER BY ts RANGE BETWEEN '1 DAY' PRECEDING AND '1 MONTH' PRECEDING) as cnt 
   FROM t
   ```
   
   it is not easy to do rejection during creation. We now do calculations for whether window frame is valid after physical schema information is available. Where we can do validation calculation with appropriate types. This doesn’t change anything for end users.
   
   - sqlparser had also changes in `Expr` type. We have incorporated those changes. However, possibly they are written with come new capabilities in mind. We just did bare minimum to support current test suit.
   
   # Are there any user-facing changes?
   
   N.A


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #3916: Support for non-u64 types for Window Bound

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #3916:
URL: https://github.com/apache/arrow-datafusion/pull/3916#issuecomment-1290822454

   > Hi @alamb, I sent a commit moving coercion from physical planning to the optimizer [type_coercion](https://github.com/synnada-ai/arrow-datafusion/blob/b78eb162910e31cdee6b2747f1031c0514e65b96/datafusion/optimizer/src/type_coercion.rs#L416). Is this what you had in mind or I misunderstood what you meant? I can change the implementation according to your suggestions. Thanks for your feedback.
   
   This is exactly right -- thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on pull request #3916: Support for non-u64 types for Window Bound

Posted by GitBox <gi...@apache.org>.
ozankabak commented on PR #3916:
URL: https://github.com/apache/arrow-datafusion/pull/3916#issuecomment-1287813927

   Thank you Andrew for your careful review and comments (as always). We will go through them and update the PR soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb merged pull request #3916: Support for non-u64 types for Window Bound

Posted by GitBox <gi...@apache.org>.
alamb merged PR #3916:
URL: https://github.com/apache/arrow-datafusion/pull/3916


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on pull request #3916: Support for non-u64 types for Window Bound

Posted by GitBox <gi...@apache.org>.
mustafasrepo commented on PR #3916:
URL: https://github.com/apache/arrow-datafusion/pull/3916#issuecomment-1290375746

   > What is your thinking regarding moving coercion of out physical planning and into the coercion pass? Is that something you plan to do?
   
   Hi @alamb, I sent a commit moving coercion from physical planning to the optimizer [type_coercion.](https://github.com/synnada-ai/arrow-datafusion/blob/feature/window_bound_scalar/datafusion/optimizer/src/type_coercion.rs). Was this what you had in mind or I misunderstood what you meant? I can change the implementation according to your suggestions. Thanks for your feedback.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #3916: Support for non-u64 types for Window Bound

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #3916:
URL: https://github.com/apache/arrow-datafusion/pull/3916#issuecomment-1291952359

   Thanks again @mustafasrepo  and @ozankabak  -- this is epic work


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #3916: Support for non-u64 types for Window Bound

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #3916:
URL: https://github.com/apache/arrow-datafusion/pull/3916#discussion_r1002092173


##########
datafusion/common/src/scalar.rs:
##########
@@ -476,35 +480,144 @@ macro_rules! impl_op {
             (ScalarValue::Int8(lhs), ScalarValue::Int8(rhs)) => {
                 primitive_op!(lhs, rhs, Int8, $OPERATION)
             }
-            _ => {
-                impl_distinct_cases_op!($LHS, $RHS, $OPERATION)
+            // Binary operations on arguments with different types:
+            (ScalarValue::Date32(Some(days)), _) => {
+                let value = date32_add(*days, $RHS, get_sign!($OPERATION))?;

Review Comment:
   since this is a generic macro I am surprised to see `date32_add` -- this would likely be surprising to someone if they tried to use this macro for `mul` or `div` I think



##########
datafusion/common/src/scalar.rs:
##########
@@ -476,35 +480,144 @@ macro_rules! impl_op {
             (ScalarValue::Int8(lhs), ScalarValue::Int8(rhs)) => {
                 primitive_op!(lhs, rhs, Int8, $OPERATION)
             }
-            _ => {
-                impl_distinct_cases_op!($LHS, $RHS, $OPERATION)
+            // Binary operations on arguments with different types:
+            (ScalarValue::Date32(Some(days)), _) => {
+                let value = date32_add(*days, $RHS, get_sign!($OPERATION))?;
+                Ok(ScalarValue::Date32(Some(value)))
+            }
+            (ScalarValue::Date64(Some(ms)), _) => {
+                let value = date64_add(*ms, $RHS, get_sign!($OPERATION))?;
+                Ok(ScalarValue::Date64(Some(value)))
+            }
+            (ScalarValue::TimestampSecond(Some(ts_s), zone), _) => {
+                let value = seconds_add(*ts_s, $RHS, get_sign!($OPERATION))?;
+                Ok(ScalarValue::TimestampSecond(Some(value), zone.clone()))
+            }
+            (ScalarValue::TimestampMillisecond(Some(ts_ms), zone), _) => {
+                let value = milliseconds_add(*ts_ms, $RHS, get_sign!($OPERATION))?;
+                Ok(ScalarValue::TimestampMillisecond(Some(value), zone.clone()))
             }
+            (ScalarValue::TimestampMicrosecond(Some(ts_us), zone), _) => {
+                let value = microseconds_add(*ts_us, $RHS, get_sign!($OPERATION))?;
+                Ok(ScalarValue::TimestampMicrosecond(Some(value), zone.clone()))
+            }
+            (ScalarValue::TimestampNanosecond(Some(ts_ns), zone), _) => {
+                let value = nanoseconds_add(*ts_ns, $RHS, get_sign!($OPERATION))?;
+                Ok(ScalarValue::TimestampNanosecond(Some(value), zone.clone()))
+            }
+            _ => Err(DataFusionError::Internal(format!(
+                "Operator {} is not implemented for types {:?} and {:?}",
+                stringify!($OPERATION),
+                $LHS,
+                $RHS
+            ))),
         }
     };
 }
 
-// If we want a special implementation for an operation this is the place to implement it.
-// For instance, in the future we may want to implement subtraction for dates but not addition.
-// We can implement such special cases here.
-macro_rules! impl_distinct_cases_op {
-    ($LHS:expr, $RHS:expr, +) => {
-        match ($LHS, $RHS) {
-            e => Err(DataFusionError::Internal(format!(
-                "Addition is not implemented for {:?}",
-                e
-            ))),
-        }
+macro_rules! get_sign {
+    (+) => {
+        1
     };
-    ($LHS:expr, $RHS:expr, -) => {
-        match ($LHS, $RHS) {
-            e => Err(DataFusionError::Internal(format!(
-                "Subtraction is not implemented for {:?}",
-                e
-            ))),
-        }
+    (-) => {
+        -1
     };
 }
 
+#[inline]
+pub fn date32_add(days: i32, scalar: &ScalarValue, sign: i32) -> Result<i32> {

Review Comment:
   In general I would like to consolidate date / time arithmetic into a single location, and ideally that location is arrow-rs.
   
   Thus I think we should be using the date math functions in arrow-rs -- specifically https://docs.rs/arrow/25.0.0/arrow/datatypes/struct.Date32Type.html 
   
   I see that this code is simply moved in this PR, but in general what do you think?



##########
datafusion/common/src/test_util.rs:
##########
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one

Review Comment:
   πŸ‘ 



##########
datafusion/physical-expr/src/window/aggregate.rs:
##########
@@ -163,19 +160,23 @@ fn calculate_index_of_row<const BISECT_SIDE: bool, const SEARCH_SIDE: bool>(
             .map(|value| {
                 if value.is_null() {
                     return Ok(value.clone());
-                };
-                let offset = ScalarValue::try_from_value(&value.get_datatype(), delta)?;
+                }
                 if SEARCH_SIDE == is_descending {
                     // TODO: Handle positive overflows
-                    value.add(&offset)
-                } else if value.is_unsigned() && value < &offset {
-                    ScalarValue::try_from_value(&value.get_datatype(), 0)
+                    value.add(delta)

Review Comment:
   πŸ‘ 



##########
datafusion/core/src/physical_plan/planner.rs:
##########
@@ -1367,6 +1367,69 @@ fn get_physical_expr_pair(
     let physical_name = physical_name(expr)?;
     Ok((physical_expr, physical_name))
 }
+/// Casts the ScalarValue `value` to column type once we have schema information
+/// The resulting type is not necessarily same type with the `column_type`. For instance
+/// if `column_type` is Timestamp the result is casted to Interval type. The reason is that
+/// Operation between Timestamps is not meaningful, However operation between Timestamp and
+/// Interval is valid. For basic types `column_type` is indeed the resulting type.
+fn convert_to_column_type(

Review Comment:
   This operation is typically called "coercion" and is handled in datafusion for other expression types here: https://github.com/apache/arrow-datafusion/blob/master/datafusion/expr/src/type_coercion.rs#L18-L32 (and submodule).
   
   Did you consider doing this conversion as part of coercion?
   
   One reason to do it as part of the normal coercion is that then the proper types will be present for operations such as constant folding / constant propagation. This might allow for expressions like
   
   ```sql
                   COUNT(*) OVER (ORDER BY ts RANGE BETWEEN INTERVAL '1' DAY + INTERVAL '1' DAY PRECEDING AND INTERVAL '3 DAY' FOLLOWING),
   ```
   
   Eventually
   
   



##########
datafusion/expr/src/window_frame.rs:
##########
@@ -110,16 +104,15 @@ impl Default for WindowFrame {
 /// 4. <expr> FOLLOWING
 /// 5. UNBOUNDED FOLLOWING
 ///
-/// in this implementation we'll only allow <expr> to be u64 (i.e. no dynamic boundary)
-#[derive(Debug, Clone, Copy, Eq)]
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]

Review Comment:
   πŸ‘ 



##########
datafusion/expr/src/window_frame.rs:
##########
@@ -132,70 +125,72 @@ pub enum WindowFrameBound {
     ///
     /// 5. UNBOUNDED FOLLOWING
     /// The frame boundary is the last row in the partition.
-    Following(Option<u64>),
+    Following(ScalarValue),
 }
 
-impl From<ast::WindowFrameBound> for WindowFrameBound {
-    fn from(value: ast::WindowFrameBound) -> Self {
-        match value {
-            ast::WindowFrameBound::Preceding(v) => Self::Preceding(v),
-            ast::WindowFrameBound::Following(v) => Self::Following(v),
+impl TryFrom<ast::WindowFrameBound> for WindowFrameBound {
+    type Error = DataFusionError;
+
+    fn try_from(value: ast::WindowFrameBound) -> Result<Self> {
+        Ok(match value {
+            ast::WindowFrameBound::Preceding(Some(v)) => {
+                Self::Preceding(convert_frame_bound_to_scalar_value(*v)?)
+            }
+            ast::WindowFrameBound::Preceding(None) => {
+                Self::Preceding(ScalarValue::Utf8(None))
+            }
+            ast::WindowFrameBound::Following(Some(v)) => {
+                Self::Following(convert_frame_bound_to_scalar_value(*v)?)
+            }
+            ast::WindowFrameBound::Following(None) => {
+                Self::Following(ScalarValue::Utf8(None))
+            }
             ast::WindowFrameBound::CurrentRow => Self::CurrentRow,
-        }
+        })
     }
 }
 
-impl fmt::Display for WindowFrameBound {
-    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        match self {
-            WindowFrameBound::CurrentRow => f.write_str("CURRENT ROW"),
-            WindowFrameBound::Preceding(None) => f.write_str("UNBOUNDED PRECEDING"),
-            WindowFrameBound::Following(None) => f.write_str("UNBOUNDED FOLLOWING"),
-            WindowFrameBound::Preceding(Some(n)) => write!(f, "{} PRECEDING", n),
-            WindowFrameBound::Following(Some(n)) => write!(f, "{} FOLLOWING", n),
+pub fn convert_frame_bound_to_scalar_value(v: ast::Expr) -> Result<ScalarValue> {
+    Ok(ScalarValue::Utf8(Some(match v {
+        ast::Expr::Value(ast::Value::Number(value, false))
+        | ast::Expr::Value(ast::Value::SingleQuotedString(value)) => value,
+        ast::Expr::Interval {
+            value,
+            leading_field,
+            ..
+        } => {
+            let result = match *value {
+                ast::Expr::Value(ast::Value::SingleQuotedString(item)) => item,
+                e => {
+                    let msg = format!("INTERVAL expression cannot be {:?}", e);
+                    return Err(DataFusionError::Internal(msg));

Review Comment:
   Should this error be ParserError, or unsupported (rather than internal)? 



##########
datafusion/common/src/scalar.rs:
##########
@@ -476,35 +480,144 @@ macro_rules! impl_op {
             (ScalarValue::Int8(lhs), ScalarValue::Int8(rhs)) => {
                 primitive_op!(lhs, rhs, Int8, $OPERATION)
             }
-            _ => {
-                impl_distinct_cases_op!($LHS, $RHS, $OPERATION)
+            // Binary operations on arguments with different types:
+            (ScalarValue::Date32(Some(days)), _) => {
+                let value = date32_add(*days, $RHS, get_sign!($OPERATION))?;
+                Ok(ScalarValue::Date32(Some(value)))
+            }
+            (ScalarValue::Date64(Some(ms)), _) => {
+                let value = date64_add(*ms, $RHS, get_sign!($OPERATION))?;
+                Ok(ScalarValue::Date64(Some(value)))
+            }
+            (ScalarValue::TimestampSecond(Some(ts_s), zone), _) => {
+                let value = seconds_add(*ts_s, $RHS, get_sign!($OPERATION))?;
+                Ok(ScalarValue::TimestampSecond(Some(value), zone.clone()))
+            }
+            (ScalarValue::TimestampMillisecond(Some(ts_ms), zone), _) => {
+                let value = milliseconds_add(*ts_ms, $RHS, get_sign!($OPERATION))?;
+                Ok(ScalarValue::TimestampMillisecond(Some(value), zone.clone()))
             }
+            (ScalarValue::TimestampMicrosecond(Some(ts_us), zone), _) => {
+                let value = microseconds_add(*ts_us, $RHS, get_sign!($OPERATION))?;
+                Ok(ScalarValue::TimestampMicrosecond(Some(value), zone.clone()))
+            }
+            (ScalarValue::TimestampNanosecond(Some(ts_ns), zone), _) => {
+                let value = nanoseconds_add(*ts_ns, $RHS, get_sign!($OPERATION))?;
+                Ok(ScalarValue::TimestampNanosecond(Some(value), zone.clone()))
+            }
+            _ => Err(DataFusionError::Internal(format!(
+                "Operator {} is not implemented for types {:?} and {:?}",
+                stringify!($OPERATION),
+                $LHS,
+                $RHS
+            ))),
         }
     };
 }
 
-// If we want a special implementation for an operation this is the place to implement it.
-// For instance, in the future we may want to implement subtraction for dates but not addition.
-// We can implement such special cases here.
-macro_rules! impl_distinct_cases_op {
-    ($LHS:expr, $RHS:expr, +) => {
-        match ($LHS, $RHS) {
-            e => Err(DataFusionError::Internal(format!(
-                "Addition is not implemented for {:?}",
-                e
-            ))),
-        }
+macro_rules! get_sign {
+    (+) => {
+        1
     };
-    ($LHS:expr, $RHS:expr, -) => {
-        match ($LHS, $RHS) {
-            e => Err(DataFusionError::Internal(format!(
-                "Subtraction is not implemented for {:?}",
-                e
-            ))),
-        }
+    (-) => {
+        -1
     };
 }
 
+#[inline]
+pub fn date32_add(days: i32, scalar: &ScalarValue, sign: i32) -> Result<i32> {
+    let epoch = NaiveDate::from_ymd(1970, 1, 1);
+    let prior = epoch.add(Duration::days(days as i64));
+    let posterior = do_date_math(prior, scalar, sign)?;
+    Ok(posterior.sub(epoch).num_days() as i32)
+}
+
+#[inline]
+pub fn date64_add(ms: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+    let epoch = NaiveDate::from_ymd(1970, 1, 1);
+    let prior = epoch.add(Duration::milliseconds(ms));
+    let posterior = do_date_math(prior, scalar, sign)?;
+    Ok(posterior.sub(epoch).num_milliseconds())
+}
+
+#[inline]
+pub fn seconds_add(ts_s: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+    Ok(do_date_time_math(ts_s, 0, scalar, sign)?.timestamp())
+}
+
+#[inline]
+pub fn milliseconds_add(ts_ms: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+    let secs = ts_ms / 1000;
+    let nsecs = ((ts_ms % 1000) * 1_000_000) as u32;
+    Ok(do_date_time_math(secs, nsecs, scalar, sign)?.timestamp_millis())
+}
+
+#[inline]
+pub fn microseconds_add(ts_us: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+    let secs = ts_us / 1_000_000;
+    let nsecs = ((ts_us % 1_000_000) * 1000) as u32;
+    Ok(do_date_time_math(secs, nsecs, scalar, sign)?.timestamp_nanos() / 1000)
+}
+
+#[inline]
+pub fn nanoseconds_add(ts_ns: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+    let secs = ts_ns / 1_000_000_000;
+    let nsecs = (ts_ns % 1_000_000_000) as u32;
+    Ok(do_date_time_math(secs, nsecs, scalar, sign)?.timestamp_nanos())
+}
+
+#[inline]
+fn do_date_time_math(
+    secs: i64,
+    nsecs: u32,
+    scalar: &ScalarValue,
+    sign: i32,
+) -> Result<NaiveDateTime> {
+    let prior = NaiveDateTime::from_timestamp(secs, nsecs);
+    do_date_math(prior, scalar, sign)
+}
+
+fn do_date_math<D>(prior: D, scalar: &ScalarValue, sign: i32) -> Result<D>
+where
+    D: Datelike + Add<Duration, Output = D>,
+{
+    Ok(match scalar {
+        ScalarValue::IntervalDayTime(Some(i)) => add_day_time(prior, *i, sign),
+        ScalarValue::IntervalYearMonth(Some(i)) => shift_months(prior, *i * sign),
+        ScalarValue::IntervalMonthDayNano(Some(i)) => add_m_d_nano(prior, *i, sign),
+        other => Err(DataFusionError::Execution(format!(
+            "DateIntervalExpr does not support non-interval type {:?}",
+            other
+        )))?,
+    })
+}
+
+// Can remove once https://github.com/apache/arrow-rs/pull/2031 is released

Review Comment:
   This has been released



##########
datafusion/expr/src/window_frame.rs:
##########
@@ -252,103 +247,32 @@ mod tests {
         };
         let result = WindowFrame::try_from(window_frame);
         assert_eq!(
-      result.err().unwrap().to_string(),
-      "Execution error: Invalid window frame: start bound cannot be unbounded following"
-        .to_owned()
-    );
+            result.err().unwrap().to_string(),
+            "Execution error: Invalid window frame: start bound cannot be unbounded following".to_owned()
+        );

Review Comment:
   You may be able to make this cleaner via https://doc.rust-lang.org/std/result/enum.Result.html#method.expect_err
   
   
   ```suggestion
           let err = WindowFrame::try_from(window_frame).unwrap_err();
           assert_eq!(
               err.to_string(),
               "Execution error: Invalid window frame: start bound cannot be unbounded following".to_owned()
           );```



##########
datafusion/common/src/delta.rs:
##########
@@ -49,7 +49,7 @@ fn normalise_day(year: i32, month: u32, day: u32) -> u32 {
 
 /// Shift a date by the given number of months.
 /// Ambiguous month-ends are shifted backwards as necessary.
-pub(crate) fn shift_months<D: Datelike>(date: D, months: i32) -> D {
+pub fn shift_months<D: Datelike>(date: D, months: i32) -> D {

Review Comment:
   Moving these to common makes sense -- eventually I hope to move all this code into arrow-rs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on pull request #3916: Support for non-u64 types for Window Bound

Posted by GitBox <gi...@apache.org>.
ozankabak commented on PR #3916:
URL: https://github.com/apache/arrow-datafusion/pull/3916#issuecomment-1289875706

   @mustafasrepo is trying to solidify his understanding of your suggestion. He will probably circle back tomorrow to make sure he gets what you meant right -- and then follow through if you guys are on the same page. He may ask for some clarifying directions from you on how to do this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ursabot commented on pull request #3916: Support for non-u64 types for Window Bound

Posted by GitBox <gi...@apache.org>.
ursabot commented on PR #3916:
URL: https://github.com/apache/arrow-datafusion/pull/3916#issuecomment-1292447456

   Benchmark runs are scheduled for baseline = 4e298353c592cccae3cbed20c64051809c4923fd and contender = 3940e36957c8a78c75da159f4836580f82b0360f. 3940e36957c8a78c75da159f4836580f82b0360f is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] [ec2-t3-xlarge-us-east-2](https://conbench.ursa.dev/compare/runs/50798c6ef2b442d59a90dc694b82f205...cde961b0b4ae43c4b350ecbe2d704c3f/)
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] [test-mac-arm](https://conbench.ursa.dev/compare/runs/37e968a86a6a4a0b81e215e24a6dae5c...6f30bea6be9446df9b1de3a03e655847/)
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] [ursa-i9-9960x](https://conbench.ursa.dev/compare/runs/957132ff37b142159d8f47bd146276ba...a6ef304a4a134035a1cee232dee13f46/)
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] [ursa-thinkcentre-m75q](https://conbench.ursa.dev/compare/runs/582344bf8ec64fbe929fa0a20147e196...c48f30312c7a449caff8bbee27b42bbf/)
   Buildkite builds:
   Supported benchmarks:
   ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
   test-mac-arm: Supported benchmark langs: C++, Python, R
   ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
   ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #3916: Support for non-u64 types for Window Bound

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #3916:
URL: https://github.com/apache/arrow-datafusion/pull/3916#issuecomment-1289616506

   What is your thinking regarding moving coercion of out physical planning and into the coercion pass? Is that something you plan to do?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] avantgardnerio commented on a diff in pull request #3916: Support for non-u64 types for Window Bound

Posted by GitBox <gi...@apache.org>.
avantgardnerio commented on code in PR #3916:
URL: https://github.com/apache/arrow-datafusion/pull/3916#discussion_r1002112849


##########
datafusion/common/src/scalar.rs:
##########
@@ -476,35 +480,144 @@ macro_rules! impl_op {
             (ScalarValue::Int8(lhs), ScalarValue::Int8(rhs)) => {
                 primitive_op!(lhs, rhs, Int8, $OPERATION)
             }
-            _ => {
-                impl_distinct_cases_op!($LHS, $RHS, $OPERATION)
+            // Binary operations on arguments with different types:
+            (ScalarValue::Date32(Some(days)), _) => {
+                let value = date32_add(*days, $RHS, get_sign!($OPERATION))?;
+                Ok(ScalarValue::Date32(Some(value)))
+            }
+            (ScalarValue::Date64(Some(ms)), _) => {
+                let value = date64_add(*ms, $RHS, get_sign!($OPERATION))?;
+                Ok(ScalarValue::Date64(Some(value)))
+            }
+            (ScalarValue::TimestampSecond(Some(ts_s), zone), _) => {
+                let value = seconds_add(*ts_s, $RHS, get_sign!($OPERATION))?;
+                Ok(ScalarValue::TimestampSecond(Some(value), zone.clone()))
+            }
+            (ScalarValue::TimestampMillisecond(Some(ts_ms), zone), _) => {
+                let value = milliseconds_add(*ts_ms, $RHS, get_sign!($OPERATION))?;
+                Ok(ScalarValue::TimestampMillisecond(Some(value), zone.clone()))
             }
+            (ScalarValue::TimestampMicrosecond(Some(ts_us), zone), _) => {
+                let value = microseconds_add(*ts_us, $RHS, get_sign!($OPERATION))?;
+                Ok(ScalarValue::TimestampMicrosecond(Some(value), zone.clone()))
+            }
+            (ScalarValue::TimestampNanosecond(Some(ts_ns), zone), _) => {
+                let value = nanoseconds_add(*ts_ns, $RHS, get_sign!($OPERATION))?;
+                Ok(ScalarValue::TimestampNanosecond(Some(value), zone.clone()))
+            }
+            _ => Err(DataFusionError::Internal(format!(
+                "Operator {} is not implemented for types {:?} and {:?}",
+                stringify!($OPERATION),
+                $LHS,
+                $RHS
+            ))),
         }
     };
 }
 
-// If we want a special implementation for an operation this is the place to implement it.
-// For instance, in the future we may want to implement subtraction for dates but not addition.
-// We can implement such special cases here.
-macro_rules! impl_distinct_cases_op {
-    ($LHS:expr, $RHS:expr, +) => {
-        match ($LHS, $RHS) {
-            e => Err(DataFusionError::Internal(format!(
-                "Addition is not implemented for {:?}",
-                e
-            ))),
-        }
+macro_rules! get_sign {
+    (+) => {
+        1
     };
-    ($LHS:expr, $RHS:expr, -) => {
-        match ($LHS, $RHS) {
-            e => Err(DataFusionError::Internal(format!(
-                "Subtraction is not implemented for {:?}",
-                e
-            ))),
-        }
+    (-) => {
+        -1
     };
 }
 
+#[inline]
+pub fn date32_add(days: i32, scalar: &ScalarValue, sign: i32) -> Result<i32> {
+    let epoch = NaiveDate::from_ymd(1970, 1, 1);
+    let prior = epoch.add(Duration::days(days as i64));
+    let posterior = do_date_math(prior, scalar, sign)?;
+    Ok(posterior.sub(epoch).num_days() as i32)
+}
+
+#[inline]
+pub fn date64_add(ms: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+    let epoch = NaiveDate::from_ymd(1970, 1, 1);
+    let prior = epoch.add(Duration::milliseconds(ms));
+    let posterior = do_date_math(prior, scalar, sign)?;
+    Ok(posterior.sub(epoch).num_milliseconds())
+}
+
+#[inline]
+pub fn seconds_add(ts_s: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+    Ok(do_date_time_math(ts_s, 0, scalar, sign)?.timestamp())
+}
+
+#[inline]
+pub fn milliseconds_add(ts_ms: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+    let secs = ts_ms / 1000;
+    let nsecs = ((ts_ms % 1000) * 1_000_000) as u32;
+    Ok(do_date_time_math(secs, nsecs, scalar, sign)?.timestamp_millis())
+}
+
+#[inline]
+pub fn microseconds_add(ts_us: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+    let secs = ts_us / 1_000_000;
+    let nsecs = ((ts_us % 1_000_000) * 1000) as u32;
+    Ok(do_date_time_math(secs, nsecs, scalar, sign)?.timestamp_nanos() / 1000)
+}
+
+#[inline]
+pub fn nanoseconds_add(ts_ns: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+    let secs = ts_ns / 1_000_000_000;
+    let nsecs = (ts_ns % 1_000_000_000) as u32;
+    Ok(do_date_time_math(secs, nsecs, scalar, sign)?.timestamp_nanos())
+}
+
+#[inline]
+fn do_date_time_math(
+    secs: i64,
+    nsecs: u32,
+    scalar: &ScalarValue,
+    sign: i32,
+) -> Result<NaiveDateTime> {
+    let prior = NaiveDateTime::from_timestamp(secs, nsecs);
+    do_date_math(prior, scalar, sign)
+}
+
+fn do_date_math<D>(prior: D, scalar: &ScalarValue, sign: i32) -> Result<D>
+where
+    D: Datelike + Add<Duration, Output = D>,
+{
+    Ok(match scalar {
+        ScalarValue::IntervalDayTime(Some(i)) => add_day_time(prior, *i, sign),
+        ScalarValue::IntervalYearMonth(Some(i)) => shift_months(prior, *i * sign),
+        ScalarValue::IntervalMonthDayNano(Some(i)) => add_m_d_nano(prior, *i, sign),
+        other => Err(DataFusionError::Execution(format!(
+            "DateIntervalExpr does not support non-interval type {:?}",
+            other
+        )))?,
+    })
+}
+
+// Can remove once https://github.com/apache/arrow-rs/pull/2031 is released

Review Comment:
   The comment should be updated. I think we're waiting on `chrono:0.4.23` now...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #3916: Support for non-u64 types for Window Bound

Posted by GitBox <gi...@apache.org>.
mustafasrepo commented on code in PR #3916:
URL: https://github.com/apache/arrow-datafusion/pull/3916#discussion_r1002930609


##########
datafusion/common/src/scalar.rs:
##########
@@ -476,35 +480,144 @@ macro_rules! impl_op {
             (ScalarValue::Int8(lhs), ScalarValue::Int8(rhs)) => {
                 primitive_op!(lhs, rhs, Int8, $OPERATION)
             }
-            _ => {
-                impl_distinct_cases_op!($LHS, $RHS, $OPERATION)
+            // Binary operations on arguments with different types:
+            (ScalarValue::Date32(Some(days)), _) => {
+                let value = date32_add(*days, $RHS, get_sign!($OPERATION))?;
+                Ok(ScalarValue::Date32(Some(value)))
+            }
+            (ScalarValue::Date64(Some(ms)), _) => {
+                let value = date64_add(*ms, $RHS, get_sign!($OPERATION))?;
+                Ok(ScalarValue::Date64(Some(value)))
+            }
+            (ScalarValue::TimestampSecond(Some(ts_s), zone), _) => {
+                let value = seconds_add(*ts_s, $RHS, get_sign!($OPERATION))?;
+                Ok(ScalarValue::TimestampSecond(Some(value), zone.clone()))
+            }
+            (ScalarValue::TimestampMillisecond(Some(ts_ms), zone), _) => {
+                let value = milliseconds_add(*ts_ms, $RHS, get_sign!($OPERATION))?;
+                Ok(ScalarValue::TimestampMillisecond(Some(value), zone.clone()))
             }
+            (ScalarValue::TimestampMicrosecond(Some(ts_us), zone), _) => {
+                let value = microseconds_add(*ts_us, $RHS, get_sign!($OPERATION))?;
+                Ok(ScalarValue::TimestampMicrosecond(Some(value), zone.clone()))
+            }
+            (ScalarValue::TimestampNanosecond(Some(ts_ns), zone), _) => {
+                let value = nanoseconds_add(*ts_ns, $RHS, get_sign!($OPERATION))?;
+                Ok(ScalarValue::TimestampNanosecond(Some(value), zone.clone()))
+            }
+            _ => Err(DataFusionError::Internal(format!(
+                "Operator {} is not implemented for types {:?} and {:?}",
+                stringify!($OPERATION),
+                $LHS,
+                $RHS
+            ))),
         }
     };
 }
 
-// If we want a special implementation for an operation this is the place to implement it.
-// For instance, in the future we may want to implement subtraction for dates but not addition.
-// We can implement such special cases here.
-macro_rules! impl_distinct_cases_op {
-    ($LHS:expr, $RHS:expr, +) => {
-        match ($LHS, $RHS) {
-            e => Err(DataFusionError::Internal(format!(
-                "Addition is not implemented for {:?}",
-                e
-            ))),
-        }
+macro_rules! get_sign {
+    (+) => {
+        1
     };
-    ($LHS:expr, $RHS:expr, -) => {
-        match ($LHS, $RHS) {
-            e => Err(DataFusionError::Internal(format!(
-                "Subtraction is not implemented for {:?}",
-                e
-            ))),
-        }
+    (-) => {
+        -1
     };
 }
 
+#[inline]
+pub fn date32_add(days: i32, scalar: &ScalarValue, sign: i32) -> Result<i32> {
+    let epoch = NaiveDate::from_ymd(1970, 1, 1);
+    let prior = epoch.add(Duration::days(days as i64));
+    let posterior = do_date_math(prior, scalar, sign)?;
+    Ok(posterior.sub(epoch).num_days() as i32)
+}
+
+#[inline]
+pub fn date64_add(ms: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+    let epoch = NaiveDate::from_ymd(1970, 1, 1);
+    let prior = epoch.add(Duration::milliseconds(ms));
+    let posterior = do_date_math(prior, scalar, sign)?;
+    Ok(posterior.sub(epoch).num_milliseconds())
+}
+
+#[inline]
+pub fn seconds_add(ts_s: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+    Ok(do_date_time_math(ts_s, 0, scalar, sign)?.timestamp())
+}
+
+#[inline]
+pub fn milliseconds_add(ts_ms: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+    let secs = ts_ms / 1000;
+    let nsecs = ((ts_ms % 1000) * 1_000_000) as u32;
+    Ok(do_date_time_math(secs, nsecs, scalar, sign)?.timestamp_millis())
+}
+
+#[inline]
+pub fn microseconds_add(ts_us: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+    let secs = ts_us / 1_000_000;
+    let nsecs = ((ts_us % 1_000_000) * 1000) as u32;
+    Ok(do_date_time_math(secs, nsecs, scalar, sign)?.timestamp_nanos() / 1000)
+}
+
+#[inline]
+pub fn nanoseconds_add(ts_ns: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+    let secs = ts_ns / 1_000_000_000;
+    let nsecs = (ts_ns % 1_000_000_000) as u32;
+    Ok(do_date_time_math(secs, nsecs, scalar, sign)?.timestamp_nanos())
+}
+
+#[inline]
+fn do_date_time_math(
+    secs: i64,
+    nsecs: u32,
+    scalar: &ScalarValue,
+    sign: i32,
+) -> Result<NaiveDateTime> {
+    let prior = NaiveDateTime::from_timestamp(secs, nsecs);
+    do_date_math(prior, scalar, sign)
+}
+
+fn do_date_math<D>(prior: D, scalar: &ScalarValue, sign: i32) -> Result<D>
+where
+    D: Datelike + Add<Duration, Output = D>,
+{
+    Ok(match scalar {
+        ScalarValue::IntervalDayTime(Some(i)) => add_day_time(prior, *i, sign),
+        ScalarValue::IntervalYearMonth(Some(i)) => shift_months(prior, *i * sign),
+        ScalarValue::IntervalMonthDayNano(Some(i)) => add_m_d_nano(prior, *i, sign),
+        other => Err(DataFusionError::Execution(format!(
+            "DateIntervalExpr does not support non-interval type {:?}",
+            other
+        )))?,
+    })
+}
+
+// Can remove once https://github.com/apache/arrow-rs/pull/2031 is released

Review Comment:
   @avantgardnerio I have updated the comment per your suggestion. Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #3916: Support for non-u64 types for Window Bound

Posted by GitBox <gi...@apache.org>.
mustafasrepo commented on code in PR #3916:
URL: https://github.com/apache/arrow-datafusion/pull/3916#discussion_r1002931293


##########
datafusion/expr/src/window_frame.rs:
##########
@@ -132,70 +125,72 @@ pub enum WindowFrameBound {
     ///
     /// 5. UNBOUNDED FOLLOWING
     /// The frame boundary is the last row in the partition.
-    Following(Option<u64>),
+    Following(ScalarValue),
 }
 
-impl From<ast::WindowFrameBound> for WindowFrameBound {
-    fn from(value: ast::WindowFrameBound) -> Self {
-        match value {
-            ast::WindowFrameBound::Preceding(v) => Self::Preceding(v),
-            ast::WindowFrameBound::Following(v) => Self::Following(v),
+impl TryFrom<ast::WindowFrameBound> for WindowFrameBound {
+    type Error = DataFusionError;
+
+    fn try_from(value: ast::WindowFrameBound) -> Result<Self> {
+        Ok(match value {
+            ast::WindowFrameBound::Preceding(Some(v)) => {
+                Self::Preceding(convert_frame_bound_to_scalar_value(*v)?)
+            }
+            ast::WindowFrameBound::Preceding(None) => {
+                Self::Preceding(ScalarValue::Utf8(None))
+            }
+            ast::WindowFrameBound::Following(Some(v)) => {
+                Self::Following(convert_frame_bound_to_scalar_value(*v)?)
+            }
+            ast::WindowFrameBound::Following(None) => {
+                Self::Following(ScalarValue::Utf8(None))
+            }
             ast::WindowFrameBound::CurrentRow => Self::CurrentRow,
-        }
+        })
     }
 }
 
-impl fmt::Display for WindowFrameBound {
-    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        match self {
-            WindowFrameBound::CurrentRow => f.write_str("CURRENT ROW"),
-            WindowFrameBound::Preceding(None) => f.write_str("UNBOUNDED PRECEDING"),
-            WindowFrameBound::Following(None) => f.write_str("UNBOUNDED FOLLOWING"),
-            WindowFrameBound::Preceding(Some(n)) => write!(f, "{} PRECEDING", n),
-            WindowFrameBound::Following(Some(n)) => write!(f, "{} FOLLOWING", n),
+pub fn convert_frame_bound_to_scalar_value(v: ast::Expr) -> Result<ScalarValue> {
+    Ok(ScalarValue::Utf8(Some(match v {
+        ast::Expr::Value(ast::Value::Number(value, false))
+        | ast::Expr::Value(ast::Value::SingleQuotedString(value)) => value,
+        ast::Expr::Interval {
+            value,
+            leading_field,
+            ..
+        } => {
+            let result = match *value {
+                ast::Expr::Value(ast::Value::SingleQuotedString(item)) => item,
+                e => {
+                    let msg = format!("INTERVAL expression cannot be {:?}", e);
+                    return Err(DataFusionError::Internal(msg));

Review Comment:
   I have changed the error type to ParserError, thanks for pointing it out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #3916: Support for non-u64 types for Window Bound

Posted by GitBox <gi...@apache.org>.
mustafasrepo commented on code in PR #3916:
URL: https://github.com/apache/arrow-datafusion/pull/3916#discussion_r1002942852


##########
datafusion/expr/src/window_frame.rs:
##########
@@ -252,103 +247,32 @@ mod tests {
         };
         let result = WindowFrame::try_from(window_frame);
         assert_eq!(
-      result.err().unwrap().to_string(),
-      "Execution error: Invalid window frame: start bound cannot be unbounded following"
-        .to_owned()
-    );
+            result.err().unwrap().to_string(),
+            "Execution error: Invalid window frame: start bound cannot be unbounded following".to_owned()
+        );

Review Comment:
   Changed per your suggestion. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org