You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Venkataramanan V <ve...@skyhighnetworks.com> on 2017/11/25 04:31:32 UTC
[Spark-streaming] Inconsistent Error: No partition assignment error
on seek
Hi,
We are running spark streaming on AWS and trying to process incoming
messages on Kafka topics. All was well.
Recently we wanted to migrate from 0.8 to 0.11 version of Spark library and
Kafka 0.11 version of server.
With this new version of software we are facing issues with regard to 'No
assignment to partition for a topic and it happens intermittently'. I
construct four DStreams with different group.ids as suggested.
The main source of code thats causing the issue is this one
if (!toSeek.isEmpty) {
// work around KAFKA-3370 when reset is none
// poll will throw if no position, i.e. auto offset reset none and no
explicit position
// but cant seek to a position before poll, because poll is what gets
subscription partitions
// So, poll, suppress the first exception, then seek
val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
val shouldSuppress = aor != null &&
aor.asInstanceOf[String].toUpperCase == "NONE"
try {
consumer.poll(0)
} catch {
case x: NoOffsetForPartitionException if shouldSuppress =>
logWarning("Catching NoOffsetForPartitionException since " +
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none. See
KAFKA-3370")
}
toSeek.asScala.foreach { case (topicPartition, offset) =>
*consumer.seek(topicPartition, offset)*
}
}
At the start of the job, I also ensure we are supplying all required offsets
correctly
private Map<TopicPartition, Long> getCommittedOffsets(String topic) {
Map<TopicPartition, Long> offsets = new HashMap<>();
List<TopicPartition> topicPartitions =
consumer.partitionsFor(topic).stream().map(partitionInfo ->
new TopicPartition(partitionInfo.topic(),
partitionInfo.partition()))
.collect(Collectors.toList());
Map<TopicPartition, Long> earliestOffsets =
consumer.beginningOffsets(topicPartitions);
// pick committed offsets
for (TopicPartition topicAndPartition : topicPartitions) {
final OffsetAndMetadata committed =
consumer.committed(topicAndPartition);
Long earliestOffset = earliestOffsets.get(topicAndPartition);
if (committed != null && committed.offset() > earliestOffset) {
logger
.warn(
"Committed offset found for: {} offset:{} -> Hence adding
committed offset",
topicAndPartition, committed.offset());
offsets.put(topicAndPartition, committed.offset());
} else {
logger
.warn(
"New partition/stale offset found for: {} offset:{} -> Hence
adding earliest offset",
topicAndPartition, earliestOffset);
offsets.put(topicAndPartition, earliestOffset);
}
}
return offsets;
}
The actual stack trace:
Caused by: java.lang.IllegalStateException: No current assignment for
partition genericEvents-1
2017-11-23 10:35:24,677 - at
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269)
2017-11-23 10:35:24,677 - at
org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:294)
2017-11-23 10:35:24,677 - at
org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1249)
2017-11-23 10:35:24,678 - at
org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:107)
2017-11-23 10:35:24,678 - at
org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:106)
2017-11-23 10:35:24,678 - at
scala.collection.Iterator$class.foreach(Iterator.scala:893)
2017-11-23 10:35:24,678 - at
scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
2017-11-23 10:35:24,678 - at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
2017-11-23 10:35:24,678 - at
scala.collection.AbstractIterable.foreach(Iterable.scala:54)
2017-11-23 10:35:24,678 - at
org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:106)
2017-11-23 10:35:24,679 - at
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:72)
2017-11-23 10:35:24,679 - at
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:242)
2017-11-23 10:35:24,679 - at
org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
2017-11-23 10:35:24,679 - at
org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
2017-11-23 10:35:24,679 - at
scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
2017-11-23 10:35:24,679 - at
scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
2017-11-23 10:35:24,679 - at
scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
2017-11-23 10:35:24,680 - at
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
2017-11-23 10:35:24,680 - at
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
2017-11-23 10:35:24,680 - at
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
2017-11-23 10:35:24,680 - at
scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
2017-11-23 10:35:24,680 - at
scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
2017-11-23 10:35:24,680 - at
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159)
2017-11-23 10:35:24,680 - at
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443)
2017-11-23 10:35:24,681 - at
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149)
2017-11-23 10:35:24,681 - at
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
2017-11-23 10:35:24,681 - at
scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
2017-11-23 10:35:24,681 - at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
Consumer properties
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [172.16.202.58:9092, 172.16.201.212:9092, 172.16.202.57:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = tp-preprocessor-venkat
heartbeat.interval.ms = 100000
interceptor.classes = null
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 300000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
Thanks
Venkat