You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/07/28 06:09:00 UTC

spark git commit: [SPARK-8195] [SPARK-8196] [SQL] udf next_day last_day

Repository: spark
Updated Branches:
  refs/heads/master daa1964b6 -> 2e7f99a00


[SPARK-8195] [SPARK-8196] [SQL] udf next_day last_day

next_day, returns next certain dayofweek.
last_day, returns the last day of the month which given date belongs to.

Author: Daoyuan Wang <da...@intel.com>

Closes #6986 from adrian-wang/udfnlday and squashes the following commits:

ef7e3da [Daoyuan Wang] fix
02b3426 [Daoyuan Wang] address 2 comments
dc69630 [Daoyuan Wang] address comments from rxin
8846086 [Daoyuan Wang] address comments from rxin
d09bcce [Daoyuan Wang] multi fix
1a9de3d [Daoyuan Wang] function next_day and last_day


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2e7f99a0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2e7f99a0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2e7f99a0

Branch: refs/heads/master
Commit: 2e7f99a004f08a42e86f6f603e4ba35cb52561c4
Parents: daa1964
Author: Daoyuan Wang <da...@intel.com>
Authored: Mon Jul 27 21:08:56 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Mon Jul 27 21:08:56 2015 -0700

----------------------------------------------------------------------
 .../catalyst/analysis/FunctionRegistry.scala    |  4 +-
 .../expressions/datetimeFunctions.scala         | 72 ++++++++++++++++++++
 .../spark/sql/catalyst/util/DateTimeUtils.scala | 46 +++++++++++++
 .../expressions/DateExpressionsSuite.scala      | 28 ++++++++
 .../scala/org/apache/spark/sql/functions.scala  | 17 +++++
 .../apache/spark/sql/DateFunctionsSuite.scala   | 22 ++++++
 6 files changed, 188 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2e7f99a0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index aa05f44..61ee6f6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -219,8 +219,10 @@ object FunctionRegistry {
     expression[DayOfYear]("dayofyear"),
     expression[DayOfMonth]("dayofmonth"),
     expression[Hour]("hour"),
-    expression[Month]("month"),
+    expression[LastDay]("last_day"),
     expression[Minute]("minute"),
+    expression[Month]("month"),
+    expression[NextDay]("next_day"),
     expression[Quarter]("quarter"),
     expression[Second]("second"),
     expression[WeekOfYear]("weekofyear"),

http://git-wip-us.apache.org/repos/asf/spark/blob/2e7f99a0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala
index 9e55f05..b00a1b2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala
@@ -265,3 +265,75 @@ case class DateFormatClass(left: Expression, right: Expression) extends BinaryEx
     })
   }
 }
+
+/**
+ * Returns the last day of the month which the date belongs to.
+ */
+case class LastDay(startDate: Expression) extends UnaryExpression with ImplicitCastInputTypes {
+  override def child: Expression = startDate
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(DateType)
+
+  override def dataType: DataType = DateType
+
+  override def prettyName: String = "last_day"
+
+  override def nullSafeEval(date: Any): Any = {
+    val days = date.asInstanceOf[Int]
+    DateTimeUtils.getLastDayOfMonth(days)
+  }
+
+  override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
+    val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
+    defineCodeGen(ctx, ev, (sd) => {
+      s"$dtu.getLastDayOfMonth($sd)"
+    })
+  }
+}
+
+/**
+ * Returns the first date which is later than startDate and named as dayOfWeek.
+ * For example, NextDay(2015-07-27, Sunday) would return 2015-08-02, which is the first
+ * sunday later than 2015-07-27.
+ */
+case class NextDay(startDate: Expression, dayOfWeek: Expression)
+  extends BinaryExpression with ImplicitCastInputTypes {
+
+  override def left: Expression = startDate
+  override def right: Expression = dayOfWeek
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(DateType, StringType)
+
+  override def dataType: DataType = DateType
+
+  override def nullSafeEval(start: Any, dayOfW: Any): Any = {
+    val dow = DateTimeUtils.getDayOfWeekFromString(dayOfW.asInstanceOf[UTF8String])
+    if (dow == -1) {
+      null
+    } else {
+      val sd = start.asInstanceOf[Int]
+      DateTimeUtils.getNextDateForDayOfWeek(sd, dow)
+    }
+  }
+
+  override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
+    nullSafeCodeGen(ctx, ev, (sd, dowS) => {
+      val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
+      val dow = ctx.freshName("dow")
+      val genDow = if (right.foldable) {
+        val dowVal = DateTimeUtils.getDayOfWeekFromString(
+          dayOfWeek.eval(InternalRow.empty).asInstanceOf[UTF8String])
+        s"int $dow = $dowVal;"
+      } else {
+        s"int $dow = $dtu.getDayOfWeekFromString($dowS);"
+      }
+      genDow + s"""
+        if ($dow == -1) {
+          ${ev.isNull} = true;
+        } else {
+          ${ev.primitive} = $dtu.getNextDateForDayOfWeek($sd, $dow);
+        }
+       """
+    })
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2e7f99a0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index 07412e7..2e28fb9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -573,4 +573,50 @@ object DateTimeUtils {
       dayInYear - 334
     }
   }
