You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Bruce Robbins (Jira)" <ji...@apache.org> on 2020/09/02 23:20:00 UTC
[jira] [Created] (SPARK-32779) Spark/Hive3 interaction potentially
causes deadlock
Bruce Robbins created SPARK-32779:
-------------------------------------
Summary: Spark/Hive3 interaction potentially causes deadlock
Key: SPARK-32779
URL: https://issues.apache.org/jira/browse/SPARK-32779
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 3.1.0
Reporter: Bruce Robbins
This is an issue for applications that share a Spark Session across multiple threads.
sessionCatalog.loadPartition (after checking that the table exists) grabs locks in this order:
- HiveExternalCatalog
- HiveSessionCatalog (in Shim_v3_0)
Other operations (e.g., sessionCatalog.tableExists), grab locks in this order:
- HiveSessionCatalog
- HiveExternalCatalog
[This|https://github.com/apache/spark/blob/ad6b887541bf90cc3ea830a1a3322b71ccdd80ee/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala#L1332] appears to be the culprit. Maybe db name should be defaulted _before_ the call to HiveClient so that Shim_v3_0 doesn't have to call back into SessionCatalog. Or possibly this is not needed at all, since loadPartition in Shim_v2_1 doesn't worry about the default db name, but that might be because of differences between Hive client libraries.
Reproduction case:
- You need to have a running Hive 3.x HMS instance and the appropriate hive-site.xml for your Spark instance
- Adjust your spark.sql.hive.metastore.version accordingly
- It might take more than one try to hit the deadlock
Launch Spark:
{noformat}
bin/spark-shell --conf "spark.sql.hive.metastore.jars=${HIVE_HOME}/lib/*" --conf spark.sql.hive.metastore.version=3.1
{noformat}
Then use the following code:
{noformat}
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
val tableCount = 4
for (i <- 0 until tableCount) {
val tableName = s"partitioned${i+1}"
sql(s"drop table if exists $tableName")
sql(s"create table $tableName (a bigint) partitioned by (b bigint) stored as orc")
}
val threads = new ArrayBuffer[Thread]
for (i <- 0 until tableCount) {
threads.append(new Thread( new Runnable {
override def run: Unit = {
val tableName = s"partitioned${i + 1}"
val rand = Random
val df = spark.range(0, 20000).toDF("a")
val location = s"/tmp/${rand.nextLong.abs}"
df.write.mode("overwrite").orc(location)
sql(
s"""
LOAD DATA LOCAL INPATH '$location' INTO TABLE $tableName partition (b=$i)""")
}
}, s"worker$i"))
threads(i).start()
}
for (i <- 0 until tableCount) {
println(s"Joining with thread $i")
threads(i).join()
}
println("All done")
{noformat}
The job often gets stuck after one or two "Joining..." lines.
{{kill -3}} shows something like this:
{noformat}
Found one Java-level deadlock:
=============================
"worker3":
waiting to lock monitor 0x00007fdc3cde6798 (object 0x0000000784d98ac8, a org.apache.spark.sql.hive.HiveSessionCatalog),
which is held by "worker0"
"worker0":
waiting to lock monitor 0x00007fdc441d1b88 (object 0x00000007861d1208, a org.apache.spark.sql.hive.HiveExternalCatalog),
which is held by "worker3"
{noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org