You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/10/24 06:49:24 UTC

[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #3916: Support for non-u64 types for Window Bound

mustafasrepo commented on code in PR #3916:
URL: https://github.com/apache/arrow-datafusion/pull/3916#discussion_r1002930609


##########
datafusion/common/src/scalar.rs:
##########
@@ -476,35 +480,144 @@ macro_rules! impl_op {
             (ScalarValue::Int8(lhs), ScalarValue::Int8(rhs)) => {
                 primitive_op!(lhs, rhs, Int8, $OPERATION)
             }
-            _ => {
-                impl_distinct_cases_op!($LHS, $RHS, $OPERATION)
+            // Binary operations on arguments with different types:
+            (ScalarValue::Date32(Some(days)), _) => {
+                let value = date32_add(*days, $RHS, get_sign!($OPERATION))?;
+                Ok(ScalarValue::Date32(Some(value)))
+            }
+            (ScalarValue::Date64(Some(ms)), _) => {
+                let value = date64_add(*ms, $RHS, get_sign!($OPERATION))?;
+                Ok(ScalarValue::Date64(Some(value)))
+            }
+            (ScalarValue::TimestampSecond(Some(ts_s), zone), _) => {
+                let value = seconds_add(*ts_s, $RHS, get_sign!($OPERATION))?;
+                Ok(ScalarValue::TimestampSecond(Some(value), zone.clone()))
+            }
+            (ScalarValue::TimestampMillisecond(Some(ts_ms), zone), _) => {
+                let value = milliseconds_add(*ts_ms, $RHS, get_sign!($OPERATION))?;
+                Ok(ScalarValue::TimestampMillisecond(Some(value), zone.clone()))
             }
+            (ScalarValue::TimestampMicrosecond(Some(ts_us), zone), _) => {
+                let value = microseconds_add(*ts_us, $RHS, get_sign!($OPERATION))?;
+                Ok(ScalarValue::TimestampMicrosecond(Some(value), zone.clone()))
+            }
+            (ScalarValue::TimestampNanosecond(Some(ts_ns), zone), _) => {
+                let value = nanoseconds_add(*ts_ns, $RHS, get_sign!($OPERATION))?;
+                Ok(ScalarValue::TimestampNanosecond(Some(value), zone.clone()))
+            }
+            _ => Err(DataFusionError::Internal(format!(
+                "Operator {} is not implemented for types {:?} and {:?}",
+                stringify!($OPERATION),
+                $LHS,
+                $RHS
+            ))),
         }
     };
 }
 
-// If we want a special implementation for an operation this is the place to implement it.
-// For instance, in the future we may want to implement subtraction for dates but not addition.
-// We can implement such special cases here.
-macro_rules! impl_distinct_cases_op {
-    ($LHS:expr, $RHS:expr, +) => {
-        match ($LHS, $RHS) {
-            e => Err(DataFusionError::Internal(format!(
-                "Addition is not implemented for {:?}",
-                e
-            ))),
-        }
+macro_rules! get_sign {
+    (+) => {
+        1
     };
-    ($LHS:expr, $RHS:expr, -) => {
-        match ($LHS, $RHS) {
-            e => Err(DataFusionError::Internal(format!(
-                "Subtraction is not implemented for {:?}",
-                e
-            ))),
-        }
+    (-) => {
+        -1
     };
 }
 
+#[inline]
+pub fn date32_add(days: i32, scalar: &ScalarValue, sign: i32) -> Result<i32> {
+    let epoch = NaiveDate::from_ymd(1970, 1, 1);
+    let prior = epoch.add(Duration::days(days as i64));
+    let posterior = do_date_math(prior, scalar, sign)?;
+    Ok(posterior.sub(epoch).num_days() as i32)
+}
+
+#[inline]
+pub fn date64_add(ms: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+    let epoch = NaiveDate::from_ymd(1970, 1, 1);
+    let prior = epoch.add(Duration::milliseconds(ms));
+    let posterior = do_date_math(prior, scalar, sign)?;
+    Ok(posterior.sub(epoch).num_milliseconds())
+}
+
+#[inline]
+pub fn seconds_add(ts_s: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+    Ok(do_date_time_math(ts_s, 0, scalar, sign)?.timestamp())
+}
+
+#[inline]
+pub fn milliseconds_add(ts_ms: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+    let secs = ts_ms / 1000;
+    let nsecs = ((ts_ms % 1000) * 1_000_000) as u32;
+    Ok(do_date_time_math(secs, nsecs, scalar, sign)?.timestamp_millis())
+}
+
+#[inline]
+pub fn microseconds_add(ts_us: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+    let secs = ts_us / 1_000_000;
+    let nsecs = ((ts_us % 1_000_000) * 1000) as u32;
+    Ok(do_date_time_math(secs, nsecs, scalar, sign)?.timestamp_nanos() / 1000)
+}
+
+#[inline]
+pub fn nanoseconds_add(ts_ns: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+    let secs = ts_ns / 1_000_000_000;
+    let nsecs = (ts_ns % 1_000_000_000) as u32;
+    Ok(do_date_time_math(secs, nsecs, scalar, sign)?.timestamp_nanos())
+}
+
+#[inline]
+fn do_date_time_math(
+    secs: i64,
+    nsecs: u32,
+    scalar: &ScalarValue,
+    sign: i32,
+) -> Result<NaiveDateTime> {
+    let prior = NaiveDateTime::from_timestamp(secs, nsecs);
+    do_date_math(prior, scalar, sign)
+}
+
+fn do_date_math<D>(prior: D, scalar: &ScalarValue, sign: i32) -> Result<D>
+where
+    D: Datelike + Add<Duration, Output = D>,
+{
+    Ok(match scalar {
+        ScalarValue::IntervalDayTime(Some(i)) => add_day_time(prior, *i, sign),
+        ScalarValue::IntervalYearMonth(Some(i)) => shift_months(prior, *i * sign),
+        ScalarValue::IntervalMonthDayNano(Some(i)) => add_m_d_nano(prior, *i, sign),
+        other => Err(DataFusionError::Execution(format!(
+            "DateIntervalExpr does not support non-interval type {:?}",
+            other
+        )))?,
+    })
+}
+
+// Can remove once https://github.com/apache/arrow-rs/pull/2031 is released

Review Comment:
   @avantgardnerio I have updated the comment per your suggestion. Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org