You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Evo Eftimov <ev...@isecc.com> on 2018/01/10 15:56:17 UTC

spark streaming direct receiver offset initialization

In the class CachedKafkaConsumer.scala 

 

https://github.com/apache/spark/blob/master/external/kafka-0-10/src/main/sca
la/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala 

 

what is the purpose of the following condition check in the method
get(offset: Long, timeout: Long): ConsumerRecord[K, V]

 

	
assert(record.offset == offset,

	
s"Got wrong record for $groupId $topic $partition even after seeking to
offset $offset")

 

I have a production spark streaming job which after having worked for awhile
(consumed kafka messages and updated/recorded offsets in kafka using =
rdd.asInstanceOf[HasOffsetRanges].offsetRanges 
and dstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) ) on
restart during the first attempt to resume message consumption, seems to be
hitting the above assertion 

 

What is the purpose of that Assertion - i.e. what System Conditions related
to e.g. the operation and interactions between Message Brokers and Message
Consumers is it supposed to detect? The assertion is only available in the
Spark Streaming Direct Consumer lib class and seems to be comparing the
value of the Offset provided to Kafka to start reading from with the Offset
of the message record returned by it (ie the Offset which is available as a
field in the Record itself)

 

For example something like the following ie Consumer Offset misalignment
after Leader Failure and subsequent Leader Election?  

http://mkuthan.github.io/blog/2016/01/29/spark-kafka-integration2/ 

The last important Kafka cluster configuration property is
unclean.leader.election.enable. It should be disabled (by default it is
enabled) to avoid unrecoverable exceptions from Kafka consumer. Consider the
situation when the latest committed offset is N, but after leader failure,
the latest offset on the new leader is M < N. M < N because the new leader
was elected from the lagging follower (not in-sync replica). When the
streaming engine ask for data from offset N using Kafka consumer, it will
get an exception because the offset N does not exist yet. Someone will have
to fix offsets manually.

So the minimal recommended Kafka setup for reliable message processing is:

*	4 nodes in the cluster
*	unclean.leader.election.enable=false in the brokers configuration
*	replication factor for the topics - 3
*	min.insync.replicas=2 property in topic configuration
*	ack=all property in the producer configuration
*	block.on.buffer.full=true property in the producer configuration

With the above setup your configuration should be resistant to single broker
failure, and Kafka consumers will survive new leader election.

You could also take look at replica.lag.max.messages and
replica.lag.time.max.ms properties for tuning when the follower is removed
from ISR by the leader. But this is out of this blog post scope.