You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2018/02/21 22:56:17 UTC

spark git commit: [SPARK-23484][SS] Fix possible race condition in KafkaContinuousReader

Repository: spark
Updated Branches:
  refs/heads/master e836c27ce -> 3fd0ccb13


[SPARK-23484][SS] Fix possible race condition in KafkaContinuousReader

## What changes were proposed in this pull request?

var `KafkaContinuousReader.knownPartitions` should be threadsafe as it is accessed from multiple threads - the query thread at the time of reader factory creation, and the epoch tracking thread at the time of `needsReconfiguration`.

## How was this patch tested?

Existing tests.

Author: Tathagata Das <ta...@gmail.com>

Closes #20655 from tdas/SPARK-23484.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3fd0ccb1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3fd0ccb1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3fd0ccb1

Branch: refs/heads/master
Commit: 3fd0ccb13fea44727d970479af1682ef00592147
Parents: e836c27
Author: Tathagata Das <ta...@gmail.com>
Authored: Wed Feb 21 14:56:13 2018 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed Feb 21 14:56:13 2018 -0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/kafka010/KafkaContinuousReader.scala      | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3fd0ccb1/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
index 97a0f66..ecd1170 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
@@ -66,7 +66,7 @@ class KafkaContinuousReader(
   // Initialized when creating reader factories. If this diverges from the partitions at the latest
   // offsets, we need to reconfigure.
   // Exposed outside this object only for unit tests.
-  private[sql] var knownPartitions: Set[TopicPartition] = _
+  @volatile private[sql] var knownPartitions: Set[TopicPartition] = _
 
   override def readSchema: StructType = KafkaOffsetReader.kafkaSchema
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org