You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Kris Mo <kr...@databricks.com> on 2020/06/09 20:30:10 UTC

[OSS DIGEST] The major changes of Apache Spark from Apr 22 to May 5

Hi all,

This is the bi-weekly Apache Spark digest from the Databricks OSS team.
For each API/configuration/behavior change, there will be an *[API]* tag in
the title.

CORE
<https://github.com/databricks/runtime/wiki/OSS-Digest-Apr-22-~-Apr-28,-2020#65spark-31485core-avoid-application-hang-if-only-partial-barrier-tasks-launched-53--19>[2.4][SPARK-31485][CORE]
Avoid application hang if only partial barrier tasks launched (+53, -19)>
<https://github.com/apache/spark/commit/263f04db865920d9c10251517b00a1b477b58ff1>

The application could hang when the partial barrier tasks are launched. For
example,

initLocalClusterSparkContext(2)
val rdd0 = sc.parallelize(Seq(0, 1, 2, 3), 2)
val dep = new OneToOneDependency[Int](rdd0)
val rdd = new MyRDD(sc, 2, List(dep),
Seq(Seq("executor_h_0"),Seq("executor_h_0")))
rdd.barrier().mapPartitions { iter =>
  BarrierTaskContext.get().barrier()
  iter
}.collect()

The original approach assumes that the exception thrown in
TaskSchedulerImpl.resourceOffers can fail the application and the barrier
stage isn't really executed. However, the resourceOffers function is within
the scope of Spark RPC framework, which swallows any non fatal exceptions.
This breaks the assumption. This PR fixes the bug by using
dagScheduler.taskSetFailed to abort a barrier stage.
<https://github.com/databricks/runtime/wiki/OSS-Digest-Apr-22-~-Apr-28,-2020#api80spark-31518core-expose-filterbyrange-in-javapairrdd-57--0>[API][3.1][SPARK-31518][CORE]
Expose filterByRange in JavaPairRDD (+57, -0)>
<https://github.com/apache/spark/commit/497024956aa1d94b2d952de2ed24ee02e4637372>

To improve the consistency between the Scala and Java APIs, this PR exposes
the filterByRange method from OrderedRDDFunctions in the Java API (as a
method of JavaPairRDD).

  /**
   * Return a RDD containing only the elements in the inclusive range
`lower` to `upper`.
   * If the RDD has been partitioned using a `RangePartitioner`, then
this operation can be
   * performed efficiently by only scanning the partitions that might
containt matching elements.
   * Otherwise, a standard `filter` is applied to all partitions.
   *
   * @since 3.1.0
   */
  @Since("3.1.0")
  def filterByRange(lower: K, upper: K): JavaPairRDD[K, V]

  /**
   * Return a RDD containing only the elements in the inclusive range
`lower` to `upper`.
   * If the RDD has been partitioned using a `RangePartitioner`, then
this operation can be
   * performed efficiently by only scanning the partitions that might
containt matching elements.
   * Otherwise, a standard `filter` is applied to all partitions.
   *
   * @since 3.1.0
   */
  @Since("3.1.0")
  def filterByRange(comp: Comparator[K], lower: K, upper: K): JavaPairRDD[K, V]

[API][3.0][SPARK-31619][CORE] Rename config
spark.dynamicAllocation.shuffleTimeout to
spark.dynamicAllocation.shuffleTracking.timeout (+13, -13)>
<https://github.com/apache/spark/commit/b7cde42b04b21c9bfee6535199cf385855c15853>

Rename the config spark.dynamicAllocation.shuffleTimeout (which has not
been released yet) to spark.dynamicAllocation.shuffleTracking.timeout

spark.dynamicAllocation.shuffleTracking.timeout (Default: Long.MaxValue)

   - When shuffle tracking is enabled, controls the timeout for executors
   that are holding shuffle data. The default value means that Spark will rely
   on the shuffles being garbage collected to be able to release executors. If
   for some reason garbage collection is not cleaning up shuffles quickly
   enough, this option can be used to control when to time out executors even
   when they are storing shuffle data.

<https://github.com/databricks/runtime/wiki/OSS-Digest-Apr-29-~-May-5,-2020#api65spark-31582yarn-being-able-to-not-populate-hadoop-classpath-43--1>[API][2.4][SPARK-31582][YARN]
Being able to not populate Hadoop classpath (+43, -1)>
<https://github.com/apache/spark/commit/ecfee82fda5f0403024ff64f16bc767b8d1e3e3d>

Add a new Spark YARN config spark.yarn.populateHadoopClasspath, to avoid
jar conflicts when running a Spark distribution with its own embedded
Hadoop on an external YARN cluster.

