You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2018/02/21 22:56:17 UTC
spark git commit: [SPARK-23484][SS] Fix possible race condition in
KafkaContinuousReader
Repository: spark
Updated Branches:
refs/heads/master e836c27ce -> 3fd0ccb13
[SPARK-23484][SS] Fix possible race condition in KafkaContinuousReader
## What changes were proposed in this pull request?
var `KafkaContinuousReader.knownPartitions` should be threadsafe as it is accessed from multiple threads - the query thread at the time of reader factory creation, and the epoch tracking thread at the time of `needsReconfiguration`.
## How was this patch tested?
Existing tests.
Author: Tathagata Das <ta...@gmail.com>
Closes #20655 from tdas/SPARK-23484.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3fd0ccb1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3fd0ccb1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3fd0ccb1
Branch: refs/heads/master
Commit: 3fd0ccb13fea44727d970479af1682ef00592147
Parents: e836c27
Author: Tathagata Das <ta...@gmail.com>
Authored: Wed Feb 21 14:56:13 2018 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed Feb 21 14:56:13 2018 -0800
----------------------------------------------------------------------
.../org/apache/spark/sql/kafka010/KafkaContinuousReader.scala | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/3fd0ccb1/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
index 97a0f66..ecd1170 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
@@ -66,7 +66,7 @@ class KafkaContinuousReader(
// Initialized when creating reader factories. If this diverges from the partitions at the latest
// offsets, we need to reconfigure.
// Exposed outside this object only for unit tests.
- private[sql] var knownPartitions: Set[TopicPartition] = _
+ @volatile private[sql] var knownPartitions: Set[TopicPartition] = _
override def readSchema: StructType = KafkaOffsetReader.kafkaSchema
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org