You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ru...@apache.org on 2023/06/20 05:32:25 UTC

[spark] branch master updated: [SPARK-44073][SQL][PYTHON][CONNECT] Add date time functions to Scala, Python and Connect - part 2

This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a8f2ec53fd [SPARK-44073][SQL][PYTHON][CONNECT] Add date time functions to Scala, Python and Connect - part 2
7a8f2ec53fd is described below

commit 7a8f2ec53fdeee4119c62ed8e1143987ff751482
Author: Jiaan Geng <be...@163.com>
AuthorDate: Tue Jun 20 13:32:02 2023 +0800

    [SPARK-44073][SQL][PYTHON][CONNECT] Add date time functions to Scala, Python and Connect - part 2
    
    ### What changes were proposed in this pull request?
    This PR want add some date time functions to Scala, Python and Connect API. These functions show below.
    
    - weekday
    - convert_timezone
    - now
    - timestamp_micros
    - timestamp_millis
    
    The origin plan also contains function `extract`. You can see this PR exclude it, since we can't get the data type for unresolved expressions. Please refer
    https://github.com/apache/spark/blob/b97ce8b9a99c570fc57dec967e7e9db3d115c1db/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala#L2835 and https://github.com/apache/spark/blob/b97ce8b9a99c570fc57dec967e7e9db3d115c1db/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala#L2922
    
    ### Why are the changes needed?
    Add date time functions to Scala, Python and Connect API.
    
    ### Does this PR introduce _any_ user-facing change?
    'No'.
    New feature.
    
    ### How was this patch tested?
    New test cases.
    
    Closes #41651 from beliefer/SPARK-44073.
    
    Authored-by: Jiaan Geng <be...@163.com>
    Signed-off-by: Ruifeng Zheng <ru...@apache.org>
---
 .../scala/org/apache/spark/sql/functions.scala     |  74 ++++++++-
 .../apache/spark/sql/PlanGenerationTestSuite.scala |  24 +++
 ..._convert_timezone_with_source_time_zone.explain |   2 +
 ...nvert_timezone_without_source_time_zone.explain |   2 +
 .../explain-results/function_now.explain           |   2 +
 .../function_timestamp_micros.explain              |   2 +
 .../function_timestamp_millis.explain              |   2 +
 .../explain-results/function_weekday.explain       |   2 +
 ...ion_convert_timezone_with_source_time_zone.json |  33 ++++
 ...onvert_timezone_with_source_time_zone.proto.bin | Bin 0 -> 170 bytes
 ..._convert_timezone_without_source_time_zone.json |  29 ++++
 ...ert_timezone_without_source_time_zone.proto.bin | Bin 0 -> 150 bytes
 .../query-tests/queries/function_now.json          |  20 +++
 .../query-tests/queries/function_now.proto.bin     | Bin 0 -> 110 bytes
 .../queries/function_timestamp_micros.json         |  25 +++
 .../queries/function_timestamp_micros.proto.bin    | Bin 0 -> 130 bytes
 .../queries/function_timestamp_millis.json         |  25 +++
 .../queries/function_timestamp_millis.proto.bin    | Bin 0 -> 130 bytes
 .../query-tests/queries/function_weekday.json      |  25 +++
 .../query-tests/queries/function_weekday.proto.bin | Bin 0 -> 121 bytes
 .../source/reference/pyspark.sql/functions.rst     |   5 +
 python/pyspark/sql/connect/functions.py            |  40 +++++
 python/pyspark/sql/functions.py                    | 176 +++++++++++++++++++++
 .../scala/org/apache/spark/sql/functions.scala     |  64 ++++++++
 .../org/apache/spark/sql/DateFunctionsSuite.scala  |  53 +++++++
 25 files changed, 597 insertions(+), 8 deletions(-)

diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
index ccd46c2d267..93cf8f521b2 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
@@ -4198,6 +4198,14 @@ object functions {
    */
   def current_timestamp(): Column = Column.fn("current_timestamp")
 
+  /**
+   * Returns the current timestamp at the start of query evaluation.
+   *
+   * @group datetime_funcs
+   * @since 3.5.0
+   */
+  def now(): Column = Column.fn("now")
+
   /**
    * Returns the current timestamp without time zone at the start of query evaluation as a
    * timestamp without time zone column. All calls of localtimestamp within the same query return
@@ -4459,6 +4467,14 @@ object functions {
    */
   def minute(e: Column): Column = Column.fn("minute", e)
 
+  /**
+   * Returns the day of the week for date/timestamp (0 = Monday, 1 = Tuesday, ..., 6 = Sunday).
+   *
+   * @group datetime_funcs
+   * @since 3.5.0
+   */
+  def weekday(e: Column): Column = Column.fn("weekday", e)
+
   /**
    * @return
    *   A date created from year, month and day fields.
@@ -5072,6 +5088,22 @@ object functions {
    */
   def timestamp_seconds(e: Column): Column = Column.fn("timestamp_seconds", e)
 
+  /**
+   * Creates timestamp from the number of milliseconds since UTC epoch.
+   *
+   * @group datetime_funcs
+   * @since 3.5.0
+   */
+  def timestamp_millis(e: Column): Column = Column.fn("timestamp_millis", e)
+
+  /**
+   * Creates timestamp from the number of microseconds since UTC epoch.
+   *
+   * @group datetime_funcs
+   * @since 3.5.0
+   */
+  def timestamp_micros(e: Column): Column = Column.fn("timestamp_micros", e)
+
   /**
    * Parses the `timestamp` expression with the `format` expression to a timestamp without time
    * zone. Returns null with invalid input.
@@ -6493,8 +6525,7 @@ object functions {
    * @group partition_transforms
    * @since 3.4.0
    */
-  def years(e: Column): Column =
-    Column.fn("years", e)
+  def years(e: Column): Column = Column.fn("years", e)
 
   /**
    * A transform for timestamps and dates to partition data into months.
@@ -6502,8 +6533,7 @@ object functions {
    * @group partition_transforms
    * @since 3.4.0
    */
-  def months(e: Column): Column =
-    Column.fn("months", e)
+  def months(e: Column): Column = Column.fn("months", e)
 
   /**
    * A transform for timestamps and dates to partition data into days.
@@ -6511,8 +6541,7 @@ object functions {
    * @group partition_transforms
    * @since 3.4.0
    */
-  def days(e: Column): Column =
-    Column.fn("days", e)
+  def days(e: Column): Column = Column.fn("days", e)
 
   /**
    * A transform for timestamps to partition data into hours.
@@ -6520,8 +6549,37 @@ object functions {
    * @group partition_transforms
    * @since 3.4.0
    */
-  def hours(e: Column): Column =
-    Column.fn("hours", e)
+  def hours(e: Column): Column = Column.fn("hours", e)
+
+  /**
+   * Converts the timestamp without time zone `sourceTs` from the `sourceTz` time zone to
+   * `targetTz`.
+   *
+   * @param sourceTz
+   *   the time zone for the input timestamp. If it is missed, the current session time zone is
+   *   used as the source time zone.
+   * @param targetTz
+   *   the time zone to which the input timestamp should be converted.
+   * @param sourceTs
+   *   a timestamp without time zone.
+   * @group datetime_funcs
+   * @since 3.5.0
+   */
+  def convert_timezone(sourceTz: Column, targetTz: Column, sourceTs: Column): Column =
+    Column.fn("convert_timezone", sourceTz, targetTz, sourceTs)
+
+  /**
+   * Converts the timestamp without time zone `sourceTs` from the current time zone to `targetTz`.
+   *
+   * @param targetTz
+   *   the time zone to which the input timestamp should be converted.
+   * @param sourceTs
+   *   a timestamp without time zone.
+   * @group datetime_funcs
+   * @since 3.5.0
+   */
+  def convert_timezone(targetTz: Column, sourceTs: Column): Column =
+    Column.fn("convert_timezone", targetTz, sourceTs)
 
   /**
    * Make DayTimeIntervalType duration from days, hours, mins and secs.
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index 0b3bcf8a79c..e212720b84e 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -1717,6 +1717,14 @@ class PlanGenerationTestSuite
     fn.hours(Column("a"))
   }
 
+  temporalFunctionTest("convert_timezone with source time zone") {
+    fn.convert_timezone(lit("\"Africa/Dakar\""), lit("\"Asia/Urumqi\""), fn.col("t"))
+  }
+
+  temporalFunctionTest("convert_timezone without source time zone") {
+    fn.convert_timezone(lit("\"Asia/Urumqi\""), fn.col("t"))
+  }
+
   functionTest("make_dt_interval days hours mins secs") {
     fn.make_dt_interval(fn.col("a"), fn.col("a"), fn.col("a"), fn.col("b"))
   }
@@ -1909,6 +1917,10 @@ class PlanGenerationTestSuite
     fn.current_timestamp()
   }
 
+  temporalFunctionTest("now") {
+    fn.now()
+  }
+
   temporalFunctionTest("localtimestamp") {
     fn.localtimestamp()
   }
@@ -1981,6 +1993,10 @@ class PlanGenerationTestSuite
     fn.minute(fn.col("t"))
   }
 
+  temporalFunctionTest("weekday") {
+    fn.weekday(fn.col("d"))
+  }
+
   temporalFunctionTest("make_date") {
     fn.make_date(fn.lit(2018), fn.lit(5), fn.lit(14))
   }
@@ -2120,6 +2136,14 @@ class PlanGenerationTestSuite
     fn.timestamp_seconds(fn.col("x"))
   }
 
+  temporalFunctionTest("timestamp_millis") {
+    fn.timestamp_millis(fn.col("x"))
+  }
+
+  temporalFunctionTest("timestamp_micros") {
+    fn.timestamp_micros(fn.col("x"))
+  }
+
   // Array of Long
   // Array of Long
   // Array of Array of Long
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_convert_timezone_with_source_time_zone.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_convert_timezone_with_source_time_zone.explain
new file mode 100644
index 00000000000..bdae1acca70
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_convert_timezone_with_source_time_zone.explain
@@ -0,0 +1,2 @@
+Project [convert_timezone("Africa/Dakar", "Asia/Urumqi", cast(t#0 as timestamp_ntz)) AS convert_timezone("Africa/Dakar", "Asia/Urumqi", t)#0]
++- LocalRelation <empty>, [d#0, t#0, s#0, x#0L, wt#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_convert_timezone_without_source_time_zone.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_convert_timezone_without_source_time_zone.explain
new file mode 100644
index 00000000000..5d141976008
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_convert_timezone_without_source_time_zone.explain
@@ -0,0 +1,2 @@
+Project [convert_timezone(current_timezone(), "Asia/Urumqi", cast(t#0 as timestamp_ntz)) AS convert_timezone(current_timezone(), "Asia/Urumqi", t)#0]
++- LocalRelation <empty>, [d#0, t#0, s#0, x#0L, wt#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_now.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_now.explain
new file mode 100644
index 00000000000..058d1f60f38
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_now.explain
@@ -0,0 +1,2 @@
+Project [now() AS now()#0]
++- LocalRelation <empty>, [d#0, t#0, s#0, x#0L, wt#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_timestamp_micros.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_timestamp_micros.explain
new file mode 100644
index 00000000000..5c8e30e6e5e
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_timestamp_micros.explain
@@ -0,0 +1,2 @@
+Project [timestamp_micros(x#0L) AS timestamp_micros(x)#0]
++- LocalRelation <empty>, [d#0, t#0, s#0, x#0L, wt#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_timestamp_millis.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_timestamp_millis.explain
new file mode 100644
index 00000000000..7902cd8a9e0
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_timestamp_millis.explain
@@ -0,0 +1,2 @@
+Project [timestamp_millis(x#0L) AS timestamp_millis(x)#0]
++- LocalRelation <empty>, [d#0, t#0, s#0, x#0L, wt#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_weekday.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_weekday.explain
new file mode 100644
index 00000000000..20bbaf80b5f
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_weekday.explain
@@ -0,0 +1,2 @@
+Project [weekday(d#0) AS weekday(d)#0]
++- LocalRelation <empty>, [d#0, t#0, s#0, x#0L, wt#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_convert_timezone_with_source_time_zone.json b/connector/connect/common/src/test/resources/query-tests/queries/function_convert_timezone_with_source_time_zone.json
new file mode 100644
index 00000000000..b27d7e2b55f
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_convert_timezone_with_source_time_zone.json
@@ -0,0 +1,33 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cd:date,t:timestamp,s:string,x:bigint,wt:struct\u003cstart:timestamp,end:timestamp\u003e\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "convert_timezone",
+        "arguments": [{
+          "literal": {
+            "string": "\"Africa/Dakar\""
+          }
+        }, {
+          "literal": {
+            "string": "\"Asia/Urumqi\""
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "t"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_convert_timezone_with_source_time_zone.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_convert_timezone_with_source_time_zone.proto.bin
new file mode 100644
index 00000000000..8ef4e3bdce2
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_convert_timezone_with_source_time_zone.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_convert_timezone_without_source_time_zone.json b/connector/connect/common/src/test/resources/query-tests/queries/function_convert_timezone_without_source_time_zone.json
new file mode 100644
index 00000000000..b072c89d42b
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_convert_timezone_without_source_time_zone.json
@@ -0,0 +1,29 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cd:date,t:timestamp,s:string,x:bigint,wt:struct\u003cstart:timestamp,end:timestamp\u003e\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "convert_timezone",
+        "arguments": [{
+          "literal": {
+            "string": "\"Asia/Urumqi\""
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "t"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_convert_timezone_without_source_time_zone.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_convert_timezone_without_source_time_zone.proto.bin
new file mode 100644
index 00000000000..c6d1db9b8fb
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_convert_timezone_without_source_time_zone.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_now.json b/connector/connect/common/src/test/resources/query-tests/queries/function_now.json
new file mode 100644
index 00000000000..98556585c3e
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_now.json
@@ -0,0 +1,20 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cd:date,t:timestamp,s:string,x:bigint,wt:struct\u003cstart:timestamp,end:timestamp\u003e\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "now"
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_now.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_now.proto.bin
new file mode 100644
index 00000000000..a8fcd67fa19
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_now.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_timestamp_micros.json b/connector/connect/common/src/test/resources/query-tests/queries/function_timestamp_micros.json
new file mode 100644
index 00000000000..e43aa6d7115
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_timestamp_micros.json
@@ -0,0 +1,25 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cd:date,t:timestamp,s:string,x:bigint,wt:struct\u003cstart:timestamp,end:timestamp\u003e\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "timestamp_micros",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "x"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_timestamp_micros.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_timestamp_micros.proto.bin
new file mode 100644
index 00000000000..c8ca8eedef3
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_timestamp_micros.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_timestamp_millis.json b/connector/connect/common/src/test/resources/query-tests/queries/function_timestamp_millis.json
new file mode 100644
index 00000000000..afcdf42d7b3
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_timestamp_millis.json
@@ -0,0 +1,25 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cd:date,t:timestamp,s:string,x:bigint,wt:struct\u003cstart:timestamp,end:timestamp\u003e\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "timestamp_millis",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "x"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_timestamp_millis.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_timestamp_millis.proto.bin
new file mode 100644
index 00000000000..bbe401c39f3
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_timestamp_millis.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_weekday.json b/connector/connect/common/src/test/resources/query-tests/queries/function_weekday.json
new file mode 100644
index 00000000000..b7577002917
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_weekday.json
@@ -0,0 +1,25 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cd:date,t:timestamp,s:string,x:bigint,wt:struct\u003cstart:timestamp,end:timestamp\u003e\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "weekday",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "d"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_weekday.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_weekday.proto.bin
new file mode 100644
index 00000000000..1954103269e
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_weekday.proto.bin differ
diff --git a/python/docs/source/reference/pyspark.sql/functions.rst b/python/docs/source/reference/pyspark.sql/functions.rst
index 92422818737..f41443b8274 100644
--- a/python/docs/source/reference/pyspark.sql/functions.rst
+++ b/python/docs/source/reference/pyspark.sql/functions.rst
@@ -116,6 +116,7 @@ Datetime Functions
     :toctree: api/
 
     add_months
+    convert_timezone
     curdate
     current_date
     current_timestamp
@@ -150,6 +151,7 @@ Datetime Functions
     next_day
     hour
     make_date
+    now
     from_unixtime
     unix_timestamp
     to_unix_timestamp
@@ -160,8 +162,11 @@ Datetime Functions
     trunc
     from_utc_timestamp
     to_utc_timestamp
+    weekday
     window
     session_window
+    timestamp_micros
+    timestamp_millis
     timestamp_seconds
     unix_date
     unix_micros
diff --git a/python/pyspark/sql/connect/functions.py b/python/pyspark/sql/connect/functions.py
index 3f4395f9297..90800106cf7 100644
--- a/python/pyspark/sql/connect/functions.py
+++ b/python/pyspark/sql/connect/functions.py
@@ -2628,6 +2628,13 @@ def current_timestamp() -> Column:
 current_timestamp.__doc__ = pysparkfuncs.current_timestamp.__doc__
 
 
+def now() -> Column:
+    return _invoke_function("current_timestamp")
+
+
+now.__doc__ = pysparkfuncs.now.__doc__
+
+
 def current_timezone() -> Column:
     return _invoke_function("current_timezone")
 
@@ -2726,6 +2733,13 @@ def weekofyear(col: "ColumnOrName") -> Column:
 weekofyear.__doc__ = pysparkfuncs.weekofyear.__doc__
 
 
+def weekday(col: "ColumnOrName") -> Column:
+    return _invoke_function_over_columns("weekday", col)
+
+
+weekday.__doc__ = pysparkfuncs.weekday.__doc__
+
+
 def make_date(year: "ColumnOrName", month: "ColumnOrName", day: "ColumnOrName") -> Column:
     return _invoke_function_over_columns("make_date", year, month, day)
 
@@ -2995,6 +3009,20 @@ def timestamp_seconds(col: "ColumnOrName") -> Column:
 timestamp_seconds.__doc__ = pysparkfuncs.timestamp_seconds.__doc__
 
 
+def timestamp_millis(col: "ColumnOrName") -> Column:
+    return _invoke_function_over_columns("timestamp_millis", col)
+
+
+timestamp_millis.__doc__ = pysparkfuncs.timestamp_millis.__doc__
+
+
+def timestamp_micros(col: "ColumnOrName") -> Column:
+    return _invoke_function_over_columns("timestamp_micros", col)
+
+
+timestamp_micros.__doc__ = pysparkfuncs.timestamp_micros.__doc__
+
+
 def window(
     timeColumn: "ColumnOrName",
     windowDuration: str,
@@ -3159,6 +3187,18 @@ def hours(col: "ColumnOrName") -> Column:
 hours.__doc__ = pysparkfuncs.hours.__doc__
 
 
+def convert_timezone(
+    sourceTz: Optional[Column], targetTz: Column, sourceTs: "ColumnOrName"
+) -> Column:
+    if sourceTz is None:
+        return _invoke_function_over_columns("convert_timezone", targetTz, sourceTs)
+    else:
+        return _invoke_function_over_columns("convert_timezone", sourceTz, targetTz, sourceTs)
+
+
+convert_timezone.__doc__ = pysparkfuncs.convert_timezone.__doc__
+
+
 def make_dt_interval(
     days: Optional["ColumnOrName"] = None,
     hours: Optional["ColumnOrName"] = None,
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 1a5633e3c5e..3eaccdc1ea1 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -5581,6 +5581,31 @@ def current_timestamp() -> Column:
     return _invoke_function("current_timestamp")
 
 
+@try_remote_functions
+def now() -> Column:
+    """
+    Returns the current timestamp at the start of query evaluation.
+
+    .. versionadded:: 3.5.0
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        current timestamp at the start of query evaluation.
+
+    Examples
+    --------
+    >>> df = spark.range(1)
+    >>> df.select(now()).show(truncate=False) # doctest: +SKIP
+    +-----------------------+
+    |now()    |
+    +-----------------------+
+    |2022-08-26 21:23:22.716|
+    +-----------------------+
+    """
+    return _invoke_function("current_timestamp")
+
+
 @try_remote_functions
 def localtimestamp() -> Column:
     """
@@ -5974,6 +5999,36 @@ def weekofyear(col: "ColumnOrName") -> Column:
     return _invoke_function_over_columns("weekofyear", col)
 
 
+@try_remote_functions
+def weekday(col: "ColumnOrName") -> Column:
+    """
+    Returns the day of the week for date/timestamp (0 = Monday, 1 = Tuesday, ..., 6 = Sunday).
+
+    .. versionadded:: 3.5.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+        target date/timestamp column to work on.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        the day of the week for date/timestamp (0 = Monday, 1 = Tuesday, ..., 6 = Sunday).
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame([('2015-04-08',)], ['dt'])
+    >>> df.select(weekday('dt').alias('day')).show()
+    +---+
+    |day|
+    +---+
+    |  2|
+    +---+
+    """
+    return _invoke_function_over_columns("weekday", col)
+
+
 @try_remote_functions
 def make_date(year: "ColumnOrName", month: "ColumnOrName", day: "ColumnOrName") -> Column:
     """
@@ -6970,6 +7025,76 @@ def timestamp_seconds(col: "ColumnOrName") -> Column:
     return _invoke_function_over_columns("timestamp_seconds", col)
 
 
+@try_remote_functions
+def timestamp_millis(col: "ColumnOrName") -> Column:
+    """
+    Creates timestamp from the number of milliseconds since UTC epoch.
+
+    .. versionadded:: 3.5.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+        unix time values.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        converted timestamp value.
+
+    Examples
+    --------
+    >>> spark.conf.set("spark.sql.session.timeZone", "UTC")
+    >>> time_df = spark.createDataFrame([(1230219000,)], ['unix_time'])
+    >>> time_df.select(timestamp_millis(time_df.unix_time).alias('ts')).show()
+    +-------------------+
+    |                 ts|
+    +-------------------+
+    |1970-01-15 05:43:39|
+    +-------------------+
+    >>> time_df.select(timestamp_millis('unix_time').alias('ts')).printSchema()
+    root
+     |-- ts: timestamp (nullable = true)
+    >>> spark.conf.unset("spark.sql.session.timeZone")
+    """
+    return _invoke_function_over_columns("timestamp_millis", col)
+
+
+@try_remote_functions
+def timestamp_micros(col: "ColumnOrName") -> Column:
+    """
+    Creates timestamp from the number of microseconds since UTC epoch.
+
+    .. versionadded:: 3.5.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+        unix time values.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        converted timestamp value.
+
+    Examples
+    --------
+    >>> spark.conf.set("spark.sql.session.timeZone", "UTC")
+    >>> time_df = spark.createDataFrame([(1230219000,)], ['unix_time'])
+    >>> time_df.select(timestamp_micros(time_df.unix_time).alias('ts')).show()
+    +--------------------+
+    |                  ts|
+    +--------------------+
+    |1970-01-01 00:20:...|
+    +--------------------+
+    >>> time_df.select(timestamp_micros('unix_time').alias('ts')).printSchema()
+    root
+     |-- ts: timestamp (nullable = true)
+    >>> spark.conf.unset("spark.sql.session.timeZone")
+    """
+    return _invoke_function_over_columns("timestamp_micros", col)
+
+
 @try_remote_functions
 def window(
     timeColumn: "ColumnOrName",
@@ -12952,6 +13077,57 @@ def hours(col: "ColumnOrName") -> Column:
     return _invoke_function_over_columns("hours", col)
 
 
+@try_remote_functions
+def convert_timezone(
+    sourceTz: Optional[Column], targetTz: Column, sourceTs: "ColumnOrName"
+) -> Column:
+    """
+    Converts the timestamp without time zone `sourceTs`
+    from the `sourceTz` time zone to `targetTz`.
+
+    .. versionadded:: 3.5.0
+
+    Parameters
+    ----------
+    sourceTz : :class:`~pyspark.sql.Column`
+        the time zone for the input timestamp. If it is missed,
+        the current session time zone is used as the source time zone.
+    targetTz : :class:`~pyspark.sql.Column`
+        the time zone to which the input timestamp should be converted.
+    sourceTs : :class:`~pyspark.sql.Column`
+        a timestamp without time zone.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        timestamp for converted time zone.
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame([('2015-04-08',)], ['dt'])
+    >>> df.select(convert_timezone(   # doctest: +SKIP
+    ...     None, lit('Asia/Hong_Kong'), 'dt').alias('ts')
+    ... ).show()
+    +-------------------+
+    |                 ts|
+    +-------------------+
+    |2015-04-08 00:00:00|
+    +-------------------+
+    >>> df.select(convert_timezone(
+    ...     lit('America/Los_Angeles'), lit('Asia/Hong_Kong'), 'dt').alias('ts')
+    ... ).show()
+    +-------------------+
+    |                 ts|
+    +-------------------+
+    |2015-04-08 15:00:00|
+    +-------------------+
+    """
+    if sourceTz is None:
+        return _invoke_function_over_columns("convert_timezone", targetTz, sourceTs)
+    else:
+        return _invoke_function_over_columns("convert_timezone", sourceTz, targetTz, sourceTs)
+
+
 @try_remote_functions
 def make_dt_interval(
     days: Optional["ColumnOrName"] = None,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 582e3b9e363..a18c6969d47 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -4343,6 +4343,14 @@ object functions {
    */
   def current_timestamp(): Column = withExpr { CurrentTimestamp() }
 
+  /**
+   * Returns the current timestamp at the start of query evaluation.
+   *
+   * @group datetime_funcs
+   * @since 3.5.0
+   */
+  def now(): Column = withExpr { Now() }
+
   /**
    * Returns the current timestamp without time zone at the start of query evaluation
    * as a timestamp without time zone column.
@@ -4570,6 +4578,14 @@ object functions {
    */
   def minute(e: Column): Column = withExpr { Minute(e.expr) }
 
+  /**
+   * Returns the day of the week for date/timestamp (0 = Monday, 1 = Tuesday, ..., 6 = Sunday).
+   *
+   * @group datetime_funcs
+   * @since 3.5.0
+   */
+  def weekday(e: Column): Column = withExpr { WeekDay(e.expr) }
+
   /**
    * @return A date created from year, month and day fields.
    * @group datetime_funcs
@@ -5192,6 +5208,26 @@ object functions {
     SecondsToTimestamp(e.expr)
   }
 
+  /**
+   * Creates timestamp from the number of milliseconds since UTC epoch.
+   *
+   * @group datetime_funcs
+   * @since 3.5.0
+   */
+  def timestamp_millis(e: Column): Column = withExpr {
+    MillisToTimestamp(e.expr)
+  }
+
+  /**
+   * Creates timestamp from the number of microseconds since UTC epoch.
+   *
+   * @group datetime_funcs
+   * @since 3.5.0
+   */
+  def timestamp_micros(e: Column): Column = withExpr {
+    MicrosToTimestamp(e.expr)
+  }
+
   /**
    * Parses the `timestamp` expression with the `format` expression
    * to a timestamp without time zone. Returns null with invalid input.
@@ -6593,6 +6629,34 @@ object functions {
    */
   def hours(e: Column): Column = withExpr { Hours(e.expr) }
 
+  /**
+   * Converts the timestamp without time zone `sourceTs`
+   * from the `sourceTz` time zone to `targetTz`.
+   *
+   * @param sourceTz the time zone for the input timestamp. If it is missed,
+   *                 the current session time zone is used as the source time zone.
+   * @param targetTz the time zone to which the input timestamp should be converted.
+   * @param sourceTs a timestamp without time zone.
+   * @group datetime_funcs
+   * @since 3.5.0
+   */
+  def convert_timezone(sourceTz: Column, targetTz: Column, sourceTs: Column): Column = withExpr {
+    ConvertTimezone(sourceTz.expr, targetTz.expr, sourceTs.expr)
+  }
+
+  /**
+   * Converts the timestamp without time zone `sourceTs`
+   * from the current time zone to `targetTz`.
+   *
+   * @param targetTz the time zone to which the input timestamp should be converted.
+   * @param sourceTs a timestamp without time zone.
+   * @group datetime_funcs
+   * @since 3.5.0
+   */
+  def convert_timezone(targetTz: Column, sourceTs: Column): Column = withExpr {
+    new ConvertTimezone(targetTz.expr, sourceTs.expr)
+  }
+
   /**
    * Make DayTimeIntervalType duration from days, hours, mins and secs.
    *
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
index 8a50e935847..7b13af8bf7d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
@@ -74,6 +74,7 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession {
   test("function current_timestamp and now") {
     val df1 = Seq((1, 2), (3, 1)).toDF("a", "b")
     checkAnswer(df1.select(count_distinct(current_timestamp())), Row(1))
+    checkAnswer(df1.select(count_distinct(now())), Row(1))
 
     // Execution in one query should return the same value
     checkAnswer(sql("""SELECT CURRENT_TIMESTAMP() = CURRENT_TIMESTAMP()"""), Row(true))
@@ -85,6 +86,7 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession {
     assert(got >= before && got <= after)
 
     // Now alias
+    checkAnswer(df1.select(current_timestamp().equalTo(now())), Seq(Row(true), Row(true)))
     checkAnswer(sql("""SELECT CURRENT_TIMESTAMP() = NOW()"""), Row(true))
   }
 
@@ -253,6 +255,18 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession {
       Row(15, 15, 15))
   }
 
+  test("weekday") {
+    val df = Seq((d, sdfDate.format(d), ts)).toDF("a", "b", "c")
+
+    checkAnswer(
+      df.select(weekday($"a"), weekday($"b"), weekday($"c")),
+      Row(2, 2, 0))
+
+    checkAnswer(
+      df.selectExpr("weekday(a)", "weekday(b)", "weekday(c)"),
+      Row(2, 2, 0))
+  }
+
   test("function date_add & dateadd") {
     val st1 = "2015-06-01 12:34:56"
     val st2 = "2015-06-02 12:34:56"
@@ -1024,6 +1038,32 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession {
         Row(Timestamp.valueOf("2015-07-24 22:00:00"))))
   }
 
+  test("timestamp_millis") {
+    val df = Seq((123456789, 1230219000123L)).toDF("a", "b")
+    checkAnswer(
+      df.select(timestamp_millis(col("a")), timestamp_millis(col("b"))),
+      Row(Timestamp.valueOf("1970-01-02 02:17:36.789"),
+        Timestamp.valueOf("2008-12-25 07:30:00.123")))
+
+    checkAnswer(
+      df.selectExpr("timestamp_millis(a)", "timestamp_millis(b)"),
+      Row(Timestamp.valueOf("1970-01-02 02:17:36.789"),
+        Timestamp.valueOf("2008-12-25 07:30:00.123")))
+  }
+
+  test("timestamp_micros") {
+    val df = Seq((123456789, 1230219000123L)).toDF("a", "b")
+    checkAnswer(
+      df.select(timestamp_micros(col("a")), timestamp_micros(col("b"))),
+      Row(Timestamp.valueOf("1969-12-31 16:02:03.456789"),
+        Timestamp.valueOf("1970-01-14 21:43:39.000123")))
+
+    checkAnswer(
+      df.selectExpr("timestamp_micros(a)", "timestamp_micros(b)"),
+      Row(Timestamp.valueOf("1969-12-31 16:02:03.456789"),
+        Timestamp.valueOf("1970-01-14 21:43:39.000123")))
+  }
+
   test("SPARK-30668: use legacy timestamp parser in to_timestamp") {
     val confKey = SQLConf.LEGACY_TIME_PARSER_POLICY.key
     val df = Seq("2020-01-27T20:06:11.847-0800").toDF("ts")
@@ -1111,6 +1151,19 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession {
     }
   }
 
+  test("convert_timezone") {
+    val df = Seq("1990-11-22").toDF("d")
+    checkAnswer(
+      df.selectExpr(s"convert_timezone('${CEST.getId}', '${LA.getId}', d)"),
+      df.select(convert_timezone(lit(CEST.getId), lit(LA.getId), col("d")))
+    )
+
+    checkAnswer(
+      df.selectExpr(s"convert_timezone('${LA.getId}', d)"),
+      df.select(convert_timezone(lit(LA.getId), col("d")))
+    )
+  }
+
   test("make_dt_interval") {
     val df = Seq((1, 12, 30, 01.001001)).toDF("day", "hour", "min", "sec")
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org