spark.yarn.populateHadoopClasspath (Default: True)

   - Whether to populate Hadoop classpath from yarn.application.classpath
    and mapreduce.application.classpath. Note that if this is set to false,
   it requires a with-Hadoop Spark distribution that bundles Hadoop runtime
   or user has to provide a Hadoop installation separately.

<https://github.com/databricks/runtime/wiki/OSS-Digest-Apr-29-~-May-5,-2020#api80spark-31235yarn-separates-different-categories-of-applications-97--4>[API][3.1][SPARK-31235][YARN]
Separates different categories of applications (+97, -4)>
<https://github.com/apache/spark/commit/f3891e377f320ad212d198521dcdf5414830c063>

Add a new config spark.yarn.applicationType to identify the application
type when submitted to a YARN cluster.

spark.yarn.applicationType (Default: SPARK)

   - Defines more specific application types, e.g. SPARK, SPARK-SQL,
   SPARK-STREAMING, SPARK-MLLIB and SPARK-GRAPH. Please be careful not to
   exceed 20 characters.

<https://github.com/databricks/runtime/wiki/OSS-Digest-Apr-22-~-Apr-28,-2020#sql>
SQL
<https://github.com/databricks/runtime/wiki/OSS-Digest-Apr-22-~-Apr-28,-2020#api65spark-31532sql-builder-should-not-propagate-static-sql-configs-to-the-existing-active-or-default-sparksession-67--10>[API][2.4][SPARK-31532][SQL]
Builder should not propagate static sql configs to the existing active or
default SparkSession (+67, -10)>
<https://github.com/apache/spark/commit/8424f552293677717da7411ed43e68e73aa7f0d6>

This PR fixes a long-standing bug for static SQL configurations. In the
original approach, SparkSessionBuilder propagated the static SQL
configurations to the existing active/default SparkSession. It breaks the
semantics of the static configs, whose values cannot be changed at runtime.
For example, before the fix, users can still modify it by using a new
session to change the static conf.

scala> spark.sql("set spark.sql.warehouse.dir=2");
org.apache.spark.sql.AnalysisException: Cannot modify the value of a
static config: spark.sql.warehouse.dir;
  at org.apache.spark.sql.RuntimeConfig.requireNonStaticConf(RuntimeConfig.scala:154)
  at org.apache.spark.sql.RuntimeConfig.set(RuntimeConfig.scala:42)

  ...

scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession

scala> SparkSession.builder.config("spark.sql.warehouse.dir", "xyz").getOrCreate
20/04/23 23:49:13 WARN SparkSession$Builder: Using an existing
SparkSession; some configuration may not take effect.
res7: org.apache.spark.sql.SparkSession =
org.apache.spark.sql.SparkSession@6403d574

scala> spark.sql("set spark.sql.warehouse.dir").show
+--------------------+-----+
|                 key|value|
+--------------------+-----+
|spark.sql.warehou...|  xyz|
+--------------------+-----+

<https://github.com/databricks/runtime/wiki/OSS-Digest-Apr-22-~-Apr-28,-2020#70spark-31495sql-support-formatted-explain-for-aqe-981--37>[3.0][SPARK-31495][SQL]
Support formatted explain for AQE (+981, -37)>
<https://github.com/apache/spark/commit/8fbfdb38c0baff7bc5d1ce1e3d6f1df70c25da70>

This PR adds the support of formatted explain for AQE. For example,

Before:

== Physical Plan ==
AdaptiveSparkPlan (1)
+- * HashAggregate (unknown)
   +- CustomShuffleReader (unknown)
      +- ShuffleQueryStage (unknown)
         +- Exchange (unknown)
            +- * HashAggregate (unknown)
               +- * Project (unknown)
                  +- * BroadcastHashJoin Inner BuildRight (unknown)
                     :- * LocalTableScan (unknown)
                     +- BroadcastQueryStage (unknown)
                        +- BroadcastExchange (unknown)
                           +- LocalTableScan (unknown)