+
+  /**
+   * Returns Day of week from String. Starting from Thursday, marked as 0.
+   * (Because 1970-01-01 is Thursday).
+   */
+  def getDayOfWeekFromString(string: UTF8String): Int = {
+    val dowString = string.toString.toUpperCase
+    dowString match {
+      case "SU" | "SUN" | "SUNDAY" => 3
+      case "MO" | "MON" | "MONDAY" => 4
+      case "TU" | "TUE" | "TUESDAY" => 5
+      case "WE" | "WED" | "WEDNESDAY" => 6
+      case "TH" | "THU" | "THURSDAY" => 0
+      case "FR" | "FRI" | "FRIDAY" => 1
+      case "SA" | "SAT" | "SATURDAY" => 2
+      case _ => -1
+    }
+  }
+
+  /**
+   * Returns the first date which is later than startDate and is of the given dayOfWeek.
+   * dayOfWeek is an integer ranges in [0, 6], and 0 is Thu, 1 is Fri, etc,.
+   */
+  def getNextDateForDayOfWeek(startDate: Int, dayOfWeek: Int): Int = {
+    startDate + 1 + ((dayOfWeek - 1 - startDate) % 7 + 7) % 7
+  }
+
+  /**
+   * number of days in a non-leap year.
+   */
+  private[this] val daysInNormalYear = Array(31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31)
+
+  /**
+   * Returns last day of the month for the given date. The date is expressed in days
+   * since 1.1.1970.
+   */
+  def getLastDayOfMonth(date: Int): Int = {
+    val dayOfMonth = getDayOfMonth(date)
+    val month = getMonth(date)
+    if (month == 2 && isLeapYear(getYear(date))) {
+      date + daysInNormalYear(month - 1) + 1 - dayOfMonth
+    } else {
+      date + daysInNormalYear(month - 1) - dayOfMonth
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2e7f99a0/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
index bdba6ce..4d2d337 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
@@ -22,6 +22,7 @@ import java.text.SimpleDateFormat
 import java.util.Calendar
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.types.{StringType, TimestampType, DateType}
 
 class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
@@ -246,4 +247,31 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
     }
   }
 
