You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Hster Geguri <hs...@gmail.com> on 2016/11/19 17:12:39 UTC

Mac vs cluster Re: kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic

Hi Cody,

Thank you for testing this on a Saturday morning!  I failed to mention that
when our data engineer runs our drivers(even complex ones) locally on his
Mac, the drivers work fine. However when we launch it into the cluster (4
machines either for a YARN cluster or spark standalone) we get this issue.

Heji


On Sat, Nov 19, 2016 at 8:53 AM, Cody Koeninger <co...@koeninger.org> wrote:

> I ran your example using the versions of kafka and spark you are
> using, against a standalone cluster.  This is what I observed:
>
> (in kafka working directory)
>
> bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
> --broker-list 'localhost:9092' --topic simple_logtest --time -2
> simple_logtest:2:0
> simple_logtest:4:0
> simple_logtest:1:0
> simple_logtest:3:0
> simple_logtest:0:0
>
> bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
> --broker-list 'localhost:9092' --topic simple_logtest --time -1
> simple_logtest:2:31
> simple_logtest:4:31
> simple_logtest:1:31
> simple_logtest:3:31
> simple_logtest:0:31
>
> So in other words, there are 5 partitions, they all have messages in them
>
> (in spark working directory)
>
> bash-3.2$ ./bin/spark-submit --master
> spark://Codys-MacBook-Pro.local:7077 --class
> example.SimpleKafkaLoggingDriver
> /private/var/tmp/kafka-bug-report/target/scala-2.11/
> kafka-example-assembly-2.0.0.jar
> localhost:9092 simple_logtest mygroup earliest
>
>
> 16/11/19 10:47:05 INFO JobScheduler: Starting job streaming job
> 1479574025000 ms.0 from job set of time 1479574025000 ms
>
> simple_logtest 3 offsets: 0 to 31
> simple_logtest 0 offsets: 0 to 31
> simple_logtest 1 offsets: 0 to 31
> simple_logtest 2 offsets: 0 to 31
> simple_logtest 4 offsets: 0 to 31
>
> 16/11/19 10:47:05 INFO JobScheduler: Finished job streaming job
> 1479574025000 ms.0 from job set of time 1479574025000 ms
> 16/11/19 10:47:05 INFO JobScheduler: Total delay: 0.172 s for time
> 1479574025000 ms (execution: 0.005 s)
> 16/11/19 10:47:05 INFO ReceivedBlockTracker: Deleting batches:
> 16/11/19 10:47:05 INFO InputInfoTracker: remove old batch metadata:
> 16/11/19 10:47:10 INFO JobScheduler: Added jobs for time 1479574030000 ms
> 16/11/19 10:47:10 INFO JobScheduler: Starting job streaming job
> 1479574030000 ms.0 from job set of time 1479574030000 ms
>
> simple_logtest 3 offsets: 31 to 31
> simple_logtest 0 offsets: 31 to 31
> simple_logtest 1 offsets: 31 to 31
> simple_logtest 2 offsets: 31 to 31
> simple_logtest 4 offsets: 31 to 31
>
> So in other words, spark is indeed seeing offsets for each partition.
>
>
> The results you posted look to me like there aren't any messages going
> into the other partitions, which looks like a misbehaving producer.
>
> On Thu, Nov 17, 2016 at 5:58 PM, Hster Geguri
> <hs...@gmail.com> wrote:
> > Our team is trying to upgrade to Spark 2.0.2/Kafka 0.10.1.0 and we have
> been
> > struggling with this show stopper problem.
> >
> > When we run our drivers with auto.offset.reset=latest ingesting from a
> > single kafka topic with 10 partitions, the driver reads correctly from
> all
> > 10 partitions.
> >
> > However when we use auto.offset.reset=earliest, the driver will read
> only a
> > single partition.
> >
> > When we turn on the debug logs, we sometimes see partitions being set to
> > different offset configuration even though the consumer config correctly
> > indicates auto.offset.reset=earliest.
> >
> >> 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset.
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> 9 DEBUG Resetting offset for partition simple_test-9 to latest offset.
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> 8 TRACE Sending ListOffsetRequest
> >> {replica_id=-1,topics=[{topic=simple_test,partitions=[{
> partition=8,timestamp=-2}]}]}
> >> to broker 10.102.20.12:9092 (id: 12 rack: null)
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> 9 TRACE Sending ListOffsetRequest
> >> {replica_id=-1,topics=[{topic=simple_test,partitions=[{
> partition=9,timestamp=-1}]}]}
> >> to broker 10.102.20.13:9092 (id: 13 rack: null)
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> 8 TRACE Received ListOffsetResponse
> >> {responses=[{topic=simple_test,partition_responses=[{
> partition=8,error_code=0,timestamp=-1,offset=0}]}]}
> >> from broker 10.102.20.12:9092 (id: 12 rack: null)
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> 9 TRACE Received ListOffsetResponse
> >> {responses=[{topic=simple_test,partition_responses=[{
> partition=9,error_code=0,timestamp=-1,offset=66724}]}]}
> >> from broker 10.102.20.13:9092 (id: 13 rack: null)
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >
> >
> >
> > I've enclosed below the completely stripped down trivial test driver that
> > shows this behavior. We normally run with YARN 2.7.3 but have also tried
> > running spark standalone mode which has the same behavior. Our drivers
> are
> > normally java but we have tried the scala version which also has the same
> > incorrect behavior. We have tried different LocationStrategies and
> partition
> > assignment strategies all without success.  Any insight would be greatly
> > appreciated.
> >
> > package com.xxxxx.labs.analytics.diagnostics.spark.drivers
> >
> > import org.apache.kafka.common.serialization.StringDeserializer
> > import org.apache.spark.SparkConf
> > import org.apache.spark.streaming.{Seconds, StreamingContext}
> > import org.apache.spark.streaming.kafka010._
> > import org.apache.spark.streaming.kafka010.LocationStrategies
> > import org.apache.spark.streaming.kafka010.ConsumerStrategies
> >
> >
> > /**
> >   *
> >   * This driver is only for pulling data from the stream and logging to
> > output just to isolate single partition bug
> >   */
> > object SimpleKafkaLoggingDriver {
> >   def main(args: Array[String]) {
> >     if (args.length != 4) {
> >       System.err.println("Usage: SimpleTestDriver <broker bootstrap
> servers>
> > <topic> <groupId> <offsetReset>")
> >       System.exit(1)
> >     }
> >
> >     val Array(brokers, topic, groupId, offsetReset) = args
> >     val preferredHosts = LocationStrategies.PreferConsistent
> >     val topics = List(topic)
> >
> >     val kafkaParams = Map(
> >       "bootstrap.servers" -> brokers,
> >       "key.deserializer" -> classOf[StringDeserializer],
> >       "value.deserializer" -> classOf[StringDeserializer],
> >       "group.id" -> groupId,
> >       "auto.offset.reset" -> offsetReset,
> >       "enable.auto.commit" -> (false: java.lang.Boolean)
> >     )
> >
> >     val sparkConf = new SparkConf().setAppName("SimpleTestDriver"+"_"
> > +topic)
> >     val streamingContext = new StreamingContext(sparkConf, Seconds(5))
> >
> >
> >     val dstream = KafkaUtils.createDirectStream[String, String](
> >       streamingContext,
> >       preferredHosts,
> >       ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))
> >
> >     dstream.foreachRDD { rdd =>
> >       // Get the offset ranges in the RDD and log
> >       val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
> >       for (o <- offsetRanges) {
> >         println(s"${o.topic} ${o.partition} offsets: ${o.fromOffset} to
> > ${o.untilOffset}")
> >       }
> >     }
> >
> >     streamingContext.start
> >     streamingContext.awaitTermination()
> >
> >   }
> >
> > }
> >
> >
> >
> >> 16/11/17 23:08:21 INFO ConsumerConfig: ConsumerConfig values:
> >>
> >> auto.commit.interval.ms = 5000
> >>
> >> auto.offset.reset = earliest
> >>
> >> bootstrap.servers = [10.102.22.11:9092, 10.102.22.12:9092]
> >>
> >> check.crcs = true
> >>
> >> client.id =
> >>
> >> connections.max.idle.ms = 540000
> >>
> >> enable.auto.commit = false
> >>
> >> exclude.internal.topics = true
> >>
> >> fetch.max.bytes = 52428800
> >>
> >> fetch.max.wait.ms = 500
> >>
> >> fetch.min.bytes = 1
> >>
> >> group.id = simple_test_group
> >>
> >> heartbeat.interval.ms = 3000
> >>
> >> interceptor.classes = null
> >>
> >> key.deserializer = class
> >> org.apache.kafka.common.serialization.StringDeserializer
> >>
> >> max.partition.fetch.bytes = 1048576
> >>
> >> max.poll.interval.ms = 300000
> >>
> >> max.poll.records = 500
> >>
> >> metadata.max.age.ms = 300000
> >>
> >> metric.reporters = []
> >>
> >> metrics.num.samples = 2
> >>
> >> metrics.sample.window.ms = 30000
> >>
> >> partition.assignment.strategy = [class
> >> org.apache.kafka.clients.consumer.RangeAssignor]
> >>
> >> receive.buffer.bytes = 65536
> >>
> >> reconnect.backoff.ms = 50
> >>
> >> request.timeout.ms = 305000
> >>
> >> retry.backoff.ms = 100
> >>
> >> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> >>
> >> sasl.kerberos.min.time.before.relogin = 60000
> >>
> >> sasl.kerberos.service.name = null
> >>
> >> sasl.kerberos.ticket.renew.jitter = 0.05
> >>
> >> sasl.kerberos.ticket.renew.window.factor = 0.8
> >>
> >> sasl.mechanism = GSSAPI
> >>
> >> security.protocol = PLAINTEXT
> >>
> >> send.buffer.bytes = 131072
> >>
> >> session.timeout.ms = 10000
> >>
> >> ssl.cipher.suites = null
> >>
> >> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> >>
> >> ssl.endpoint.identification.algorithm = null
> >>
> >> ssl.key.password = null
> >>
> >> ssl.keymanager.algorithm = SunX509
> >>
> >> ssl.keystore.location = null
> >>
> >> ssl.keystore.password = null
> >>
> >> ssl.keystore.type = JKS
> >>
> >> ssl.protocol = TLS
> >>
> >> ssl.provider = null
> >>
> >> ssl.secure.random.implementation = null
> >>
> >> ssl.trustmanager.algorithm = PKIX
> >>
> >> ssl.truststore.location = null
> >>
> >> ssl.truststore.password = null
> >>
> >> ssl.truststore.type = JKS
> >>
> >> value.deserializer = class
> >> org.apache.kafka.common.serialization.StringDeserializer
> >
> >
> >
> > Below is the output of above driver for 5 partition topic.  Offsets
> always
> > remain 0 for all but a single partition in this case 3
> >
> > simple_logtest 3 offsets: 1623531 to 1623531
> > simple_logtest 0 offsets: 0 to 0
> > simple_logtest 1 offsets: 0 to 0
> > simple_logtest 2 offsets: 0 to 0
> > simple_logtest 4 offsets: 0 to 0
> > simple_logtest 3 offsets: 1623531 to 1623531
> > simple_logtest 0 offsets: 0 to 0
> > simple_logtest 1 offsets: 0 to 0
> > simple_logtest 2 offsets: 0 to 0
> > simple_logtest 4 offsets: 0 to 0
> >
> > simple_logtest 3 offsets: 1623531 to 1623531
> >
> >
> >
> >
>

Re: Mac vs cluster Re: kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic

Posted by Hster Geguri <hs...@gmail.com>.
Hi Cody,

Our test producer has been vetted for producing evenly into each
partition.  We use kafka-manager to track this.

$ kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list '
> 10.102.22.11:9092' --topic simple_logtest --time -2
> simple_logtest:2:0
> simple_logtest:4:0
> simple_logtest:1:0
> simple_logtest:3:0
> simple_logtest:0:0
> $ kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list '
> 10.102.22.11:9092' --topic simple_logtest --time -1
> simple_logtest:2:722964
> simple_logtest:4:722864
> simple_logtest:1:722957
> simple_logtest:3:722960
> simple_logtest:0:723021


We have spent two weeks trying different configurations and stripping
everything down.  The only thing we have not tried is a different cloud
provider- we are using GCE. Since previous versions work properly as does
the "latest" offset setting, we did not think the problem was in the
infrastructure layer.

Thanks,
Heji


On Sat, Nov 19, 2016 at 9:27 AM, Cody Koeninger <co...@koeninger.org> wrote:

> This is running locally on my mac, but it's still a standalone spark
> master with multiple separate executor jvms (i.e. using --master not
> --local[2]), so it should be the same code paths.  I can't speak to
> yarn one way or the other, but you said you tried it with the
> standalone scheduler.
>
> At the very least, you should run ./bin/kafka-run-class.sh
> kafka.tools.GetOffsetShell  with -1 and -2 and compare those results
> to what you're seeing from spark.  The results you posted from spark
> didn't show any incoming messages at all.
>
> On Sat, Nov 19, 2016 at 11:12 AM, Hster Geguri
> <hs...@gmail.com> wrote:
> > Hi Cody,
> >
> > Thank you for testing this on a Saturday morning!  I failed to mention
> that
> > when our data engineer runs our drivers(even complex ones) locally on his
> > Mac, the drivers work fine. However when we launch it into the cluster (4
> > machines either for a YARN cluster or spark standalone) we get this
> issue.
> >
> > Heji
> >
> >
> > On Sat, Nov 19, 2016 at 8:53 AM, Cody Koeninger <co...@koeninger.org>
> wrote:
> >>
> >> I ran your example using the versions of kafka and spark you are
> >> using, against a standalone cluster.  This is what I observed:
> >>
> >> (in kafka working directory)
> >>
> >> bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
> >> --broker-list 'localhost:9092' --topic simple_logtest --time -2
> >> simple_logtest:2:0
> >> simple_logtest:4:0
> >> simple_logtest:1:0
> >> simple_logtest:3:0
> >> simple_logtest:0:0
> >>
> >> bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
> >> --broker-list 'localhost:9092' --topic simple_logtest --time -1
> >> simple_logtest:2:31
> >> simple_logtest:4:31
> >> simple_logtest:1:31
> >> simple_logtest:3:31
> >> simple_logtest:0:31
> >>
> >> So in other words, there are 5 partitions, they all have messages in
> them
> >>
> >> (in spark working directory)
> >>
> >> bash-3.2$ ./bin/spark-submit --master
> >> spark://Codys-MacBook-Pro.local:7077 --class
> >> example.SimpleKafkaLoggingDriver
> >>
> >> /private/var/tmp/kafka-bug-report/target/scala-2.11/
> kafka-example-assembly-2.0.0.jar
> >> localhost:9092 simple_logtest mygroup earliest
> >>
> >>
> >> 16/11/19 10:47:05 INFO JobScheduler: Starting job streaming job
> >> 1479574025000 ms.0 from job set of time 1479574025000 ms
> >>
> >> simple_logtest 3 offsets: 0 to 31
> >> simple_logtest 0 offsets: 0 to 31
> >> simple_logtest 1 offsets: 0 to 31
> >> simple_logtest 2 offsets: 0 to 31
> >> simple_logtest 4 offsets: 0 to 31
> >>
> >> 16/11/19 10:47:05 INFO JobScheduler: Finished job streaming job
> >> 1479574025000 ms.0 from job set of time 1479574025000 ms
> >> 16/11/19 10:47:05 INFO JobScheduler: Total delay: 0.172 s for time
> >> 1479574025000 ms (execution: 0.005 s)
> >> 16/11/19 10:47:05 INFO ReceivedBlockTracker: Deleting batches:
> >> 16/11/19 10:47:05 INFO InputInfoTracker: remove old batch metadata:
> >> 16/11/19 10:47:10 INFO JobScheduler: Added jobs for time 1479574030000
> ms
> >> 16/11/19 10:47:10 INFO JobScheduler: Starting job streaming job
> >> 1479574030000 ms.0 from job set of time 1479574030000 ms
> >>
> >> simple_logtest 3 offsets: 31 to 31
> >> simple_logtest 0 offsets: 31 to 31
> >> simple_logtest 1 offsets: 31 to 31
> >> simple_logtest 2 offsets: 31 to 31
> >> simple_logtest 4 offsets: 31 to 31
> >>
> >> So in other words, spark is indeed seeing offsets for each partition.
> >>
> >>
> >> The results you posted look to me like there aren't any messages going
> >> into the other partitions, which looks like a misbehaving producer.
> >>
> >> On Thu, Nov 17, 2016 at 5:58 PM, Hster Geguri
> >> <hs...@gmail.com> wrote:
> >> > Our team is trying to upgrade to Spark 2.0.2/Kafka 0.10.1.0 and we
> have
> >> > been
> >> > struggling with this show stopper problem.
> >> >
> >> > When we run our drivers with auto.offset.reset=latest ingesting from a
> >> > single kafka topic with 10 partitions, the driver reads correctly from
> >> > all
> >> > 10 partitions.
> >> >
> >> > However when we use auto.offset.reset=earliest, the driver will read
> >> > only a
> >> > single partition.
> >> >
> >> > When we turn on the debug logs, we sometimes see partitions being set
> to
> >> > different offset configuration even though the consumer config
> correctly
> >> > indicates auto.offset.reset=earliest.
> >> >
> >> >> 8 DEBUG Resetting offset for partition simple_test-8 to earliest
> >> >> offset.
> >> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> >> 9 DEBUG Resetting offset for partition simple_test-9 to latest
> offset.
> >> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> >> 8 TRACE Sending ListOffsetRequest
> >> >>
> >> >> {replica_id=-1,topics=[{topic=simple_test,partitions=[{
> partition=8,timestamp=-2}]}]}
> >> >> to broker 10.102.20.12:9092 (id: 12 rack: null)
> >> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> >> 9 TRACE Sending ListOffsetRequest
> >> >>
> >> >> {replica_id=-1,topics=[{topic=simple_test,partitions=[{
> partition=9,timestamp=-1}]}]}
> >> >> to broker 10.102.20.13:9092 (id: 13 rack: null)
> >> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> >> 8 TRACE Received ListOffsetResponse
> >> >>
> >> >> {responses=[{topic=simple_test,partition_responses=[{
> partition=8,error_code=0,timestamp=-1,offset=0}]}]}
> >> >> from broker 10.102.20.12:9092 (id: 12 rack: null)
> >> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> >> 9 TRACE Received ListOffsetResponse
> >> >>
> >> >> {responses=[{topic=simple_test,partition_responses=[{
> partition=9,error_code=0,timestamp=-1,offset=66724}]}]}
> >> >> from broker 10.102.20.13:9092 (id: 13 rack: null)
> >> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> >> 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8
> >> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> >> 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition
> >> >> simple_test-9
> >> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> >
> >> >
> >> >
> >> > I've enclosed below the completely stripped down trivial test driver
> >> > that
> >> > shows this behavior. We normally run with YARN 2.7.3 but have also
> tried
> >> > running spark standalone mode which has the same behavior. Our drivers
> >> > are
> >> > normally java but we have tried the scala version which also has the
> >> > same
> >> > incorrect behavior. We have tried different LocationStrategies and
> >> > partition
> >> > assignment strategies all without success.  Any insight would be
> greatly
> >> > appreciated.
> >> >
> >> > package com.xxxxx.labs.analytics.diagnostics.spark.drivers
> >> >
> >> > import org.apache.kafka.common.serialization.StringDeserializer
> >> > import org.apache.spark.SparkConf
> >> > import org.apache.spark.streaming.{Seconds, StreamingContext}
> >> > import org.apache.spark.streaming.kafka010._
> >> > import org.apache.spark.streaming.kafka010.LocationStrategies
> >> > import org.apache.spark.streaming.kafka010.ConsumerStrategies
> >> >
> >> >
> >> > /**
> >> >   *
> >> >   * This driver is only for pulling data from the stream and logging
> to
> >> > output just to isolate single partition bug
> >> >   */
> >> > object SimpleKafkaLoggingDriver {
> >> >   def main(args: Array[String]) {
> >> >     if (args.length != 4) {
> >> >       System.err.println("Usage: SimpleTestDriver <broker bootstrap
> >> > servers>
> >> > <topic> <groupId> <offsetReset>")
> >> >       System.exit(1)
> >> >     }
> >> >
> >> >     val Array(brokers, topic, groupId, offsetReset) = args
> >> >     val preferredHosts = LocationStrategies.PreferConsistent
> >> >     val topics = List(topic)
> >> >
> >> >     val kafkaParams = Map(
> >> >       "bootstrap.servers" -> brokers,
> >> >       "key.deserializer" -> classOf[StringDeserializer],
> >> >       "value.deserializer" -> classOf[StringDeserializer],
> >> >       "group.id" -> groupId,
> >> >       "auto.offset.reset" -> offsetReset,
> >> >       "enable.auto.commit" -> (false: java.lang.Boolean)
> >> >     )
> >> >
> >> >     val sparkConf = new SparkConf().setAppName("SimpleTestDriver"+"_"
> >> > +topic)
> >> >     val streamingContext = new StreamingContext(sparkConf, Seconds(5))
> >> >
> >> >
> >> >     val dstream = KafkaUtils.createDirectStream[String, String](
> >> >       streamingContext,
> >> >       preferredHosts,
> >> >       ConsumerStrategies.Subscribe[String, String](topics,
> kafkaParams))
> >> >
> >> >     dstream.foreachRDD { rdd =>
> >> >       // Get the offset ranges in the RDD and log
> >> >       val offsetRanges = rdd.asInstanceOf[
> HasOffsetRanges].offsetRanges
> >> >       for (o <- offsetRanges) {
> >> >         println(s"${o.topic} ${o.partition} offsets: ${o.fromOffset}
> to
> >> > ${o.untilOffset}")
> >> >       }
> >> >     }
> >> >
> >> >     streamingContext.start
> >> >     streamingContext.awaitTermination()
> >> >
> >> >   }
> >> >
> >> > }
> >> >
> >> >
> >> >
> >> >> 16/11/17 23:08:21 INFO ConsumerConfig: ConsumerConfig values:
> >> >>
> >> >> auto.commit.interval.ms = 5000
> >> >>
> >> >> auto.offset.reset = earliest
> >> >>
> >> >> bootstrap.servers = [10.102.22.11:9092, 10.102.22.12:9092]
> >> >>
> >> >> check.crcs = true
> >> >>
> >> >> client.id =
> >> >>
> >> >> connections.max.idle.ms = 540000
> >> >>
> >> >> enable.auto.commit = false
> >> >>
> >> >> exclude.internal.topics = true
> >> >>
> >> >> fetch.max.bytes = 52428800
> >> >>
> >> >> fetch.max.wait.ms = 500
> >> >>
> >> >> fetch.min.bytes = 1
> >> >>
> >> >> group.id = simple_test_group
> >> >>
> >> >> heartbeat.interval.ms = 3000
> >> >>
> >> >> interceptor.classes = null
> >> >>
> >> >> key.deserializer = class
> >> >> org.apache.kafka.common.serialization.StringDeserializer
> >> >>
> >> >> max.partition.fetch.bytes = 1048576
> >> >>
> >> >> max.poll.interval.ms = 300000
> >> >>
> >> >> max.poll.records = 500
> >> >>
> >> >> metadata.max.age.ms = 300000
> >> >>
> >> >> metric.reporters = []
> >> >>
> >> >> metrics.num.samples = 2
> >> >>
> >> >> metrics.sample.window.ms = 30000
> >> >>
> >> >> partition.assignment.strategy = [class
> >> >> org.apache.kafka.clients.consumer.RangeAssignor]
> >> >>
> >> >> receive.buffer.bytes = 65536
> >> >>
> >> >> reconnect.backoff.ms = 50
> >> >>
> >> >> request.timeout.ms = 305000
> >> >>
> >> >> retry.backoff.ms = 100
> >> >>
> >> >> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> >> >>
> >> >> sasl.kerberos.min.time.before.relogin = 60000
> >> >>
> >> >> sasl.kerberos.service.name = null
> >> >>
> >> >> sasl.kerberos.ticket.renew.jitter = 0.05
> >> >>
> >> >> sasl.kerberos.ticket.renew.window.factor = 0.8
> >> >>
> >> >> sasl.mechanism = GSSAPI
> >> >>
> >> >> security.protocol = PLAINTEXT
> >> >>
> >> >> send.buffer.bytes = 131072
> >> >>
> >> >> session.timeout.ms = 10000
> >> >>
> >> >> ssl.cipher.suites = null
> >> >>
> >> >> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> >> >>
> >> >> ssl.endpoint.identification.algorithm = null
> >> >>
> >> >> ssl.key.password = null
> >> >>
> >> >> ssl.keymanager.algorithm = SunX509
> >> >>
> >> >> ssl.keystore.location = null
> >> >>
> >> >> ssl.keystore.password = null
> >> >>
> >> >> ssl.keystore.type = JKS
> >> >>
> >> >> ssl.protocol = TLS
> >> >>
> >> >> ssl.provider = null
> >> >>
> >> >> ssl.secure.random.implementation = null
> >> >>
> >> >> ssl.trustmanager.algorithm = PKIX
> >> >>
> >> >> ssl.truststore.location = null
> >> >>
> >> >> ssl.truststore.password = null
> >> >>
> >> >> ssl.truststore.type = JKS
> >> >>
> >> >> value.deserializer = class
> >> >> org.apache.kafka.common.serialization.StringDeserializer
> >> >
> >> >
> >> >
> >> > Below is the output of above driver for 5 partition topic.  Offsets
> >> > always
> >> > remain 0 for all but a single partition in this case 3
> >> >
> >> > simple_logtest 3 offsets: 1623531 to 1623531
> >> > simple_logtest 0 offsets: 0 to 0
> >> > simple_logtest 1 offsets: 0 to 0
> >> > simple_logtest 2 offsets: 0 to 0
> >> > simple_logtest 4 offsets: 0 to 0
> >> > simple_logtest 3 offsets: 1623531 to 1623531
> >> > simple_logtest 0 offsets: 0 to 0
> >> > simple_logtest 1 offsets: 0 to 0
> >> > simple_logtest 2 offsets: 0 to 0
> >> > simple_logtest 4 offsets: 0 to 0
> >> >
> >> > simple_logtest 3 offsets: 1623531 to 1623531
> >> >
> >> >
> >> >
> >> >
> >
> >
>

Re: Mac vs cluster Re: kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic

Posted by Cody Koeninger <co...@koeninger.org>.
This is running locally on my mac, but it's still a standalone spark
master with multiple separate executor jvms (i.e. using --master not
--local[2]), so it should be the same code paths.  I can't speak to
yarn one way or the other, but you said you tried it with the
standalone scheduler.

At the very least, you should run ./bin/kafka-run-class.sh
kafka.tools.GetOffsetShell  with -1 and -2 and compare those results
to what you're seeing from spark.  The results you posted from spark
didn't show any incoming messages at all.

On Sat, Nov 19, 2016 at 11:12 AM, Hster Geguri
<hs...@gmail.com> wrote:
> Hi Cody,
>
> Thank you for testing this on a Saturday morning!  I failed to mention that
> when our data engineer runs our drivers(even complex ones) locally on his
> Mac, the drivers work fine. However when we launch it into the cluster (4
> machines either for a YARN cluster or spark standalone) we get this issue.
>
> Heji
>
>
> On Sat, Nov 19, 2016 at 8:53 AM, Cody Koeninger <co...@koeninger.org> wrote:
>>
>> I ran your example using the versions of kafka and spark you are
>> using, against a standalone cluster.  This is what I observed:
>>
>> (in kafka working directory)
>>
>> bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
>> --broker-list 'localhost:9092' --topic simple_logtest --time -2
>> simple_logtest:2:0
>> simple_logtest:4:0
>> simple_logtest:1:0
>> simple_logtest:3:0
>> simple_logtest:0:0
>>
>> bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
>> --broker-list 'localhost:9092' --topic simple_logtest --time -1
>> simple_logtest:2:31
>> simple_logtest:4:31
>> simple_logtest:1:31
>> simple_logtest:3:31
>> simple_logtest:0:31
>>
>> So in other words, there are 5 partitions, they all have messages in them
>>
>> (in spark working directory)
>>
>> bash-3.2$ ./bin/spark-submit --master
>> spark://Codys-MacBook-Pro.local:7077 --class
>> example.SimpleKafkaLoggingDriver
>>
>> /private/var/tmp/kafka-bug-report/target/scala-2.11/kafka-example-assembly-2.0.0.jar
>> localhost:9092 simple_logtest mygroup earliest
>>
>>
>> 16/11/19 10:47:05 INFO JobScheduler: Starting job streaming job
>> 1479574025000 ms.0 from job set of time 1479574025000 ms
>>
>> simple_logtest 3 offsets: 0 to 31
>> simple_logtest 0 offsets: 0 to 31
>> simple_logtest 1 offsets: 0 to 31
>> simple_logtest 2 offsets: 0 to 31
>> simple_logtest 4 offsets: 0 to 31
>>
>> 16/11/19 10:47:05 INFO JobScheduler: Finished job streaming job
>> 1479574025000 ms.0 from job set of time 1479574025000 ms
>> 16/11/19 10:47:05 INFO JobScheduler: Total delay: 0.172 s for time
>> 1479574025000 ms (execution: 0.005 s)
>> 16/11/19 10:47:05 INFO ReceivedBlockTracker: Deleting batches:
>> 16/11/19 10:47:05 INFO InputInfoTracker: remove old batch metadata:
>> 16/11/19 10:47:10 INFO JobScheduler: Added jobs for time 1479574030000 ms
>> 16/11/19 10:47:10 INFO JobScheduler: Starting job streaming job
>> 1479574030000 ms.0 from job set of time 1479574030000 ms
>>
>> simple_logtest 3 offsets: 31 to 31
>> simple_logtest 0 offsets: 31 to 31
>> simple_logtest 1 offsets: 31 to 31
>> simple_logtest 2 offsets: 31 to 31
>> simple_logtest 4 offsets: 31 to 31
>>
>> So in other words, spark is indeed seeing offsets for each partition.
>>
>>
>> The results you posted look to me like there aren't any messages going
>> into the other partitions, which looks like a misbehaving producer.
>>
>> On Thu, Nov 17, 2016 at 5:58 PM, Hster Geguri
>> <hs...@gmail.com> wrote:
>> > Our team is trying to upgrade to Spark 2.0.2/Kafka 0.10.1.0 and we have
>> > been
>> > struggling with this show stopper problem.
>> >
>> > When we run our drivers with auto.offset.reset=latest ingesting from a
>> > single kafka topic with 10 partitions, the driver reads correctly from
>> > all
>> > 10 partitions.
>> >
>> > However when we use auto.offset.reset=earliest, the driver will read
>> > only a
>> > single partition.
>> >
>> > When we turn on the debug logs, we sometimes see partitions being set to
>> > different offset configuration even though the consumer config correctly
>> > indicates auto.offset.reset=earliest.
>> >
>> >> 8 DEBUG Resetting offset for partition simple_test-8 to earliest
>> >> offset.
>> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> >> 9 DEBUG Resetting offset for partition simple_test-9 to latest offset.
>> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> >> 8 TRACE Sending ListOffsetRequest
>> >>
>> >> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]}
>> >> to broker 10.102.20.12:9092 (id: 12 rack: null)
>> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> >> 9 TRACE Sending ListOffsetRequest
>> >>
>> >> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]}
>> >> to broker 10.102.20.13:9092 (id: 13 rack: null)
>> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> >> 8 TRACE Received ListOffsetResponse
>> >>
>> >> {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]}
>> >> from broker 10.102.20.12:9092 (id: 12 rack: null)
>> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> >> 9 TRACE Received ListOffsetResponse
>> >>
>> >> {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]}
>> >> from broker 10.102.20.13:9092 (id: 13 rack: null)
>> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> >> 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8
>> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> >> 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition
>> >> simple_test-9
>> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> >
>> >
>> >
>> > I've enclosed below the completely stripped down trivial test driver
>> > that
>> > shows this behavior. We normally run with YARN 2.7.3 but have also tried
>> > running spark standalone mode which has the same behavior. Our drivers
>> > are
>> > normally java but we have tried the scala version which also has the
>> > same
>> > incorrect behavior. We have tried different LocationStrategies and
>> > partition
>> > assignment strategies all without success.  Any insight would be greatly
>> > appreciated.
>> >
>> > package com.xxxxx.labs.analytics.diagnostics.spark.drivers
>> >
>> > import org.apache.kafka.common.serialization.StringDeserializer
>> > import org.apache.spark.SparkConf
>> > import org.apache.spark.streaming.{Seconds, StreamingContext}
>> > import org.apache.spark.streaming.kafka010._
>> > import org.apache.spark.streaming.kafka010.LocationStrategies
>> > import org.apache.spark.streaming.kafka010.ConsumerStrategies
>> >
>> >
>> > /**
>> >   *
>> >   * This driver is only for pulling data from the stream and logging to
>> > output just to isolate single partition bug
>> >   */
>> > object SimpleKafkaLoggingDriver {
>> >   def main(args: Array[String]) {
>> >     if (args.length != 4) {
>> >       System.err.println("Usage: SimpleTestDriver <broker bootstrap
>> > servers>
>> > <topic> <groupId> <offsetReset>")
>> >       System.exit(1)
>> >     }
>> >
>> >     val Array(brokers, topic, groupId, offsetReset) = args
>> >     val preferredHosts = LocationStrategies.PreferConsistent
>> >     val topics = List(topic)
>> >
>> >     val kafkaParams = Map(
>> >       "bootstrap.servers" -> brokers,
>> >       "key.deserializer" -> classOf[StringDeserializer],
>> >       "value.deserializer" -> classOf[StringDeserializer],
>> >       "group.id" -> groupId,
>> >       "auto.offset.reset" -> offsetReset,
>> >       "enable.auto.commit" -> (false: java.lang.Boolean)
>> >     )
>> >
>> >     val sparkConf = new SparkConf().setAppName("SimpleTestDriver"+"_"
>> > +topic)
>> >     val streamingContext = new StreamingContext(sparkConf, Seconds(5))
>> >
>> >
>> >     val dstream = KafkaUtils.createDirectStream[String, String](
>> >       streamingContext,
>> >       preferredHosts,
>> >       ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))
>> >
>> >     dstream.foreachRDD { rdd =>
>> >       // Get the offset ranges in the RDD and log
>> >       val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>> >       for (o <- offsetRanges) {
>> >         println(s"${o.topic} ${o.partition} offsets: ${o.fromOffset} to
>> > ${o.untilOffset}")
>> >       }
>> >     }
>> >
>> >     streamingContext.start
>> >     streamingContext.awaitTermination()
>> >
>> >   }
>> >
>> > }
>> >
>> >
>> >
>> >> 16/11/17 23:08:21 INFO ConsumerConfig: ConsumerConfig values:
>> >>
>> >> auto.commit.interval.ms = 5000
>> >>
>> >> auto.offset.reset = earliest
>> >>
>> >> bootstrap.servers = [10.102.22.11:9092, 10.102.22.12:9092]
>> >>
>> >> check.crcs = true
>> >>
>> >> client.id =
>> >>
>> >> connections.max.idle.ms = 540000
>> >>
>> >> enable.auto.commit = false
>> >>
>> >> exclude.internal.topics = true
>> >>
>> >> fetch.max.bytes = 52428800
>> >>
>> >> fetch.max.wait.ms = 500
>> >>
>> >> fetch.min.bytes = 1
>> >>
>> >> group.id = simple_test_group
>> >>
>> >> heartbeat.interval.ms = 3000
>> >>
>> >> interceptor.classes = null
>> >>
>> >> key.deserializer = class
>> >> org.apache.kafka.common.serialization.StringDeserializer
>> >>
>> >> max.partition.fetch.bytes = 1048576
>> >>
>> >> max.poll.interval.ms = 300000
>> >>
>> >> max.poll.records = 500
>> >>
>> >> metadata.max.age.ms = 300000
>> >>
>> >> metric.reporters = []
>> >>
>> >> metrics.num.samples = 2
>> >>
>> >> metrics.sample.window.ms = 30000
>> >>
>> >> partition.assignment.strategy = [class
>> >> org.apache.kafka.clients.consumer.RangeAssignor]
>> >>
>> >> receive.buffer.bytes = 65536
>> >>
>> >> reconnect.backoff.ms = 50
>> >>
>> >> request.timeout.ms = 305000
>> >>
>> >> retry.backoff.ms = 100
>> >>
>> >> sasl.kerberos.kinit.cmd = /usr/bin/kinit
>> >>
>> >> sasl.kerberos.min.time.before.relogin = 60000
>> >>
>> >> sasl.kerberos.service.name = null
>> >>
>> >> sasl.kerberos.ticket.renew.jitter = 0.05
>> >>
>> >> sasl.kerberos.ticket.renew.window.factor = 0.8
>> >>
>> >> sasl.mechanism = GSSAPI
>> >>
>> >> security.protocol = PLAINTEXT
>> >>
>> >> send.buffer.bytes = 131072
>> >>
>> >> session.timeout.ms = 10000
>> >>
>> >> ssl.cipher.suites = null
>> >>
>> >> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>> >>
>> >> ssl.endpoint.identification.algorithm = null
>> >>
>> >> ssl.key.password = null
>> >>
>> >> ssl.keymanager.algorithm = SunX509
>> >>
>> >> ssl.keystore.location = null
>> >>
>> >> ssl.keystore.password = null
>> >>
>> >> ssl.keystore.type = JKS
>> >>
>> >> ssl.protocol = TLS
>> >>
>> >> ssl.provider = null
>> >>
>> >> ssl.secure.random.implementation = null
>> >>
>> >> ssl.trustmanager.algorithm = PKIX
>> >>
>> >> ssl.truststore.location = null
>> >>
>> >> ssl.truststore.password = null
>> >>
>> >> ssl.truststore.type = JKS
>> >>
>> >> value.deserializer = class
>> >> org.apache.kafka.common.serialization.StringDeserializer
>> >
>> >
>> >
>> > Below is the output of above driver for 5 partition topic.  Offsets
>> > always
>> > remain 0 for all but a single partition in this case 3
>> >
>> > simple_logtest 3 offsets: 1623531 to 1623531
>> > simple_logtest 0 offsets: 0 to 0
>> > simple_logtest 1 offsets: 0 to 0
>> > simple_logtest 2 offsets: 0 to 0
>> > simple_logtest 4 offsets: 0 to 0
>> > simple_logtest 3 offsets: 1623531 to 1623531
>> > simple_logtest 0 offsets: 0 to 0
>> > simple_logtest 1 offsets: 0 to 0
>> > simple_logtest 2 offsets: 0 to 0
>> > simple_logtest 4 offsets: 0 to 0
>> >
>> > simple_logtest 3 offsets: 1623531 to 1623531
>> >
>> >
>> >
>> >
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org