You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Venkataramanan V <ve...@skyhighnetworks.com> on 2017/11/25 04:31:32 UTC

[Spark-streaming] Inconsistent Error: No partition assignment error on seek

Hi,


We are running spark streaming on AWS and trying to process incoming
messages on Kafka topics. All was well.
Recently we wanted to migrate from 0.8 to 0.11 version of Spark library and
Kafka 0.11 version of server.

With this new version of software we are facing issues with regard to 'No
assignment to partition for a topic and it happens intermittently'. I
construct four DStreams with different group.ids as suggested.

The main source of code thats causing the issue is this one

if (!toSeek.isEmpty) {
      // work around KAFKA-3370 when reset is none
      // poll will throw if no position, i.e. auto offset reset none and no
explicit position
      // but cant seek to a position before poll, because poll is what gets
subscription partitions
      // So, poll, suppress the first exception, then seek
      val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
      val shouldSuppress = aor != null &&
aor.asInstanceOf[String].toUpperCase == "NONE"
      try {
        consumer.poll(0)
      } catch {
        case x: NoOffsetForPartitionException if shouldSuppress =>
          logWarning("Catching NoOffsetForPartitionException since " +
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none.  See
KAFKA-3370")
      }
      toSeek.asScala.foreach { case (topicPartition, offset) =>
          *consumer.seek(topicPartition, offset)*
      }
    }

At the start of the job, I also ensure we are supplying all required offsets
correctly

private Map<TopicPartition, Long> getCommittedOffsets(String topic) {
    Map<TopicPartition, Long> offsets = new HashMap<>();
    List<TopicPartition> topicPartitions =
        consumer.partitionsFor(topic).stream().map(partitionInfo ->
            new TopicPartition(partitionInfo.topic(),
partitionInfo.partition()))
            .collect(Collectors.toList());
    Map<TopicPartition, Long> earliestOffsets =
consumer.beginningOffsets(topicPartitions);
    // pick committed offsets
    for (TopicPartition topicAndPartition : topicPartitions) {
      final OffsetAndMetadata committed =
consumer.committed(topicAndPartition);
      Long earliestOffset = earliestOffsets.get(topicAndPartition);
      if (committed != null && committed.offset() > earliestOffset) {
        logger
            .warn(
                "Committed offset found for: {} offset:{} -> Hence adding
committed offset",
                topicAndPartition, committed.offset());
        offsets.put(topicAndPartition, committed.offset());
      } else {
        logger
            .warn(
                "New partition/stale offset found for: {} offset:{} -> Hence
adding earliest offset",
                topicAndPartition, earliestOffset);
        offsets.put(topicAndPartition, earliestOffset);
      }
    }
    return offsets;
  }

The actual stack trace:

Caused by: java.lang.IllegalStateException: No current assignment for
partition genericEvents-1
2017-11-23 10:35:24,677 -    at
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269)
2017-11-23 10:35:24,677 -    at
org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:294)
2017-11-23 10:35:24,677 -    at
org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1249)
2017-11-23 10:35:24,678 -    at
org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:107)
2017-11-23 10:35:24,678 -    at
org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:106)
2017-11-23 10:35:24,678 -    at
scala.collection.Iterator$class.foreach(Iterator.scala:893)
2017-11-23 10:35:24,678 -    at
scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
2017-11-23 10:35:24,678 -    at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
2017-11-23 10:35:24,678 -    at
scala.collection.AbstractIterable.foreach(Iterable.scala:54)
2017-11-23 10:35:24,678 -    at
org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:106)
2017-11-23 10:35:24,679 -    at
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:72)
2017-11-23 10:35:24,679 -    at
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:242)
2017-11-23 10:35:24,679 -    at
org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
2017-11-23 10:35:24,679 -    at
org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
2017-11-23 10:35:24,679 -    at
scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
2017-11-23 10:35:24,679 -    at
scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
2017-11-23 10:35:24,679 -    at
scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
2017-11-23 10:35:24,680 -    at
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
2017-11-23 10:35:24,680 -    at
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
2017-11-23 10:35:24,680 -    at
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
2017-11-23 10:35:24,680 -    at
scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
2017-11-23 10:35:24,680 -    at
scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
2017-11-23 10:35:24,680 -    at
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159)
2017-11-23 10:35:24,680 -    at
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443)
2017-11-23 10:35:24,681 -    at
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149)
2017-11-23 10:35:24,681 -    at
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
2017-11-23 10:35:24,681 -    at
scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
2017-11-23 10:35:24,681 -    at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

Consumer properties

auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [172.16.202.58:9092, 172.16.201.212:9092, 172.16.202.57:9092]
    check.crcs = true
    client.id =
    connections.max.idle.ms = 540000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = tp-preprocessor-venkat
    heartbeat.interval.ms = 100000
    interceptor.classes = null
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 305000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 300000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS

Thanks
Venkat