+  test("last_day") {
+    checkEvaluation(LastDay(Literal(Date.valueOf("2015-02-28"))), Date.valueOf("2015-02-28"))
+    checkEvaluation(LastDay(Literal(Date.valueOf("2015-03-27"))), Date.valueOf("2015-03-31"))
+    checkEvaluation(LastDay(Literal(Date.valueOf("2015-04-26"))), Date.valueOf("2015-04-30"))
+    checkEvaluation(LastDay(Literal(Date.valueOf("2015-05-25"))), Date.valueOf("2015-05-31"))
+    checkEvaluation(LastDay(Literal(Date.valueOf("2015-06-24"))), Date.valueOf("2015-06-30"))
+    checkEvaluation(LastDay(Literal(Date.valueOf("2015-07-23"))), Date.valueOf("2015-07-31"))
+    checkEvaluation(LastDay(Literal(Date.valueOf("2015-08-01"))), Date.valueOf("2015-08-31"))
+    checkEvaluation(LastDay(Literal(Date.valueOf("2015-09-02"))), Date.valueOf("2015-09-30"))
+    checkEvaluation(LastDay(Literal(Date.valueOf("2015-10-03"))), Date.valueOf("2015-10-31"))
+    checkEvaluation(LastDay(Literal(Date.valueOf("2015-11-04"))), Date.valueOf("2015-11-30"))
+    checkEvaluation(LastDay(Literal(Date.valueOf("2015-12-05"))), Date.valueOf("2015-12-31"))
+    checkEvaluation(LastDay(Literal(Date.valueOf("2016-01-06"))), Date.valueOf("2016-01-31"))
+    checkEvaluation(LastDay(Literal(Date.valueOf("2016-02-07"))), Date.valueOf("2016-02-29"))
+  }
+
+  test("next_day") {
+    checkEvaluation(
+      NextDay(Literal(Date.valueOf("2015-07-23")), Literal("Thu")),
+      DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-30")))
+    checkEvaluation(
+      NextDay(Literal(Date.valueOf("2015-07-23")), Literal("THURSDAY")),
+      DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-30")))
+    checkEvaluation(
+      NextDay(Literal(Date.valueOf("2015-07-23")), Literal("th")),
+      DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-30")))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2e7f99a0/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index cab3db6..d18558b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -2033,6 +2033,13 @@ object functions {
   def hour(columnName: String): Column = hour(Column(columnName))
 
   /**
+   * Returns the last day of the month which the given date belongs to.
+   * @group datetime_funcs
+   * @since 1.5.0
+   */
+  def last_day(e: Column): Column = LastDay(e.expr)
+
+  /**
    * Extracts the minutes as an integer from a given date/timestamp/string.
    * @group datetime_funcs
    * @since 1.5.0
@@ -2047,6 +2054,16 @@ object functions {
   def minute(columnName: String): Column = minute(Column(columnName))
 
   /**
+   * Returns the first date which is later than given date sd and named as dow.
+   * For example, `next_day('2015-07-27', "Sunday")` would return 2015-08-02, which is the
+   * first Sunday later than 2015-07-27. The parameter dayOfWeek could be 2-letter, 3-letter,
+   * or full name of the day of the week (e.g. Mo, tue, FRIDAY).
+   * @group datetime_funcs
+   * @since 1.5.0
+   */
+  def next_day(sd: Column, dayOfWeek: String): Column = NextDay(sd.expr, lit(dayOfWeek).expr)
+
+  /**
    * Extracts the seconds as an integer from a given date/timestamp/string.
    * @group datetime_funcs
    * @since 1.5.0

http://git-wip-us.apache.org/repos/asf/spark/blob/2e7f99a0/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
index 9e80ae8..ff1c756 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
@@ -184,4 +184,26 @@ class DateFunctionsSuite extends QueryTest {
       Row(15, 15, 15))
   }
 
+  test("function last_day") {
+    val df1 = Seq((1, "2015-07-23"), (2, "2015-07-24")).toDF("i", "d")
+    val df2 = Seq((1, "2015-07-23 00:11:22"), (2, "2015-07-24 11:22:33")).toDF("i", "t")
+    checkAnswer(
+      df1.select(last_day(col("d"))),
+      Seq(Row(Date.valueOf("2015-07-31")), Row(Date.valueOf("2015-07-31"))))
+    checkAnswer(
+      df2.select(last_day(col("t"))),
+      Seq(Row(Date.valueOf("2015-07-31")), Row(Date.valueOf("2015-07-31"))))
+  }
+
+  test("function next_day") {
+    val df1 = Seq(("mon", "2015-07-23"), ("tuesday", "2015-07-20")).toDF("dow", "d")
+    val df2 = Seq(("th", "2015-07-23 00:11:22"), ("xx", "2015-07-24 11:22:33")).toDF("dow", "t")
+    checkAnswer(
+      df1.select(next_day(col("d"), "MONDAY")),
+      Seq(Row(Date.valueOf("2015-07-27")), Row(Date.valueOf("2015-07-27"))))
+    checkAnswer(
+      df2.select(next_day(col("t"), "th")),
+      Seq(Row(Date.valueOf("2015-07-30")), Row(null)))
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org