(1) AdaptiveSparkPlan
Output [4]: [k#7, count(v1)#32L, sum(v1)#33L, avg(v2)#34]
Arguments: HashAggregate(keys=[k#7], functions=[count(1),
sum(cast(v1#8 as bigint)), avg(cast(v2#19 as bigint))]),
AdaptiveExecutionContext(org.apache.spark.sql.SparkSession@104ab57b),
[PlanAdaptiveSubqueries(Map())], false

After the support:

== Physical Plan ==
 AdaptiveSparkPlan (14)
 +- * HashAggregate (13)
    +- CustomShuffleReader (12)
       +- ShuffleQueryStage (11)
          +- Exchange (10)
             +- * HashAggregate (9)
                +- * Project (8)
                   +- * BroadcastHashJoin Inner BuildRight (7)
                      :- * Project (2)
                      :  +- * LocalTableScan (1)
                      +- BroadcastQueryStage (6)
                         +- BroadcastExchange (5)
                            +- * Project (4)
                               +- * LocalTableScan (3)


 (1) LocalTableScan [codegen id : 2]
 Output [2]: [_1#x, _2#x]
 Arguments: [_1#x, _2#x]

 ... [skipped in this digest]

 (13) HashAggregate [codegen id : 3]
 Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
 Keys [1]: [k#x]
 Functions [3]: [count(1), sum(cast(v1#x as bigint)), avg(cast(v2#x as bigint))]
 Aggregate Attributes [3]: [count(1)#xL, sum(cast(v1#x as bigint))#xL,
avg(cast(v2#x as bigint))#x]
 Results [4]: [k#x, count(1)#xL AS count(v1)#xL, sum(cast(v1#x as
bigint))#xL AS sum(v1)#xL, avg(cast(v2#x as bigint))#x AS avg(v2)#x]

 (14) AdaptiveSparkPlan
 Output [4]: [k#x, count(v1)#xL, sum(v1)#xL, avg(v2)#x]
 Arguments: isFinalPlan=true

<https://github.com/databricks/runtime/wiki/OSS-Digest-Apr-22-~-Apr-28,-2020#api70spark-31507sql-remove-uncommon-fields-support-and-update-some-fields-with-meaningful-names-for-extract-function-576--1806>[API][3.0][SPARK-31507][SQL]
Remove uncommon fields support and update some fields with meaningful names
for extract function (+576, -1806)>
<https://github.com/apache/spark/commit/37d2e037ed804a414ed874c829e0139a277a5ae8>

This PR deletes the support for extracting millennium, century, decade,
millisecond, microsecond, and epoch from datetime, since they are neither
the ANSI standard nor common in other SQL platforms. It also updates the
string values of field for dates and timestamps in the EXTRACT function:

   - Replace ISOYEAR by YEAROFWEEK
   - Use DOW_ISO and DAYOFWEEK_ISO to replace ISODOW

This change is made on the unreleased features in Spark 3.0.
<https://github.com/databricks/runtime/wiki/OSS-Digest-Apr-22-~-Apr-28,-2020#api70spark-31528sql-remove-millennium-century-decade-from-truncdate_trunc-fucntions-79--101>[API][3.0][SPARK-31528][SQL]
Remove millennium, century, decade from trunc/date_trunc fucntions (+79,
-101)>
<https://github.com/apache/spark/commit/f92652d0b51a38f57b70d7287525b3c24014f283>

Similar to SPARK-31507, this PR deletes these fmts support for trunc and
date_trunc functions. This change is made on the unreleased features in
Spark 3.0.
<https://github.com/databricks/runtime/wiki/OSS-Digest-Apr-22-~-Apr-28,-2020#65spark-31519sql-cast-in-having-aggregate-expressions-returns-the-wrong-result-135--80>[2.4][SPARK-31519][SQL]
Cast in having aggregate expressions returns the wrong result (+135, -80)>
<https://github.com/apache/spark/commit/6ed2dfbba193d29436dccae4c379dae7b5ba5bdb>

Before the fix, the following query returns an empty result:

SELECT SUM(a) AS b, CAST('2020-01-01' AS DATE) AS fake FROM VALUES (1,
10), (2, 20) AS T(a, b) GROUP BY b HAVING b > 10

Before the fix, the SQL parser creates Filter(..., Aggregate(...)) when
parsing the HAVING clause, and the analyzer rule
ResolveAggregateFunctions resolves
the aggregate functions and grouping columns in the Filter operator.

It works for simple cases in a very tricky way as it relies on the
execution order of analyzer rules: Step 1. Rule ResolveReferences handles
the unresolved Aggregate operator and resolves the attributes inside
aggregate functions, but the function itself is still unresolved as it's an
UnresolvedFunction. This stops resolving the Filter operator as the child
Aggregate operator is still unresolved. Step 2. Rule ResolveFunctions resolves
UnresolvedFunction. This makes the Aggregate operator resolved. Step 3.
Rule ResolveAggregateFunctions resolves the Filter operator if its child is
a resolved Aggregate. This rule can correctly resolve the grouping columns.

In the example query, the CAST operation, which needs to be resolved by
rule ResolveTimeZone, runs after ResolveAggregateFunctions. This breaks
Step 3 as the Aggregate operator is unresolved at that time. The analyzer
starts the next round and the Filter operator is resolved by
ResolveReferences, which wrongly resolves the grouping columns.

In this PR, we fix this bug by adding a new logical node AggregateWithHaving.
The analyzer resolves it to Filter(..., Aggregate(...)).
<https://github.com/databricks/runtime/wiki/OSS-Digest-Apr-22-~-Apr-28,-2020#api70spark-31522sql-hive-metastore-client-initialization-related-configurations-should-be-static-15--6>[API][3.0][SPARK-31522][SQL]
Hive metastore client initialization related configurations should be
static (+15, -6)>
<https://github.com/apache/spark/commit/8dc2c0247be0be370d764653d5a68b7aa7948e39>

This PR changes the following SQL Configs to be the static:

   1. spark.sql.hive.metastore.version - used to determine the Hive version
   in Spark
   2. spark.sql.hive.metastore.jars - The location of Hive metastore
   related jars which are used by Spark to create the hive client
   3. spark.sql.hive.metastore.sharedPrefixes and
   spark.sql.hive.metastore.barrierPrefixes - the package names of classes
   that are shared or separated between SparkContextLoader and Hive client
   class loader

<https://github.com/databricks/runtime/wiki/OSS-Digest-Apr-22-~-Apr-28,-2020#api70spark-31527sql-date-addsubtract-interval-only-allow-those-day-precision-in-ansi-mode-888--21>[API][3.0][SPARK-31527][SQL]
date add/subtract interval only allow those day precision in ansi mode
(+888, -21)>
<https://github.com/apache/spark/commit/ebc8fa50d0422f3db47b2c45025c7f2efe6ee39a>

To follow the ANSI standard,this PR makes the expressions of date + interval
, interval + date and date - interval only accept intervals in which the
microseconds part is 0, under the ANSI mode.
<https://github.com/databricks/runtime/wiki/OSS-Digest-Apr-22-~-Apr-28,-2020#api80spark-30724sql-support-like-any-and-like-all-operators-405--10>[API][3.1][SPARK-30724][SQL]
Support 'LIKE ANY' and 'LIKE ALL' operators (+405, -10)>
<https://github.com/apache/spark/commit/b10263b8e5106409467e0115968bbaf0b9141cd1>

This PR adds the support for LIKE ANY/SOME and LIKE ALL operators, which
are mostly used when we are matching a text field with one or multiple
patterns.
<https://github.com/databricks/runtime/wiki/OSS-Digest-Apr-22-~-Apr-28,-2020#80spark-31272sql-support-db2-kerberos-login-in-jdbc-connector-281--33>[3.1][SPARK-31272][SQL]
Support DB2 Kerberos login in JDBC connector (+281, -33)>
<https://github.com/apache/spark/commit/c619990c1d7dc7b21f09b410a68aa3f230a12075>

When loading DataFrames from JDBC datasource with Kerberos authentication,
remote executors (yarn-client/cluster etc. modes) fail to establish a
connection due to lack of Kerberos ticket or ability to generate it.

This PR adds the DB2 support.
<https://github.com/databricks/runtime/wiki/OSS-Digest-Apr-22-~-Apr-28,-2020#80spark-31524sql-add-metric-to-the-split-task-number-for-skew-optimization-20--7>[3.1][SPARK-31524][SQL]
Add metric to the split task number for skew optimization (+20, -7)>
<https://github.com/apache/spark/commit/079b3623c85192ff61a35cc99a4dae7ba6c599f0>

This PR adds the metric info of the split task number for the skewed
optimization. With this PR, we can see the number of splits for the skewed
partitions as follows:

[image: image]
<https://github.com/databricks/runtime/wiki/OSS-Digest-Apr-22-~-Apr-28,-2020#80spark-31586sql-replace-expression-timesubl-r-with-timeaddl--r-45--91>[3.1][SPARK-31586][SQL]
Replace expression TimeSub(l, r) with TimeAdd(l -r) (+45, -91)>
<https://github.com/apache/spark/commit/beec8d535f093f1867678fe5afeb02453464f90d>

The implementation of TimeSub for the operation of timestamp subtracting
interval is almost repetitive with TimeAdd. In this PR, we replace it with
TimeAdd(l, -r) since they are equivalent.
[2.4][SPARK-31500][SQL] collect_set() of BinaryType returns duplicate
elements (+55, -6)>
<https://github.com/apache/spark/commit/4fecc20f6ecdfe642890cf0a368a85558c40a47c>

Fix a bug that collect_set() of BinaryType could return duplicated elements
previously.
<https://github.com/databricks/runtime/wiki/OSS-Digest-Apr-29-~-May-5,-2020#api70spark-30282sqlfollowup-show-tblproperties-should-support-views-162--29>[API][3.0][SPARK-30282][SQL][FOLLOWUP]
SHOW TBLPROPERTIES should support views (+162, -29)>
<https://github.com/apache/spark/commit/36803031e850b08d689df90d15c75e1a1eeb28a8>

Fix a Spark 3.0 regression that SHOW TBLPROPERTIES does not work well for
views. This PR adds back the view support for SHOW TBLPROPERTIES.

Examples:

scala> sql("CREATE VIEW view TBLPROPERTIES('p1'='v1', 'p2'='v2') AS
SELECT 1 AS c1")
scala> sql("SHOW TBLPROPERTIES
view").show(truncate=false)+---------------------------------+-------------+|key
                             |value
|+---------------------------------+-------------+|view.catalogAndNamespace.numParts|2
           ||view.query.out.col.0             |c1
||view.query.out.numCols           |1            ||p2
             |v2           ||view.catalogAndNamespace.part.0
|spark_catalog||p1                               |v1
||view.catalogAndNamespace.part.1  |default
|+---------------------------------+-------------+

scala> sql("CREATE TEMPORARY VIEW tview TBLPROPERTIES('p1'='v1',
'p2'='v2') AS SELECT 1 AS c1")
scala> sql("SHOW TBLPROPERTIES
tview").show(truncate=false)+---+-----+|key|value|+---+-----++---+-----+

<https://github.com/databricks/runtime/wiki/OSS-Digest-Apr-29-~-May-5,-2020#api70spark-31557sql-fix-timestamps-rebasing-in-legacy-parsers-40--8>[API][3.0][SPARK-31557][SQL]
Fix timestamps rebasing in legacy parsers (+40, -8)>
<https://github.com/apache/spark/commit/c09cfb9808d0e399b97781aae0da50332ba4b49b>

The PR fixes two legacy timestamp formatter LegacySimpleTimestampFormatter
 and LegacyFastTimestampFormatter to perform micros rebasing in
parsing/formatting from/to strings. The legacy timestamps formatters
operate on the hybrid calendar (Julian + Gregorian), so, the input micros
should be rebased to have the same date-time fields as in Proleptic
Gregorian calendar used by Spark SQL.
<https://github.com/databricks/runtime/wiki/OSS-Digest-Apr-29-~-May-5,-2020#api70spark-31597sql-extracting-day-from-intervals-should-be-intervaldays--days-in-intervalmicrosecond-22--19>[API][3.0][SPARK-31597][SQL]
extracting day from intervals should be interval.days + days in
interval.microsecond (+22, -19)>
<https://github.com/apache/spark/commit/ea525fe8c0cc6336a7ba8d98bada3198795f8aed>

When extracting day from intervals, we should add the exceeded days in
interval time part to the total days of the operation which extracts day
from interval values. The new behavior satisfies both the ANSI standard and
common use cases in modern SQL platforms.

Examples:

spark-sql> SELECT EXTRACT(DAY FROM (cast('2020-01-15 00:00:00' as
timestamp) - cast('2020-01-01 00:00:01' as timestamp)));13
spark-sql> SELECT EXTRACT(DAY FROM (cast('2020-01-15 00:00:00' as
timestamp) - cast('2020-01-01 00:00:00' as timestamp)));14

<https://github.com/databricks/runtime/wiki/OSS-Digest-Apr-29-~-May-5,-2020#70spark-31606sql-reduce-the-perf-regression-of-vectorized-parquet-reader-caused-by-datetime-rebase-295--160>[3.0][SPARK-31606][SQL]
Reduce the perf regression of vectorized parquet reader caused by Datetime
rebase (+295, -160)>
<https://github.com/apache/spark/commit/f72220b8ab256e8e6532205a4ce51d50b69c26e9>

The recently added Datetime rebase degrades the performance of Parquet
vectorized reader, as it breaks vectorization, even if the Datetime values
don't need to rebase. The PR pushes the rebase logic to the lower level of
the Parquet vectorized reader, to make the code more
vectorization-friendly. According to the benchmark:

   - The Date type is 30% faster if the values don't need to rebase, 20%
   faster if need to rebase.
   - The Timestamp type is 60% faster if the values don't need to rebase,
   no difference if need to rebase.

<https://github.com/databricks/runtime/wiki/OSS-Digest-Apr-29-~-May-5,-2020#api70spark-31626sql-port-hive-10415-hivestartcleanupscratchdir-configuration-is-not-taking-effect-36--0>[API][3.0][SPARK-31626][SQL]
Port HIVE-10415: hive.start.cleanup.scratchdir configuration is not taking
effect (+36, -0)>
<https://github.com/apache/spark/commit/7ef0b69a92db91d0c09e65eb9dcfb973def71814>

The PR makes hive.start.cleanup.scratchdir effective, to cleanup the Hive
scratchdir when starting the Hive ThriftServer.
<https://github.com/databricks/runtime/wiki/OSS-Digest-Apr-29-~-May-5,-2020#70spark-31630sql-fix-perf-regression-by-skipping-timestamps-rebasing-after-some-threshold-431--410>[3.0][SPARK-31630][SQL]
Fix perf regression by skipping timestamps rebasing after some threshold
(+431, -410)>
<https://github.com/apache/spark/commit/bef5828e12500630d7efc8e0c005182b25ef2b7f>

The PR skips timestamps rebasing after a global threshold when there is no
difference between Julian and Gregorian calendars. This allows to avoid
checking hash maps of switch points, and fixes perf regressions in
toJavaTimestamp() and fromJavaTimestamp(). According to the benchmark:

   - Conversions from external type java.sql.Timestamp is ~ 34% faster.
   - Conversions to external type java.sql.Timestamps is ~16% faster.

<https://github.com/databricks/runtime/wiki/OSS-Digest-Apr-29-~-May-5,-2020#70spark-31641sql-fix-days-conversions-by-json-legacy-parser-9--5>[3.0][SPARK-31641][SQL]
Fix days conversions by JSON legacy parser (+9, -5)>
<https://github.com/apache/spark/commit/bd264299317bba91f2dc1dc27fd51e6bc0609d66>

The PR fixes the days conversion bug by the JSON legacy parser. This bug
could produce wrong results. In Spark 2.4 and earlier versions, the days
are interpreted as days since the epoch in the hybrid calendar (Julian +
Gregorian since 1582-10-15). Since Spark 3.0, the base calendar was
switched to Proleptic Gregorian calendar, so the days should be rebased to
represent the same local date.
<https://github.com/databricks/runtime/wiki/OSS-Digest-Apr-29-~-May-5,-2020#api80spark-31594sql-do-not-display-the-seed-of-randrandn-with-no-argument-in-output-schema-52--6>[API][3.1][SPARK-31594][SQL]
Do not display the seed of rand/randn with no argument in output schema
(+52, -6)>
<https://github.com/apache/spark/commit/97f2c03d3b010372b6ba52b07f94643962943808>

This PR updates the column name of the Spark SQL rand()/randn() function.
Previously, it would include the random seed in the column name, after the
change it hides the seed unless the seed is explicitly given.

Examples:

scala> sql("select rand()").show()+------------------+|
rand()|+------------------+|0.7137935639522275|+------------------+
// If a seed given, it is still shown in a column name// (the same
with the current behavior)
scala> sql("select rand(1)").show()+------------------+|
rand(1)|+------------------+|0.6363787615254752|+------------------+
// We can still check a seed in explain output:
scala> sql("select rand()").explain()== Physical Plan ==*(1) Project
[rand(-2282124938778456838) AS rand()#0]+- *(1) Scan OneRowRelation[]

<https://github.com/databricks/runtime/wiki/OSS-Digest-Apr-29-~-May-5,-2020#70spark-31607sql-improve-the-perf-of-ctesubstitution-29--21>[3.0][SPARK-31607][SQL]
Improve the perf of CTESubstitution (+29, -21)>
<https://github.com/apache/spark/commit/636119c54bf292d9f000bc96f240171516cc4276>

Previously we will traverse the main query for many times if there are many
CTE relations in the rule CTESubstitution. The PR makes a change to resolve
CTE relations first, thus we only need to traverse the main plan for once
to substitute CTE relations.
<https://github.com/databricks/runtime/wiki/OSS-Digest-Apr-29-~-May-5,-2020#api70spark-31639-revert-spark-27528-use-parquet-logical-type-timestamp_micros-by-default-4--10>[API][3.0][SPARK-31639]
Revert SPARK-27528 Use Parquet logical type TIMESTAMP_MICROS by default
(+4, -10)>
<https://github.com/apache/spark/commit/372ccba0632a76a7b02cb2c558a3ecd4fae839e5>

The PR reverts the commit that sets TIMESTAMP_MICROS as the timestamp type
while saving timestamps to Parquet files. The change is needed to be
compatible with HIVE and Presto that don't support the TIMESTAMP_MICROS type
in the current stable releases.
<https://github.com/databricks/runtime/wiki/OSS-Digest-Apr-22-~-Apr-28,-2020#ml>
ML
<https://github.com/databricks/runtime/wiki/OSS-Digest-Apr-22-~-Apr-28,-2020#70spark-31497mlpyspark-fix-pyspark-crossvalidatortrainvalidationsplit-with-pipeline-estimator-cannot-save-and-load-model-189--3>[3.0][SPARK-31497][ML][PYSPARK]
Fix PySpark CrossValidator/TrainValidationSplit with pipeline estimator
cannot save and load model (+189, -3)>
<https://github.com/apache/spark/commit/4a21c4cc92805b034ade0593eea3c4a9b6122083>

Fix PySpark CrossValidator/TrainValidationSplit with pipeline estimator
cannot save and load model.

Most PySpark estimators/transformers inherit JavaParams, but some
estimators are special (in order to support pure python implemented nested
estimators/transformers):

   - Pipeline
   - OneVsRest
   - CrossValidator
   - TrainValidationSplit

But note that, currently, in PySpark, estimators listed above, their model
reader/writer do NOT support pure python implemented nested
estimators/transformers. Because they use java reader/writer wrapper as
python side reader/writer.

PySpark CrossValidator/TrainValidationSplit model reader/writer require all
estimators define the _transfer_param_map_to_java and
_transfer_param_map_from_java (used in model read/write).

OneVsRest class already defines the two methods, but Pipeline does not, so
it lead to this bug.

This PR adds _transfer_param_map_to_java and _transfer_param_map_from_java into
Pipeline class.
<https://github.com/databricks/runtime/wiki/OSS-Digest-Apr-22-~-Apr-28,-2020#80spark-31007ml-kmeans-optimization-based-on-triangle-inequality-390--45>[3.1][SPARK-31007][ML]
KMeans optimization based on triangle-inequality (+390, -45)>
<https://github.com/apache/spark/commit/0ede08bcb21266739aab86b8af3228adc8239eb0>

This PR applies Lemma 1 in Using the Triangle Inequality to Accelerate
K-Means <https://www.aaai.org/Papers/ICML/2003/ICML03-022.pdf>:

Let x be a point, and let b and c be centers. If d(b,c)>=2d(x,b) then
d(x,c) >= d(x,b);

It can be directly applied in EuclideanDistance, but not in CosineDistance.
However, for CosineDistance we can luckily get a variant in the space of
radian/angle.
[3.1][SPARK-31603][ML] AFT uses common functions in RDDLossFunction (+173,
-226)>
<https://github.com/apache/spark/commit/701deac88d09690ddf9d28b9c79814aecfd3277d>

Make AFT reuse the common functions in ml.optim, since the logic in
optimizing AFT is quite similar to other algorithms based on RDDLossFunction
.
<https://github.com/databricks/runtime/wiki/OSS-Digest-Apr-22-~-Apr-28,-2020#ss>
SS
<https://github.com/databricks/runtime/wiki/OSS-Digest-Apr-22-~-Apr-28,-2020#70spark-27340ss-alias-on-timewindow-expression-cause-watermark-metadata-lost-38--13>[3.0][SPARK-27340][SS]
Alias on TimeWindow expression cause watermark metadata lost (+38, -13)>
<https://github.com/apache/spark/commit/ba7adc494923de8104ab37d412edd78afe540f45>

This PR fixes the bug by avoiding to set explicitMetadata in the API
Column.name. The root cause of this bug is the original approach of
Column.as didn't rightly propagate the metadata:

def name(alias: String): Column = withExpr {
   normalizedExpr() match {
     case ne: NamedExpression => Alias(expr, alias)(explicitMetadata =
Some(ne.metadata))
     case other => Alias(other, alias)()
   }
 }

In Structured Streaming, we added an Alias for TimeWindow by default. When
user put another Alias, this function will be entered twice. The first time
is the internal Alias, which goes in the branch without setting
explicitMetadata, for the second time, the explicitMetadata will be set to
None since the metadata for TimeWindow column is only set after the
analysis rule TimeWindowing.
<https://github.com/databricks/runtime/wiki/OSS-Digest-Apr-22-~-Apr-28,-2020#python>
PYTHON
<https://github.com/databricks/runtime/wiki/OSS-Digest-Apr-22-~-Apr-28,-2020#70spark-29664pythonsqlfollow-up-add-deprecation-warnings-for-getitem-instead-18--21>[3.0][SPARK-29664][PYTHON][SQL][FOLLOW-UP]
Add deprecation warnings for getItem instead (+18, -21)>
<https://github.com/apache/spark/commit/5dd581c88ab111377175b673994153072fe9ec77>

This PR proposes to use a different approach instead of breaking it per the
rubric added at https://spark.apache.org/versioning-policy.html. It
deprecates the behavior for now. It will be gradually removed in the future
releases.

After this change,

import warningswarnings.simplefilter("always")from
pyspark.sql.functions import *df = spark.range(2)map_col =
create_map(lit(0), lit(100), lit(1), lit(200))df.withColumn("mapped",
map_col.getItem(col('id'))).show()

/.../python/pyspark/sql/column.py:311: DeprecationWarning: A column as
'key' in getItem is
deprecated as of Spark 3.0, and will not be supported in the future
release. Use `column[key]`
or `column.key` syntax instead.
  DeprecationWarning)
...

import warningswarnings.simplefilter("always")from
pyspark.sql.functions import *df = spark.range(2)struct_col =
struct(lit(0), lit(100), lit(1), lit(200))df.withColumn("struct",
struct_col.getField(lit("col1"))).show()

/.../spark/python/pyspark/sql/column.py:336: DeprecationWarning: A
column as 'name'
in getField is deprecated as of Spark 3.0, and will not be supported
in the future release. Use
`column[name]` or `column.name` syntax instead.
  DeprecationWarning)

<https://github.com/databricks/runtime/wiki/OSS-Digest-Apr-22-~-Apr-28,-2020#api80spark-29641pythoncore-stage-level-sched-add-python-apis-and-tests-791--27>[API][3.1][SPARK-29641][PYTHON][CORE]
Stage Level Sched: Add python api's and tests (+791, -27)>
<https://github.com/apache/spark/commit/95aec091e4d8a45e648ce84d32d912f585eeb151>

As part of the Stage level scheduling features, add the Python API's to set
the resource profiles. This also adds the functionality to properly apply
the PySpark memory configuration when specified in the ResourceProfile. The
PySpark memory configuration is being passed in the task local properties.
This was an easy way to get it to the PythonRunner that needs it.
[API][3.0][SPARK-31549][PYSPARK] Add a develop API invoking collect on
Python RDD with user-specified job group (+90, -0)>
<https://github.com/apache/spark/commit/ee1de66fe4e05754ea3f33b75b83c54772b00112>

The PR adds a new API in PySpark RDD class: def collectWithJobGroup(self,
groupId, description, interruptOnCancel=False). This new API does the same
thing with rdd.collect(), but it can specify the job group when collecting.
Previously if we specify the job group before rdd.collect(), the job group
will not be set as expected due to the Java local thread variable issues.

    def collectWithJobGroup(self, groupId, description,
interruptOnCancel=False):
        """        .. note:: Experimental        When collect rdd, use
this method to specify job group.        .. versionadded:: 3.0.0
 """

<https://github.com/databricks/runtime/wiki/OSS-Digest-Apr-22-~-Apr-28,-2020#ui>
UI
<https://github.com/databricks/runtime/wiki/OSS-Digest-Apr-22-~-Apr-28,-2020#70spark-31534webui-text-for-tooltip-should-be-escaped-51--30>[3.0][SPARK-31534][WEBUI]
Text for tooltip should be escaped (+51, -30)>
<https://github.com/apache/spark/commit/d61c6219cd692c3e6c70cfcc0467d875201d4268>

This PR fixes the bug by escaping text for tooltip for DAG Viz and Timeline
View.

Before:
[image: dag-viz-tooltip-before-fixed] [image: timeline-tooltip-before-fixed]

After:
[image: dag-viz-tooltip-fixed] [image: timeline-tooltip-fixed]
<https://github.com/databricks/runtime/wiki/OSS-Digest-Apr-22-~-Apr-28,-2020#build>
BUILD
<https://github.com/databricks/runtime/wiki/OSS-Digest-Apr-22-~-Apr-28,-2020#70spark-31580build-upgrade-apache-orc-to-1510-23--10>[3.0][SPARK-31580][BUILD]
Upgrade Apache ORC to 1.5.10 (+23, -10)>
<https://github.com/apache/spark/commit/79eaaaf6daeff6b048a81f4ef60fcc48395a2772>

This PR aims to upgrade Apache ORC to 1.5.10.

Apache ORC 1.5.10 is a maintenance release with the following patches.

   - ORC-621 <https://issues.apache.org/jira/browse/ORC-621> Need reader
   fix for ORC-569
   - ORC-616 <https://issues.apache.org/jira/browse/ORC-616> In Patched
   Base encoding, the value of headerThirdByte goes beyond the range of byte
   - ORC-613 <https://issues.apache.org/jira/browse/ORC-613>
OrcMapredRecordReader
   mis-reuse struct object when actual children schema differs
   - ORC-610 <https://issues.apache.org/jira/browse/ORC-610> Updated
   Copyright year in the NOTICE file

The following is the release note.

   -
   https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12318320&version=12346912

[3.0][SPARK-31633][BUILD] Upgrade SLF4J from 1.7.16 to 1.7.30 (+13, -13)>
<https://github.com/apache/spark/commit/e7995c2ddcfd43c8cd99d2a54009139752b66e69>

SLF4J 1.7.23+ is required to enable slf4j-log4j12 with MDC feature to run
under Java 9. Also, this will bring all latest bug fixes.
<https://github.com/databricks/runtime/wiki/OSS-Digest-Apr-29-~-May-5,-2020#80minorinfra-add-a-guide-to-clarify-releaseunreleased-spark-versions-of-user-facing-change-in-the-github-pr-template-3--1>[3.1][MINOR][INFRA]
Add a guide to clarify release/unreleased Spark versions of user-facing
change in the Github PR template (+3, -1)>
<https://github.com/apache/spark/commit/f0c79ad88a4a572da70c76925850f2dc3e350c2e>

Add a guide to clarify the Spark version when describing "Does this PR
introduce any user-facing change?".
--

Kris Mok

Software Engineer Databricks Inc.

kris.mok@databricks.com

databricks.com


<http://databricks.com/>